Skip to content

Commit 9b48103

Browse files
authored
impl(bigtable): update AsyncRowSampler to support new OperationContext (#15304)
1 parent ea0c971 commit 9b48103

File tree

4 files changed

+251
-70
lines changed

4 files changed

+251
-70
lines changed

google/cloud/bigtable/internal/async_row_sampler.cc

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,12 @@ future<StatusOr<std::vector<bigtable::RowKeySample>>> AsyncRowSampler::Create(
3131
CompletionQueue cq, std::shared_ptr<BigtableStub> stub,
3232
std::unique_ptr<bigtable::DataRetryPolicy> retry_policy,
3333
std::unique_ptr<BackoffPolicy> backoff_policy, bool enable_server_retries,
34-
std::string const& app_profile_id, std::string const& table_name) {
35-
std::shared_ptr<AsyncRowSampler> sampler(
36-
new AsyncRowSampler(std::move(cq), std::move(stub),
37-
std::move(retry_policy), std::move(backoff_policy),
38-
enable_server_retries, app_profile_id, table_name));
34+
std::string const& app_profile_id, std::string const& table_name,
35+
std::shared_ptr<OperationContext> operation_context) {
36+
std::shared_ptr<AsyncRowSampler> sampler(new AsyncRowSampler(
37+
std::move(cq), std::move(stub), std::move(retry_policy),
38+
std::move(backoff_policy), enable_server_retries, app_profile_id,
39+
table_name, std::move(operation_context)));
3940
sampler->StartIteration();
4041
return sampler->promise_.get_future();
4142
}
@@ -44,7 +45,8 @@ AsyncRowSampler::AsyncRowSampler(
4445
CompletionQueue cq, std::shared_ptr<BigtableStub> stub,
4546
std::unique_ptr<bigtable::DataRetryPolicy> retry_policy,
4647
std::unique_ptr<BackoffPolicy> backoff_policy, bool enable_server_retries,
47-
std::string const& app_profile_id, std::string const& table_name)
48+
std::string const& app_profile_id, std::string const& table_name,
49+
std::shared_ptr<OperationContext> operation_context)
4850
: cq_(std::move(cq)),
4951
stub_(std::move(stub)),
5052
retry_policy_(std::move(retry_policy)),
@@ -54,21 +56,22 @@ AsyncRowSampler::AsyncRowSampler(
5456
table_name_(std::move(table_name)),
5557
promise_([this] { keep_reading_ = false; }),
5658
options_(internal::SaveCurrentOptions()),
57-
call_context_(options_) {}
59+
call_context_(options_),
60+
operation_context_(std::move(operation_context)) {}
5861

5962
void AsyncRowSampler::StartIteration() {
6063
v2::SampleRowKeysRequest request;
6164
request.set_app_profile_id(app_profile_id_);
6265
request.set_table_name(table_name_);
6366

6467
internal::ScopedCallContext scope(call_context_);
65-
context_ = std::make_shared<grpc::ClientContext>();
66-
internal::ConfigureContext(*context_, *call_context_.options);
67-
operation_context_->PreCall(*context_);
68+
client_context_ = std::make_shared<grpc::ClientContext>();
69+
internal::ConfigureContext(*client_context_, *call_context_.options);
70+
operation_context_->PreCall(*client_context_);
6871

6972
auto self = this->shared_from_this();
7073
PerformAsyncStreamingRead<v2::SampleRowKeysResponse>(
71-
stub_->AsyncSampleRowKeys(cq_, context_, options_, request),
74+
stub_->AsyncSampleRowKeys(cq_, client_context_, options_, request),
7275
[self](v2::SampleRowKeysResponse response) {
7376
return self->OnRead(std::move(response));
7477
},
@@ -84,20 +87,23 @@ future<bool> AsyncRowSampler::OnRead(v2::SampleRowKeysResponse response) {
8487
}
8588

8689
void AsyncRowSampler::OnFinish(Status const& status) {
90+
operation_context_->PostCall(*client_context_, status);
91+
8792
if (status.ok()) {
8893
promise_.set_value(std::move(samples_));
94+
operation_context_->OnDone(status);
8995
return;
9096
}
9197
auto delay = internal::Backoff(status, "AsyncSampleRows", *retry_policy_,
9298
*backoff_policy_, Idempotency::kIdempotent,
9399
enable_server_retries_);
94100
if (!delay) {
95101
promise_.set_value(std::move(delay).status());
102+
operation_context_->OnDone(status);
96103
return;
97104
}
98105

99-
operation_context_->PostCall(*context_, {});
100-
context_.reset();
106+
client_context_.reset();
101107
samples_.clear();
102108
auto self = this->shared_from_this();
103109
internal::TracedAsyncBackoff(cq_, *call_context_.options, *delay,

google/cloud/bigtable/internal/async_row_sampler.h

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,16 @@ class AsyncRowSampler : public std::enable_shared_from_this<AsyncRowSampler> {
4343
CompletionQueue cq, std::shared_ptr<BigtableStub> stub,
4444
std::unique_ptr<bigtable::DataRetryPolicy> retry_policy,
4545
std::unique_ptr<BackoffPolicy> backoff_policy, bool enable_server_retries,
46-
std::string const& app_profile_id, std::string const& table_name);
46+
std::string const& app_profile_id, std::string const& table_name,
47+
std::shared_ptr<OperationContext> operation_context);
4748

4849
private:
4950
AsyncRowSampler(CompletionQueue cq, std::shared_ptr<BigtableStub> stub,
5051
std::unique_ptr<bigtable::DataRetryPolicy> retry_policy,
5152
std::unique_ptr<BackoffPolicy> backoff_policy,
5253
bool enable_server_retries, std::string const& app_profile_id,
53-
std::string const& table_name);
54+
std::string const& table_name,
55+
std::shared_ptr<OperationContext> operation_context);
5456

5557
void StartIteration();
5658
future<bool> OnRead(google::bigtable::v2::SampleRowKeysResponse response);
@@ -69,9 +71,8 @@ class AsyncRowSampler : public std::enable_shared_from_this<AsyncRowSampler> {
6971
promise<StatusOr<std::vector<bigtable::RowKeySample>>> promise_;
7072
internal::ImmutableOptions options_;
7173
internal::CallContext call_context_;
72-
std::shared_ptr<grpc::ClientContext> context_;
73-
std::shared_ptr<OperationContext> operation_context_ =
74-
std::make_shared<OperationContext>();
74+
std::shared_ptr<grpc::ClientContext> client_context_;
75+
std::shared_ptr<OperationContext> operation_context_;
7576
};
7677

7778
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

0 commit comments

Comments
 (0)