Skip to content

Commit 5b15436

Browse files
bajajneha27ddelgrosso1
authored andcommitted
Impl(ACv2): add appendable takeover (#102)
1 parent dfddfd1 commit 5b15436

File tree

9 files changed

+109
-5
lines changed

9 files changed

+109
-5
lines changed

google/cloud/storage/async/client.cc

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,33 @@ AsyncClient::StartAppendableObjectUpload(
144144
});
145145
}
146146

147+
future<StatusOr<std::pair<AsyncWriter, AsyncToken>>>
148+
AsyncClient::ResumeAppendableObjectUpload(BucketName const& bucket_name,
149+
std::string object_name,
150+
std::int64_t generation,
151+
Options opts) {
152+
auto request = google::storage::v2::BidiWriteObjectRequest{};
153+
auto& append_object_spec = *request.mutable_append_object_spec();
154+
155+
append_object_spec.set_bucket(BucketName(bucket_name).FullName());
156+
append_object_spec.set_object(std::move(object_name));
157+
append_object_spec.set_generation(std::move(generation));
158+
159+
return connection_
160+
->ResumeAppendableObjectUpload(
161+
{std::move(request),
162+
internal::MergeOptions(std::move(opts), connection_->options())})
163+
.then([](auto f) -> StatusOr<std::pair<AsyncWriter, AsyncToken>> {
164+
auto w = f.get();
165+
if (!w) return std::move(w).status();
166+
auto t = absl::holds_alternative<google::storage::v2::Object>(
167+
(*w)->PersistedState())
168+
? AsyncToken()
169+
: storage_internal::MakeAsyncToken(w->get());
170+
return std::make_pair(AsyncWriter(*std::move(w)), std::move(t));
171+
});
172+
}
173+
147174
future<StatusOr<std::pair<AsyncWriter, AsyncToken>>>
148175
AsyncClient::StartBufferedUpload(BucketName const& bucket_name,
149176
std::string object_name, Options opts) {

google/cloud/storage/async/client.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,23 @@ class AsyncClient {
426426
StartAppendableObjectUpload(
427427
google::storage::v2::BidiWriteObjectRequest request, Options opts = {});
428428

429+
/**
430+
* Resume a resumable upload session for appendable objects and automatic
431+
* recovery from transient failures.
432+
*
433+
* @snippet{doc} async/client.h resume-appendable-object-upload
434+
*
435+
* @param bucket_name the name of the bucket that contains the object.
436+
* @param object_name the name of the object to be uploaded.
437+
* @param generation the object generation to be uploaded.
438+
* @param opts options controlling the behaviour of this RPC, for example the
439+
* application may change the retry policy.
440+
*/
441+
future<StatusOr<std::pair<AsyncWriter, AsyncToken>>>
442+
ResumeAppendableObjectUpload(BucketName const& bucket_name,
443+
std::string object_name, std::int64_t generation,
444+
Options opts = {});
445+
429446
/*
430447
[start-buffered-upload-common]
431448
This function always uses [resumable uploads][resumable-link]. The objects

google/cloud/storage/async/connection.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,11 @@ class AsyncConnection {
131131
StatusOr<std::unique_ptr<storage_experimental::AsyncWriterConnection>>>
132132
StartAppendableObjectUpload(AppendableUploadParams p) = 0;
133133

134+
/// Resume an appendable upload configured for persistent sources.
135+
virtual future<
136+
StatusOr<std::unique_ptr<storage_experimental::AsyncWriterConnection>>>
137+
ResumeAppendableObjectUpload(AppendableUploadParams p) = 0;
138+
134139
/**
135140
* A thin wrapper around the `WriteObject()` parameters.
136141
*

google/cloud/storage/internal/async/connection_impl.cc

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,17 @@ AsyncConnectionImpl::ReadObjectRange(ReadObjectParams p) {
293293

294294
future<StatusOr<std::unique_ptr<storage_experimental::AsyncWriterConnection>>>
295295
AsyncConnectionImpl::StartAppendableObjectUpload(AppendableUploadParams p) {
296+
return AppendableObjectUploadImpl(std::move(p));
297+
}
298+
299+
future<StatusOr<std::unique_ptr<storage_experimental::AsyncWriterConnection>>>
300+
AsyncConnectionImpl::ResumeAppendableObjectUpload(AppendableUploadParams p) {
301+
return AppendableObjectUploadImpl(std::move(p), true);
302+
}
303+
304+
future<StatusOr<std::unique_ptr<storage_experimental::AsyncWriterConnection>>>
305+
AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p,
306+
bool takeover) {
296307
auto current = internal::MakeImmutableOptions(std::move(p.options));
297308
auto request = p.request;
298309
std::int64_t persisted_size = 0;
@@ -313,10 +324,10 @@ AsyncConnectionImpl::StartAppendableObjectUpload(AppendableUploadParams p) {
313324

314325
auto factory = WriteResultFactory(
315326
[stub = stub_, cq = cq_, retry = std::move(retry),
316-
backoff = std::move(backoff), current, function_name = __func__](
317-
google::storage::v2::BidiWriteObjectRequest req) {
318-
auto call = [stub, request = std::move(req)](
319-
CompletionQueue& cq,
327+
backoff = std::move(backoff), current, function_name = __func__,
328+
takeover](google::storage::v2::BidiWriteObjectRequest req) {
329+
auto call = [stub, request = std::move(req), takeover](
330+
google::cloud::CompletionQueue& cq,
320331
std::shared_ptr<grpc::ClientContext> context,
321332
google::cloud::internal::ImmutableOptions options,
322333
RequestPlaceholder const&) mutable
@@ -325,7 +336,13 @@ AsyncConnectionImpl::StartAppendableObjectUpload(AppendableUploadParams p) {
325336
options->get<storage::TransferStallTimeoutOption>(),
326337
options->get<storage::TransferStallMinimumRateOption>(),
327338
google::storage::v2::ServiceConstants::MAX_WRITE_CHUNK_BYTES);
328-
ApplyRoutingHeaders(*context, request.write_object_spec());
339+
340+
// Apply the routing header
341+
if (takeover)
342+
ApplyRoutingHeaders(*context, request.append_object_spec());
343+
else
344+
ApplyRoutingHeaders(*context, request.write_object_spec());
345+
329346
auto rpc = stub->AsyncBidiWriteObject(cq, std::move(context),
330347
std::move(options));
331348
rpc = std::make_unique<StreamingRpcTimeout>(cq, timeout, timeout,

google/cloud/storage/internal/async/connection_impl.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ class AsyncConnectionImpl
7373
future<StatusOr<std::unique_ptr<storage_experimental::AsyncWriterConnection>>>
7474
StartAppendableObjectUpload(AppendableUploadParams p) override;
7575

76+
future<StatusOr<std::unique_ptr<storage_experimental::AsyncWriterConnection>>>
77+
ResumeAppendableObjectUpload(AppendableUploadParams p) override;
78+
7679
future<StatusOr<std::unique_ptr<storage_experimental::AsyncWriterConnection>>>
7780
StartUnbufferedUpload(UploadParams p) override;
7881

@@ -137,6 +140,9 @@ class AsyncConnectionImpl
137140
std::shared_ptr<storage::internal::HashFunction> hash_function,
138141
std::int64_t persisted_size);
139142

143+
future<StatusOr<std::unique_ptr<storage_experimental::AsyncWriterConnection>>>
144+
AppendableObjectUploadImpl(AppendableUploadParams p, bool takeover = false);
145+
140146
CompletionQueue cq_;
141147
std::shared_ptr<GrpcChannelRefresh> refresh_;
142148
std::shared_ptr<StorageStub> stub_;

google/cloud/storage/internal/async/connection_tracing.cc

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,23 @@ class AsyncConnectionTracing : public storage_experimental::AsyncConnection {
113113
});
114114
}
115115

116+
future<StatusOr<std::unique_ptr<storage_experimental::AsyncWriterConnection>>>
117+
ResumeAppendableObjectUpload(AppendableUploadParams p) override {
118+
auto span = internal::MakeSpan(
119+
"storage::AsyncConnection::ResumeAppendableObjectUpload");
120+
internal::OTelScope scope(span);
121+
return impl_->ResumeAppendableObjectUpload(std::move(p))
122+
.then([oc = opentelemetry::context::RuntimeContext::GetCurrent(),
123+
span = std::move(span)](auto f)
124+
-> StatusOr<std::unique_ptr<
125+
storage_experimental::AsyncWriterConnection>> {
126+
auto w = f.get();
127+
internal::DetachOTelContext(oc);
128+
if (!w) return internal::EndSpan(*span, std::move(w).status());
129+
return MakeTracingWriterConnection(span, *std::move(w));
130+
});
131+
}
132+
116133
future<StatusOr<std::unique_ptr<storage_experimental::AsyncWriterConnection>>>
117134
StartUnbufferedUpload(UploadParams p) override {
118135
auto span =

google/cloud/storage/internal/grpc/configure_client_context.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,13 @@ void ApplyRoutingHeaders(grpc::ClientContext& context,
5050
"bucket=" + google::cloud::internal::UrlEncode(spec.resource().bucket()));
5151
}
5252

53+
void ApplyRoutingHeaders(grpc::ClientContext& context,
54+
google::storage::v2::AppendObjectSpec const& spec) {
55+
context.AddMetadata(
56+
"x-goog-request-params",
57+
"bucket=" + google::cloud::internal::UrlEncode(spec.bucket()));
58+
}
59+
5360
void ApplyRoutingHeaders(grpc::ClientContext& context,
5461
storage::internal::UploadChunkRequest const& request) {
5562
ApplyResumableUploadRoutingHeader(context, request.upload_session_url());

google/cloud/storage/internal/grpc/configure_client_context.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ void ApplyRoutingHeaders(
7979
void ApplyRoutingHeaders(grpc::ClientContext& context,
8080
google::storage::v2::WriteObjectSpec const& spec);
8181

82+
/// @copydoc ApplyRoutingHeaders(grpc::ClientContext&,)
83+
void ApplyRoutingHeaders(grpc::ClientContext& context,
84+
google::storage::v2::AppendObjectSpec const& spec);
85+
8286
/**
8387
* The generated `StorageMetadata` stub can not handle dynamic routing headers
8488
* for client side streaming. So we manually match and extract the headers in

google/cloud/storage/mocks/mock_async_connection.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ class MockAsyncConnection : public storage_experimental::AsyncConnection {
4848
future<StatusOr<
4949
std::unique_ptr<storage_experimental::AsyncWriterConnection>>>,
5050
StartAppendableObjectUpload, (AppendableUploadParams), (override));
51+
MOCK_METHOD(
52+
future<StatusOr<
53+
std::unique_ptr<storage_experimental::AsyncWriterConnection>>>,
54+
ResumeAppendableObjectUpload, (AppendableUploadParams), (override));
5155
MOCK_METHOD(
5256
future<StatusOr<
5357
std::unique_ptr<storage_experimental::AsyncWriterConnection>>>,

0 commit comments

Comments
 (0)