Skip to content

Commit 50cdd43

Browse files
authored
impl(bigtable): update AsyncRowReader to support new OperationContext (#15302)
1 parent 67120b2 commit 50cdd43

File tree

4 files changed

+332
-62
lines changed

4 files changed

+332
-62
lines changed

google/cloud/bigtable/internal/async_row_reader.cc

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,19 +45,24 @@ void AsyncRowReader::MakeRequest() {
4545
parser_ = bigtable::internal::ReadRowsParserFactory().Create(reverse_);
4646

4747
internal::ScopedCallContext scope(call_context_);
48-
context_ = std::make_shared<grpc::ClientContext>();
49-
internal::ConfigureContext(*context_, *call_context_.options);
50-
operation_context_->PreCall(*context_);
48+
client_context_ = std::make_shared<grpc::ClientContext>();
49+
internal::ConfigureContext(*client_context_, *call_context_.options);
50+
operation_context_->PreCall(*client_context_);
5151

5252
auto self = this->shared_from_this();
5353
PerformAsyncStreamingRead(
54-
stub_->AsyncReadRows(cq_, context_, options_, request),
54+
stub_->AsyncReadRows(cq_, client_context_, options_, request),
5555
[self](v2::ReadRowsResponse r) {
5656
return self->OnDataReceived(std::move(r));
5757
},
5858
[self](Status s) { self->OnStreamFinished(std::move(s)); });
5959
}
6060

61+
void AsyncRowReader::UserWantsRows() {
62+
operation_context_->ElementRequest(*client_context_);
63+
TryGiveRowToUser();
64+
}
65+
6166
void AsyncRowReader::TryGiveRowToUser() {
6267
// The user is likely to ask for more rows immediately after receiving a
6368
// row, which means that this function will be called recursively. The depth
@@ -102,6 +107,7 @@ void AsyncRowReader::TryGiveRowToUser() {
102107

103108
auto self = this->shared_from_this();
104109
bool const break_recursion = recursion_level_ >= 100;
110+
operation_context_->ElementDelivery(*client_context_);
105111
on_row_(std::move(row)).then([self, break_recursion](future<bool> fut) {
106112
bool should_cancel;
107113
#if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS
@@ -165,6 +171,7 @@ future<bool> AsyncRowReader::OnDataReceived(
165171
}
166172

167173
void AsyncRowReader::OnStreamFinished(Status status) {
174+
operation_context_->PostCall(*client_context_, status);
168175
// assert(!continue_reading_);
169176
if (status_.ok()) {
170177
status_ = std::move(status);
@@ -206,6 +213,7 @@ void AsyncRowReader::OnStreamFinished(Status status) {
206213
if (status_.ok()) {
207214
// We've successfully finished the scan.
208215
whole_op_finished_ = true;
216+
operation_context_->OnDone(status_);
209217
TryGiveRowToUser();
210218
return;
211219
}
@@ -217,11 +225,11 @@ void AsyncRowReader::OnStreamFinished(Status status) {
217225
// Can't retry.
218226
status_ = std::move(delay).status();
219227
whole_op_finished_ = true;
228+
operation_context_->OnDone(status_);
220229
TryGiveRowToUser();
221230
return;
222231
}
223-
operation_context_->PostCall(*context_, {});
224-
context_.reset();
232+
client_context_.reset();
225233
auto self = this->shared_from_this();
226234
internal::TracedAsyncBackoff(cq_, *call_context_.options, *delay,
227235
"Async Backoff")

google/cloud/bigtable/internal/async_row_reader.h

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,14 @@ class AsyncRowReader : public std::enable_shared_from_this<AsyncRowReader> {
6464
bigtable::Filter filter, bool reverse,
6565
std::unique_ptr<bigtable::DataRetryPolicy> retry_policy,
6666
std::unique_ptr<BackoffPolicy> backoff_policy,
67-
bool enable_server_retries) {
67+
bool enable_server_retries,
68+
std::shared_ptr<OperationContext> operation_context) {
6869
auto reader = std::shared_ptr<AsyncRowReader>(new AsyncRowReader(
6970
std::move(cq), std::move(stub), std::move(app_profile_id),
7071
std::move(table_name), std::move(on_row), std::move(on_finish),
7172
std::move(row_set), rows_limit, std::move(filter), reverse,
7273
std::move(retry_policy), std::move(backoff_policy),
73-
enable_server_retries));
74+
enable_server_retries, std::move(operation_context)));
7475
reader->MakeRequest();
7576
}
7677

@@ -82,7 +83,8 @@ class AsyncRowReader : public std::enable_shared_from_this<AsyncRowReader> {
8283
bigtable::Filter filter, bool reverse,
8384
std::unique_ptr<bigtable::DataRetryPolicy> retry_policy,
8485
std::unique_ptr<BackoffPolicy> backoff_policy,
85-
bool enable_server_retries)
86+
bool enable_server_retries,
87+
std::shared_ptr<OperationContext> operation_context)
8688
: cq_(std::move(cq)),
8789
stub_(std::move(stub)),
8890
app_profile_id_(std::move(app_profile_id)),
@@ -97,15 +99,16 @@ class AsyncRowReader : public std::enable_shared_from_this<AsyncRowReader> {
9799
backoff_policy_(std::move(backoff_policy)),
98100
enable_server_retries_(enable_server_retries),
99101
options_(internal::SaveCurrentOptions()),
100-
call_context_(options_) {}
102+
call_context_(options_),
103+
operation_context_(std::move(operation_context)) {}
101104

102105
void MakeRequest();
103106

104107
/**
105108
* Called when the user asks for more rows via satisfying the future returned
106109
* from the row callback.
107110
*/
108-
void UserWantsRows() { TryGiveRowToUser(); }
111+
void UserWantsRows();
109112

110113
/**
111114
* Attempt to call a user callback.
@@ -175,9 +178,8 @@ class AsyncRowReader : public std::enable_shared_from_this<AsyncRowReader> {
175178
int recursion_level_ = 0;
176179
internal::ImmutableOptions options_;
177180
internal::CallContext call_context_;
178-
std::shared_ptr<grpc::ClientContext> context_;
179-
std::shared_ptr<OperationContext> operation_context_ =
180-
std::make_shared<OperationContext>();
181+
std::shared_ptr<grpc::ClientContext> client_context_;
182+
std::shared_ptr<OperationContext> operation_context_;
181183
};
182184

183185
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

0 commit comments

Comments
 (0)