Skip to content

Commit ca57e7f

Browse files
bajajneha27ddelgrosso1
authored andcommitted
impl(ACv2): [ Appendable write ] Resume stream with write_handle (#94)
1 parent e7b31bc commit ca57e7f

File tree

6 files changed

+638
-38
lines changed

6 files changed

+638
-38
lines changed

google/cloud/storage/google_cloud_cpp_storage_grpc.bzl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ google_cloud_cpp_storage_grpc_hdrs = [
6565
"internal/async/writer_connection_buffered.h",
6666
"internal/async/writer_connection_finalized.h",
6767
"internal/async/writer_connection_impl.h",
68+
"internal/async/writer_connection_resumed.h",
6869
"internal/async/writer_connection_tracing.h",
6970
"internal/grpc/bucket_access_control_parser.h",
7071
"internal/grpc/bucket_metadata_parser.h",
@@ -137,6 +138,7 @@ google_cloud_cpp_storage_grpc_srcs = [
137138
"internal/async/writer_connection_buffered.cc",
138139
"internal/async/writer_connection_finalized.cc",
139140
"internal/async/writer_connection_impl.cc",
141+
"internal/async/writer_connection_resumed.cc",
140142
"internal/async/writer_connection_tracing.cc",
141143
"internal/grpc/bucket_access_control_parser.cc",
142144
"internal/grpc/bucket_metadata_parser.cc",

google/cloud/storage/google_cloud_cpp_storage_grpc.cmake

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,8 @@ add_library(
151151
internal/async/writer_connection_finalized.h
152152
internal/async/writer_connection_impl.cc
153153
internal/async/writer_connection_impl.h
154+
internal/async/writer_connection_resumed.cc
155+
internal/async/writer_connection_resumed.h
154156
internal/async/writer_connection_tracing.cc
155157
internal/async/writer_connection_tracing.h
156158
internal/grpc/bucket_access_control_parser.cc

google/cloud/storage/internal/async/connection_impl.cc

Lines changed: 53 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include "google/cloud/storage/internal/async/writer_connection_buffered.h"
3434
#include "google/cloud/storage/internal/async/writer_connection_finalized.h"
3535
#include "google/cloud/storage/internal/async/writer_connection_impl.h"
36+
#include "google/cloud/storage/internal/async/writer_connection_resumed.h"
3637
#include "google/cloud/storage/internal/crc32c.h"
3738
#include "google/cloud/storage/internal/grpc/channel_refresh.h"
3839
#include "google/cloud/storage/internal/grpc/configure_client_context.h"
@@ -295,49 +296,63 @@ AsyncConnectionImpl::StartAppendableObjectUpload(AppendableUploadParams p) {
295296
auto current = internal::MakeImmutableOptions(std::move(p.options));
296297
auto request = p.request;
297298
std::int64_t persisted_size = 0;
298-
auto hash_function = CreateHashFunction(*current);
299-
auto retry = retry_policy(*current);
300-
auto backoff = backoff_policy(*current);
299+
std::shared_ptr<storage::internal::HashFunction> hash_function =
300+
CreateHashFunction(*current);
301+
auto retry = std::shared_ptr<storage::RetryPolicy>(retry_policy(*current));
302+
auto backoff =
303+
std::shared_ptr<storage::BackoffPolicy>(backoff_policy(*current));
301304
struct RequestPlaceholder {};
302305

303-
auto call = [stub = stub_, request = std::move(request)](
304-
CompletionQueue& cq,
305-
std::shared_ptr<grpc::ClientContext> context,
306-
google::cloud::internal::ImmutableOptions options,
307-
RequestPlaceholder const&) mutable
308-
-> future<StatusOr<WriteObject::WriteResult>> {
309-
auto rpc =
310-
stub->AsyncBidiWriteObject(cq, std::move(context), std::move(options));
311-
request.set_state_lookup(true);
312-
auto open = std::make_shared<WriteObject>(std::move(rpc), request);
313-
return open->Call().then([open, &request](auto f) mutable {
314-
auto response = f.get();
315-
if (!response) {
316-
EnsureFirstMessageAppendObjectSpec(request);
317-
ApplyWriteRedirectErrors(*request.mutable_append_object_spec(),
318-
ExtractGrpcStatus(response.status()));
319-
}
320-
return response;
321-
});
322-
};
306+
using WriteResultFactory =
307+
std::function<future<StatusOr<WriteObject::WriteResult>>(
308+
google::storage::v2::BidiWriteObjectRequest)>;
323309

324-
auto transform = [current, request, persisted_size,
325-
hash = std::move(hash_function)](auto f) mutable
310+
auto factory = WriteResultFactory(
311+
[stub = stub_, cq = cq_, retry = std::move(retry),
312+
backoff = std::move(backoff), current, function_name = __func__](
313+
google::storage::v2::BidiWriteObjectRequest req) {
314+
auto call = [stub, request = std::move(req)](
315+
CompletionQueue& cq_ref,
316+
std::shared_ptr<grpc::ClientContext> context,
317+
google::cloud::internal::ImmutableOptions options,
318+
RequestPlaceholder const&) mutable
319+
-> future<StatusOr<WriteObject::WriteResult>> {
320+
auto rpc = stub->AsyncBidiWriteObject(cq_ref, std::move(context),
321+
std::move(options));
322+
request.set_state_lookup(true);
323+
auto open = std::make_shared<WriteObject>(std::move(rpc), request);
324+
return open->Call().then([open, &request](auto f) {
325+
auto response = f.get();
326+
if (!response) {
327+
EnsureFirstMessageAppendObjectSpec(request);
328+
ApplyWriteRedirectErrors(*request.mutable_append_object_spec(),
329+
ExtractGrpcStatus(response.status()));
330+
}
331+
return response;
332+
});
333+
};
334+
335+
return google::cloud::internal::AsyncRetryLoop(
336+
retry->clone(), backoff->clone(), Idempotency::kIdempotent, cq,
337+
std::move(call), std::move(current), RequestPlaceholder{},
338+
function_name);
339+
});
340+
341+
auto pending = factory(std::move(request));
342+
return pending.then(
343+
[current, request = std::move(p.request), persisted_size,
344+
hash = std::move(hash_function), fa = std::move(factory)](auto f) mutable
326345
-> StatusOr<
327346
std::unique_ptr<storage_experimental::AsyncWriterConnection>> {
328-
auto rpc = f.get();
329-
if (!rpc) return std::move(rpc).status();
330-
return std::unique_ptr<storage_experimental::AsyncWriterConnection>(
331-
std::make_unique<AsyncWriterConnectionImpl>(
332-
current, std::move(request), std::move(rpc->stream),
333-
std::move(hash), persisted_size, false));
334-
};
335-
336-
return google::cloud::internal::AsyncRetryLoop(
337-
std::move(retry), std::move(backoff), Idempotency::kIdempotent,
338-
cq_, std::move(call), std::move(current), RequestPlaceholder{},
339-
__func__)
340-
.then(std::move(transform));
347+
auto rpc = f.get();
348+
if (!rpc) return std::move(rpc).status();
349+
auto impl = std::make_unique<AsyncWriterConnectionImpl>(
350+
current, request, std::move(rpc->stream), hash, persisted_size,
351+
false);
352+
return MakeWriterConnectionResumed(std::move(fa), std::move(impl),
353+
std::move(request), std::move(hash),
354+
*current);
355+
});
341356
}
342357

343358
future<StatusOr<std::unique_ptr<storage_experimental::AsyncWriterConnection>>>

google/cloud/storage/internal/async/handle_redirect_error.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ void ApplyWriteRedirectErrors(google::storage::v2::AppendObjectSpec& spec,
5555
if (!any.UnpackTo(&error)) continue;
5656
*spec.mutable_write_handle() = std::move(*error.mutable_write_handle());
5757
*spec.mutable_routing_token() = std::move(*error.mutable_routing_token());
58+
if (error.has_generation()) spec.set_generation(error.generation());
5859
}
5960
}
6061

0 commit comments

Comments
 (0)