Skip to content

Commit 656ffeb

Browse files
authored
feat: implement ComposeMany (#3239)
This fixes #3016.
1 parent 1042627 commit 656ffeb

File tree

7 files changed

+615
-0
lines changed

7 files changed

+615
-0
lines changed

google/cloud/storage/client.cc

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,36 @@ StatusOr<PolicyDocumentResult> Client::SignPolicyDocument(
336336
internal::Base64Encode(signed_blob->signed_blob)};
337337
}
338338

339+
namespace internal {
340+
341+
ScopedDeleter::ScopedDeleter(std::function<Status(ObjectMetadata)> delete_fun)
342+
: delete_fun_(std::move(delete_fun)) {}
343+
344+
ScopedDeleter::~ScopedDeleter() { ExecuteDelete(); }
345+
346+
void ScopedDeleter::Add(ObjectMetadata object) {
347+
object_list_.emplace_back(std::move(object));
348+
}
349+
350+
Status ScopedDeleter::ExecuteDelete() {
351+
std::vector<ObjectMetadata> object_list;
352+
// make sure the dtor will not do this again
353+
object_list.swap(object_list_);
354+
355+
for (auto& object : object_list) {
356+
Status status = delete_fun_(std::move(object));
357+
// Fail on first error. If the service is unavailable, every deletion
358+
// would potentially keep retrying until the timeout passes - this would
359+
// take way too much time and would be pointless.
360+
if (!status.ok()) {
361+
return status;
362+
}
363+
}
364+
return Status();
365+
}
366+
367+
} // namespace internal
368+
339369
} // namespace STORAGE_CLIENT_NS
340370
} // namespace storage
341371
} // namespace cloud

google/cloud/storage/client.h

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3171,6 +3171,213 @@ Status DeleteByPrefix(Client& client, std::string const& bucket_name,
31713171
return Status();
31723172
}
31733173

