Skip to content

Commit a202418

Browse files
fix(storage): always use latest write handle for fast resume in gRPC BidiWriteObject (#15795)
1 parent b839b6b commit a202418

File tree

5 files changed

+239
-4
lines changed

5 files changed

+239
-4
lines changed

google/cloud/storage/internal/async/writer_connection_impl.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,10 @@ AsyncWriterConnectionImpl::MakeRequest() {
178178
request.clear_append_object_spec();
179179
}
180180
request.set_write_offset(offset_);
181+
if (latest_write_handle_.has_value()) {
182+
*request.mutable_append_object_spec()->mutable_write_handle() =
183+
*latest_write_handle_;
184+
}
181185
return request;
182186
}
183187

@@ -242,6 +246,9 @@ future<StatusOr<std::int64_t>> AsyncWriterConnectionImpl::OnQuery(
242246
return StatusOr<std::int64_t>(std::move(result));
243247
});
244248
}
249+
if (response->has_write_handle()) {
250+
latest_write_handle_ = response->write_handle();
251+
}
245252
if (response->has_persisted_size()) {
246253
persisted_state_ = response->persisted_size();
247254
return make_ready_future(make_status_or(response->persisted_size()));

google/cloud/storage/internal/async/writer_connection_impl.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,9 @@ class AsyncWriterConnectionImpl
105105
// that `on_finish_` is called when `Finish()` is completed.
106106
google::cloud::promise<void> on_finish_;
107107
google::cloud::future<void> finished_;
108+
109+
// Track the latest write handle seen in responses.
110+
absl::optional<google::storage::v2::BidiWriteHandle> latest_write_handle_;
108111
};
109112

110113
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/storage/internal/async/writer_connection_impl_test.cc

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
#include "google/cloud/storage/internal/async/writer_connection_impl.h"
1616
#include "google/cloud/mocks/mock_async_streaming_read_write_rpc.h"
17+
#include "google/cloud/storage/internal/async/write_object.h"
1718
#include "google/cloud/storage/internal/crc32c.h"
1819
#include "google/cloud/storage/internal/grpc/ctype_cord_workaround.h"
1920
#include "google/cloud/storage/internal/hash_function_impl.h"
@@ -698,6 +699,150 @@ TEST(AsyncWriterConnectionTest, FinalizeAppendableNoChecksum) {
698699
next.first.set_value(true);
699700
}
700701

