Skip to content

Commit 51f9c10

Browse files
authored
feat(storage): Add generation in append_object_spec in resume operation (#15395)
1 parent a4d34f3 commit 51f9c10

File tree

4 files changed

+97
-18
lines changed

4 files changed

+97
-18
lines changed

google/cloud/storage/internal/async/connection_impl.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) {
390390
false);
391391
return MakeWriterConnectionResumed(std::move(fa), std::move(impl),
392392
std::move(request), std::move(hash),
393-
*current);
393+
rpc->first_response, *current);
394394
});
395395
}
396396

google/cloud/storage/internal/async/writer_connection_resumed.cc

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,14 @@ class AsyncWriterConnectionResumedState
6464
std::unique_ptr<storage_experimental::AsyncWriterConnection> impl,
6565
google::storage::v2::BidiWriteObjectRequest initial_request,
6666
std::shared_ptr<storage::internal::HashFunction> hash_function,
67+
google::storage::v2::BidiWriteObjectResponse const& first_response,
6768
Options const& options, std::size_t buffer_size_lwm,
6869
std::size_t buffer_size_hwm)
6970
: factory_(std::move(factory)),
7071
impl_(std::move(impl)),
7172
initial_request_(std::move(initial_request)),
7273
hash_function_(std::move(hash_function)),
74+
first_response_(std::move(first_response)),
7375
buffer_size_lwm_(buffer_size_lwm),
7476
buffer_size_hwm_(buffer_size_hwm) {
7577
finalized_future_ = finalized_.get_future();
@@ -330,6 +332,7 @@ class AsyncWriterConnectionResumedState
330332
auto& append_object_spec = *request.mutable_append_object_spec();
331333
append_object_spec.set_bucket(spec.resource().bucket());
332334
append_object_spec.set_object(spec.resource().name());
335+
append_object_spec.set_generation(first_response_.resource().generation());
333336
ApplyWriteRedirectErrors(append_object_spec, std::move(proto_status));
334337

335338
// Capture the finalization state *before* starting the async resume.
@@ -531,6 +534,8 @@ class AsyncWriterConnectionResumedState
531534

532535
google::cloud::internal::ImmutableOptions options_;
533536

537+
google::storage::v2::BidiWriteObjectResponse first_response_;
538+
534539
// Request a server-side flush if the buffer goes over this threshold.
535540
std::size_t const buffer_size_lwm_;
536541

@@ -636,10 +641,11 @@ class AsyncWriterConnectionResumed
636641
std::unique_ptr<storage_experimental::AsyncWriterConnection> impl,
637642
google::storage::v2::BidiWriteObjectRequest initial_request,
638643
std::shared_ptr<storage::internal::HashFunction> hash_function,
644+
google::storage::v2::BidiWriteObjectResponse const& first_response,
639645
Options const& options)
640646
: state_(std::make_shared<AsyncWriterConnectionResumedState>(
641647
std::move(factory), std::move(impl), std::move(initial_request),
642-
std::move(hash_function), options,
648+
std::move(hash_function), first_response, options,
643649
options.get<storage_experimental::BufferedUploadLwmOption>(),
644650
options.get<storage_experimental::BufferedUploadHwmOption>())) {}
645651

@@ -683,10 +689,11 @@ MakeWriterConnectionResumed(
683689
std::unique_ptr<storage_experimental::AsyncWriterConnection> impl,
684690
google::storage::v2::BidiWriteObjectRequest initial_request,
685691
std::shared_ptr<storage::internal::HashFunction> hash_function,
692+
google::storage::v2::BidiWriteObjectResponse const& first_response,
686693
Options const& options) {
687694
return absl::make_unique<AsyncWriterConnectionResumed>(
688695
std::move(factory), std::move(impl), std::move(initial_request),
689-
std::move(hash_function), std::move(options));
696+
std::move(hash_function), std::move(first_response), std::move(options));
690697
}
691698

692699
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/storage/internal/async/writer_connection_resumed.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ MakeWriterConnectionResumed(
4242
std::unique_ptr<storage_experimental::AsyncWriterConnection> impl,
4343
google::storage::v2::BidiWriteObjectRequest initial_request,
4444
std::shared_ptr<storage::internal::HashFunction> hash_function,
45+
google::storage::v2::BidiWriteObjectResponse const& first_response,
4546
Options const& options);
4647

4748
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/storage/internal/async/writer_connection_resumed_test.cc

Lines changed: 86 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ using ::google::cloud::testing_util::AsyncSequencer;
3535
using ::google::cloud::testing_util::IsOkAndHolds;
3636
using ::google::cloud::testing_util::IsProtoEqual;
3737
using ::google::cloud::testing_util::StatusIs;
38+
using ::testing::_;
3839
using ::testing::Return;
3940
using ::testing::VariantWith;
4041

