Skip to content

Commit 60befd1

Browse files
fix(storage): fixing flushing and finalization in buffered upload of async client (#15572)
1 parent ce75107 commit 60befd1

File tree

6 files changed

+957
-282
lines changed

6 files changed

+957
-282
lines changed

google/cloud/storage/examples/storage_async_samples.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1033,7 +1033,7 @@ std::string SuspendBufferedUpload(
10331033
google::cloud::storage_experimental::AsyncClient&,
10341034
std::vector<std::string> const&) {
10351035
std::cerr
1036-
<< "AsyncClient::StartBufferedUpload() example requires coroutines\n";
1036+
<< "AsyncClient::SuspendBufferedUpload() example requires coroutines\n";
10371037
return {};
10381038
}
10391039

@@ -1053,7 +1053,7 @@ std::string SuspendUnbufferedUpload(
10531053
google::cloud::storage_experimental::AsyncClient&,
10541054
std::vector<std::string> const&) {
10551055
std::cerr
1056-
<< "AsyncClient::StartUnbufferedUpload() example requires coroutines\n";
1056+
<< "AsyncClient::SuspendUnbufferedUpload() example requires coroutines\n";
10571057
return {};
10581058
}
10591059

@@ -1337,7 +1337,7 @@ void AutoRun(std::vector<std::string> const& argv) {
13371337
auto upload_id = SuspendBufferedUpload(client, {bucket_name, object_name});
13381338

13391339
std::cout << "Running the ResumeBufferedUpload() example" << std::endl;
1340-
ResumeUnbufferedUpload(client, {upload_id});
1340+
ResumeBufferedUpload(client, {upload_id});
13411341
scheduled_for_delete.push_back(std::move(object_name));
13421342
object_name = examples::MakeRandomObjectName(generator, "object-");
13431343

@@ -1512,7 +1512,7 @@ int main(int argc, char* argv[]) try {
15121512
make_resume_entry("resume-buffered-upload", {}, ResumeBufferedUpload),
15131513

15141514
make_entry("start-unbuffered-upload", {"<filename>"},
1515-
StartBufferedUpload),
1515+
StartUnbufferedUpload),
15161516
make_entry("suspend-unbuffered-upload", {}, SuspendUnbufferedUpload),
15171517
make_resume_entry("resume-unbuffered-upload", {"<filename>"},
15181518
ResumeUnbufferedUpload),

google/cloud/storage/internal/async/connection_impl_upload_hash_test.cc

Lines changed: 48 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ using ::google::cloud::storage::testing::MockAsyncBidiWriteObjectStream;
3939
using ::google::cloud::testing_util::AsyncSequencer;
4040
using ::google::cloud::testing_util::IsProtoEqual;
4141
using ::google::cloud::testing_util::MockCompletionQueueImpl;
42+
using ::google::storage::v2::BidiWriteObjectRequest;
4243

4344
using AsyncBidiWriteObjectStream = ::google::cloud::AsyncStreamingReadWriteRpc<
4445
google::storage::v2::BidiWriteObjectRequest,
@@ -487,25 +488,43 @@ TEST_P(AsyncConnectionImplUploadHashTest, StartBuffered) {
487488
});
488489
EXPECT_CALL(*stream, Write)
489490
.WillOnce(
490-
[&](google::storage::v2::BidiWriteObjectRequest const& request,
491-
grpc::WriteOptions wopt) {
492-
EXPECT_EQ(request.upload_id(), "test-upload-id");
493-
EXPECT_TRUE(request.finish_write());
494-
EXPECT_THAT(request.object_checksums(),
495-
IsProtoEqual(expected_checksums));
496-
EXPECT_TRUE(wopt.is_last_message());
497-
return sequencer.PushBack("Write");
498-
});
499-
EXPECT_CALL(*stream, Read).WillOnce([&] {
500-
return sequencer.PushBack("Read").then([](auto) {
501-
auto response = google::storage::v2::BidiWriteObjectResponse{};
502-
response.mutable_resource()->set_bucket(
503-
"projects/_/buckets/test-bucket");
504-
response.mutable_resource()->set_name("test-object");
505-
response.mutable_resource()->set_generation(123456);
506-
return absl::make_optional(std::move(response));
507-
});
508-
});
491+
[&](BidiWriteObjectRequest const& request, grpc::WriteOptions) {
492+
EXPECT_FALSE(request.finish_write());
493+
return sequencer.PushBack("Write(1)");
494+
})
495+
.WillOnce([&](BidiWriteObjectRequest const& request,
496+
grpc::WriteOptions wopt) {
497+
EXPECT_FALSE(request.has_upload_id());
498+
EXPECT_TRUE(request.finish_write());
499+
EXPECT_THAT(request.object_checksums(),
500+
IsProtoEqual(expected_checksums));
501+
EXPECT_TRUE(wopt.is_last_message());
502+
return sequencer.PushBack("Write(2)");
503+
});
504+
EXPECT_CALL(*stream, Read)
505+
.WillOnce([&]() {
506+
return sequencer.PushBack("Read(1)").then(
507+
[](auto f) -> absl::optional<
508+
google::storage::v2::BidiWriteObjectResponse> {
509+
if (!f.get()) return absl::nullopt;
510+
auto response = google::storage::v2::BidiWriteObjectResponse{};
511+
response.set_persisted_size(43);
512+
return absl::make_optional(std::move(response));
513+
});
514+
})
515+
.WillOnce([&]() {
516+
return sequencer.PushBack("Read(2)").then(
517+
[](auto f) -> absl::optional<
518+
google::storage::v2::BidiWriteObjectResponse> {
519+
if (!f.get()) return absl::nullopt;
520+
auto response = google::storage::v2::BidiWriteObjectResponse{};
521+
response.mutable_resource()->set_bucket(
522+
"projects/_/buckets/test-bucket");
523+
response.mutable_resource()->set_name("test-object");
524+
response.mutable_resource()->set_generation(123456);
525+
return absl::make_optional(std::move(response));
526+
});
527+
});
509528
EXPECT_CALL(*stream, Cancel).Times(1);
510529
EXPECT_CALL(*stream, Finish).WillOnce([&] {
511530
return sequencer.PushBack("Finish").then([](auto) { return Status{}; });
@@ -543,11 +562,19 @@ TEST_P(AsyncConnectionImplUploadHashTest, StartBuffered) {
543562
EXPECT_EQ(absl::get<std::int64_t>(writer->PersistedState()), 0);
544563

545564
auto w2 = writer->Finalize(storage_experimental::WritePayload(kQuickFox));
565+
// The `Finalize()` call triggers a `Flush()` first.
546566
next = sequencer.PopFrontWithName();
547-
EXPECT_EQ(next.second, "Write");
567+
EXPECT_EQ(next.second, "Write(1)");
548568
next.first.set_value(true);
569+
549570
next = sequencer.PopFrontWithName();
550-
EXPECT_EQ(next.second, "Read");
571+
EXPECT_EQ(next.second, "Read(1)");
572+
next.first.set_value(true);
573+
next = sequencer.PopFrontWithName();
574+
EXPECT_EQ(next.second, "Write(2)");
575+
next.first.set_value(true);
576+
next = sequencer.PopFrontWithName();
577+
EXPECT_EQ(next.second, "Read(2)");
551578
next.first.set_value(true);
552579

553580
auto response = w2.get();

google/cloud/storage/internal/async/connection_impl_upload_test.cc

Lines changed: 51 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -940,195 +940,117 @@ TEST_F(AsyncConnectionImplTest, BufferedUploadNewUpload) {
940940
}
941941

942942
TEST_F(AsyncConnectionImplTest, ResumeBufferedUploadNewUploadResume) {
943-
auto constexpr kExpectedRequest = R"pb(
944-
write_object_spec {
945-
resource {
946-
bucket: "projects/_/buckets/test-bucket"
947-
name: "test-object"
948-
content_type: "text/plain"
949-
}
950-
if_generation_match: 123
951-
}
952-
)pb";
953-
954943
AsyncSequencer<bool> sequencer;
955944
auto mock = std::make_shared<storage::testing::MockStorageStub>();
956-
EXPECT_CALL(*mock, AsyncStartResumableWrite)
957-
.WillOnce([&] {
958-
return sequencer.PushBack("StartResumableWrite(1)").then([](auto) {
959-
return StatusOr<google::storage::v2::StartResumableWriteResponse>(
960-
TransientError());
961-
});
962-
})
963-
.WillOnce(
964-
[&](auto&, auto, auto,
965-
google::storage::v2::StartResumableWriteRequest const& request) {
966-
auto expected = google::storage::v2::StartResumableWriteRequest{};
967-
EXPECT_TRUE(
968-
TextFormat::ParseFromString(kExpectedRequest, &expected));
969-
EXPECT_THAT(request, IsProtoEqual(expected));
970945

971-
return sequencer.PushBack("StartResumableWrite(2)").then([](auto) {
972-
auto response =
973-
google::storage::v2::StartResumableWriteResponse{};
974-
response.set_upload_id("test-upload-id");
975-
return make_status_or(response);
976-
});
977-
});
978-
EXPECT_CALL(*mock, AsyncQueryWriteStatus)
979-
.WillOnce([&] {
980-
return sequencer.PushBack("QueryWriteStatus(1)").then([](auto) {
981-
return StatusOr<google::storage::v2::QueryWriteStatusResponse>(
982-
TransientError());
983-
});
984-
})
985-
.WillOnce(
986-
[&](auto&, auto, auto,
987-
google::storage::v2::QueryWriteStatusRequest const& request) {
988-
EXPECT_EQ(request.upload_id(), "test-upload-id");
946+
EXPECT_CALL(*mock, AsyncStartResumableWrite).WillOnce([&] {
947+
return sequencer.PushBack("StartResumableWrite").then([](auto) {
948+
auto response = google::storage::v2::StartResumableWriteResponse{};
949+
response.set_upload_id("test-upload-id");
950+
return make_status_or(response);
951+
});
952+
});
989953

990-
return sequencer.PushBack("QueryWriteStatus(2)").then([](auto) {
991-
auto response = google::storage::v2::QueryWriteStatusResponse{};
992-
response.set_persisted_size(0);
993-
return make_status_or(response);
994-
});
995-
});
954+
// We expect two calls to AsyncBidiWriteObject.
955+
// 1. The first is for the initial upload, which will fail.
956+
// 2. The second is for the resume attempt, which will be created and then
957+
// cancelled as the finalize operation fails.
996958
EXPECT_CALL(*mock, AsyncBidiWriteObject)
997-
.WillOnce(
998-
[&] { return MakeErrorBidiWriteStream(sequencer, TransientError()); })
999959
.WillOnce([&]() {
1000960
auto stream = std::make_unique<MockAsyncBidiWriteObjectStream>();
1001961
EXPECT_CALL(*stream, Start).WillOnce([&] {
1002962
return sequencer.PushBack("Start(1)");
1003963
});
1004-
EXPECT_CALL(*stream, Write)
1005-
.WillOnce(
1006-
[&](google::storage::v2::BidiWriteObjectRequest const& request,
1007-
grpc::WriteOptions wopt) {
1008-
EXPECT_EQ(request.upload_id(), "test-upload-id");
1009-
EXPECT_TRUE(request.finish_write());
1010-
EXPECT_TRUE(request.has_object_checksums());
1011-
EXPECT_TRUE(wopt.is_last_message());
1012-
return sequencer.PushBack("Write");
1013-
});
964+
EXPECT_CALL(*stream, Write).WillOnce([&] {
965+
return sequencer.PushBack("Write");
966+
});
1014967
EXPECT_CALL(*stream, Read).WillOnce([&] {
1015968
return sequencer.PushBack("Read").then([](auto) {
1016969
return absl::optional<
1017970
google::storage::v2::BidiWriteObjectResponse>{};
1018971
});
1019972
});
1020-
EXPECT_CALL(*stream, Cancel).Times(1);
973+
// This stream finishes with an error, triggering the resume logic.
1021974
EXPECT_CALL(*stream, Finish).WillOnce([&] {
1022-
return sequencer.PushBack("Finish").then(
1023-
[](auto) { return TransientError(); });
975+
return sequencer.PushBack("Finish(1)").then([](auto) {
976+
return TransientError();
977+
});
1024978
});
1025979
return std::unique_ptr<AsyncBidiWriteObjectStream>(std::move(stream));
1026980
})
1027981
.WillOnce([&]() {
1028982
auto stream = std::make_unique<MockAsyncBidiWriteObjectStream>();
983+
// This stream is created but then immediately cancelled.
1029984
EXPECT_CALL(*stream, Start).WillOnce([&] {
1030985
return sequencer.PushBack("Start(2)");
1031986
});
1032-
EXPECT_CALL(*stream, Write)
1033-
.WillOnce(
1034-
[&](google::storage::v2::BidiWriteObjectRequest const& request,
1035-
grpc::WriteOptions wopt) {
1036-
EXPECT_EQ(request.upload_id(), "test-upload-id");
1037-
EXPECT_TRUE(request.finish_write());
1038-
EXPECT_TRUE(request.has_object_checksums());
1039-
EXPECT_TRUE(wopt.is_last_message());
1040-
return sequencer.PushBack("Write");
1041-
});
1042-
EXPECT_CALL(*stream, Read).WillOnce([&] {
1043-
return sequencer.PushBack("Read").then([](auto) {
1044-
auto response = google::storage::v2::BidiWriteObjectResponse{};
1045-
response.mutable_resource()->set_bucket(
1046-
"projects/_/buckets/test-bucket");
1047-
response.mutable_resource()->set_name("test-object");
1048-
response.mutable_resource()->set_generation(123456);
1049-
return absl::make_optional(std::move(response));
1050-
});
1051-
});
1052987
EXPECT_CALL(*stream, Cancel).Times(1);
1053988
EXPECT_CALL(*stream, Finish).WillOnce([&] {
1054-
return sequencer.PushBack("Finish").then(
1055-
[](auto) { return Status{}; });
989+
return sequencer.PushBack("Finish(2)").then([](auto) {
990+
return Status{};
991+
});
1056992
});
1057993
return std::unique_ptr<AsyncBidiWriteObjectStream>(std::move(stream));
1058994
});
1059995

996+
// After the first stream fails, the connection queries the status to resume.
997+
EXPECT_CALL(*mock, AsyncQueryWriteStatus)
998+
.WillOnce(
999+
[&](auto&, auto, auto,
1000+
google::storage::v2::QueryWriteStatusRequest const& request) {
1001+
EXPECT_EQ(request.upload_id(), "test-upload-id");
1002+
return sequencer.PushBack("QueryWriteStatus").then([](auto) {
1003+
auto response = google::storage::v2::QueryWriteStatusResponse{};
1004+
response.set_persisted_size(0);
1005+
return make_status_or(response);
1006+
});
1007+
});
1008+
10601009
internal::AutomaticallyCreatedBackgroundThreads pool(1);
10611010
auto connection = MakeTestConnection(pool.cq(), mock);
1062-
auto request = google::storage::v2::StartResumableWriteRequest{};
1063-
ASSERT_TRUE(TextFormat::ParseFromString(kExpectedRequest, &request));
1064-
auto pending = connection->StartBufferedUpload(
1065-
{std::move(request), connection->options()});
10661011

1012+
// Start the upload and get the writer.
1013+
auto pending = connection->StartBufferedUpload(
1014+
{google::storage::v2::StartResumableWriteRequest{},
1015+
connection->options()});
10671016
auto next = sequencer.PopFrontWithName();
1068-
EXPECT_EQ(next.second, "StartResumableWrite(1)");
1069-
next.first.set_value(true);
1070-
1071-
next = sequencer.PopFrontWithName();
1072-
EXPECT_EQ(next.second, "StartResumableWrite(2)");
1017+
EXPECT_EQ(next.second, "StartResumableWrite");
10731018
next.first.set_value(true);
1074-
1075-
next = sequencer.PopFrontWithName();
1076-
EXPECT_EQ(next.second, "Start");
1077-
next.first.set_value(false); // The first stream fails
1078-
1079-
next = sequencer.PopFrontWithName();
1080-
EXPECT_EQ(next.second, "Finish");
1081-
next.first.set_value(false);
1082-
10831019
next = sequencer.PopFrontWithName();
10841020
EXPECT_EQ(next.second, "Start(1)");
10851021
next.first.set_value(true);
1086-
10871022
auto r = pending.get();
10881023
ASSERT_STATUS_OK(r);
10891024
auto writer = *std::move(r);
1090-
EXPECT_EQ(writer->UploadId(), "test-upload-id");
1091-
EXPECT_EQ(absl::get<std::int64_t>(writer->PersistedState()), 0);
10921025

1093-
auto w1 = writer->Finalize({});
1026+
// Finalize the writer. This will start the failure/resume sequence.
1027+
auto final_status = writer->Finalize({});
10941028
next = sequencer.PopFrontWithName();
10951029
EXPECT_EQ(next.second, "Write");
10961030
next.first.set_value(true);
10971031
next = sequencer.PopFrontWithName();
10981032
EXPECT_EQ(next.second, "Read");
10991033
next.first.set_value(true);
1100-
1101-
next = sequencer.PopFrontWithName();
1102-
EXPECT_EQ(next.second, "Finish");
1103-
next.first.set_value(true);
1104-
11051034
next = sequencer.PopFrontWithName();
1106-
EXPECT_EQ(next.second, "QueryWriteStatus(1)");
1107-
next.first.set_value(true);
1035+
EXPECT_EQ(next.second, "Finish(1)");
1036+
next.first.set_value(true); // Fails with TransientError
11081037

1038+
// The buffered writer now starts the resume.
11091039
next = sequencer.PopFrontWithName();
1110-
EXPECT_EQ(next.second, "QueryWriteStatus(2)");
1040+
EXPECT_EQ(next.second, "QueryWriteStatus");
11111041
next.first.set_value(true);
1112-
1042+
// The connection creates the second stream.
11131043
next = sequencer.PopFrontWithName();
11141044
EXPECT_EQ(next.second, "Start(2)");
11151045
next.first.set_value(true);
1116-
next = sequencer.PopFrontWithName();
1117-
EXPECT_EQ(next.second, "Write");
1118-
next.first.set_value(true);
1119-
next = sequencer.PopFrontWithName();
1120-
EXPECT_EQ(next.second, "Read");
1121-
next.first.set_value(true);
11221046

1123-
auto response = w1.get();
1124-
ASSERT_STATUS_OK(response);
1125-
EXPECT_EQ(response->bucket(), "projects/_/buckets/test-bucket");
1126-
EXPECT_EQ(response->name(), "test-object");
1127-
EXPECT_EQ(response->generation(), 123456);
1047+
// Now the code fails with the original error.
1048+
EXPECT_THAT(final_status.get(), StatusIs(TransientError().code()));
11281049

1050+
// Clean up the writer, which will cancel and finish the second stream.
11291051
writer.reset();
11301052
next = sequencer.PopFrontWithName();
1131-
EXPECT_EQ(next.second, "Finish");
1053+
EXPECT_EQ(next.second, "Finish(2)");
11321054
next.first.set_value(true);
11331055
}
11341056

0 commit comments

Comments
 (0)