3174+
namespace internal {
3175+
3176+
// Just a wrapper to allow for use in `google::cloud::internal::apply`.
3177+
struct ComposeApplyHelper {
3178+
template <typename... Options>
3179+
StatusOr<ObjectMetadata> operator()(Options... options) const {
3180+
return client.ComposeObject(
3181+
std::move(bucket_name), std::move(source_objects),
3182+
std::move(destination_object_name), std::move(options)...);
3183+
}
3184+
3185+
Client& client;
3186+
std::string bucket_name;
3187+
std::vector<ComposeSourceObject> source_objects;
3188+
std::string destination_object_name;
3189+
};
3190+
3191+
// A helper to defer deletion of temporary GCS objects.
3192+
class ScopedDeleter {
3193+
public:
3194+
// The actual deletion depends on local's types in a very non-trivial way,
3195+
// so we abstract this away by providing the function to delete one object.
3196+
ScopedDeleter(std::function<Status(ObjectMetadata)> delete_fun);
3197+
~ScopedDeleter();
3198+
3199+
/// Defer object's deletion to this objects destruction (or ExecuteDelete())
3200+
void Add(ObjectMetadata object);
3201+
3202+
/// Execute all the deferred deletions now.
3203+
Status ExecuteDelete();
3204+
3205+
private:
3206+
std::function<Status(ObjectMetadata)> delete_fun_;
3207+
std::vector<ObjectMetadata> object_list_;
3208+
};
3209+
3210+
} // namespace internal
3211+
3212+
/**
3213+
* Compose existing objects into a new object in the same bucket.
3214+
*
3215+
* Contrary to `Client::ComposeObject`, this function doesn't have a limit on
3216+
* the number of source objects.
3217+
*
3218+
* The implementation may need to perform multiple Client::ComposeObject calls
3219+
* to create intermediate, temporary objects which are then further composed.
3220+
* Due to the lack of atomicity of this series of operations, stray temporary
3221+
* objects might be left over if there are transient failuers. In order to allow
3222+
* the user to easily control for such situations, the user is expected to
3223+
* provide a unique @p prefix parameter, which will become the prefix of all the
3224+
* temporary objects created by this function. Once this function finishes, the
3225+
* user may safely remove all objects with the provided prefix (e.g. via
3226+
* DeleteByPrefix()). We recommend using CreateRandomPrefix() for selecting a
3227+
* random prefix within a bucket.
3228+
*
3229+
* @param client the client on which to perform the operations needed by this
3230+
* function
3231+
* @param bucket_name the name of the bucket used for source object and
3232+
* destination object.
3233+
* @param source_objects objects used to compose `destination_object_name`.
3234+
* @param destination_object_name the composed object name.
3235+
* @param prefix prefix for temporary objects created by this function; the user
3236+
* should guarantee that there are no objects with this prefix (except for a
3237+
* marker, as created by `CreateRandomPrefix`)
3238+
* @param ignore_cleanup_failures if the composition succeeds but cleanup of
3239+
* temporary objects fails, depending on this parameter either a success
3240+
* will be returned (`true`) or the relevant cleanup error (`false`)
3241+
* @param options a list of optional query parameters and/or request headers.
3242+
* Valid types for this operation include `DestinationPredefinedAcl`,
3243+
* `EncryptionKey`, `IfGenerationMatch`, `IfMetagenerationMatch`
3244+
* `KmsKeyName`, `QuotaUser`, `UserIp`, `UserProject` and
3245+
* `WithObjectMetadata`.
3246+
*
3247+
* @par Idempotency
3248+
* This operation is not idempotent. While each request performed by this
3249+
* function is retried based on the client policies, the operation itself stops
3250+
* on the first request that fails.
3251+
*
3252+
* @par Example
3253+
* @snippet storage_object_samples.cc compose object from many
3254+
*/
3255+
template <typename... Options>
3256+
StatusOr<ObjectMetadata> ComposeMany(
3257+
Client& client, std::string const& bucket_name,
3258+
std::vector<ComposeSourceObject> source_objects, std::string const& prefix,
3259+
std::string destination_object_name, bool ignore_cleanup_failures,
3260+
Options&&... options) {
3261+
using internal::Among;
3262+
using internal::NotAmong;
3263+
using internal::StaticTupleFilter;
3264+
std::size_t const max_num_objects = 32;
3265+
3266+
if (prefix.empty()) {
3267+
// In theory, nothing prevents us from using an empty prefix, but this is
3268+
// most likely a misuse. We recommend running `DeleteByPrefix` on this
3269+
// prefix, so this might not be the best idea.
3270+
return Status(StatusCode::kInvalidArgument,
3271+
"ComposeMany requires a non-empty prefix to create temporary "
3272+
"objects. Please check the documentation.");
3273+
}
3274+
3275+
if (source_objects.empty()) {
3276+
return Status(StatusCode::kInvalidArgument,
3277+
"ComposeMany requires at least one source object.");
3278+
}
3279+
3280+
auto all_options = std::tie(options...);
3281+
3282+
// TODO(#3247): this list of type should somehow be generated
3283+
static_assert(
3284+
std::tuple_size<decltype(
3285+
StaticTupleFilter<NotAmong<
3286+
DestinationPredefinedAcl, EncryptionKey, IfGenerationMatch,
3287+
IfMetagenerationMatch, KmsKeyName, QuotaUser, UserIp,
3288+
UserProject, WithObjectMetadata>::TPred>(
3289+
all_options))>::value == 0,
3290+
"This functions accepts only options of type DestinationPredefinedAcl, "
3291+
"EncryptionKey, IfGenerationMatch, IfMetagenerationMatch, KmsKeyName, "
3292+
"QuotaUser, UserIp, UserProject or WithObjectMetadata.");
3293+
3294+
internal::ScopedDeleter deleter([&](ObjectMetadata const& object) {
3295+
return google::cloud::internal::apply(
3296+
internal::DeleteApplyHelper{client, bucket_name, object.name()},
3297+
std::tuple_cat(
3298+
std::make_tuple(IfGenerationMatch(object.generation())),
3299+
StaticTupleFilter<NotAmong<Versions>::TPred>(all_options)));
3300+
});
3301+
3302+
std::size_t num_tmp_objects = 0;
3303+
auto tmpobject_name_gen = [&num_tmp_objects, &prefix] {
3304+
return prefix + ".compose-tmp-" + std::to_string(num_tmp_objects++);
3305+
};
3306+
3307+
auto to_source_objects = [](std::vector<ObjectMetadata> objects) {
3308+
std::vector<ComposeSourceObject> sources(objects.size());
3309+
std::transform(objects.begin(), objects.end(), sources.begin(),
3310+
[](ObjectMetadata m) {
3311+
return ComposeSourceObject{m.name(), m.generation(), {}};
3312+
});
3313+
return sources;
3314+
};
3315+
3316+
auto composer = [&](std::vector<ComposeSourceObject> compose_range,
3317+
bool is_final) -> StatusOr<ObjectMetadata> {
3318+
if (is_final) {
3319+
return google::cloud::internal::apply(
3320+
internal::ComposeApplyHelper{client, bucket_name,
3321+
std::move(compose_range),
3322+
std::move(destination_object_name)},
3323+
std::tuple_cat(std::make_tuple(IfGenerationMatch(0)), all_options));
3324+
}
3325+
return google::cloud::internal::apply(
3326+
internal::ComposeApplyHelper{client, bucket_name,
3327+
std::move(compose_range),
3328+
tmpobject_name_gen()},
3329+
StaticTupleFilter<
3330+
NotAmong<IfGenerationMatch, IfMetagenerationMatch>::TPred>(
3331+
all_options));
3332+
};
3333+
3334+
auto reduce = [&](std::vector<ComposeSourceObject> source_objects)
3335+
-> StatusOr<std::vector<ObjectMetadata>> {
3336+
std::vector<ObjectMetadata> objects;
3337+
for (auto range_begin = source_objects.begin();
3338+
range_begin != source_objects.end();) {
3339+
std::size_t range_size = std::min<std::size_t>(
3340+
std::distance(range_begin, source_objects.end()), max_num_objects);
3341+
auto range_end = std::next(range_begin, range_size);
3342+
std::vector<ComposeSourceObject> compose_range(range_size);
3343+
std::move(range_begin, range_end, compose_range.begin());
3344+
3345+
bool const is_final_composition =
3346+
source_objects.size() <= max_num_objects;
3347+
auto object = composer(std::move(compose_range), is_final_composition);
3348+
if (!object) {
3349+
return std::move(object).status();
3350+
}
3351+
objects.push_back(*std::move(object));
3352+
if (!is_final_composition) {
3353+
deleter.Add(objects.back());
3354+
}
3355+
range_begin = range_end;
3356+
}
3357+
return objects;
3358+
};
3359+
3360+
StatusOr<ObjectMetadata> result;
3361+
do {
3362+
StatusOr<std::vector<ObjectMetadata>> objects = reduce(source_objects);
3363+
if (!objects) {
3364+
return objects.status();
3365+
}
3366+
if (objects->size() == 1) {
3367+
if (!ignore_cleanup_failures) {
3368+
auto delete_status = deleter.ExecuteDelete();
3369+
if (!delete_status.ok()) {
3370+
return delete_status;
3371+
}
3372+
}
3373+
result = std::move((*objects)[0]);
3374+
break;
3375+
}
3376+
source_objects = to_source_objects(*std::move(objects));
3377+
} while (source_objects.size() > 1);
3378+
return result;
3379+
}
3380+
31743381
} // namespace STORAGE_CLIENT_NS
31753382
} // namespace storage
31763383
} // namespace cloud

