Skip to content

Commit fc57376

Browse files
authored
impl(bigtable): update DefaultRowReader to support new OperationContext (#15308)
1 parent 50cdd43 commit fc57376

File tree

4 files changed

+660
-61
lines changed

4 files changed

+660
-61
lines changed

google/cloud/bigtable/internal/data_connection_impl.cc

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,23 @@ inline bool enable_server_retries(Options const& options) {
5959
return options.get<EnableServerRetriesOption>();
6060
}
6161

62+
// This function allows for ReadRow and ReadRowsFull to provide an instance of
63+
// an OperationContext specific to that operation.
64+
bigtable::RowReader ReadRowsHelper(
65+
std::shared_ptr<BigtableStub>& stub,
66+
internal::ImmutableOptions const& current,
67+
bigtable::ReadRowsParams
68+
params, // NOLINT(performance-unnecessary-value-param)
69+
std::shared_ptr<OperationContext>
70+
operation_context) { // NOLINT(performance-unnecessary-value-param)
71+
auto impl = std::make_shared<DefaultRowReader>(
72+
stub, std::move(params.app_profile_id), std::move(params.table_name),
73+
std::move(params.row_set), params.rows_limit, std::move(params.filter),
74+
params.reverse, retry_policy(*current), backoff_policy(*current),
75+
enable_server_retries(*current), std::move(operation_context));
76+
return MakeRowReader(std::move(impl));
77+
}
78+
6279
} // namespace
6380

