Skip to content

Commit 72b4764

Browse files
bajajneha27ddelgrosso1
authored andcommitted
impl(ACv2): Add retry with routing_token for rpc Start (#91)
1 parent 1eacbfb commit 72b4764

File tree

9 files changed

+256
-31
lines changed

9 files changed

+256
-31
lines changed

google/cloud/storage/google_cloud_cpp_storage_grpc.bzl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ google_cloud_cpp_storage_grpc_hdrs = [
5959
"internal/async/rewriter_connection_impl.h",
6060
"internal/async/rewriter_connection_tracing.h",
6161
"internal/async/token_impl.h",
62+
"internal/async/write_object.h",
6263
"internal/async/write_payload_fwd.h",
6364
"internal/async/write_payload_impl.h",
6465
"internal/async/writer_connection_buffered.h",
@@ -132,6 +133,7 @@ google_cloud_cpp_storage_grpc_srcs = [
132133
"internal/async/rewriter_connection_impl.cc",
133134
"internal/async/rewriter_connection_tracing.cc",
134135
"internal/async/token_impl.cc",
136+
"internal/async/write_object.cc",
135137
"internal/async/writer_connection_buffered.cc",
136138
"internal/async/writer_connection_finalized.cc",
137139
"internal/async/writer_connection_impl.cc",

google/cloud/storage/google_cloud_cpp_storage_grpc.cmake

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,8 @@ add_library(
141141
internal/async/rewriter_connection_tracing.h
142142
internal/async/token_impl.cc
143143
internal/async/token_impl.h
144+
internal/async/write_object.cc
145+
internal/async/write_object.h
144146
internal/async/write_payload_fwd.h
145147
internal/async/write_payload_impl.h
146148
internal/async/writer_connection_buffered.cc

google/cloud/storage/internal/async/connection_impl.cc

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "google/cloud/storage/internal/async/reader_connection_impl.h"
2929
#include "google/cloud/storage/internal/async/reader_connection_resume.h"
3030
#include "google/cloud/storage/internal/async/rewriter_connection_impl.h"
31+
#include "google/cloud/storage/internal/async/write_object.h"
3132
#include "google/cloud/storage/internal/async/write_payload_impl.h"
3233
#include "google/cloud/storage/internal/async/writer_connection_buffered.h"
3334
#include "google/cloud/storage/internal/async/writer_connection_finalized.h"
@@ -292,44 +293,46 @@ AsyncConnectionImpl::ReadObjectRange(ReadObjectParams p) {
292293
future<StatusOr<std::unique_ptr<storage_experimental::AsyncWriterConnection>>>
293294
AsyncConnectionImpl::StartAppendableObjectUpload(AppendableUploadParams p) {
294295
auto current = internal::MakeImmutableOptions(std::move(p.options));
296+
auto request = p.request;
295297
std::int64_t persisted_size = 0;
296298
auto hash_function = CreateHashFunction(*current);
297-
using StreamingRpc = AsyncWriterConnectionImpl::StreamingRpc;
299+
auto retry = retry_policy(*current);
300+
auto backoff = backoff_policy(*current);
298301
struct RequestPlaceholder {};
299302

300-
auto call = [stub = stub_](CompletionQueue& cq,
301-
std::shared_ptr<grpc::ClientContext> context,
302-
google::cloud::internal::ImmutableOptions options,
303-
RequestPlaceholder const&)
304-
-> future<StatusOr<std::unique_ptr<StreamingRpc>>> {
303+
auto call = [stub = stub_, request = std::move(request)](
304+
CompletionQueue& cq,
305+
std::shared_ptr<grpc::ClientContext> context,
306+
google::cloud::internal::ImmutableOptions options,
307+
RequestPlaceholder const&) mutable
308+
-> future<StatusOr<WriteObject::WriteResult>> {
305309
auto rpc =
306310
stub->AsyncBidiWriteObject(cq, std::move(context), std::move(options));
307-
auto start = rpc->Start();
308-
return start.then([rpc = std::move(rpc)](auto f) mutable {
309-
if (f.get()) return make_ready_future(make_status_or(std::move(rpc)));
310-
auto pending = rpc->Finish();
311-
return pending.then([rpc = std::move(rpc)](auto f) mutable {
312-
rpc.reset();
313-
auto status = f.get();
314-
return StatusOr<std::unique_ptr<StreamingRpc>>(std::move(status));
315-
});
311+
request.set_state_lookup(true);
312+
auto open = std::make_shared<WriteObject>(std::move(rpc), request);
313+
return open->Call().then([open, &request](auto f) mutable {
314+
auto response = f.get();
315+
if (!response) {
316+
EnsureFirstMessageAppendObjectSpec(request);
317+
ApplyWriteRedirectErrors(*request.mutable_append_object_spec(),
318+
ExtractGrpcStatus(response.status()));
319+
}
320+
return response;
316321
});
317322
};
318323

319-
auto transform = [current, request = std::move(p.request), persisted_size,
324+
auto transform = [current, request, persisted_size,
320325
hash = std::move(hash_function)](auto f) mutable
321326
-> StatusOr<
322327
std::unique_ptr<storage_experimental::AsyncWriterConnection>> {
323328
auto rpc = f.get();
324329
if (!rpc) return std::move(rpc).status();
325330
return std::unique_ptr<storage_experimental::AsyncWriterConnection>(
326331
std::make_unique<AsyncWriterConnectionImpl>(
327-
current, std::move(request), *std::move(rpc), std::move(hash),
328-
persisted_size));
332+
current, std::move(request), std::move(rpc->stream),
333+
std::move(hash), persisted_size, false));
329334
};
330335

331-
auto retry = retry_policy(*current);
332-
auto backoff = backoff_policy(*current);
333336
return google::cloud::internal::AsyncRetryLoop(
334337
std::move(retry), std::move(backoff), Idempotency::kIdempotent,
335338
cq_, std::move(call), std::move(current), RequestPlaceholder{},
@@ -716,7 +719,7 @@ AsyncConnectionImpl::UnbufferedUploadImpl(
716719
return std::unique_ptr<storage_experimental::AsyncWriterConnection>(
717720
std::make_unique<AsyncWriterConnectionImpl>(
718721
current, std::move(request), *std::move(rpc), std::move(hash),
719-
persisted_size));
722+
persisted_size, true));
720723
};
721724

722725
auto retry = retry_policy(*current);

google/cloud/storage/internal/async/handle_redirect_error.cc

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,16 @@ namespace cloud {
2020
namespace storage_internal {
2121
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
2222

23+
void EnsureFirstMessageAppendObjectSpec(
24+
google::storage::v2::BidiWriteObjectRequest& request) {
25+
if (request.has_write_object_spec()) {
26+
auto spec = request.write_object_spec();
27+
auto& append_object_spec = *request.mutable_append_object_spec();
28+
append_object_spec.set_bucket(spec.resource().bucket());
29+
append_object_spec.set_object(spec.resource().name());
30+
}
31+
}
32+
2333
google::rpc::Status ExtractGrpcStatus(Status const& status) {
2434
auto proto_status = google::rpc::Status{};
2535
auto payload = google::cloud::internal::GetPayload(
@@ -38,6 +48,16 @@ void ApplyRedirectErrors(google::storage::v2::BidiReadObjectSpec& spec,
3848
}
3949
}
4050

51+
void ApplyWriteRedirectErrors(google::storage::v2::AppendObjectSpec& spec,
52+
google::rpc::Status const& rpc_status) {
53+
for (auto const& any : rpc_status.details()) {
54+
auto error = google::storage::v2::BidiWriteObjectRedirectedError{};
55+
if (!any.UnpackTo(&error)) continue;
56+
*spec.mutable_write_handle() = std::move(*error.mutable_write_handle());
57+
*spec.mutable_routing_token() = std::move(*error.mutable_routing_token());
58+
}
59+
}
60+
4161
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
4262
} // namespace storage_internal
4363
} // namespace cloud

google/cloud/storage/internal/async/handle_redirect_error.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,17 @@ namespace cloud {
2525
namespace storage_internal {
2626
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
2727

28+
void EnsureFirstMessageAppendObjectSpec(
29+
google::storage::v2::BidiWriteObjectRequest& request);
30+
2831
google::rpc::Status ExtractGrpcStatus(Status const& status);
2932

3033
void ApplyRedirectErrors(google::storage::v2::BidiReadObjectSpec& spec,
3134
google::rpc::Status const& rpc_status);
3235

36+
void ApplyWriteRedirectErrors(google::storage::v2::AppendObjectSpec& spec,
37+
google::rpc::Status const& rpc_status);
38+
3339
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
3440
} // namespace storage_internal
3541
} // namespace cloud
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "google/cloud/storage/internal/async/write_object.h"
16+
#include "google/cloud/internal/absl_str_cat_quiet.h"
17+
#include "google/cloud/internal/make_status.h"
18+
#include <utility>
19+
20+
namespace google {
21+
namespace cloud {
22+
namespace storage_internal {
23+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
24+
25+
WriteObject::WriteObject(std::unique_ptr<StreamingRpc> rpc,
26+
google::storage::v2::BidiWriteObjectRequest request)
27+
: rpc_(std::move(rpc)), initial_request_(std::move(request)) {}
28+
29+
future<StatusOr<WriteObject::WriteResult>> WriteObject::Call() {
30+
auto future = promise_.get_future();
31+
rpc_->Start().then([w = WeakFromThis()](auto f) {
32+
if (auto self = w.lock()) self->OnStart(f.get());
33+
});
34+
return future;
35+
}
36+
37+
std::weak_ptr<WriteObject> WriteObject::WeakFromThis() {
38+
return shared_from_this();
39+
}
40+
41+
void WriteObject::OnStart(bool ok) {
42+
if (!ok) return DoFinish();
43+
rpc_->Write(initial_request_, grpc::WriteOptions{})
44+
.then([w = WeakFromThis()](auto f) {
45+
if (auto self = w.lock()) self->OnWrite(f.get());
46+
});
47+
}
48+
49+
void WriteObject::OnWrite(bool ok) {
50+
if (!ok) return DoFinish();
51+
rpc_->Read().then([w = WeakFromThis()](auto f) {
52+
if (auto self = w.lock()) self->OnRead(f.get());
53+
});
54+
}
55+
56+
void WriteObject::OnRead(
57+
absl::optional<google::storage::v2::BidiWriteObjectResponse> response) {
58+
if (!response) return DoFinish();
59+
promise_.set_value(WriteResult{std::move(rpc_), std::move(*response)});
60+
}
61+
62+
void WriteObject::DoFinish() {
63+
rpc_->Finish().then([w = WeakFromThis()](auto f) {
64+
if (auto self = w.lock()) self->OnFinish(f.get());
65+
});
66+
}
67+
68+
void WriteObject::OnFinish(Status status) {
69+
if (!status.ok()) return promise_.set_value(std::move(status));
70+
// This should not happen, it indicates an EOF on the stream, but we
71+
// did not ask to close it.
72+
promise_.set_value(google::cloud::internal::InternalError(
73+
"could not open stream, but the stream closed successfully",
74+
GCP_ERROR_INFO()));
75+
}
76+
77+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
78+
} // namespace storage_internal
79+
} // namespace cloud
80+
} // namespace google
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_ASYNC_WRITE_OBJECT_H
16+
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_ASYNC_WRITE_OBJECT_H
17+
18+
#include "google/cloud/storage/internal/storage_stub.h"
19+
#include "google/cloud/completion_queue.h"
20+
#include "google/cloud/future.h"
21+
#include "google/cloud/options.h"
22+
#include "google/cloud/status.h"
23+
#include "google/cloud/status_or.h"
24+
#include "google/cloud/version.h"
25+
#include <google/storage/v2/storage.pb.h>
26+
#include <grpcpp/grpcpp.h>
27+
#include <memory>
28+
#include <string>
29+
30+
namespace google {
31+
namespace cloud {
32+
namespace storage_internal {
33+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
34+
35+
/**
36+
* Performs a single attempt to open a bidi-streaming write RPC.
37+
*
38+
* Before we can use a bidi-streaming write RPC we must call `Start()`, and
39+
* then call `Read()` to check the RPC start was successful.
40+
*
41+
* Using C++20 coroutines we would write this as:
42+
*
43+
* @code
44+
* using StreamingRpc = google::cloud::AsyncStreamingReadWriteRpc<
45+
* google::storage::v2::BidiWriteObjectRequest,
46+
* google::storage::v2::BidiWriteObjectResponse>;
47+
*
48+
*
49+
* future<StatusOr<google::storage::v2::BidiWriteObjectResponse>> Call(
50+
* StreamingRpc rpc,
51+
* google::storage::v2::BidiWriteObjectRequest request) {
52+
* auto start = co_await rpc->Start();
53+
* if (!start) co_return co_await rpc->Finish();
54+
* auto read = co_await rpc->Read();
55+
* if (!read) co_return co_await rpc->Finish();
56+
* co_return std::move(*read);
57+
* }
58+
* @endcode
59+
*
60+
* As usual, all `co_await` calls become a callback. And all `co_return` calls
61+
* must set the value in an explicit `google::cloud::promise<>` object.
62+
*/
63+
class WriteObject : public std::enable_shared_from_this<WriteObject> {
64+
public:
65+
using StreamingRpc = google::cloud::AsyncStreamingReadWriteRpc<
66+
google::storage::v2::BidiWriteObjectRequest,
67+
google::storage::v2::BidiWriteObjectResponse>;
68+
69+
using ReturnType = google::storage::v2::BidiWriteObjectResponse;
70+
/// Create a coroutine to create an open a bidi streaming write RPC.
71+
WriteObject(std::unique_ptr<StreamingRpc> rpc,
72+
google::storage::v2::BidiWriteObjectRequest request);
73+
74+
struct WriteResult {
75+
std::unique_ptr<StreamingRpc> stream;
76+
google::storage::v2::BidiWriteObjectResponse first_response;
77+
};
78+
79+
/// Start the coroutine.
80+
future<StatusOr<WriteResult>> Call();
81+
82+
private:
83+
std::weak_ptr<WriteObject> WeakFromThis();
84+
85+
void OnStart(bool ok);
86+
void OnWrite(bool ok);
87+
void OnRead(
88+
absl::optional<google::storage::v2::BidiWriteObjectResponse> response);
89+
void DoFinish();
90+
void OnFinish(Status status);
91+
92+
std::unique_ptr<StreamingRpc> rpc_;
93+
promise<StatusOr<WriteResult>> promise_;
94+
google::storage::v2::BidiWriteObjectRequest initial_request_;
95+
};
96+
97+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
98+
} // namespace storage_internal
99+
} // namespace cloud
100+
} // namespace google
101+
102+
#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_ASYNC_WRITE_OBJECT_H

0 commit comments

Comments
 (0)