Skip to content

Commit 8a2d049

Browse files
committed
adding test for multiple responses having latest handle
1 parent 9119270 commit 8a2d049

File tree

4 files changed

+184
-19
lines changed

4 files changed

+184
-19
lines changed

google/cloud/storage/internal/async/writer_connection_impl.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ AsyncWriterConnectionImpl::MakeRequest() {
177177
request.clear_append_object_spec();
178178
}
179179
request.set_write_offset(offset_);
180-
if (latest_write_handle_.has_value() && request.has_append_object_spec()) {
180+
if (latest_write_handle_.has_value()) {
181181
*request.mutable_append_object_spec()->mutable_write_handle() =
182182
*latest_write_handle_;
183183
}

google/cloud/storage/internal/async/writer_connection_impl_test.cc

Lines changed: 106 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
#include "google/cloud/storage/internal/async/writer_connection_impl.h"
1616
#include "google/cloud/mocks/mock_async_streaming_read_write_rpc.h"
17+
#include "google/cloud/storage/internal/async/write_object.h"
1718
#include "google/cloud/storage/internal/crc32c.h"
1819
#include "google/cloud/storage/internal/grpc/ctype_cord_workaround.h"
1920
#include "google/cloud/storage/internal/hash_function_impl.h"
@@ -648,55 +649,148 @@ TEST(AsyncWriterConnectionTest, UnexpectedQueryFailsWithoutError) {
648649
EXPECT_THAT(query.get(), StatusIs(StatusCode::kInternal));
649650
}
650651

651-
TEST(AsyncWriterConnectionImpl, WriteHandleIsPropagatedAfterQuery) {
652+
TEST(AsyncWriterConnectionTest, WriteHandleIsUpdatedAfterQuery) {
652653
AsyncSequencer<bool> sequencer;
653654
auto mock = std::make_unique<MockStream>();
654-
int write_call_count = 0;
655+
std::vector<std::string> seen_handles;
656+
655657
EXPECT_CALL(*mock, Write)
656-
.Times(2)
658+
.Times(3)
657659
.WillRepeatedly([&](Request const& req, grpc::WriteOptions) {
658660
EXPECT_TRUE(req.has_append_object_spec());
659661
EXPECT_TRUE(req.append_object_spec().has_write_handle());
660-
EXPECT_EQ(req.append_object_spec().write_handle().handle(),
661-
"test-handle");
662-
++write_call_count;
662+
seen_handles.push_back(
663+
req.append_object_spec().write_handle().handle());
663664
return sequencer.PushBack("Write");
664665
});
665-
EXPECT_CALL(*mock, Read).WillOnce([&]() {
666+
667+
int read_call_count = 0;
668+
EXPECT_CALL(*mock, Read).Times(2).WillRepeatedly([&]() {
666669
Response resp;
667-
resp.mutable_write_handle()->set_handle("test-handle");
668-
resp.set_persisted_size(42);
670+
if (read_call_count == 0) {
671+
resp.mutable_write_handle()->set_handle("handle1");
672+
resp.set_persisted_size(42);
673+
} else {
674+
resp.mutable_write_handle()->set_handle("handle2");
675+
resp.set_persisted_size(43);
676+
}
677+
++read_call_count;
669678
return make_ready_future(absl::make_optional(std::move(resp)));
670679
});
680+
681+
auto hash = std::make_shared<MockHashFunction>();
671682
EXPECT_CALL(*mock, Cancel).Times(1);
672683
EXPECT_CALL(*mock, Finish).WillOnce([] {
673684
return make_ready_future(Status{});
674685
});
686+
EXPECT_CALL(*hash, Update(_, An<absl::Cord const&>(), _)).Times(3);
675687

676-
auto hash = std::make_shared<MockHashFunction>();
677688
google::storage::v2::BidiWriteObjectRequest req;
678689
req.mutable_append_object_spec()->set_bucket("bucket");
679690
req.mutable_append_object_spec()->set_object("object");
680691

681692
auto tested = std::make_unique<AsyncWriterConnectionImpl>(
682693
TestOptions(), req, std::move(mock), hash, 0);
683694

695+
// First Query sets handle1.
684696
EXPECT_THAT(tested->Query().get(), IsOkAndHolds(42));
685697

698+
// First Write uses handle1.
686699
auto result1 = tested->Write(WritePayload("payload1"));
687700
auto next1 = sequencer.PopFrontWithName();
688701
ASSERT_THAT(next1.second, "Write");
689702
next1.first.set_value(true);
690703
EXPECT_STATUS_OK(result1.get());
691704

692-
// Second write should also include write_handle.
705+
// Second Query sets handle2.
706+
EXPECT_THAT(tested->Query().get(), IsOkAndHolds(43));
707+
708+
// Second Write uses handle2.
709+
auto result2 = tested->Write(WritePayload("payload2"));
710+
auto next2 = sequencer.PopFrontWithName();
711+
ASSERT_THAT(next2.second, "Write");
712+
next2.first.set_value(true);
713+
EXPECT_STATUS_OK(result2.get());
714+
715+
// Third Write also uses handle2.
716+
auto result3 = tested->Write(WritePayload("payload3"));
717+
auto next3 = sequencer.PopFrontWithName();
718+
ASSERT_THAT(next3.second, "Write");
719+
next3.first.set_value(true);
720+
EXPECT_STATUS_OK(result3.get());
721+
722+
ASSERT_EQ(seen_handles.size(), 3);
723+
EXPECT_EQ(seen_handles[0], "handle1");
724+
EXPECT_EQ(seen_handles[1], "handle2");
725+
EXPECT_EQ(seen_handles[2], "handle2");
726+
}
727+
728+
TEST(AsyncWriterConnectionTest, WriteHandleIsUpdatedAfterResume) {
729+
AsyncSequencer<bool> sequencer;
730+
auto mock = std::make_unique<MockStream>();
731+
std::vector<std::string> seen_handles;
732+
733+
EXPECT_CALL(*mock, Write)
734+
.Times(2)
735+
.WillRepeatedly([&](Request const& req, grpc::WriteOptions) {
736+
EXPECT_TRUE(req.has_append_object_spec());
737+
EXPECT_TRUE(req.append_object_spec().has_write_handle());
738+
seen_handles.push_back(
739+
req.append_object_spec().write_handle().handle());
740+
return sequencer.PushBack("Write");
741+
});
742+
743+
EXPECT_CALL(*mock, Read)
744+
.WillOnce([&]() {
745+
Response resp;
746+
resp.mutable_write_handle()->set_handle("handle1");
747+
resp.set_persisted_size(42);
748+
return make_ready_future(absl::make_optional(std::move(resp)));
749+
})
750+
.WillOnce([&]() {
751+
Response resp;
752+
resp.mutable_write_handle()->set_handle("handle2");
753+
resp.set_persisted_size(43);
754+
return make_ready_future(absl::make_optional(std::move(resp)));
755+
});
756+
757+
EXPECT_CALL(*mock, Cancel).Times(1);
758+
EXPECT_CALL(*mock, Finish).WillOnce([] {
759+
return make_ready_future(Status{});
760+
});
761+
762+
auto hash = std::make_shared<MockHashFunction>();
763+
EXPECT_CALL(*hash, Update(_, An<absl::Cord const&>(), _)).Times(2);
764+
765+
google::storage::v2::BidiWriteObjectRequest req;
766+
req.mutable_append_object_spec()->set_bucket("bucket");
767+
req.mutable_append_object_spec()->set_object("object");
768+
769+
auto tested = std::make_unique<AsyncWriterConnectionImpl>(
770+
TestOptions(), req, std::move(mock), hash, 0);
771+
772+
// First Query sets handle1.
773+
EXPECT_THAT(tested->Query().get(), IsOkAndHolds(42));
774+
775+
// First Write uses handle1 but fails.
776+
auto result1 = tested->Write(WritePayload("payload1"));
777+
auto next1 = sequencer.PopFrontWithName();
778+
ASSERT_THAT(next1.second, "Write");
779+
next1.first.set_value(false);
780+
781+
// Simulate resume by calling Query again which returns handle2.
782+
EXPECT_THAT(tested->Query().get(), IsOkAndHolds(43));
783+
784+
// Second Write should use handle2.
693785
auto result2 = tested->Write(WritePayload("payload2"));
694786
auto next2 = sequencer.PopFrontWithName();
695787
ASSERT_THAT(next2.second, "Write");
696788
next2.first.set_value(true);
697789
EXPECT_STATUS_OK(result2.get());
698790

699-
EXPECT_EQ(write_call_count, 2);
791+
ASSERT_EQ(seen_handles.size(), 2);
792+
EXPECT_EQ(seen_handles[0], "handle1");
793+
EXPECT_EQ(seen_handles[1], "handle2");
700794
}
701795

702796
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/storage/internal/async/writer_connection_resumed.cc

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,13 @@ class AsyncWriterConnectionResumedState
8282
} else {
8383
buffer_offset_ = absl::get<std::int64_t>(state);
8484
}
85+
if (first_response_.has_write_handle()) {
86+
latest_write_handle_ = first_response_.write_handle();
87+
} else if (initial_request_.has_append_object_spec() &&
88+
initial_request_.append_object_spec().has_write_handle()) {
89+
latest_write_handle_ =
90+
initial_request_.append_object_spec().write_handle();
91+
}
8592
}
8693

8794
void Cancel() {
@@ -336,9 +343,8 @@ class AsyncWriterConnectionResumedState
336343
}
337344
// Include write_handle to enable fast resume instead of slow
338345
// takeover. Without handle, server performs full state validation.
339-
if (first_response_.has_write_handle()) {
340-
*append_object_spec.mutable_write_handle() =
341-
first_response_.write_handle();
346+
if (latest_write_handle_) {
347+
*append_object_spec.mutable_write_handle() = *latest_write_handle_;
342348
}
343349
append_object_spec.set_generation(first_response_.resource().generation());
344350
ApplyWriteRedirectErrors(append_object_spec, std::move(proto_status));
@@ -365,8 +371,7 @@ class AsyncWriterConnectionResumedState
365371
std::unique_lock<std::mutex> lk(mu_);
366372
// Update write_handle from any resume response that contains it.
367373
if (res && res->first_response.has_write_handle()) {
368-
*first_response_.mutable_write_handle() =
369-
res->first_response.write_handle();
374+
latest_write_handle_ = res->first_response.write_handle();
370375
}
371376

372377
if (was_finalizing) {
@@ -607,6 +612,9 @@ class AsyncWriterConnectionResumedState
607612

608613
// Tracks if the final promise (`finalized_`) has been completed.
609614
bool finalized_promise_completed_ = false;
615+
616+
// Track the latest write handle seen in responses.
617+
absl::optional<google::storage::v2::BidiWriteHandle> latest_write_handle_;
610618
};
611619

612620
/**

google/cloud/storage/internal/async/writer_connection_resumed_test.cc

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ TEST(WriteConnectionResumed, ResumeUsesWriteObjectSpecFromInitialRequest) {
268268
"test-object");
269269

270270
google::storage::v2::BidiWriteObjectResponse first_response;
271-
first_response.mutable_write_handle();
271+
first_response.mutable_write_handle()->set_handle("test-handle");
272272
first_response.mutable_resource()->set_generation(12345);
273273

274274
EXPECT_CALL(*mock, PersistedState)
@@ -320,6 +320,8 @@ TEST(WriteConnectionResumed, ResumeUsesWriteObjectSpecFromInitialRequest) {
320320
EXPECT_FALSE(captured_request.has_write_object_spec());
321321
EXPECT_TRUE(captured_request.has_append_object_spec());
322322
EXPECT_TRUE(captured_request.append_object_spec().has_write_handle());
323+
EXPECT_EQ(captured_request.append_object_spec().write_handle().handle(),
324+
"test-handle");
323325
EXPECT_EQ(captured_request.append_object_spec().generation(), 12345);
324326
EXPECT_EQ(captured_request.append_object_spec().object(), "test-object");
325327
EXPECT_EQ(captured_request.append_object_spec().bucket(),
@@ -392,6 +394,67 @@ TEST(WriteConnectionResumed, ResumeUsesAppendObjectSpecFromInitialRequest) {
392394
"projects/_/buckets/test-bucket");
393395
}
394396

397+
TEST(WriteConnectionResumed, WriteHandleAssignmentAfterResume) {
398+
struct {
399+
bool use_write_object_spec;
400+
std::string bucket, object, handle;
401+
std::int64_t generation;
402+
} cases[] = {
403+
{false, "projects/_/buckets/test-bucket", "test-object", "expected-handle", 12345},
404+
{true, "bucket1", "object1", "handle1", 111},
405+
{false, "bucket2", "object2", "handle2", 222},
406+
};
407+
408+
for (auto const& tc : cases) {
409+
AsyncSequencer<bool> sequencer;
410+
auto mock = std::make_unique<MockAsyncWriterConnection>();
411+
google::storage::v2::BidiWriteObjectRequest req;
412+
if (tc.use_write_object_spec) {
413+
req.mutable_write_object_spec()->mutable_resource()->set_bucket(tc.bucket);
414+
req.mutable_write_object_spec()->mutable_resource()->set_name(tc.object);
415+
} else {
416+
req.mutable_append_object_spec()->set_bucket(tc.bucket);
417+
req.mutable_append_object_spec()->set_object(tc.object);
418+
}
419+
google::storage::v2::BidiWriteObjectResponse resp;
420+
resp.mutable_write_handle()->set_handle(tc.handle);
421+
resp.mutable_resource()->set_generation(tc.generation);
422+
423+
EXPECT_CALL(*mock, PersistedState).WillRepeatedly(Return(MakePersistedState(0)));
424+
EXPECT_CALL(*mock, Flush(_)).WillOnce([&](auto) {
425+
return sequencer.PushBack("Flush").then([](auto f) {
426+
if (f.get()) return google::cloud::Status{};
427+
return TransientError();
428+
});
429+
});
430+
431+
MockFactory mock_factory;
432+
google::storage::v2::BidiWriteObjectRequest captured;
433+
EXPECT_CALL(mock_factory, Call(_))
434+
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest r) {
435+
captured = std::move(r);
436+
return sequencer.PushBack("Factory").then([](auto) {
437+
return StatusOr<WriteObject::WriteResult>(
438+
internal::AbortedError("stop test", GCP_ERROR_INFO()));
439+
});
440+
});
441+
442+
auto conn = MakeWriterConnectionResumed(
443+
mock_factory.AsStdFunction(), std::move(mock), req, nullptr, resp, Options{});
444+
auto write = conn->Write(TestPayload(1));
445+
sequencer.PopFrontWithName().first.set_value(false);
446+
sequencer.PopFrontWithName().first.set_value(true);
447+
448+
EXPECT_THAT(write.get(), StatusIs(StatusCode::kAborted));
449+
EXPECT_TRUE(captured.has_append_object_spec());
450+
EXPECT_EQ(captured.append_object_spec().bucket(), tc.bucket);
451+
EXPECT_EQ(captured.append_object_spec().object(), tc.object);
452+
EXPECT_EQ(captured.append_object_spec().generation(), tc.generation);
453+
EXPECT_TRUE(captured.append_object_spec().has_write_handle());
454+
EXPECT_EQ(captured.append_object_spec().write_handle().handle(), tc.handle);
455+
}
456+
}
457+
395458
} // namespace
396459
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
397460
} // namespace storage_internal

0 commit comments

Comments
 (0)