702+
TEST(AsyncWriterConnectionTest, WriteHandleIsUpdatedAfterQuery) {
703+
AsyncSequencer<bool> sequencer;
704+
auto mock = std::make_unique<MockStream>();
705+
std::vector<std::string> seen_handles;
706+
707+
EXPECT_CALL(*mock, Write)
708+
.Times(3)
709+
.WillRepeatedly([&](Request const& req, grpc::WriteOptions) {
710+
EXPECT_TRUE(req.has_append_object_spec());
711+
EXPECT_TRUE(req.append_object_spec().has_write_handle());
712+
seen_handles.push_back(
713+
req.append_object_spec().write_handle().handle());
714+
return sequencer.PushBack("Write");
715+
});
716+
717+
int read_call_count = 0;
718+
EXPECT_CALL(*mock, Read).Times(2).WillRepeatedly([&]() {
719+
Response resp;
720+
if (read_call_count == 0) {
721+
resp.mutable_write_handle()->set_handle("handle1");
722+
resp.set_persisted_size(42);
723+
} else {
724+
resp.mutable_write_handle()->set_handle("handle2");
725+
resp.set_persisted_size(43);
726+
}
727+
++read_call_count;
728+
return make_ready_future(absl::make_optional(std::move(resp)));
729+
});
730+
731+
auto hash = std::make_shared<MockHashFunction>();
732+
EXPECT_CALL(*mock, Cancel).Times(1);
733+
EXPECT_CALL(*mock, Finish).WillOnce([] {
734+
return make_ready_future(Status{});
735+
});
736+
EXPECT_CALL(*hash, Update(_, An<absl::Cord const&>(), _)).Times(3);
737+
738+
google::storage::v2::BidiWriteObjectRequest req;
739+
req.mutable_append_object_spec()->set_bucket("bucket");
740+
req.mutable_append_object_spec()->set_object("object");
741+
742+
auto tested = std::make_unique<AsyncWriterConnectionImpl>(
743+
TestOptions(), req, std::move(mock), hash, 0);
744+
745+
// First Query sets handle1.
746+
EXPECT_THAT(tested->Query().get(), IsOkAndHolds(42));
747+
748+
// First Write uses handle1.
749+
auto result1 = tested->Write(WritePayload("payload1"));
750+
auto next1 = sequencer.PopFrontWithName();
751+
ASSERT_THAT(next1.second, "Write");
752+
next1.first.set_value(true);
753+
EXPECT_STATUS_OK(result1.get());
754+
755+
// Second Query sets handle2.
756+
EXPECT_THAT(tested->Query().get(), IsOkAndHolds(43));
757+
758+
// Second Write uses handle2.
759+
auto result2 = tested->Write(WritePayload("payload2"));
760+
auto next2 = sequencer.PopFrontWithName();
761+
ASSERT_THAT(next2.second, "Write");
762+
next2.first.set_value(true);
763+
EXPECT_STATUS_OK(result2.get());
764+
765+
// Third Write also uses handle2.
766+
auto result3 = tested->Write(WritePayload("payload3"));
767+
auto next3 = sequencer.PopFrontWithName();
768+
ASSERT_THAT(next3.second, "Write");
769+
next3.first.set_value(true);
770+
EXPECT_STATUS_OK(result3.get());
771+
772+
ASSERT_EQ(seen_handles.size(), 3);
773+
EXPECT_EQ(seen_handles[0], "handle1");
774+
EXPECT_EQ(seen_handles[1], "handle2");
775+
EXPECT_EQ(seen_handles[2], "handle2");
776+
}
777+
778+
TEST(AsyncWriterConnectionTest, WriteHandleIsUpdatedAfterResume) {
779+
AsyncSequencer<bool> sequencer;
780+
auto mock = std::make_unique<MockStream>();
781+
std::vector<std::string> seen_handles;
782+
783+
EXPECT_CALL(*mock, Write)
784+
.Times(2)
785+
.WillRepeatedly([&](Request const& req, grpc::WriteOptions) {
786+
EXPECT_TRUE(req.has_append_object_spec());
787+
EXPECT_TRUE(req.append_object_spec().has_write_handle());
788+
seen_handles.push_back(
789+
req.append_object_spec().write_handle().handle());
790+
return sequencer.PushBack("Write");
791+
});
792+
793+
EXPECT_CALL(*mock, Read)
794+
.WillOnce([&]() {
795+
Response resp;
796+
resp.mutable_write_handle()->set_handle("handle1");
797+
resp.set_persisted_size(42);
798+
return make_ready_future(absl::make_optional(std::move(resp)));
799+
})
800+
.WillOnce([&]() {
801+
Response resp;
802+
resp.mutable_write_handle()->set_handle("handle2");
803+
resp.set_persisted_size(43);
804+
return make_ready_future(absl::make_optional(std::move(resp)));
805+
});
806+
807+
EXPECT_CALL(*mock, Cancel).Times(1);
808+
EXPECT_CALL(*mock, Finish).WillOnce([] {
809+
return make_ready_future(Status{});
810+
});
811+
812+
auto hash = std::make_shared<MockHashFunction>();
813+
EXPECT_CALL(*hash, Update(_, An<absl::Cord const&>(), _)).Times(2);
814+
815+
google::storage::v2::BidiWriteObjectRequest req;
816+
req.mutable_append_object_spec()->set_bucket("bucket");
817+
req.mutable_append_object_spec()->set_object("object");
818+
819+
auto tested = std::make_unique<AsyncWriterConnectionImpl>(
820+
TestOptions(), req, std::move(mock), hash, 0);
821+
822+
// First Query sets handle1.
823+
EXPECT_THAT(tested->Query().get(), IsOkAndHolds(42));
824+
825+
// First Write uses handle1 but fails.
826+
auto result1 = tested->Write(WritePayload("payload1"));
827+
auto next1 = sequencer.PopFrontWithName();
828+
ASSERT_THAT(next1.second, "Write");
829+
next1.first.set_value(false);
830+
831+
// Simulate resume by calling Query again which returns handle2.
832+
EXPECT_THAT(tested->Query().get(), IsOkAndHolds(43));
833+
834+
// Second Write should use handle2.
835+
auto result2 = tested->Write(WritePayload("payload2"));
836+
auto next2 = sequencer.PopFrontWithName();
837+
ASSERT_THAT(next2.second, "Write");
838+
next2.first.set_value(true);
839+
EXPECT_STATUS_OK(result2.get());
840+
841+
ASSERT_EQ(seen_handles.size(), 2);
842+
EXPECT_EQ(seen_handles[0], "handle1");
843+
EXPECT_EQ(seen_handles[1], "handle2");
844+
}
845+
701846
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
702847
} // namespace storage_internal
703848
} // namespace cloud