google/cloud/storage/examples/run_examples_utils.sh

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,11 @@ run_all_object_examples() {
398398
"${object_name}"
399399
run_example ./storage_object_samples delete-object \
400400
"${bucket_name}" "${composed_object_name}"
401+
run_example ./storage_object_samples compose-object-from-many \
402+
"${bucket_name}" "${composed_object_name}" "${object_name}" \
403+
"${object_name}"
404+
run_example ./storage_object_samples delete-object \
405+
"${bucket_name}" "${composed_object_name}"
401406
run_example ./storage_object_samples copy-object \
402407
"${bucket_name}" "${object_name}" \
403408
"${bucket_name}" "${copied_object_name}"

google/cloud/storage/examples/storage_object_samples.cc

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1097,6 +1097,48 @@ void ComposeObjectFromEncryptedObjects(google::cloud::storage::Client client,
10971097
std::move(compose_objects));
10981098
}
10991099

1100+
void ComposeObjectFromMany(google::cloud::storage::Client client, int& argc,
1101+
char* argv[]) {
1102+
if (argc < 4) {
1103+
throw Usage{
1104+
"compose-object-from-many <bucket-name> <destination-object-name>"
1105+
" <object_1> ..."};
1106+
}
1107+
auto bucket_name = ConsumeArg(argc, argv);
1108+
auto destination_object_name = ConsumeArg(argc, argv);
1109+
std::vector<google::cloud::storage::ComposeSourceObject> compose_objects;
1110+
while (argc > 1) {
1111+
compose_objects.push_back({ConsumeArg(argc, argv), {}, {}});
1112+
}
1113+
//! [compose object from many] [START storage_compose_file_from_many]
1114+
namespace gcs = google::cloud::storage;
1115+
using ::google::cloud::StatusOr;
1116+
[](gcs::Client client, std::string bucket_name,
1117+
std::string destination_object_name,
1118+
std::vector<gcs::ComposeSourceObject> compose_objects) {
1119+
StatusOr<gcs::ObjectMetadata> prefix_md =
1120+
gcs::CreateRandomPrefix(client, bucket_name, ".tmpfiles");
1121+
if (!prefix_md) {
1122+
throw std::runtime_error(prefix_md.status().message());
1123+
}
1124+
std::string const prefix = prefix_md->name();
1125+
StatusOr<gcs::ObjectMetadata> composed_object =
1126+
ComposeMany(client, bucket_name, compose_objects, prefix,
1127+
destination_object_name, false);
1128+
1129+
if (!composed_object) {
1130+
throw std::runtime_error(composed_object.status().message());
1131+
}
1132+
1133+
std::cout << "Composed new object " << composed_object->name()
1134+
<< " in bucket " << composed_object->bucket()
1135+
<< "\nFull metadata: " << *composed_object << "\n";
1136+
}
1137+
//! [compose object from many] [END storage_compose_file_from_many]
1138+
(std::move(client), bucket_name, destination_object_name,
1139+
std::move(compose_objects));
1140+
}
1141+
11001142
void WriteObjectWithKmsKey(google::cloud::storage::Client client, int& argc,
11011143
char* argv[]) {
11021144
if (argc != 4) {
@@ -1777,6 +1819,7 @@ int main(int argc, char* argv[]) try {
17771819
{"compose-object", &ComposeObject},
17781820
{"compose-object-from-encrypted-objects",
17791821
&ComposeObjectFromEncryptedObjects},
1822+
{"compose-object-from-many", &ComposeObjectFromMany},
17801823
{"write-object-with-kms-key", &WriteObjectWithKmsKey},
17811824
{"rewrite-object", &RewriteObject},
17821825
{"rewrite-object-non-blocking", &RewriteObjectNonBlocking},

google/cloud/storage/internal/tuple_filter.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,18 @@ StaticTupleFilter(Tuple&& t) {
142142
std::forward<Tuple>(t)));
143143
}
144144

145+
/**
146+
* A factory of template predicates checking for presence on a type list
147+
*
148+
* @tparam Types the list of types which for which the predicate returns true.
149+
*/
150+
template <typename... Types>
151+
struct Among {
152+
template <typename T>
153+
using TPred = google::cloud::internal::disjunction<
154+
std::is_same<typename std::decay<T>::type, Types>...>;
155+
};
156+
145157
/**
146158
* A factory of template predicates checking for lack of presence on a type list
147159
*

0 commit comments

Comments
 (0)