Skip to content

Commit 01eefca

Browse files
authored
feat(bigtable): ensure that buffer is cleaned after being parsed. (#15646)
* feat(bigtable): ensure that the buffer is cleaned after being parsed
1 parent a5d8a1d commit 01eefca

File tree

2 files changed

+166
-75
lines changed

2 files changed

+166
-75
lines changed

google/cloud/bigtable/internal/partial_result_set_resume_test.cc

Lines changed: 165 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,39 @@ MATCHER_P(IsValidAndEquals, expected,
6262
return arg && *arg == expected;
6363
}
6464

65+
auto constexpr kResultMetadataText = R"pb(
66+
proto_schema {
67+
columns {
68+
name: "TestColumn"
69+
type { string_type {} }
70+
}
71+
}
72+
)pb";
73+
74+
void MakeResponse(google::bigtable::v2::PartialResultSet& response,
75+
std::vector<std::string> const& values,
76+
absl::optional<std::string> resume_token, bool reset) {
77+
google::bigtable::v2::ProtoRows proto_rows;
78+
for (auto const& v : values) {
79+
proto_rows.add_values()->set_string_value(v);
80+
}
81+
std::string binary_batch_data = proto_rows.SerializeAsString();
82+
auto text = absl::Substitute(
83+
R"pb(
84+
proto_rows_batch: {
85+
batch_data: "$0",
86+
},
87+
$1 $2
88+
estimated_batch_size: 31,
89+
batch_checksum: 123456
90+
)pb",
91+
binary_batch_data,
92+
resume_token ? absl::Substitute(R"(resume_token: "$0")", *resume_token)
93+
: "",
94+
reset ? "reset: true," : "");
95+
ASSERT_TRUE(TextFormat::ParseFromString(text, &response));
96+
}
97+
6598
// Helper function for MockPartialResultSetReader::Read to return true and
6699
// populate result
67100
auto ReadAction(google::bigtable::v2::PartialResultSet& response_proto,
@@ -123,31 +156,11 @@ TEST(PartialResultSetResume, Success) {
123156
}
124157

125158
TEST(PartialResultSetResume, SuccessWithRestart) {
126-
auto make_response = [](std::vector<std::string> const& values,
127-
std::string const& resume_token) {
128-
google::bigtable::v2::PartialResultSet response;
129-
google::bigtable::v2::ProtoRows proto_rows;
130-
for (auto const& v : values) {
131-
proto_rows.add_values()->set_string_value(v);
132-
}
133-
std::string binary_batch_data = proto_rows.SerializeAsString();
134-
auto text = absl::Substitute(
135-
R"pb(
136-
proto_rows_batch: {
137-
batch_data: "$0",
138-
},
139-
resume_token: "$1",
140-
reset: true,
141-
estimated_batch_size: 31,
142-
batch_checksum: 123456
143-
)pb",
144-
binary_batch_data, resume_token);
145-
EXPECT_TRUE(TextFormat::ParseFromString(text, &response));
146-
return response;
147-
};
159+
google::bigtable::v2::PartialResultSet r12;
160+
google::bigtable::v2::PartialResultSet r34;
148161

149-
auto r12 = make_response({"value-1", "value-2"}, "resume-after-2");
150-
auto r34 = make_response({"value-3", "value-4"}, "resume-after-4");
162+
MakeResponse(r12, {"value-1", "value-2"}, "resume-after-2", true);
163+
MakeResponse(r34, {"value-3", "value-4"}, "resume-after-4", true);
151164

152165
MockFactory mock_factory;
153166
EXPECT_CALL(mock_factory, MakeReader)
@@ -195,30 +208,8 @@ TEST(PartialResultSetResume, SuccessWithRestart) {
195208
}
196209

197210
TEST(PartialResultSetResume, PermanentError) {
198-
auto make_response = [](std::vector<std::string> const& values,
199-
std::string const& resume_token) {
200-
google::bigtable::v2::PartialResultSet response;
201-
google::bigtable::v2::ProtoRows proto_rows;
202-
for (auto const& v : values) {
203-
proto_rows.add_values()->set_string_value(v);
204-
}
205-
std::string binary_batch_data = proto_rows.SerializeAsString();
206-
auto text = absl::Substitute(
207-
R"pb(
208-
proto_rows_batch: {
209-
batch_data: "$0",
210-
},
211-
resume_token: "$1",
212-
reset: true,
213-
estimated_batch_size: 31,
214-
batch_checksum: 123456
215-
)pb",
216-
binary_batch_data, resume_token);
217-
EXPECT_TRUE(TextFormat::ParseFromString(text, &response));
218-
return response;
219-
};
220-
221-
auto r12 = make_response({"value-1", "value-2"}, "resume-after-2");
211+
google::bigtable::v2::PartialResultSet r12;
212+
MakeResponse(r12, {"value-1", "value-2"}, "resume-after-2", true);
222213

223214
MockFactory mock_factory;
224215
EXPECT_CALL(mock_factory, MakeReader)
@@ -256,30 +247,8 @@ TEST(PartialResultSetResume, PermanentError) {
256247
}
257248

258249
TEST(PartialResultSetResume, TransientNonIdempotent) {
259-
auto make_response = [](std::vector<std::string> const& values,
260-
std::string const& resume_token) {
261-
google::bigtable::v2::PartialResultSet response;
262-
google::bigtable::v2::ProtoRows proto_rows;
263-
for (auto const& v : values) {
264-
proto_rows.add_values()->set_string_value(v);
265-
}
266-
std::string binary_batch_data = proto_rows.SerializeAsString();
267-
auto text = absl::Substitute(
268-
R"pb(
269-
proto_rows_batch: {
270-
batch_data: "$0",
271-
},
272-
resume_token: "$1",
273-
reset: true,
274-
estimated_batch_size: 31,
275-
batch_checksum: 123456
276-
)pb",
277-
binary_batch_data, resume_token);
278-
EXPECT_TRUE(TextFormat::ParseFromString(text, &response));
279-
return response;
280-
};
281-
282-
auto r12 = make_response({"value-1", "value-2"}, "resume-after-2");
250+
google::bigtable::v2::PartialResultSet r12;
251+
MakeResponse(r12, {"value-1", "value-2"}, "resume-after-2", true);
283252

284253
MockFactory mock_factory;
285254
EXPECT_CALL(mock_factory, MakeReader)
@@ -333,6 +302,130 @@ TEST(PartialResultSetResume, TooManyTransients) {
333302
StatusIs(StatusCode::kUnavailable, HasSubstr("Try again")));
334303
}
335304

