Skip to content

Commit ea0c971

Browse files
authored
impl(bigtable): update AsyncBulkApply to support new OperationContext (#15301)
1 parent 25fb431 commit ea0c971

File tree

4 files changed

+238
-47
lines changed

4 files changed

+238
-47
lines changed

google/cloud/bigtable/internal/async_bulk_apply.cc

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,17 @@ future<std::vector<bigtable::FailedMutation>> AsyncBulkApplier::Create(
2929
std::unique_ptr<BackoffPolicy> backoff_policy, bool enable_server_retries,
3030
bigtable::IdempotentMutationPolicy& idempotent_policy,
3131
std::string const& app_profile_id, std::string const& table_name,
32-
bigtable::BulkMutation mut) {
32+
bigtable::BulkMutation mut,
33+
std::shared_ptr<OperationContext> operation_context) {
3334
if (mut.empty()) {
3435
return make_ready_future(std::vector<bigtable::FailedMutation>{});
3536
}
3637

3738
std::shared_ptr<AsyncBulkApplier> bulk_apply(new AsyncBulkApplier(
3839
std::move(cq), std::move(stub), std::move(limiter),
3940
std::move(retry_policy), std::move(backoff_policy), enable_server_retries,
40-
idempotent_policy, app_profile_id, table_name, std::move(mut)));
41+
idempotent_policy, app_profile_id, table_name, std::move(mut),
42+
std::move(operation_context)));
4143
bulk_apply->StartIteration();
4244
return bulk_apply->promise_.get_future();
4345
}
@@ -49,7 +51,8 @@ AsyncBulkApplier::AsyncBulkApplier(
4951
std::unique_ptr<BackoffPolicy> backoff_policy, bool enable_server_retries,
5052
bigtable::IdempotentMutationPolicy& idempotent_policy,
5153
std::string const& app_profile_id, std::string const& table_name,
52-
bigtable::BulkMutation mut)
54+
bigtable::BulkMutation mut,
55+
std::shared_ptr<OperationContext> operation_context)
5356
: cq_(std::move(cq)),
5457
stub_(std::move(stub)),
5558
limiter_(std::move(limiter)),
@@ -59,7 +62,8 @@ AsyncBulkApplier::AsyncBulkApplier(
5962
state_(app_profile_id, table_name, idempotent_policy, std::move(mut)),
6063
promise_([this] { keep_reading_ = false; }),
6164
options_(internal::SaveCurrentOptions()),
62-
call_context_(options_) {}
65+
call_context_(options_),
66+
operation_context_(std::move(operation_context)) {}
6367

6468
void AsyncBulkApplier::StartIteration() {
6569
auto self = this->shared_from_this();
@@ -71,13 +75,14 @@ void AsyncBulkApplier::StartIteration() {
7175

7276
void AsyncBulkApplier::MakeRequest() {
7377
internal::ScopedCallContext scope(call_context_);
74-
context_ = std::make_shared<grpc::ClientContext>();
75-
internal::ConfigureContext(*context_, *call_context_.options);
76-
operation_context_->PreCall(*context_);
78+
client_context_ = std::make_shared<grpc::ClientContext>();
79+
internal::ConfigureContext(*client_context_, *call_context_.options);
80+
operation_context_->PreCall(*client_context_);
7781

7882
auto self = this->shared_from_this();
7983
PerformAsyncStreamingRead(
80-
stub_->AsyncMutateRows(cq_, context_, options_, state_.BeforeStart()),
84+
stub_->AsyncMutateRows(cq_, client_context_, options_,
85+
state_.BeforeStart()),
8186
[self](google::bigtable::v2::MutateRowsResponse r) {
8287
self->OnRead(std::move(r));
8388
return make_ready_future(self->keep_reading_.load());
@@ -92,21 +97,24 @@ void AsyncBulkApplier::OnRead(
9297
}
9398

9499
void AsyncBulkApplier::OnFinish(Status const& status) {
100+
operation_context_->PostCall(*client_context_, status);
95101
state_.OnFinish(status);
102+
96103
if (!state_.HasPendingMutations()) {
97104
SetPromise();
105+
operation_context_->OnDone(status);
98106
return;
99107
}
100108
auto delay = internal::Backoff(status, "AsyncBulkApply", *retry_policy_,
101109
*backoff_policy_, Idempotency::kIdempotent,
102110
enable_server_retries_);
103111
if (!delay) {
104112
SetPromise();
113+
operation_context_->OnDone(status);
105114
return;
106115
}
107116

108-
operation_context_->PostCall(*context_, {});
109-
context_.reset();
117+
client_context_.reset();
110118
auto self = this->shared_from_this();
111119
internal::TracedAsyncBackoff(cq_, *call_context_.options, *delay,
112120
"Async Backoff")

google/cloud/bigtable/internal/async_bulk_apply.h

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ class AsyncBulkApplier : public std::enable_shared_from_this<AsyncBulkApplier> {
4848
std::unique_ptr<BackoffPolicy> backoff_policy, bool enable_server_retries,
4949
bigtable::IdempotentMutationPolicy& idempotent_policy,
5050
std::string const& app_profile_id, std::string const& table_name,
51-
bigtable::BulkMutation mut);
51+
bigtable::BulkMutation mut,
52+
std::shared_ptr<OperationContext> operation_context);
5253

5354
private:
5455
AsyncBulkApplier(CompletionQueue cq, std::shared_ptr<BigtableStub> stub,
@@ -58,7 +59,8 @@ class AsyncBulkApplier : public std::enable_shared_from_this<AsyncBulkApplier> {
5859
bool enable_server_retries,
5960
bigtable::IdempotentMutationPolicy& idempotent_policy,
6061
std::string const& app_profile_id,
61-
std::string const& table_name, bigtable::BulkMutation mut);
62+
std::string const& table_name, bigtable::BulkMutation mut,
63+
std::shared_ptr<OperationContext> operation_context);
6264

6365
void StartIteration();
6466
void MakeRequest();
@@ -77,9 +79,8 @@ class AsyncBulkApplier : public std::enable_shared_from_this<AsyncBulkApplier> {
7779
promise<std::vector<bigtable::FailedMutation>> promise_;
7880
internal::ImmutableOptions options_;
7981
internal::CallContext call_context_;
80-
std::shared_ptr<grpc::ClientContext> context_;
81-
std::shared_ptr<OperationContext> operation_context_ =
82-
std::make_shared<OperationContext>();
82+
std::shared_ptr<grpc::ClientContext> client_context_;
83+
std::shared_ptr<OperationContext> operation_context_;
8384
};
8485

8586
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

0 commit comments

Comments
 (0)