google/cloud/storage/internal/async/writer_connection_resumed.cc

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,13 @@ class AsyncWriterConnectionResumedState
8282
} else {
8383
buffer_offset_ = absl::get<std::int64_t>(state);
8484
}
85+
if (first_response_.has_write_handle()) {
86+
latest_write_handle_ = first_response_.write_handle();
87+
} else if (initial_request_.has_append_object_spec() &&
88+
initial_request_.append_object_spec().has_write_handle()) {
89+
latest_write_handle_ =
90+
initial_request_.append_object_spec().write_handle();
91+
}
8592
}
8693

8794
void Cancel() {
@@ -336,9 +343,8 @@ class AsyncWriterConnectionResumedState
336343
}
337344
// Include write_handle to enable fast resume instead of slow
338345
// takeover. Without handle, server performs full state validation.
339-
if (first_response_.has_write_handle()) {
340-
*append_object_spec.mutable_write_handle() =
341-
first_response_.write_handle();
346+
if (latest_write_handle_) {
347+
*append_object_spec.mutable_write_handle() = *latest_write_handle_;
342348
}
343349
append_object_spec.set_generation(first_response_.resource().generation());
344350
ApplyWriteRedirectErrors(append_object_spec, std::move(proto_status));
@@ -363,6 +369,10 @@ class AsyncWriterConnectionResumedState
363369
void OnResume(Status const& original_status, bool was_finalizing,
364370
StatusOr<WriteObject::WriteResult> res) {
365371
std::unique_lock<std::mutex> lk(mu_);
372+
// Update write_handle from any resume response that contains it.
373+
if (res && res->first_response.has_write_handle()) {
374+
latest_write_handle_ = res->first_response.write_handle();
375+
}
366376

367377
if (was_finalizing) {
368378
// If resuming due to a finalization error, we *must* complete the
@@ -602,6 +612,9 @@ class AsyncWriterConnectionResumedState
602612

603613
// Tracks if the final promise (`finalized_`) has been completed.
604614
bool finalized_promise_completed_ = false;
615+
616+
// Track the latest write handle seen in responses.
617+
absl::optional<google::storage::v2::BidiWriteHandle> latest_write_handle_;
605618
};
606619

607620
/**

google/cloud/storage/internal/async/writer_connection_resumed_test.cc

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ TEST(WriteConnectionResumed, ResumeUsesWriteObjectSpecFromInitialRequest) {
268268
"test-object");
269269

270270
google::storage::v2::BidiWriteObjectResponse first_response;
271-
first_response.mutable_write_handle();
271+
first_response.mutable_write_handle()->set_handle("test-handle");
272272
first_response.mutable_resource()->set_generation(12345);
273273

274274
EXPECT_CALL(*mock, PersistedState)
@@ -320,6 +320,8 @@ TEST(WriteConnectionResumed, ResumeUsesWriteObjectSpecFromInitialRequest) {
320320
EXPECT_FALSE(captured_request.has_write_object_spec());
321321
EXPECT_TRUE(captured_request.has_append_object_spec());
322322
EXPECT_TRUE(captured_request.append_object_spec().has_write_handle());
323+
EXPECT_EQ(captured_request.append_object_spec().write_handle().handle(),
324+
"test-handle");
323325
EXPECT_EQ(captured_request.append_object_spec().generation(), 12345);
324326
EXPECT_EQ(captured_request.append_object_spec().object(), "test-object");
325327
EXPECT_EQ(captured_request.append_object_spec().bucket(),
@@ -392,6 +394,71 @@ TEST(WriteConnectionResumed, ResumeUsesAppendObjectSpecFromInitialRequest) {
392394
"projects/_/buckets/test-bucket");
393395
}
394396

397+
TEST(WriteConnectionResumed, WriteHandleAssignmentAfterResume) {
398+
struct {
399+
bool use_write_object_spec;
400+
std::string bucket, object, handle;
401+
std::int64_t generation;
402+
} cases[] = {
403+
{false, "projects/_/buckets/test-bucket", "test-object",
404+
"expected-handle", 12345},
405+
{true, "bucket1", "object1", "handle1", 111},
406+
{false, "bucket2", "object2", "handle2", 222},
407+
};
408+
409+
for (auto const& tc : cases) {
410+
AsyncSequencer<bool> sequencer;
411+
auto mock = std::make_unique<MockAsyncWriterConnection>();
412+
google::storage::v2::BidiWriteObjectRequest req;
413+
if (tc.use_write_object_spec) {
414+
req.mutable_write_object_spec()->mutable_resource()->set_bucket(
415+
tc.bucket);
416+
req.mutable_write_object_spec()->mutable_resource()->set_name(tc.object);
417+
} else {
418+
req.mutable_append_object_spec()->set_bucket(tc.bucket);
419+
req.mutable_append_object_spec()->set_object(tc.object);
420+
}
421+
google::storage::v2::BidiWriteObjectResponse resp;
422+
resp.mutable_write_handle()->set_handle(tc.handle);
423+
resp.mutable_resource()->set_generation(tc.generation);
424+
425+
EXPECT_CALL(*mock, PersistedState)
426+
.WillRepeatedly(Return(MakePersistedState(0)));
427+
EXPECT_CALL(*mock, Flush(_)).WillOnce([&](auto) {
428+
return sequencer.PushBack("Flush").then([](auto f) {
429+
if (f.get()) return google::cloud::Status{};
430+
return TransientError();
431+
});
432+
});
433+
434+
MockFactory mock_factory;
435+
google::storage::v2::BidiWriteObjectRequest captured;
436+
EXPECT_CALL(mock_factory, Call(_))
437+
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest r) {
438+
captured = std::move(r);
439+
return sequencer.PushBack("Factory").then([](auto) {
440+
return StatusOr<WriteObject::WriteResult>(
441+
internal::AbortedError("stop test", GCP_ERROR_INFO()));
442+
});
443+
});
444+
445+
auto conn = MakeWriterConnectionResumed(mock_factory.AsStdFunction(),
446+
std::move(mock), req, nullptr, resp,
447+
Options{});
448+
auto write = conn->Write(TestPayload(1));
449+
sequencer.PopFrontWithName().first.set_value(false);
450+
sequencer.PopFrontWithName().first.set_value(true);
451+
452+
EXPECT_THAT(write.get(), StatusIs(StatusCode::kAborted));
453+
EXPECT_TRUE(captured.has_append_object_spec());
454+
EXPECT_EQ(captured.append_object_spec().bucket(), tc.bucket);
455+
EXPECT_EQ(captured.append_object_spec().object(), tc.object);
456+
EXPECT_EQ(captured.append_object_spec().generation(), tc.generation);
457+
EXPECT_TRUE(captured.append_object_spec().has_write_handle());
458+
EXPECT_EQ(captured.append_object_spec().write_handle().handle(), tc.handle);
459+
}
460+
}
461+
395462
} // namespace
396463
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
397464
} // namespace storage_internal

0 commit comments

Comments
 (0)