Skip to content

Commit 67120b2

Browse files
authored
impl(bigtable): update BulkMutator to support new OperationContext (#15305)
1 parent 230ddb9 commit 67120b2

File tree

6 files changed

+202
-54
lines changed

6 files changed

+202
-54
lines changed

google/cloud/bigtable/internal/bulk_mutator.cc

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,10 @@ std::vector<bigtable::FailedMutation> BulkMutatorState::OnRetryDone() && {
190190
BulkMutator::BulkMutator(std::string const& app_profile_id,
191191
std::string const& table_name,
192192
bigtable::IdempotentMutationPolicy& idempotent_policy,
193-
bigtable::BulkMutation mut)
194-
: state_(app_profile_id, table_name, idempotent_policy, std::move(mut)) {}
193+
bigtable::BulkMutation mut,
194+
std::shared_ptr<OperationContext> operation_context)
195+
: state_(app_profile_id, table_name, idempotent_policy, std::move(mut)),
196+
operation_context_(std::move(operation_context)) {}
195197

196198
grpc::Status BulkMutator::MakeOneRequest(bigtable::DataClient& client,
197199
grpc::ClientContext& client_context) {
@@ -215,10 +217,10 @@ Status BulkMutator::MakeOneRequest(BigtableStub& stub,
215217
// Send the request to the server.
216218
auto const& mutations = state_.BeforeStart();
217219

218-
// Configure the context
219-
auto context = std::make_shared<grpc::ClientContext>();
220-
google::cloud::internal::ConfigureContext(*context, options);
221-
operation_context_.PreCall(*context);
220+
// Configure the client_context
221+
auto client_context = std::make_shared<grpc::ClientContext>();
222+
google::cloud::internal::ConfigureContext(*client_context, options);
223+
operation_context_->PreCall(*client_context);
222224
bool enable_server_retries = options.get<EnableServerRetriesOption>();
223225

224226
struct UnpackVariant {
@@ -241,11 +243,11 @@ Status BulkMutator::MakeOneRequest(BigtableStub& stub,
241243
limiter.Acquire();
242244

243245
// Read the stream of responses.
244-
auto stream = stub.MutateRows(context, options, mutations);
246+
auto stream = stub.MutateRows(client_context, options, mutations);
245247
while (absl::visit(UnpackVariant{state_, limiter, enable_server_retries},
246248
stream->Read())) {
247249
}
248-
operation_context_.PostCall(*context, {});
250+
operation_context_->PostCall(*client_context, state_.last_status());
249251
return state_.last_status();
250252
}
251253

google/cloud/bigtable/internal/bulk_mutator.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ class BulkMutator {
117117
public:
118118
BulkMutator(std::string const& app_profile_id, std::string const& table_name,
119119
bigtable::IdempotentMutationPolicy& idempotent_policy,
120-
bigtable::BulkMutation mut);
120+
bigtable::BulkMutation mut,
121+
std::shared_ptr<OperationContext> operation_context);
121122

122123
/// Return true if there are pending mutations in the mutator
123124
bool HasPendingMutations() const { return state_.HasPendingMutations(); }
@@ -135,7 +136,7 @@ class BulkMutator {
135136

136137
protected:
137138
BulkMutatorState state_;
138-
OperationContext operation_context_;
139+
std::shared_ptr<OperationContext> operation_context_;
139140
};
140141

141142
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/bigtable/internal/bulk_mutator_test.cc

Lines changed: 159 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@
1313
// limitations under the License.
1414

1515
#include "google/cloud/bigtable/internal/bulk_mutator.h"
16+
#include "google/cloud/bigtable/internal/operation_context.h"
17+
#ifdef GOOGLE_CLOUD_CPP_BIGTABLE_WITH_OTEL_METRICS
18+
#include "google/cloud/bigtable/internal/metrics.h"
19+
#include "google/cloud/testing_util/fake_clock.h"
20+
#endif
1621
#include "google/cloud/bigtable/testing/mock_bigtable_stub.h"
1722
#include "google/cloud/bigtable/testing/mock_mutate_rows_limiter.h"
1823
#include "google/cloud/internal/make_status.h"
@@ -75,14 +80,78 @@ v2::MutateRowsResponse MakeResponse(
7580
return resp;
7681
}
7782

83+
#ifdef GOOGLE_CLOUD_CPP_BIGTABLE_WITH_OTEL_METRICS
84+
85+
class MockMetric : public bigtable_internal::Metric {
86+
public:
87+
MOCK_METHOD(void, PreCall,
88+
(opentelemetry::context::Context const&,
89+
bigtable_internal::PreCallParams const&),
90+
(override));
91+
MOCK_METHOD(void, PostCall,
92+
(opentelemetry::context::Context const&,
93+
grpc::ClientContext const&,
94+
bigtable_internal::PostCallParams const&),
95+
(override));
96+
MOCK_METHOD(void, OnDone,
97+
(opentelemetry::context::Context const&,
98+
bigtable_internal::OnDoneParams const&),
99+
(override));
100+
MOCK_METHOD(void, ElementRequest,
101+
(opentelemetry::context::Context const&,
102+
bigtable_internal::ElementRequestParams const&),
103+
(override));
104+
MOCK_METHOD(void, ElementDelivery,
105+
(opentelemetry::context::Context const&,
106+
bigtable_internal::ElementDeliveryParams const&),
107+
(override));
108+
MOCK_METHOD(std::unique_ptr<Metric>, clone,
109+
(bigtable_internal::ResourceLabels resource_labels,
110+
bigtable_internal::DataLabels data_labels),
111+
(const, override));
112+
};
113+
114+
// This class is a vehicle to get a MockMetric into the OperationContext object.
115+
class CloningMetric : public bigtable_internal::Metric {
116+
public:
117+
explicit CloningMetric(std::unique_ptr<MockMetric> metric)
118+
: metric_(std::move(metric)) {}
119+
std::unique_ptr<bigtable_internal::Metric> clone(
120+
bigtable_internal::ResourceLabels,
121+
bigtable_internal::DataLabels) const override {
122+
return std::move(metric_);
123+
}
124+
125+
private:
126+
mutable std::unique_ptr<MockMetric> metric_;
127+
};
128+
129+
#endif // GOOGLE_CLOUD_CPP_BIGTABLE_WITH_OTEL_METRICS
130+
78131
class BulkMutatorTest : public ::testing::Test {
79132
protected:
80133
testing_util::ValidateMetadataFixture metadata_fixture_;
81134
};
82135

83136
TEST_F(BulkMutatorTest, Simple) {
84137
BulkMutation mut(IdempotentMutation("r0"), IdempotentMutation("r1"));
85-
138+
#ifdef GOOGLE_CLOUD_CPP_BIGTABLE_WITH_OTEL_METRICS
139+
auto mock_metric = std::make_unique<MockMetric>();
140+
EXPECT_CALL(*mock_metric, PreCall).Times(1);
141+
EXPECT_CALL(*mock_metric, PostCall).Times(1);
142+
143+
auto fake_metric = std::make_shared<CloningMetric>(std::move(mock_metric));
144+
auto clock = std::make_shared<testing_util::FakeSteadyClock>();
145+
146+
// Normally std::make_shared would be used here, but some weird type deduction
147+
// is preventing it.
148+
// NOLINTNEXTLINE(modernize-make-shared)
149+
auto operation_context = std::shared_ptr<bigtable_internal::OperationContext>(
150+
new bigtable_internal::OperationContext({}, {}, {fake_metric}, clock));
151+
#else
152+
auto operation_context =
153+
std::make_shared<bigtable_internal::OperationContext>();
154+
#endif
86155
auto mock = std::make_shared<MockBigtableStub>();
87156
EXPECT_CALL(*mock, MutateRows)
88157
.WillOnce([this](auto context, auto const&,
@@ -99,7 +168,8 @@ TEST_F(BulkMutatorTest, Simple) {
99168

100169
auto policy = DefaultIdempotentMutationPolicy();
101170
bigtable_internal::BulkMutator mutator(kAppProfile, kTableName, *policy,
102-
std::move(mut));
171+
std::move(mut),
172+
std::move(operation_context));
103173

104174
EXPECT_TRUE(mutator.HasPendingMutations());
105175
bigtable_internal::NoopMutateRowsLimiter limiter;
@@ -113,6 +183,23 @@ TEST_F(BulkMutatorTest, RetryPartialFailure) {
113183
// In this test we create a Mutation for two rows, one of which will fail.
114184
// First create the mutation.
115185
BulkMutation mut(IdempotentMutation("r0"), IdempotentMutation("r1"));
186+
#ifdef GOOGLE_CLOUD_CPP_BIGTABLE_WITH_OTEL_METRICS
187+
auto mock_metric = std::make_unique<MockMetric>();
188+
EXPECT_CALL(*mock_metric, PreCall).Times(2);
189+
EXPECT_CALL(*mock_metric, PostCall).Times(2);
190+
191+
auto fake_metric = std::make_shared<CloningMetric>(std::move(mock_metric));
192+
auto clock = std::make_shared<testing_util::FakeSteadyClock>();
193+
194+
// Normally std::make_shared would be used here, but some weird type deduction
195+
// is preventing it.
196+
// NOLINTNEXTLINE(modernize-make-shared)
197+
auto operation_context = std::shared_ptr<bigtable_internal::OperationContext>(
198+
new bigtable_internal::OperationContext({}, {}, {fake_metric}, clock));
199+
#else
200+
auto operation_context =
201+
std::make_shared<bigtable_internal::OperationContext>();
202+
#endif
116203

117204
auto mock = std::make_shared<MockBigtableStub>();
118205
EXPECT_CALL(*mock, MutateRows)
@@ -144,7 +231,8 @@ TEST_F(BulkMutatorTest, RetryPartialFailure) {
144231

145232
auto policy = DefaultIdempotentMutationPolicy();
146233
bigtable_internal::BulkMutator mutator(kAppProfile, kTableName, *policy,
147-
std::move(mut));
234+
std::move(mut),
235+
std::move(operation_context));
148236

149237
// This work will be in BulkApply(), but this is the test for BulkMutator in
150238
// isolation, so call MakeOneRequest() twice, for the r1, and the r2 cases.
@@ -163,6 +251,23 @@ TEST_F(BulkMutatorTest, PermanentFailure) {
163251
// Create a bulk mutation with two SetCell() mutations.
164252
BulkMutation mut(IdempotentMutation("r0"), IdempotentMutation("r1"));
165253

254+
#ifdef GOOGLE_CLOUD_CPP_BIGTABLE_WITH_OTEL_METRICS
255+
auto mock_metric = std::make_unique<MockMetric>();
256+
EXPECT_CALL(*mock_metric, PreCall).Times(2);
257+
EXPECT_CALL(*mock_metric, PostCall).Times(2);
258+
259+
auto fake_metric = std::make_shared<CloningMetric>(std::move(mock_metric));
260+
auto clock = std::make_shared<testing_util::FakeSteadyClock>();
261+
262+
// Normally std::make_shared would be used here, but some weird type deduction
263+
// is preventing it.
264+
// NOLINTNEXTLINE(modernize-make-shared)
265+
auto operation_context = std::shared_ptr<bigtable_internal::OperationContext>(
266+
new bigtable_internal::OperationContext({}, {}, {fake_metric}, clock));
267+
#else
268+
auto operation_context =
269+
std::make_shared<bigtable_internal::OperationContext>();
270+
#endif
166271
auto mock = std::make_shared<MockBigtableStub>();
167272
EXPECT_CALL(*mock, MutateRows)
168273
// The first RPC return one recoverable and one unrecoverable failure.
@@ -193,7 +298,8 @@ TEST_F(BulkMutatorTest, PermanentFailure) {
193298

194299
auto policy = DefaultIdempotentMutationPolicy();
195300
bigtable_internal::BulkMutator mutator(kAppProfile, kTableName, *policy,
196-
std::move(mut));
301+
std::move(mut),
302+
std::move(operation_context));
197303

198304
// This work will be in BulkApply(), but this is the test for BulkMutator in
199305
// isolation, so call MakeOneRequest() twice, for the r1, and the r2 cases.
@@ -214,6 +320,23 @@ TEST_F(BulkMutatorTest, PartialStream) {
214320
// for all requests. Create a BulkMutation with two entries.
215321
BulkMutation mut(IdempotentMutation("r0"), IdempotentMutation("r1"));
216322

323+
#ifdef GOOGLE_CLOUD_CPP_BIGTABLE_WITH_OTEL_METRICS
324+
auto mock_metric = std::make_unique<MockMetric>();
325+
EXPECT_CALL(*mock_metric, PreCall).Times(2);
326+
EXPECT_CALL(*mock_metric, PostCall).Times(2);
327+
328+
auto fake_metric = std::make_shared<CloningMetric>(std::move(mock_metric));
329+
auto clock = std::make_shared<testing_util::FakeSteadyClock>();
330+
331+
// Normally std::make_shared would be used here, but some weird type deduction
332+
// is preventing it.
333+
// NOLINTNEXTLINE(modernize-make-shared)
334+
auto operation_context = std::shared_ptr<bigtable_internal::OperationContext>(
335+
new bigtable_internal::OperationContext({}, {}, {fake_metric}, clock));
336+
#else
337+
auto operation_context =
338+
std::make_shared<bigtable_internal::OperationContext>();
339+
#endif
217340
auto mock = std::make_shared<MockBigtableStub>();
218341
EXPECT_CALL(*mock, MutateRows)
219342
// This will be the stream returned by the first request. It is missing
@@ -244,7 +367,8 @@ TEST_F(BulkMutatorTest, PartialStream) {
244367

245368
auto policy = DefaultIdempotentMutationPolicy();
246369
bigtable_internal::BulkMutator mutator(kAppProfile, kTableName, *policy,
247-
std::move(mut));
370+
std::move(mut),
371+
std::move(operation_context));
248372

249373
// This work will be in BulkApply(), but this is the test for BulkMutator in
250374
// isolation, so call MakeOneRequest() twice: for the r1 and r2 cases.
@@ -305,8 +429,9 @@ TEST_F(BulkMutatorTest, RetryOnlyIdempotent) {
305429
});
306430

307431
auto policy = DefaultIdempotentMutationPolicy();
308-
bigtable_internal::BulkMutator mutator(kAppProfile, kTableName, *policy,
309-
std::move(mut));
432+
bigtable_internal::BulkMutator mutator(
433+
kAppProfile, kTableName, *policy, std::move(mut),
434+
std::make_shared<bigtable_internal::OperationContext>());
310435

311436
// This work will be in BulkApply(), but this is the test for BulkMutator in
312437
// isolation, so call MakeOneRequest() twice, for the r1, and the r2 cases.
@@ -358,8 +483,9 @@ TEST_F(BulkMutatorTest, RetryInfoHeeded) {
358483
});
359484

360485
auto policy = DefaultIdempotentMutationPolicy();
361-
bigtable_internal::BulkMutator mutator(kAppProfile, kTableName, *policy,
362-
std::move(mut));
486+
bigtable_internal::BulkMutator mutator(
487+
kAppProfile, kTableName, *policy, std::move(mut),
488+
std::make_shared<bigtable_internal::OperationContext>());
363489

364490
for (int i = 0; i != 2; ++i) {
365491
EXPECT_TRUE(mutator.HasPendingMutations());
@@ -390,8 +516,9 @@ TEST_F(BulkMutatorTest, RetryInfoIgnored) {
390516
});
391517

392518
auto policy = DefaultIdempotentMutationPolicy();
393-
bigtable_internal::BulkMutator mutator(kAppProfile, kTableName, *policy,
394-
std::move(mut));
519+
bigtable_internal::BulkMutator mutator(
520+
kAppProfile, kTableName, *policy, std::move(mut),
521+
std::make_shared<bigtable_internal::OperationContext>());
395522

396523
EXPECT_TRUE(mutator.HasPendingMutations());
397524
bigtable_internal::NoopMutateRowsLimiter limiter;
@@ -430,8 +557,9 @@ TEST_F(BulkMutatorTest, UnconfirmedAreFailed) {
430557
// PERMISSION_DENIED (not retryable).
431558

432559
auto policy = DefaultIdempotentMutationPolicy();
433-
bigtable_internal::BulkMutator mutator(kAppProfile, kTableName, *policy,
434-
std::move(mut));
560+
bigtable_internal::BulkMutator mutator(
561+
kAppProfile, kTableName, *policy, std::move(mut),
562+
std::make_shared<bigtable_internal::OperationContext>());
435563

436564
EXPECT_TRUE(mutator.HasPendingMutations());
437565
bigtable_internal::NoopMutateRowsLimiter limiter;
@@ -459,8 +587,9 @@ TEST_F(BulkMutatorTest, ConfiguresContext) {
459587
});
460588

461589
auto policy = DefaultIdempotentMutationPolicy();
462-
bigtable_internal::BulkMutator mutator(kAppProfile, kTableName, *policy,
463-
std::move(mut));
590+
bigtable_internal::BulkMutator mutator(
591+
kAppProfile, kTableName, *policy, std::move(mut),
592+
std::make_shared<bigtable_internal::OperationContext>());
464593

465594
MockFunction<void(grpc::ClientContext&)> mock_setup;
466595
EXPECT_CALL(mock_setup, Call).Times(1);
@@ -490,8 +619,9 @@ TEST_F(BulkMutatorTest, MutationStatusReportedOnOkStream) {
490619
});
491620

492621
auto policy = DefaultIdempotentMutationPolicy();
493-
bigtable_internal::BulkMutator mutator(kAppProfile, kTableName, *policy,
494-
std::move(mut));
622+
bigtable_internal::BulkMutator mutator(
623+
kAppProfile, kTableName, *policy, std::move(mut),
624+
std::make_shared<bigtable_internal::OperationContext>());
495625

496626
bigtable_internal::NoopMutateRowsLimiter limiter;
497627
auto status = mutator.MakeOneRequest(*mock, limiter, Options{});
@@ -527,8 +657,9 @@ TEST_F(BulkMutatorTest, ReportEitherRetryableMutationFailOrStreamFail) {
527657
});
528658

529659
auto policy = DefaultIdempotentMutationPolicy();
530-
bigtable_internal::BulkMutator mutator(kAppProfile, kTableName, *policy,
531-
std::move(mut));
660+
bigtable_internal::BulkMutator mutator(
661+
kAppProfile, kTableName, *policy, std::move(mut),
662+
std::make_shared<bigtable_internal::OperationContext>());
532663

533664
bigtable_internal::NoopMutateRowsLimiter limiter;
534665
auto status = mutator.MakeOneRequest(*mock, limiter, Options{});
@@ -574,8 +705,9 @@ TEST_F(BulkMutatorTest, ReportOnlyLatestMutationStatus) {
574705
});
575706

576707
auto policy = DefaultIdempotentMutationPolicy();
577-
bigtable_internal::BulkMutator mutator(kAppProfile, kTableName, *policy,
578-
std::move(mut));
708+
bigtable_internal::BulkMutator mutator(
709+
kAppProfile, kTableName, *policy, std::move(mut),
710+
std::make_shared<bigtable_internal::OperationContext>());
579711

580712
bigtable_internal::NoopMutateRowsLimiter limiter;
581713
auto status = mutator.MakeOneRequest(*mock, limiter, Options{});
@@ -616,8 +748,9 @@ TEST_F(BulkMutatorTest, Throttling) {
616748
}
617749

618750
auto policy = DefaultIdempotentMutationPolicy();
619-
bigtable_internal::BulkMutator mutator(kAppProfile, kTableName, *policy,
620-
std::move(mut));
751+
bigtable_internal::BulkMutator mutator(
752+
kAppProfile, kTableName, *policy, std::move(mut),
753+
std::make_shared<bigtable_internal::OperationContext>());
621754

622755
EXPECT_TRUE(mutator.HasPendingMutations());
623756
auto status = mutator.MakeOneRequest(*mock_stub, *mock_limiter, Options{});
@@ -657,8 +790,9 @@ TEST_F(BulkMutatorTest, BigtableCookies) {
657790
});
658791

659792
auto policy = DefaultIdempotentMutationPolicy();
660-
bigtable_internal::BulkMutator mutator(kAppProfile, kTableName, *policy,
661-
std::move(mut));
793+
bigtable_internal::BulkMutator mutator(
794+
kAppProfile, kTableName, *policy, std::move(mut),
795+
std::make_shared<bigtable_internal::OperationContext>());
662796

663797
EXPECT_TRUE(mutator.HasPendingMutations());
664798
bigtable_internal::NoopMutateRowsLimiter limiter;
@@ -669,7 +803,6 @@ TEST_F(BulkMutatorTest, BigtableCookies) {
669803
status = mutator.MakeOneRequest(*mock, limiter, Options{});
670804
EXPECT_THAT(status, StatusIs(StatusCode::kPermissionDenied));
671805
}
672-
673806
} // namespace
674807
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
675808
} // namespace bigtable

0 commit comments

Comments
 (0)