Skip to content

Commit e6692f8

Browse files
authored
feat(bigtable): introduce PartialResultSetSource (#15598)
* feat(bigtable): introduce PartialResultSetSource to read streaming data.
1 parent 393d5a9 commit e6692f8

10 files changed

+829
-2
lines changed

google/cloud/bigtable/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,9 @@ add_library(
205205
internal/operation_context.h
206206
internal/operation_context_factory.cc
207207
internal/operation_context_factory.h
208+
internal/partial_result_set_reader.h
209+
internal/partial_result_set_source.cc
210+
internal/partial_result_set_source.h
208211
internal/prefix_range_end.cc
209212
internal/prefix_range_end.h
210213
internal/rate_limiter.cc
@@ -389,6 +392,7 @@ if (BUILD_TESTING)
389392
testing/mock_data_client.h
390393
testing/mock_mutate_rows_limiter.h
391394
testing/mock_mutate_rows_reader.h
395+
testing/mock_partial_result_set_reader.h
392396
testing/mock_policies.h
393397
testing/mock_read_rows_reader.h
394398
testing/mock_response_reader.h
@@ -470,6 +474,7 @@ if (BUILD_TESTING)
470474
internal/mutate_rows_limiter_test.cc
471475
internal/operation_context_factory_test.cc
472476
internal/operation_context_test.cc
477+
internal/partial_result_set_source_test.cc
473478
internal/prefix_range_end_test.cc
474479
internal/rate_limiter_test.cc
475480
internal/retry_traits_test.cc

google/cloud/bigtable/bigtable_client_testing.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ bigtable_client_testing_hdrs = [
2525
"testing/mock_data_client.h",
2626
"testing/mock_mutate_rows_limiter.h",
2727
"testing/mock_mutate_rows_reader.h",
28+
"testing/mock_partial_result_set_reader.h",
2829
"testing/mock_policies.h",
2930
"testing/mock_read_rows_reader.h",
3031
"testing/mock_response_reader.h",

google/cloud/bigtable/bigtable_client_unit_tests.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ bigtable_client_unit_tests = [
6464
"internal/mutate_rows_limiter_test.cc",
6565
"internal/operation_context_factory_test.cc",
6666
"internal/operation_context_test.cc",
67+
"internal/partial_result_set_source_test.cc",
6768
"internal/prefix_range_end_test.cc",
6869
"internal/rate_limiter_test.cc",
6970
"internal/retry_traits_test.cc",

google/cloud/bigtable/google_cloud_cpp_bigtable.bzl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ google_cloud_cpp_bigtable_hdrs = [
101101
"internal/mutate_rows_limiter.h",
102102
"internal/operation_context.h",
103103
"internal/operation_context_factory.h",
104+
"internal/partial_result_set_reader.h",
105+
"internal/partial_result_set_source.h",
104106
"internal/prefix_range_end.h",
105107
"internal/rate_limiter.h",
106108
"internal/readrowsparser.h",
@@ -211,6 +213,7 @@ google_cloud_cpp_bigtable_srcs = [
211213
"internal/mutate_rows_limiter.cc",
212214
"internal/operation_context.cc",
213215
"internal/operation_context_factory.cc",
216+
"internal/partial_result_set_source.cc",
214217
"internal/prefix_range_end.cc",
215218
"internal/rate_limiter.cc",
216219
"internal/readrowsparser.cc",
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_PARTIAL_RESULT_SET_READER_H
16+
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_PARTIAL_RESULT_SET_READER_H
17+
18+
#include "google/cloud/status.h"
19+
#include <google/bigtable/v2/data.pb.h>
20+
21+
namespace google {
22+
namespace cloud {
23+
namespace bigtable_internal {
24+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
25+
26+
/**
27+
* The result of a successful `PartialResultSetReader::Read()`, which may
28+
* be the next partial result of a stream, or a resumption of an interrupted
29+
* stream from the `resume_token` if it was engaged. In the latter case, the
30+
* caller should discard any pending state not covered by the token, as that
31+
* data will be replayed.
32+
*/
33+
struct UnownedPartialResultSet {
34+
static UnownedPartialResultSet FromPartialResultSet(
35+
google::bigtable::v2::PartialResultSet& result) {
36+
return UnownedPartialResultSet{result, false};
37+
}
38+
39+
google::bigtable::v2::PartialResultSet& result;
40+
bool resumption;
41+
};
42+
43+
/**
44+
* Wrap `grpc::ClientReaderInterface<google::bigtable::v2::PartialResultSet>`.
45+
*
46+
* This defines an interface to handle a streaming RPC returning a sequence
47+
* of `google::bigtable::v2::PartialResultSet`. Its main purpose is to
48+
* simplify memory management, as each streaming RPC requires two separate
49+
* `std::unique_ptr<>`. As a side-effect, it is also easier to mock as it
50+
* has a narrower interface.
51+
*/
52+
class PartialResultSetReader {
53+
public:
54+
virtual ~PartialResultSetReader() = default;
55+
virtual void TryCancel() = 0;
56+
virtual bool Read(absl::optional<std::string> const& resume_token,
57+
UnownedPartialResultSet& result) = 0;
58+
virtual Status Finish() = 0;
59+
};
60+
61+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
62+
} // namespace bigtable_internal
63+
} // namespace cloud
64+
} // namespace google
65+
66+
#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_PARTIAL_RESULT_SET_READER_H
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "google/cloud/bigtable/internal/partial_result_set_source.h"
16+
#include "google/cloud/bigtable/options.h"
17+
#include "google/cloud/internal/absl_str_cat_quiet.h"
18+
#include "google/cloud/internal/make_status.h"
19+
#include "google/cloud/log.h"
20+
#include "absl/types/optional.h"
21+
22+
namespace google {
23+
namespace cloud {
24+
namespace bigtable_internal {
25+
26+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
27+
28+
StatusOr<std::unique_ptr<bigtable::ResultSourceInterface>>
29+
PartialResultSetSource::Create(
30+
absl::optional<google::bigtable::v2::ResultSetMetadata> metadata,
31+
std::unique_ptr<PartialResultSetReader> reader) {
32+
std::unique_ptr<PartialResultSetSource> source(
33+
new PartialResultSetSource(std::move(metadata), std::move(reader)));
34+
35+
// Do an initial read from the stream to determine the fate of the factory.
36+
auto status = source->ReadFromStream();
37+
38+
// If the initial read finished the stream, and `Finish()` failed, then
39+
// creating the `PartialResultSetSource` should fail with the same error.
40+
if (source->state_ == State::kFinished && !status.ok()) return status;
41+
42+
return {std::move(source)};
43+
}
44+
45+
PartialResultSetSource::PartialResultSetSource(
46+
absl::optional<google::bigtable::v2::ResultSetMetadata> metadata,
47+
std::unique_ptr<PartialResultSetReader> reader)
48+
: options_(internal::CurrentOptions()),
49+
reader_(std::move(reader)),
50+
metadata_(std::move(metadata)) {
51+
if (metadata_.has_value()) {
52+
columns_ = std::make_shared<std::vector<std::string>>();
53+
columns_->reserve(metadata_->proto_schema().columns_size());
54+
for (auto const& c : metadata_->proto_schema().columns()) {
55+
columns_->push_back(c.name());
56+
}
57+
}
58+
}
59+
60+
PartialResultSetSource::~PartialResultSetSource() {
61+
internal::OptionsSpan span(options_);
62+
if (state_ == State::kReading) {
63+
// Finish() can deadlock if there is still data in the streaming RPC,
64+
// so before trying to read the final status we need to cancel.
65+
reader_->TryCancel();
66+
state_ = State::kEndOfStream;
67+
}
68+
if (state_ == State::kEndOfStream) {
69+
// The user didn't iterate over all the data, so finish the stream on
70+
// their behalf, although we have no way to communicate error status.
71+
auto status = reader_->Finish();
72+
if (!status.ok() && status.code() != StatusCode::kCancelled) {
73+
GCP_LOG(WARNING)
74+
<< "PartialResultSetSource: Finish() failed in destructor: "
75+
<< status;
76+
}
77+
state_ = State::kFinished;
78+
}
79+
}
80+
81+
StatusOr<bigtable::QueryRow> PartialResultSetSource::NextRow() {
82+
while (rows_.empty()) {
83+
if (state_ == State::kFinished) return bigtable::QueryRow();
84+
internal::OptionsSpan span(options_);
85+
// Continue fetching if there are more rows in the stream.
86+
auto status = ReadFromStream();
87+
if (!status.ok()) return status;
88+
}
89+
// Returns the row at the front of the queue
90+
auto row = std::move(rows_.front());
91+
rows_.pop_front();
92+
return row;
93+
}
94+
95+
Status PartialResultSetSource::ReadFromStream() {
96+
if (state_ == State::kFinished) {
97+
return internal::InternalError("PartialResultSetSource already finished",
98+
GCP_ERROR_INFO());
99+
}
100+
// The application should consume rows_ before calling ReadFromStream again.
101+
if (!rows_.empty()) {
102+
return internal::InternalError("PartialResultSetSource has unconsumed rows",
103+
GCP_ERROR_INFO());
104+
}
105+
106+
auto* raw_result_set =
107+
google::protobuf::Arena::Create<google::bigtable::v2::PartialResultSet>(
108+
&arena_);
109+
auto result_set =
110+
UnownedPartialResultSet::FromPartialResultSet(*raw_result_set);
111+
112+
// The resume_token_ member holds the token from the previous
113+
// PartialResultSet. It's empty on the first call.
114+
if (reader_->Read(resume_token_, result_set)) {
115+
return ProcessDataFromStream(result_set.result);
116+
}
117+
state_ = State::kFinished;
118+
// The buffered_rows_ is expected to be empty because the last successful
119+
// read would have had a sentinel resume_token, causing
120+
// ProcessDataFromStream to commit them.
121+
if (!buffered_rows_.empty()) {
122+
return internal::InternalError("Stream ended with uncommitted rows.",
123+
GCP_ERROR_INFO());
124+
}
125+
return reader_->Finish();
126+
}
127+
128+
Status PartialResultSetSource::ProcessDataFromStream(
129+
google::bigtable::v2::PartialResultSet& result) {
130+
// If the `reset` is true then all the data buffered since the last
131+
// resume_token should be discarded.
132+
if (result.reset()) {
133+
read_buffer_.clear();
134+
buffered_rows_.clear();
135+
}
136+
137+
// Reserve space of the buffer at the start of a new batch of data.
138+
if (result.estimated_batch_size() > 0 && read_buffer_.empty()) {
139+
read_buffer_.reserve(result.estimated_batch_size());
140+
}
141+
142+
if (result.has_proto_rows_batch()) {
143+
absl::StrAppend(&read_buffer_, result.proto_rows_batch().batch_data());
144+
}
145+
146+
// TODO(#15617): Validate that the checksum matches the contents of `buffer`.
147+
if (result.has_batch_checksum() && !read_buffer_.empty()) {
148+
if (proto_rows_.ParseFromString(read_buffer_)) {
149+
auto status = BufferProtoRows();
150+
proto_rows_.Clear();
151+
if (!status.ok()) return status;
152+
} else {
153+
read_buffer_.clear();
154+
buffered_rows_.clear();
155+
return internal::InternalError("Failed to parse ProtoRows from buffer");
156+
}
157+
}
158+
159+
// Buffered rows in buffered_rows_ are ready to be committed into rows_
160+
// once the resume_token is received.
161+
if (!result.resume_token().empty()) {
162+
rows_.insert(rows_.end(), buffered_rows_.begin(), buffered_rows_.end());
163+
buffered_rows_.clear();
164+
read_buffer_.clear();
165+
resume_token_ = result.resume_token();
166+
}
167+
return {}; // OK
168+
}
169+
170+
Status PartialResultSetSource::BufferProtoRows() {
171+
if (metadata_.has_value()) {
172+
auto const& proto_schema = metadata_->proto_schema();
173+
auto const columns_size = proto_schema.columns_size();
174+
auto const& proto_values = proto_rows_.values();
175+
176+
if (columns_size == 0 && !proto_values.empty()) {
177+
return internal::InternalError(
178+
"ProtoRows has values but the schema has no columns.",
179+
GCP_ERROR_INFO());
180+
}
181+
if (proto_values.size() % columns_size != 0) {
182+
return internal::InternalError(
183+
"The number of values in ProtoRows is not a multiple of the "
184+
"number of columns in the schema.",
185+
GCP_ERROR_INFO());
186+
}
187+
188+
auto parsed_value = proto_values.begin();
189+
std::vector<bigtable::Value> values;
190+
values.reserve(columns_size);
191+
192+
while (parsed_value != proto_values.end()) {
193+
for (auto const& column : proto_schema.columns()) {
194+
auto value = FromProto(column.type(), *parsed_value);
195+
values.push_back(std::move(value));
196+
++parsed_value;
197+
}
198+
buffered_rows_.push_back(
199+
QueryRowFriend::MakeQueryRow(std::move(values), columns_));
200+
values.clear();
201+
}
202+
}
203+
return {};
204+
}
205+
206+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
207+
} // namespace bigtable_internal
208+
} // namespace cloud
209+
} // namespace google

0 commit comments

Comments
 (0)