Skip to content

Commit 785b05e

Browse files
committed
Always Use Latest Write Handle for Fast Resume in gRPC BidiWriteObject
1 parent 4ac3fe1 commit 785b05e

File tree

4 files changed

+55
-0
lines changed

4 files changed

+55
-0
lines changed

google/cloud/storage/internal/async/writer_connection_impl.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,12 @@ AsyncWriterConnectionImpl::MakeRequest() {
177177
request.clear_append_object_spec();
178178
}
179179
request.set_write_offset(offset_);
180+
if (latest_write_handle_.has_value()) {
181+
if (request.has_append_object_spec()) {
182+
*request.mutable_append_object_spec()->mutable_write_handle() =
183+
*latest_write_handle_;
184+
}
185+
}
180186
return request;
181187
}
182188

@@ -241,6 +247,9 @@ future<StatusOr<std::int64_t>> AsyncWriterConnectionImpl::OnQuery(
241247
return StatusOr<std::int64_t>(std::move(result));
242248
});
243249
}
250+
if (response->has_write_handle()) {
251+
latest_write_handle_ = response->write_handle();
252+
}
244253
if (response->has_persisted_size()) {
245254
persisted_state_ = response->persisted_size();
246255
return make_ready_future(make_status_or(response->persisted_size()));

google/cloud/storage/internal/async/writer_connection_impl.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,9 @@ class AsyncWriterConnectionImpl
105105
// that `on_finish_` is called when `Finish()` is completed.
106106
google::cloud::promise<void> on_finish_;
107107
google::cloud::future<void> finished_;
108+
109+
// Track the latest write handle seen in responses.
110+
absl::optional<google::storage::v2::BidiWriteHandle> latest_write_handle_;
108111
};
109112

110113
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/storage/internal/async/writer_connection_impl_test.cc

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -648,6 +648,44 @@ TEST(AsyncWriterConnectionTest, UnexpectedQueryFailsWithoutError) {
648648
EXPECT_THAT(query.get(), StatusIs(StatusCode::kInternal));
649649
}
650650

651+
TEST(AsyncWriterConnectionImpl, WriteHandleIsPropagatedAfterQuery) {
652+
AsyncSequencer<bool> sequencer;
653+
auto mock = std::make_unique<MockStream>();
654+
EXPECT_CALL(*mock, Write)
655+
.WillOnce([&](Request const& req, grpc::WriteOptions) {
656+
EXPECT_TRUE(req.has_append_object_spec());
657+
EXPECT_TRUE(req.append_object_spec().has_write_handle());
658+
EXPECT_EQ(req.append_object_spec().write_handle().handle(),
659+
"test-handle");
660+
return sequencer.PushBack("Write");
661+
});
662+
EXPECT_CALL(*mock, Read).WillOnce([&]() {
663+
Response resp;
664+
resp.mutable_write_handle()->set_handle("test-handle");
665+
resp.set_persisted_size(42);
666+
return make_ready_future(absl::make_optional(std::move(resp)));
667+
});
668+
EXPECT_CALL(*mock, Cancel).Times(1);
669+
EXPECT_CALL(*mock, Finish).WillOnce([] {
670+
return make_ready_future(Status{});
671+
});
672+
673+
auto hash = std::make_shared<MockHashFunction>();
674+
google::storage::v2::BidiWriteObjectRequest req;
675+
req.mutable_append_object_spec()->set_bucket("bucket");
676+
req.mutable_append_object_spec()->set_object("object");
677+
678+
auto tested = std::make_unique<AsyncWriterConnectionImpl>(
679+
TestOptions(), req, std::move(mock), hash, 0);
680+
681+
EXPECT_THAT(tested->Query().get(), IsOkAndHolds(42));
682+
auto result = tested->Write(WritePayload("payload"));
683+
auto next = sequencer.PopFrontWithName();
684+
ASSERT_THAT(next.second, "Write");
685+
next.first.set_value(true);
686+
EXPECT_STATUS_OK(result.get());
687+
}
688+
651689
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
652690
} // namespace storage_internal
653691
} // namespace cloud

google/cloud/storage/internal/async/writer_connection_resumed.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,11 @@ class AsyncWriterConnectionResumedState
363363
void OnResume(Status const& original_status, bool was_finalizing,
364364
StatusOr<WriteObject::WriteResult> res) {
365365
std::unique_lock<std::mutex> lk(mu_);
366+
// Update write_handle from any resume response that contains it.
367+
if (res && res.value().first_response.has_write_handle()) {
368+
*first_response_.mutable_write_handle() =
369+
res.value().first_response.write_handle();
370+
}
366371

367372
if (was_finalizing) {
368373
// If resuming due to a finalization error, we *must* complete the

0 commit comments

Comments
 (0)