6481
bigtable::Row TransformReadModifyWriteRowResponse(
@@ -99,7 +116,8 @@ DataConnectionImpl::DataConnectionImpl(
99116
std::string client_uid =
100117
internal::Sample(gen, 16, "abcdefghijklmnopqrstuvwxyz0123456789");
101118
operation_context_factory_ =
102-
std::make_unique<MetricsOperationContextFactory>(std::move(client_uid));
119+
std::make_unique<MetricsOperationContextFactory>(std::move(client_uid),
120+
options_);
103121
} else {
104122
operation_context_factory_ =
105123
std::make_unique<SimpleOperationContextFactory>();
@@ -231,12 +249,9 @@ DataConnectionImpl::AsyncBulkApply(std::string const& table_name,
231249
bigtable::RowReader DataConnectionImpl::ReadRowsFull(
232250
bigtable::ReadRowsParams params) {
233251
auto current = google::cloud::internal::SaveCurrentOptions();
234-
auto impl = std::make_shared<DefaultRowReader>(
235-
stub_, std::move(params.app_profile_id), std::move(params.table_name),
236-
std::move(params.row_set), params.rows_limit, std::move(params.filter),
237-
params.reverse, retry_policy(*current), backoff_policy(*current),
238-
enable_server_retries(*current));
239-
return MakeRowReader(std::move(impl));
252+
auto operation_context = std::make_shared<OperationContext>();
253+
return ReadRowsHelper(stub_, current, std::move(params),
254+
std::move(operation_context));
240255
}
241256

242257
StatusOr<std::pair<bool, bigtable::Row>> DataConnectionImpl::ReadRow(
@@ -245,9 +260,16 @@ StatusOr<std::pair<bool, bigtable::Row>> DataConnectionImpl::ReadRow(
245260
auto current = google::cloud::internal::SaveCurrentOptions();
246261
bigtable::RowSet row_set(std::move(row_key));
247262
std::int64_t const rows_limit = 1;
248-
auto reader = ReadRowsFull(bigtable::ReadRowsParams{
249-
table_name, app_profile_id(*current), std::move(row_set), rows_limit,
250-
std::move(filter)});
263+
// TODO(sdhart): ensure OperationContext::OnDone is called correctly.
264+
// TODO(sdhart): add ReadRow tests once we call
265+
// OperationContextFactory::ReadRow to create the operation_context.
266+
auto operation_context = std::make_shared<OperationContext>();
267+
auto reader =
268+
ReadRowsHelper(stub_, current,
269+
bigtable::ReadRowsParams{
270+
table_name, app_profile_id(*current),
271+
std::move(row_set), rows_limit, std::move(filter)},
272+
std::move(operation_context));
251273

252274
auto it = reader.begin();
253275
if (it == reader.end()) return std::make_pair(false, bigtable::Row("", {}));

google/cloud/bigtable/internal/default_row_reader.cc

Lines changed: 39 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ DefaultRowReader::DefaultRowReader(
2929
bigtable::Filter filter, bool reverse,
3030
std::unique_ptr<bigtable::DataRetryPolicy> retry_policy,
3131
std::unique_ptr<BackoffPolicy> backoff_policy, bool enable_server_retries,
32-
Sleeper sleeper)
32+
std::shared_ptr<OperationContext> operation_context, Sleeper sleeper)
3333
: stub_(std::move(stub)),
3434
app_profile_id_(std::move(app_profile_id)),
3535
table_name_(std::move(table_name)),
@@ -40,7 +40,8 @@ DefaultRowReader::DefaultRowReader(
4040
retry_policy_(std::move(retry_policy)),
4141
backoff_policy_(std::move(backoff_policy)),
4242
enable_server_retries_(enable_server_retries),
43-
sleeper_(std::move(sleeper)) {}
43+
sleeper_(std::move(sleeper)),
44+
operation_context_(std::move(operation_context)) {}
4445

4546
void DefaultRowReader::MakeRequest() {
4647
response_ = {};
@@ -62,10 +63,11 @@ void DefaultRowReader::MakeRequest() {
6263
}
6364

6465
auto const& options = internal::CurrentOptions();
65-
context_ = std::make_shared<grpc::ClientContext>();
66-
internal::ConfigureContext(*context_, options);
67-
operation_context_.PreCall(*context_);
68-
stream_ = stub_->ReadRows(context_, options, request);
66+
client_context_ = std::make_shared<grpc::ClientContext>();
67+
internal::ConfigureContext(*client_context_, options);
68+
operation_context_->PreCall(*client_context_);
69+
called_post_call_ = false;
70+
stream_ = stub_->ReadRows(client_context_, options, request);
6971
stream_is_open_ = true;
7072

7173
parser_ = bigtable::internal::ReadRowsParserFactory().Create(reverse_);
@@ -78,9 +80,9 @@ bool DefaultRowReader::NextChunk() {
7880
auto v = stream_->Read();
7981
if (absl::holds_alternative<Status>(v)) {
8082
last_status_ = absl::get<Status>(std::move(v));
83+
operation_context_->PostCall(*client_context_, last_status_);
84+
called_post_call_ = true;
8185
response_ = {};
82-
operation_context_.PostCall(*context_, {});
83-
context_.reset();
8486
return false;
8587
}
8688
response_ = absl::get<google::bigtable::v2::ReadRowsResponse>(std::move(v));
@@ -92,6 +94,11 @@ bool DefaultRowReader::NextChunk() {
9294
}
9395

9496
absl::variant<Status, bigtable::Row> DefaultRowReader::Advance() {
97+
// We only want to call ElementRequest if an RPC has previously been
98+
// made.
99+
if (stream_is_open_) {
100+
operation_context_->ElementRequest(*client_context_);
101+
}
95102
if (operation_cancelled_) {
96103
return internal::CancelledError(
97104
"call cancelled",
@@ -100,6 +107,7 @@ absl::variant<Status, bigtable::Row> DefaultRowReader::Advance() {
100107
while (true) {
101108
auto variant = AdvanceOrFail();
102109
if (absl::holds_alternative<bigtable::Row>(variant)) {
110+
operation_context_->ElementDelivery(*client_context_);
103111
return absl::get<bigtable::Row>(std::move(variant));
104112
}
105113

@@ -143,16 +151,19 @@ absl::variant<Status, bigtable::Row> DefaultRowReader::Advance() {
143151
}
144152

145153
absl::variant<Status, bigtable::Row> DefaultRowReader::AdvanceOrFail() {
146-
grpc::Status status;
147-
if (!stream_) {
148-
MakeRequest();
149-
}
154+
grpc::Status grpc_status;
155+
if (!stream_) MakeRequest();
150156
while (!parser_->HasNext()) {
151157
if (NextChunk()) {
152158
parser_->HandleChunk(
153159
std::move(*(response_.mutable_chunks(processed_chunks_count_))),
154-
status);
155-
if (!status.ok()) return MakeStatusFromRpcError(status);
160+
grpc_status);
161+
if (!grpc_status.ok()) {
162+
auto status = MakeStatusFromRpcError(grpc_status);
163+
operation_context_->PostCall(*client_context_, status);
164+
called_post_call_ = true;
165+
return status;
166+
}
156167
continue;
157168
}
158169

@@ -161,21 +172,30 @@ absl::variant<Status, bigtable::Row> DefaultRowReader::AdvanceOrFail() {
161172
// fails during cleanup.
162173
stream_is_open_ = false;
163174
if (!last_status_.ok()) return last_status_;
164-
parser_->HandleEndOfStream(status);
165-
return MakeStatusFromRpcError(status);
175+
parser_->HandleEndOfStream(grpc_status);
176+
return MakeStatusFromRpcError(grpc_status);
166177
}
167178

168179
// We have a complete row in the parser.
169-
bigtable::Row parsed_row = parser_->Next(status);
180+
bigtable::Row parsed_row = parser_->Next(grpc_status);
170181

171-
if (!status.ok()) return MakeStatusFromRpcError(status);
182+
if (!grpc_status.ok()) return MakeStatusFromRpcError(grpc_status);
172183

173184
++rows_count_;
174185
last_read_row_key_ = parsed_row.row_key();
175186
return parsed_row;
176187
}
177188

178189
void DefaultRowReader::Cancel() {
190+
// As the destructor also calls Cancel, we want to call OnDone exactly once.
191+
// If parser_ == nullptr, then we know no RPC was ever attempted.
192+
if (!operation_cancelled_ && parser_) {
193+
if (!called_post_call_) {
194+
called_post_call_ = true;
195+
operation_context_->PostCall(*client_context_, last_status_);
196+
}
197+
operation_context_->OnDone(last_status_);
198+
}
179199
operation_cancelled_ = true;
180200
if (!stream_is_open_) return;
181201
stream_->Cancel();
@@ -185,7 +205,7 @@ void DefaultRowReader::Cancel() {
185205
stream_->Read())) {
186206
}
187207

188-
context_.reset();
208+
client_context_.reset();
189209
stream_is_open_ = false;
190210
}
191211

google/cloud/bigtable/internal/default_row_reader.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class DefaultRowReader : public RowReaderImpl {
5454
bigtable::Filter filter, bool reverse,
5555
std::unique_ptr<bigtable::DataRetryPolicy> retry_policy,
5656
std::unique_ptr<BackoffPolicy> backoff_policy, bool enable_server_retries,
57+
std::shared_ptr<OperationContext> operation_context,
5758
Sleeper sleeper = [](auto d) { std::this_thread::sleep_for(d); });
5859

5960
~DefaultRowReader() override;
@@ -101,15 +102,17 @@ class DefaultRowReader : public RowReaderImpl {
101102
std::unique_ptr<BackoffPolicy> backoff_policy_;
102103
bool enable_server_retries_;
103104
Sleeper sleeper_;
104-
std::shared_ptr<grpc::ClientContext> context_;
105-
OperationContext operation_context_;
105+
std::shared_ptr<grpc::ClientContext> client_context_;
106+
std::shared_ptr<OperationContext> operation_context_;
106107

107108
std::unique_ptr<bigtable::internal::ReadRowsParser> parser_;
108109
std::unique_ptr<
109110
internal::StreamingReadRpc<google::bigtable::v2::ReadRowsResponse>>
110111
stream_;
111112
bool stream_is_open_ = false;
112113
bool operation_cancelled_ = false;
114+
// TODO(#15314): Refactor state machine to not require this flag.
115+
bool called_post_call_ = false;
113116

114117
/// The end of stream Status.
115118
Status last_status_;

0 commit comments

Comments
 (0)