305+
TEST(PartialResultSetResume, ResumptionStart) {
306+
google::bigtable::v2::ResultSetMetadata metadata;
307+
ASSERT_TRUE(TextFormat::ParseFromString(kResultMetadataText, &metadata));
308+
309+
std::vector<google::bigtable::v2::PartialResultSet> response;
310+
google::bigtable::v2::PartialResultSet r12;
311+
google::bigtable::v2::PartialResultSet r34;
312+
google::bigtable::v2::PartialResultSet r56;
313+
MakeResponse(r12, {"value-1", "value-2"}, absl::nullopt, true);
314+
MakeResponse(r34, {"value-3", "value-4"}, absl::nullopt, false);
315+
MakeResponse(r56, {"value-5", "value-6"}, "end-of-stream", false);
316+
response.push_back(r12);
317+
response.push_back(r34);
318+
response.push_back(r56);
319+
320+
MockFactory mock_factory;
321+
EXPECT_CALL(mock_factory, MakeReader)
322+
.WillOnce([&response](std::string const& token) {
323+
EXPECT_TRUE(token.empty());
324+
auto mock = std::make_unique<MockPartialResultSetReader>();
325+
EXPECT_CALL(*mock, Read(_, _))
326+
.WillOnce(ReadAction(response[0], false))
327+
.WillOnce(ReadAction(response[1], false))
328+
.WillOnce(Return(false));
329+
EXPECT_CALL(*mock, TryCancel()).Times(0);
330+
EXPECT_CALL(*mock, Finish())
331+
.WillOnce(Return(Status(StatusCode::kUnavailable, "Try again")));
332+
return mock;
333+
})
334+
.WillOnce([&response](std::string const& token) {
335+
EXPECT_TRUE(token.empty());
336+
auto mock = std::make_unique<MockPartialResultSetReader>();
337+
EXPECT_CALL(*mock, Read(_, _))
338+
.WillOnce(ReadAction(response[0], false))
339+
.WillOnce(ReadAction(response[1], false))
340+
.WillOnce(ReadAction(response[2], false))
341+
.WillOnce(Return(false));
342+
EXPECT_CALL(*mock, TryCancel()).Times(0);
343+
EXPECT_CALL(*mock, Finish()).WillOnce(Return(Status()));
344+
return mock;
345+
});
346+
347+
auto factory = [&mock_factory](std::string const& token) {
348+
return mock_factory.MakeReader(token);
349+
};
350+
auto grpc_reader = MakeTestResume(factory, Idempotency::kIdempotent);
351+
auto reader =
352+
PartialResultSetSource::Create(metadata, std::move(grpc_reader));
353+
ASSERT_STATUS_OK(reader);
354+
355+
// Verify the returned rows are correct, despite the resumption from the
356+
// beginning of the stream after the transient error.
357+
for (auto const* s :
358+
{"value-1", "value-2", "value-3", "value-4", "value-5", "value-6"}) {
359+
auto row = (*reader)->NextRow();
360+
EXPECT_THAT(row, IsValidAndEquals(bigtable_mocks::MakeQueryRow(
361+
{{"TestColumn", bigtable::Value(s)}})));
362+
}
363+
364+
auto row = (*reader)->NextRow();
365+
EXPECT_THAT(row, IsValidAndEquals(bigtable::QueryRow{}));
366+
}
367+
368+
TEST(PartialResultSetResume, ResumptionMidway) {
369+
google::bigtable::v2::ResultSetMetadata metadata;
370+
ASSERT_TRUE(TextFormat::ParseFromString(kResultMetadataText, &metadata));
371+
372+
std::vector<google::bigtable::v2::PartialResultSet> response;
373+
google::bigtable::v2::PartialResultSet r12;
374+
google::bigtable::v2::PartialResultSet r34;
375+
google::bigtable::v2::PartialResultSet r56;
376+
MakeResponse(r12, {"value-1", "value-2"}, absl::nullopt, true);
377+
MakeResponse(r34, {"value-3", "value-4"}, "resume-after-4", false);
378+
MakeResponse(r56, {"value-5", "value-6"}, "end-of-stream", false);
379+
response.push_back(r12);
380+
response.push_back(r34);
381+
response.push_back(r56);
382+
383+
MockFactory mock_factory;
384+
EXPECT_CALL(mock_factory, MakeReader)
385+
.WillOnce([&response](std::string const& token) {
386+
EXPECT_TRUE(token.empty());
387+
auto mock = std::make_unique<MockPartialResultSetReader>();
388+
EXPECT_CALL(*mock, Read(_, _))
389+
.WillOnce(ReadAction(response[0], false))
390+
.WillOnce(ReadAction(response[1], false))
391+
.WillOnce(Return(false));
392+
EXPECT_CALL(*mock, TryCancel()).Times(0);
393+
EXPECT_CALL(*mock, Finish())
394+
.WillOnce(Return(Status(StatusCode::kUnavailable, "Try again")));
395+
return mock;
396+
})
397+
.WillOnce([&response](std::string const& token) {
398+
EXPECT_EQ("resume-after-4", token);
399+
auto mock = std::make_unique<MockPartialResultSetReader>();
400+
EXPECT_CALL(*mock, Read(_, _))
401+
.WillOnce(ReadAction(response[2], false))
402+
.WillOnce(Return(false));
403+
EXPECT_CALL(*mock, TryCancel()).Times(0);
404+
EXPECT_CALL(*mock, Finish()).WillOnce(Return(Status()));
405+
return mock;
406+
});
407+
408+
auto factory = [&mock_factory](std::string const& token) {
409+
return mock_factory.MakeReader(token);
410+
};
411+
auto grpc_reader = MakeTestResume(factory, Idempotency::kIdempotent);
412+
auto reader =
413+
PartialResultSetSource::Create(metadata, std::move(grpc_reader));
414+
ASSERT_STATUS_OK(reader);
415+
416+
// Verify the returned rows are correct, despite the resumption from a
417+
// midway point in the stream after the transient error.
418+
for (auto const* s :
419+
{"value-1", "value-2", "value-3", "value-4", "value-5", "value-6"}) {
420+
auto row = (*reader)->NextRow();
421+
EXPECT_THAT(row, IsValidAndEquals(bigtable_mocks::MakeQueryRow(
422+
{{"TestColumn", bigtable::Value(s)}})));
423+
}
424+
425+
auto row = (*reader)->NextRow();
426+
EXPECT_THAT(row, IsValidAndEquals(bigtable::QueryRow{}));
427+
}
428+
336429
} // namespace
337430
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
338431
} // namespace bigtable_internal