@@ -65,6 +66,7 @@ TEST(WriteConnectionResumed, FinalizeEmpty) {
6566
AsyncSequencer<bool> sequencer;
6667
auto mock = std::make_unique<MockAsyncWriterConnection>();
6768
auto initial_request = google::storage::v2::BidiWriteObjectRequest{};
69+
auto first_response = google::storage::v2::BidiWriteObjectResponse{};
6870

6971
EXPECT_CALL(*mock, UploadId).WillOnce(Return("test-upload-id"));
7072
EXPECT_CALL(*mock, PersistedState)
@@ -79,9 +81,9 @@ TEST(WriteConnectionResumed, FinalizeEmpty) {
7981
MockFactory mock_factory;
8082
EXPECT_CALL(mock_factory, Call).Times(0);
8183

82-
auto connection =
83-
MakeWriterConnectionResumed(mock_factory.AsStdFunction(), std::move(mock),
84-
initial_request, nullptr, Options{});
84+
auto connection = MakeWriterConnectionResumed(
85+
mock_factory.AsStdFunction(), std::move(mock), initial_request, nullptr,
86+
first_response, Options{});
8587
EXPECT_EQ(connection->UploadId(), "test-upload-id");
8688
EXPECT_THAT(connection->PersistedState(), VariantWith<std::int64_t>(0));
8789

@@ -95,6 +97,7 @@ TEST(WriteConnectionResumed, FinalizeEmpty) {
9597
TEST(WriteConnectionResumed, FinalizedOnConstruction) {
9698
AsyncSequencer<bool> sequencer;
9799
auto initial_request = google::storage::v2::BidiWriteObjectRequest{};
100+
auto first_response = google::storage::v2::BidiWriteObjectResponse{};
98101
auto mock = std::make_unique<MockAsyncWriterConnection>();
99102
EXPECT_CALL(*mock, UploadId).WillRepeatedly(Return("test-upload-id"));
100103
EXPECT_CALL(*mock, PersistedState).WillRepeatedly(Return(TestObject()));
@@ -103,9 +106,9 @@ TEST(WriteConnectionResumed, FinalizedOnConstruction) {
103106
MockFactory mock_factory;
104107
EXPECT_CALL(mock_factory, Call).Times(0);
105108

106-
auto connection =
107-
MakeWriterConnectionResumed(mock_factory.AsStdFunction(), std::move(mock),
108-
initial_request, nullptr, Options{});
109+
auto connection = MakeWriterConnectionResumed(
110+
mock_factory.AsStdFunction(), std::move(mock), initial_request, nullptr,
111+
first_response, Options{});
109112
EXPECT_EQ(connection->UploadId(), "test-upload-id");
110113
EXPECT_THAT(
111114
connection->PersistedState(),
@@ -119,6 +122,7 @@ TEST(WriteConnectionResumed, FinalizedOnConstruction) {
119122
TEST(WriteConnectionResumed, Cancel) {
120123
AsyncSequencer<bool> sequencer;
121124
auto initial_request = google::storage::v2::BidiWriteObjectRequest{};
125+
auto first_response = google::storage::v2::BidiWriteObjectResponse{};
122126
auto mock = std::make_unique<MockAsyncWriterConnection>();
123127
EXPECT_CALL(*mock, UploadId).WillRepeatedly(Return("test-upload-id"));
124128
EXPECT_CALL(*mock, PersistedState)
@@ -132,9 +136,9 @@ TEST(WriteConnectionResumed, Cancel) {
132136
MockFactory mock_factory;
133137
EXPECT_CALL(mock_factory, Call).Times(0);
134138

135-
auto connection =
136-
MakeWriterConnectionResumed(mock_factory.AsStdFunction(), std::move(mock),
137-
initial_request, nullptr, Options{});
139+
auto connection = MakeWriterConnectionResumed(
140+
mock_factory.AsStdFunction(), std::move(mock), initial_request, nullptr,
141+
first_response, Options{});
138142

139143
auto write = connection->Write(TestPayload(64 * 1024));
140144
ASSERT_FALSE(write.is_ready());
@@ -152,6 +156,7 @@ TEST(WriteConnectionResumed, Cancel) {
152156
TEST(WriterConnectionResumed, FlushEmpty) {
153157
AsyncSequencer<bool> sequencer;
154158
auto initial_request = google::storage::v2::BidiWriteObjectRequest{};
159+
auto first_response = google::storage::v2::BidiWriteObjectResponse{};
155160

156161
auto mock = std::make_unique<MockAsyncWriterConnection>();
157162
EXPECT_CALL(*mock, PersistedState)
@@ -174,9 +179,9 @@ TEST(WriterConnectionResumed, FlushEmpty) {
174179
MockFactory mock_factory;
175180
EXPECT_CALL(mock_factory, Call).Times(0);
176181

177-
auto connection =
178-
MakeWriterConnectionResumed(mock_factory.AsStdFunction(), std::move(mock),
179-
initial_request, nullptr, Options{});
182+
auto connection = MakeWriterConnectionResumed(
183+
mock_factory.AsStdFunction(), std::move(mock), initial_request, nullptr,
184+
first_response, Options{});
180185
EXPECT_THAT(connection->PersistedState(), VariantWith<std::int64_t>(0));
181186

182187
auto flush = connection->Flush({});
@@ -195,6 +200,7 @@ TEST(WriteConnectionResumed, FlushNonEmpty) {
195200
AsyncSequencer<bool> sequencer;
196201
auto mock = std::make_unique<MockAsyncWriterConnection>();
197202
auto initial_request = google::storage::v2::BidiWriteObjectRequest{};
203+
auto first_response = google::storage::v2::BidiWriteObjectResponse{};
198204
auto const payload = TestPayload(1024);
199205

200206
EXPECT_CALL(*mock, PersistedState)
@@ -217,9 +223,9 @@ TEST(WriteConnectionResumed, FlushNonEmpty) {
217223
MockFactory mock_factory;
218224
EXPECT_CALL(mock_factory, Call).Times(0);
219225

220-
auto connection =
221-
MakeWriterConnectionResumed(mock_factory.AsStdFunction(), std::move(mock),
222-
initial_request, nullptr, Options{});
226+
auto connection = MakeWriterConnectionResumed(
227+
mock_factory.AsStdFunction(), std::move(mock), initial_request, nullptr,
228+
first_response, Options{});
223229
EXPECT_THAT(connection->PersistedState(), VariantWith<std::int64_t>(0));
224230

225231
auto write = connection->Write(payload);
@@ -243,6 +249,71 @@ TEST(WriteConnectionResumed, FlushNonEmpty) {
243249
EXPECT_THAT(write.get(), StatusIs(StatusCode::kOk));
244250
}
245251

252+
TEST(WriteConnectionResumed, ResumeUsesGenerationFromFirstResponse) {
253+
AsyncSequencer<bool> sequencer;
254+
auto mock = std::make_unique<MockAsyncWriterConnection>();
255+
auto initial_request = google::storage::v2::BidiWriteObjectRequest{};
256+
initial_request.mutable_write_object_spec()->mutable_resource()->set_bucket(
257+
"projects/_/buckets/test-bucket");
258+
initial_request.mutable_write_object_spec()->mutable_resource()->set_name(
259+
"test-object");
260+
261+
google::storage::v2::BidiWriteObjectResponse first_response;
262+
first_response.mutable_resource()->set_generation(12345);
263+
264+
EXPECT_CALL(*mock, PersistedState)
265+
.WillRepeatedly(Return(MakePersistedState(0)));
266+
267+
// Expect Flush to be called because flush_ is true by default.
268+
// Make it fail to trigger Resume().
269+
EXPECT_CALL(*mock, Flush(_)).WillOnce([&](auto) {
270+
return sequencer.PushBack("Flush").then([](auto f) {
271+
if (f.get()) return google::cloud::Status{}; // Should not be true
272+
return TransientError();
273+
});
274+
});
275+
276+
MockFactory mock_factory;
277+
google::storage::v2::BidiWriteObjectRequest captured_request;
278+
EXPECT_CALL(mock_factory, Call(_))
279+
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest request) {
280+
captured_request = std::move(request);
281+
return sequencer.PushBack("Factory").then([](auto) {
282+
return StatusOr<WriteObject::WriteResult>(
283+
internal::AbortedError("stop test", GCP_ERROR_INFO()));
284+
});
285+
});
286+
287+
auto connection = MakeWriterConnectionResumed(
288+
mock_factory.AsStdFunction(), std::move(mock), initial_request, nullptr,
289+
first_response, Options{});
290+
291+
// This will call FlushStep -> mock->Flush()
292+
auto write = connection->Write(TestPayload(1));
293+
ASSERT_FALSE(write.is_ready());
294+
295+
// Trigger the Flush error in the mock
296+
auto next = sequencer.PopFrontWithName();
297+
EXPECT_EQ(next.second, "Flush");
298+
next.first.set_value(false); // This makes the lambda return TransientError
299+
300+
// The error in OnFlush triggers Resume(), which calls the mock_factory.
301+
// Allow the factory callback to proceed.
302+
next = sequencer.PopFrontWithName();
303+
EXPECT_EQ(next.second, "Factory");
304+
next.first.set_value(true);
305+
306+
// The write future should now be ready, containing the error from the
307+
// factory.
308+
EXPECT_THAT(write.get(), StatusIs(StatusCode::kAborted));
309+
310+
EXPECT_TRUE(captured_request.has_append_object_spec());
311+
EXPECT_EQ(captured_request.append_object_spec().generation(), 12345);
312+
EXPECT_EQ(captured_request.append_object_spec().object(), "test-object");
313+
EXPECT_EQ(captured_request.append_object_spec().bucket(),
314+
"projects/_/buckets/test-bucket");
315+
}
316+
246317
} // namespace
247318
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
248319
} // namespace storage_internal

0 commit comments

Comments
 (0)