Skip to content

Commit 9a05b0e

Browse files
authored
impl(bigtable): BulkMutator can use Stub (#9114)
1 parent 41d558b commit 9a05b0e

File tree

8 files changed

+767
-364
lines changed

8 files changed

+767
-364
lines changed

google/cloud/bigtable/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,7 @@ if (BUILD_TESTING)
340340
internal/default_row_reader_test.cc
341341
internal/defaults_test.cc
342342
internal/google_bytes_traits_test.cc
343+
internal/legacy_bulk_mutator_test.cc
343344
internal/legacy_row_reader_test.cc
344345
internal/logging_data_client_test.cc
345346
internal/prefix_range_end_test.cc

google/cloud/bigtable/bigtable_client_unit_tests.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ bigtable_client_unit_tests = [
5050
"internal/default_row_reader_test.cc",
5151
"internal/defaults_test.cc",
5252
"internal/google_bytes_traits_test.cc",
53+
"internal/legacy_bulk_mutator_test.cc",
5354
"internal/legacy_row_reader_test.cc",
5455
"internal/logging_data_client_test.cc",
5556
"internal/prefix_range_end_test.cc",

google/cloud/bigtable/internal/async_bulk_apply.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ void AsyncRetryBulkApply::StartIteration(CompletionQueue cq) {
7676

7777
void AsyncRetryBulkApply::OnRead(
7878
google::bigtable::v2::MutateRowsResponse response) {
79-
state_.OnRead(response);
79+
state_.OnRead(std::move(response));
8080
}
8181

8282
void AsyncRetryBulkApply::OnFinish(CompletionQueue cq, Status const& status) {

google/cloud/bigtable/internal/bulk_mutator.cc

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ google::bigtable::v2::MutateRowsRequest const& BulkMutatorState::BeforeStart() {
7979
}
8080

8181
std::vector<int> BulkMutatorState::OnRead(
82-
google::bigtable::v2::MutateRowsResponse& response) {
82+
google::bigtable::v2::MutateRowsResponse response) {
8383
std::vector<int> res;
8484
for (auto& entry : *response.mutable_entries()) {
8585
// The type of `entry.index()` is a 64-bit int. But we can never create more
@@ -197,14 +197,42 @@ grpc::Status BulkMutator::MakeOneRequest(bigtable::DataClient& client,
197197
// Read the stream of responses.
198198
btproto::MutateRowsResponse response;
199199
while (stream->Read(&response)) {
200-
state_.OnRead(response);
200+
state_.OnRead(std::move(response));
201201
}
202202
// Handle any errors in the stream.
203203
auto grpc_status = stream->Finish();
204204
state_.OnFinish(MakeStatusFromRpcError(grpc_status));
205205
return grpc_status;
206206
}
207207

208+
Status BulkMutator::MakeOneRequest(bigtable_internal::BigtableStub& stub) {
209+
// Send the request to the server.
210+
auto const& mutations = state_.BeforeStart();
211+
212+
// Configure the context
213+
auto const& options = google::cloud::internal::CurrentOptions();
214+
auto context = absl::make_unique<grpc::ClientContext>();
215+
google::cloud::internal::ConfigureContext(*context, options);
216+
217+
struct UnpackVariant {
218+
BulkMutatorState& state;
219+
bool operator()(btproto::MutateRowsResponse r) {
220+
state.OnRead(std::move(r));
221+
return true;
222+
}
223+
bool operator()(Status s) {
224+
state.OnFinish(std::move(s));
225+
return false;
226+
}
227+
};
228+
229+
// Read the stream of responses.
230+
auto stream = stub.MutateRows(std::move(context), mutations);
231+
while (absl::visit(UnpackVariant{state_}, stream->Read())) {
232+
}
233+
return state_.last_status();
234+
}
235+
208236
std::vector<FailedMutation> BulkMutator::OnRetryDone() && {
209237
return std::move(state_).OnRetryDone();
210238
}

google/cloud/bigtable/internal/bulk_mutator.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "google/cloud/bigtable/completion_queue.h"
1919
#include "google/cloud/bigtable/data_client.h"
2020
#include "google/cloud/bigtable/idempotent_mutation_policy.h"
21+
#include "google/cloud/bigtable/internal/bigtable_stub.h"
2122
#include "google/cloud/bigtable/version.h"
2223
#include "google/cloud/internal/invoke_result.h"
2324
#include "google/cloud/internal/retry_policy.h"
@@ -49,14 +50,17 @@ class BulkMutatorState {
4950
*
5051
* Returns the original index of any successful operations.
5152
*/
52-
std::vector<int> OnRead(google::bigtable::v2::MutateRowsResponse& response);
53+
std::vector<int> OnRead(google::bigtable::v2::MutateRowsResponse response);
5354

5455
/// Handle the result of a `Finish()` operation on the MutateRows() RPC.
5556
void OnFinish(google::cloud::Status finish_status);
5657

5758
/// Terminate the retry loop and return all the failures.
5859
std::vector<FailedMutation> OnRetryDone() &&;
5960

61+
/// The status of the most recent stream.
62+
Status last_status() const { return last_status_; };
63+
6064
private:
6165
/// The current request proto.
6266
google::bigtable::v2::MutateRowsRequest mutations_;
@@ -116,6 +120,9 @@ class BulkMutator {
116120
grpc::Status MakeOneRequest(bigtable::DataClient& client,
117121
grpc::ClientContext& client_context);
118122

123+
/// Synchronously send one batch request to the given stub.
124+
Status MakeOneRequest(bigtable_internal::BigtableStub& stub);
125+
119126
/// Give up on any pending mutations, move them to the failures array.
120127
std::vector<FailedMutation> OnRetryDone() &&;
121128

0 commit comments

Comments
 (0)