google/cloud/bigtable/internal/partial_result_set_source.cc

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,11 @@ StatusOr<bigtable::QueryRow> PartialResultSetSource::NextRow() {
9494

9595
Status PartialResultSetSource::ReadFromStream() {
9696
if (state_ == State::kFinished) {
97-
std::cout << "HELLO FINISHED: " << std::endl;
9897
return internal::InternalError("PartialResultSetSource already finished",
9998
GCP_ERROR_INFO());
10099
}
101100
// The application should consume rows_ before calling ReadFromStream again.
102101
if (!rows_.empty()) {
103-
std::cout << "HELLO row not empty: " << std::endl;
104102
return internal::InternalError("PartialResultSetSource has unconsumed rows",
105103
GCP_ERROR_INFO());
106104
}
@@ -121,7 +119,6 @@ Status PartialResultSetSource::ReadFromStream() {
121119
// read would have had a sentinel resume_token, causing
122120
// ProcessDataFromStream to commit them.
123121
if (!buffered_rows_.empty()) {
124-
std::cout << "BUFFERED ROWS NOT EMPTY: " << std::endl;
125122
return internal::InternalError("Stream ended with uncommitted rows.",
126123
GCP_ERROR_INFO());
127124
}
@@ -151,6 +148,7 @@ Status PartialResultSetSource::ProcessDataFromStream(
151148
if (proto_rows_.ParseFromString(read_buffer_)) {
152149
auto status = BufferProtoRows();
153150
proto_rows_.Clear();
151+
read_buffer_.clear();
154152
if (!status.ok()) return status;
155153
} else {
156154
read_buffer_.clear();

0 commit comments

Comments
 (0)