Skip to content

Commit 2d8e23a

Browse files
authored
feat(storage): Add unit test for AppendableUploadConnectionImpl (#15200)
* feat(storage) : Add unit test for AppendableUploadConnectionImpl * Remove the unwanted comment. * Remove unnecesary declerations * Update the copyright year * remove the unwanted declaration
1 parent acd4884 commit 2d8e23a

File tree

3 files changed

+357
-0
lines changed

3 files changed

+357
-0
lines changed

google/cloud/storage/google_cloud_cpp_storage_grpc.cmake

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,7 @@ set(storage_client_grpc_unit_tests
430430
async/token_test.cc
431431
async/writer_test.cc
432432
grpc_plugin_test.cc
433+
internal/async/connection_impl_appendable_upload_test.cc
433434
internal/async/connection_impl_insert_test.cc
434435
internal/async/connection_impl_open_test.cc
435436
internal/async/connection_impl_read_hash_test.cc
Lines changed: 355 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,355 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "google/cloud/storage/async/retry_policy.h"
16+
#include "google/cloud/storage/async/writer_connection.h"
17+
#include "google/cloud/storage/internal/async/connection_impl.h"
18+
#include "google/cloud/storage/internal/async/default_options.h"
19+
#include "google/cloud/storage/testing/canonical_errors.h"
20+
#include "google/cloud/storage/testing/mock_storage_stub.h"
21+
#include "google/cloud/common_options.h"
22+
#include "google/cloud/grpc_options.h"
23+
#include "google/cloud/internal/background_threads_impl.h"
24+
#include "google/cloud/testing_util/async_sequencer.h"
25+
#include "google/cloud/testing_util/is_proto_equal.h"
26+
#include "google/cloud/testing_util/mock_completion_queue_impl.h"
27+
#include "google/cloud/testing_util/status_matchers.h"
28+
#include <google/protobuf/text_format.h>
29+
#include <gmock/gmock.h>
30+
31+
namespace google {
32+
namespace cloud {
33+
namespace storage_internal {
34+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
35+
namespace {
36+
37+
using ::google::cloud::storage::testing::MockAsyncBidiWriteObjectStream;
38+
using ::google::cloud::storage::testing::canonical_errors::PermanentError;
39+
using ::google::cloud::storage::testing::canonical_errors::TransientError;
40+
using ::google::cloud::testing_util::AsyncSequencer;
41+
using ::google::cloud::testing_util::StatusIs;
42+
using ::google::protobuf::TextFormat;
43+
44+
using AsyncBidiWriteObjectStream = ::google::cloud::AsyncStreamingReadWriteRpc<
45+
google::storage::v2::BidiWriteObjectRequest,
46+
google::storage::v2::BidiWriteObjectResponse>;
47+
48+
// A test fixture for the appendable upload tests.
49+
class AsyncConnectionImplAppendableTest : public ::testing::Test {};
50+
51+
// Common options for all tests.
52+
auto TestOptions(Options options = {}) {
53+
using ms = std::chrono::milliseconds;
54+
options = internal::MergeOptions(
55+
std::move(options),
56+
Options{}
57+
.set<GrpcNumChannelsOption>(1)
58+
.set<storage_experimental::AsyncRetryPolicyOption>(
59+
storage_experimental::LimitedErrorCountRetryPolicy(2).clone())
60+
.set<storage::BackoffPolicyOption>(
61+
storage::ExponentialBackoffPolicy(ms(1), ms(2), 2.0).clone()));
62+
return DefaultOptionsAsync(std::move(options));
63+
}
64+
65+
// Creates a test connection with a mock stub.
66+
std::shared_ptr<storage_experimental::AsyncConnection> MakeTestConnection(
67+
CompletionQueue cq, std::shared_ptr<storage::testing::MockStorageStub> mock,
68+
Options options = {}) {
69+
return MakeAsyncConnection(std::move(cq), std::move(mock),
70+
TestOptions(std::move(options)));
71+
}
72+
73+
// Creates a mock bidirectional stream that simulates a successful append flow.
74+
std::unique_ptr<AsyncBidiWriteObjectStream> MakeSuccessfulAppendStream(
75+
AsyncSequencer<bool>& sequencer, std::int64_t persisted_size) {
76+
auto stream = std::make_unique<MockAsyncBidiWriteObjectStream>();
77+
EXPECT_CALL(*stream, Start).WillOnce([&] {
78+
return sequencer.PushBack("Start");
79+
});
80+
// The first write is a "state lookup" write. It should not contain a payload.
81+
// The server responds with the current persisted size of the object.
82+
EXPECT_CALL(*stream, Write)
83+
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request,
84+
grpc::WriteOptions wopt) {
85+
EXPECT_TRUE(request.state_lookup());
86+
EXPECT_FALSE(wopt.is_last_message());
87+
return sequencer.PushBack("Write(StateLookup)");
88+
})
89+
// Subsequent writes carry data.
90+
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest const&,
91+
grpc::WriteOptions wopt) {
92+
EXPECT_FALSE(wopt.is_last_message());
93+
return sequencer.PushBack("Write(data)");
94+
})
95+
// The finalize write marks the end of the stream.
96+
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request,
97+
grpc::WriteOptions wopt) {
98+
EXPECT_TRUE(request.finish_write());
99+
EXPECT_TRUE(wopt.is_last_message());
100+
return sequencer.PushBack("Write(Finalize)");
101+
});
102+
103+
// The first `Read()` call after the state lookup confirms the persisted size.
104+
EXPECT_CALL(*stream, Read)
105+
.WillOnce([&, persisted_size] {
106+
return sequencer.PushBack("Read(PersistedSize)")
107+
.then([persisted_size](auto) {
108+
auto response = google::storage::v2::BidiWriteObjectResponse{};
109+
response.mutable_resource()->set_size(persisted_size);
110+
return absl::make_optional(std::move(response));
111+
});
112+
})
113+
// The second `Read()` call, after the final write, returns the full
114+
// object metadata.
115+
.WillOnce([&, persisted_size] {
116+
return sequencer.PushBack("Read(FinalObject)")
117+
.then([persisted_size](auto) {
118+
auto response = google::storage::v2::BidiWriteObjectResponse{};
119+
response.mutable_resource()->set_bucket(
120+
"projects/_/buckets/test-bucket");
121+
response.mutable_resource()->set_name("test-object");
122+
// The final size should be greater than the persisted size.
123+
response.mutable_resource()->set_size(persisted_size + 1024);
124+
return absl::make_optional(std::move(response));
125+
});
126+
});
127+
128+
EXPECT_CALL(*stream, Cancel).Times(1);
129+
EXPECT_CALL(*stream, Finish).WillOnce([&] {
130+
return sequencer.PushBack("Finish").then([](auto) { return Status{}; });
131+
});
132+
133+
return std::unique_ptr<AsyncBidiWriteObjectStream>(std::move(stream));
134+
}
135+
136+
// Creates a mock stream that returns an error.
137+
std::unique_ptr<AsyncBidiWriteObjectStream> MakeErrorBidiWriteStream(
138+
AsyncSequencer<bool>& sequencer, Status const& status) {
139+
auto stream = std::make_unique<MockAsyncBidiWriteObjectStream>();
140+
EXPECT_CALL(*stream, Start).WillOnce([&] {
141+
return sequencer.PushBack("Start");
142+
});
143+
EXPECT_CALL(*stream, Finish).WillOnce([&, status] {
144+
return sequencer.PushBack("Finish").then([status](auto) { return status; });
145+
});
146+
return std::unique_ptr<AsyncBidiWriteObjectStream>(std::move(stream));
147+
}
148+
149+
TEST_F(AsyncConnectionImplAppendableTest, StartAppendableObjectUploadSuccess) {
150+
auto constexpr kRequestText = R"pb(
151+
write_object_spec {
152+
resource {
153+
bucket: "projects/_/buckets/test-bucket"
154+
name: "test-object"
155+
content_type: "text/plain"
156+
}
157+
}
158+
)pb";
159+
AsyncSequencer<bool> sequencer;
160+
auto mock = std::make_shared<storage::testing::MockStorageStub>();
161+
162+
// Simulate one transient failure, followed by a success.
163+
EXPECT_CALL(*mock, AsyncBidiWriteObject)
164+
.WillOnce(
165+
[&] { return MakeErrorBidiWriteStream(sequencer, TransientError()); })
166+
.WillOnce([&] { return MakeSuccessfulAppendStream(sequencer, 0); });
167+
168+
internal::AutomaticallyCreatedBackgroundThreads pool(1);
169+
auto connection = MakeTestConnection(pool.cq(), mock);
170+
171+
auto request = google::storage::v2::BidiWriteObjectRequest{};
172+
ASSERT_TRUE(TextFormat::ParseFromString(kRequestText, &request));
173+
auto pending = connection->StartAppendableObjectUpload(
174+
{std::move(request), connection->options()});
175+
176+
// First attempt fails.
177+
auto next = sequencer.PopFrontWithName();
178+
EXPECT_EQ(next.second, "Start");
179+
next.first.set_value(false); // The stream fails to start.
180+
181+
next = sequencer.PopFrontWithName();
182+
EXPECT_EQ(next.second, "Finish");
183+
// Fulfill the promise. The future will complete with the TransientError
184+
// provided in the mock setup, which the retry loop will handle.
185+
next.first.set_value(true);
186+
187+
// Retry attempt succeeds.
188+
next = sequencer.PopFrontWithName();
189+
EXPECT_EQ(next.second, "Start");
190+
next.first.set_value(true);
191+
192+
next = sequencer.PopFrontWithName();
193+
EXPECT_EQ(next.second, "Write(StateLookup)");
194+
next.first.set_value(true);
195+
196+
next = sequencer.PopFrontWithName();
197+
EXPECT_EQ(next.second, "Read(PersistedSize)");
198+
next.first.set_value(true);
199+
200+
auto r = pending.get();
201+
ASSERT_STATUS_OK(r);
202+
auto writer = *std::move(r);
203+
EXPECT_EQ(absl::get<std::int64_t>(writer->PersistedState()), 0);
204+
205+
// Write some data.
206+
// An empty payload might be a no-op in the implementation, which would
207+
// prevent the mock from being triggered and cause the sequencer to hang.
208+
// We provide a non-empty payload to ensure the Write RPC is sent.
209+
auto w1 = writer->Write(storage_experimental::WritePayload("some data"));
210+
next = sequencer.PopFrontWithName();
211+
EXPECT_EQ(next.second, "Write(data)");
212+
next.first.set_value(true);
213+
EXPECT_STATUS_OK(w1.get());
214+
215+
// Finalize the upload.
216+
auto w2 = writer->Finalize({});
217+
next = sequencer.PopFrontWithName();
218+
EXPECT_EQ(next.second, "Write(Finalize)");
219+
next.first.set_value(true);
220+
next = sequencer.PopFrontWithName();
221+
EXPECT_EQ(next.second, "Read(FinalObject)");
222+
next.first.set_value(true);
223+
224+
auto response = w2.get();
225+
ASSERT_STATUS_OK(response);
226+
EXPECT_EQ(response->bucket(), "projects/_/buckets/test-bucket");
227+
EXPECT_EQ(response->name(), "test-object");
228+
EXPECT_EQ(response->size(), 1024);
229+
230+
writer.reset();
231+
next = sequencer.PopFrontWithName();
232+
EXPECT_EQ(next.second, "Finish");
233+
next.first.set_value(true);
234+
}
235+
236+
TEST_F(AsyncConnectionImplAppendableTest, ResumeAppendableObjectUploadSuccess) {
237+
auto constexpr kRequestText = R"pb(
238+
append_object_spec { object: "test-object" }
239+
)pb";
240+
AsyncSequencer<bool> sequencer;
241+
auto mock = std::make_shared<storage::testing::MockStorageStub>();
242+
243+
// In a resume, the server should report the already persisted size.
244+
// We'll simulate 16384 bytes are already uploaded.
245+
constexpr std::int64_t kPersistedSize = 16384;
246+
EXPECT_CALL(*mock, AsyncBidiWriteObject).WillOnce([&] {
247+
return MakeSuccessfulAppendStream(sequencer, kPersistedSize);
248+
});
249+
250+
internal::AutomaticallyCreatedBackgroundThreads pool(1);
251+
auto connection = MakeTestConnection(pool.cq(), mock);
252+
253+
auto request = google::storage::v2::BidiWriteObjectRequest{};
254+
ASSERT_TRUE(TextFormat::ParseFromString(kRequestText, &request));
255+
auto pending = connection->ResumeAppendableObjectUpload(
256+
{std::move(request), connection->options()});
257+
258+
// The stream starts, performs state lookup, and reports the persisted size.
259+
auto next = sequencer.PopFrontWithName();
260+
EXPECT_EQ(next.second, "Start");
261+
next.first.set_value(true);
262+
next = sequencer.PopFrontWithName();
263+
EXPECT_EQ(next.second, "Write(StateLookup)");
264+
next.first.set_value(true);
265+
next = sequencer.PopFrontWithName();
266+
EXPECT_EQ(next.second, "Read(PersistedSize)");
267+
next.first.set_value(true);
268+
269+
auto r = pending.get();
270+
ASSERT_STATUS_OK(r);
271+
auto writer = *std::move(r);
272+
// Verify the persisted state is correctly reported.
273+
EXPECT_EQ(absl::get<std::int64_t>(writer->PersistedState()), kPersistedSize);
274+
275+
auto w1 = writer->Write(storage_experimental::WritePayload("some more data"));
276+
next = sequencer.PopFrontWithName();
277+
EXPECT_EQ(next.second, "Write(data)");
278+
next.first.set_value(true);
279+
EXPECT_STATUS_OK(w1.get());
280+
281+
// Finalize the upload.
282+
auto w2 = writer->Finalize({});
283+
next = sequencer.PopFrontWithName();
284+
EXPECT_EQ(next.second, "Write(Finalize)");
285+
next.first.set_value(true);
286+
next = sequencer.PopFrontWithName();
287+
EXPECT_EQ(next.second, "Read(FinalObject)");
288+
next.first.set_value(true);
289+
290+
auto response = w2.get();
291+
ASSERT_STATUS_OK(response);
292+
EXPECT_EQ(response->size(), kPersistedSize + 1024);
293+
294+
writer.reset();
295+
next = sequencer.PopFrontWithName();
296+
EXPECT_EQ(next.second, "Finish");
297+
next.first.set_value(true);
298+
}
299+
300+
TEST_F(AsyncConnectionImplAppendableTest, AppendableUploadTooManyTransients) {
301+
AsyncSequencer<bool> sequencer;
302+
auto mock = std::make_shared<storage::testing::MockStorageStub>();
303+
// The retry policy is configured for 3 attempts total.
304+
EXPECT_CALL(*mock, AsyncBidiWriteObject).Times(3).WillRepeatedly([&] {
305+
return MakeErrorBidiWriteStream(sequencer, TransientError());
306+
});
307+
308+
internal::AutomaticallyCreatedBackgroundThreads pool(1);
309+
auto connection = MakeTestConnection(pool.cq(), mock);
310+
auto pending = connection->StartAppendableObjectUpload(
311+
{google::storage::v2::BidiWriteObjectRequest{}, connection->options()});
312+
313+
for (int i = 0; i != 3; ++i) {
314+
auto next = sequencer.PopFrontWithName();
315+
EXPECT_EQ(next.second, "Start");
316+
next.first.set_value(false);
317+
318+
next = sequencer.PopFrontWithName();
319+
EXPECT_EQ(next.second, "Finish");
320+
next.first.set_value(true);
321+
}
322+
323+
auto r = pending.get();
324+
EXPECT_THAT(r, StatusIs(TransientError().code()));
325+
}
326+
327+
TEST_F(AsyncConnectionImplAppendableTest, AppendableUploadPermanentError) {
328+
AsyncSequencer<bool> sequencer;
329+
auto mock = std::make_shared<storage::testing::MockStorageStub>();
330+
EXPECT_CALL(*mock, AsyncBidiWriteObject).WillOnce([&] {
331+
return MakeErrorBidiWriteStream(sequencer, PermanentError());
332+
});
333+
334+
internal::AutomaticallyCreatedBackgroundThreads pool(1);
335+
auto connection = MakeTestConnection(pool.cq(), mock);
336+
auto pending = connection->StartAppendableObjectUpload(
337+
{google::storage::v2::BidiWriteObjectRequest{}, connection->options()});
338+
339+
auto next = sequencer.PopFrontWithName();
340+
EXPECT_EQ(next.second, "Start");
341+
next.first.set_value(false);
342+
343+
next = sequencer.PopFrontWithName();
344+
EXPECT_EQ(next.second, "Finish");
345+
next.first.set_value(true);
346+
347+
auto r = pending.get();
348+
EXPECT_THAT(r, StatusIs(PermanentError().code()));
349+
}
350+
351+
} // namespace
352+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
353+
} // namespace storage_internal
354+
} // namespace cloud
355+
} // namespace google

google/cloud/storage/storage_client_grpc_unit_tests.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ storage_client_grpc_unit_tests = [
2929
"async/token_test.cc",
3030
"async/writer_test.cc",
3131
"grpc_plugin_test.cc",
32+
"internal/async/connection_impl_appendable_upload_test.cc",
3233
"internal/async/connection_impl_insert_test.cc",
3334
"internal/async/connection_impl_open_test.cc",
3435
"internal/async/connection_impl_read_hash_test.cc",

0 commit comments

Comments
 (0)