Skip to content

Commit a85d425

Browse files
authored
feat(bigtable): add AsyncWaitForConsistency() helper for Table Admin (#9310)
1 parent b926c6a commit a85d425

File tree

8 files changed

+114
-92
lines changed

8 files changed

+114
-92
lines changed

google/cloud/bigtable/CMakeLists.txt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -205,8 +205,6 @@ add_library(
205205
internal/rpc_policy_parameters.h
206206
internal/rpc_policy_parameters.inc
207207
internal/unary_client_utils.h
208-
internal/wait_for_consistency.cc
209-
internal/wait_for_consistency.h
210208
metadata_update_policy.cc
211209
metadata_update_policy.h
212210
mutation_batcher.cc
@@ -241,7 +239,9 @@ add_library(
241239
table_config.h
242240
version.cc
243241
version.h
244-
version_info.h)
242+
version_info.h
243+
wait_for_consistency.cc
244+
wait_for_consistency.h)
245245
target_link_libraries(
246246
google_cloud_cpp_bigtable
247247
PUBLIC absl::memory
@@ -356,7 +356,6 @@ if (BUILD_TESTING)
356356
internal/legacy_row_reader_test.cc
357357
internal/logging_data_client_test.cc
358358
internal/prefix_range_end_test.cc
359-
internal/wait_for_consistency_test.cc
360359
legacy_table_test.cc
361360
metadata_update_policy_test.cc
362361
mocks/mock_row_reader_test.cc
@@ -381,7 +380,8 @@ if (BUILD_TESTING)
381380
table_sample_row_keys_test.cc
382381
table_test.cc
383382
testing/cleanup_stale_resources_test.cc
384-
testing/random_names_test.cc)
383+
testing/random_names_test.cc
384+
wait_for_consistency_test.cc)
385385

386386
# Export the list of unit tests so the Bazel BUILD file can pick it up.
387387
export_list_to_bazel("bigtable_client_unit_tests.bzl"

google/cloud/bigtable/bigtable_client_unit_tests.bzl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ bigtable_client_unit_tests = [
5858
"internal/legacy_row_reader_test.cc",
5959
"internal/logging_data_client_test.cc",
6060
"internal/prefix_range_end_test.cc",
61-
"internal/wait_for_consistency_test.cc",
6261
"legacy_table_test.cc",
6362
"metadata_update_policy_test.cc",
6463
"mocks/mock_row_reader_test.cc",
@@ -84,4 +83,5 @@ bigtable_client_unit_tests = [
8483
"table_test.cc",
8584
"testing/cleanup_stale_resources_test.cc",
8685
"testing/random_names_test.cc",
86+
"wait_for_consistency_test.cc",
8787
]

google/cloud/bigtable/examples/table_admin_snippets.cc

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "google/cloud/bigtable/table_admin.h"
1818
#include "google/cloud/bigtable/testing/cleanup_stale_resources.h"
1919
#include "google/cloud/bigtable/testing/random_names.h"
20+
#include "google/cloud/bigtable/wait_for_consistency.h"
2021
#include "google/cloud/internal/getenv.h"
2122
#include "google/cloud/log.h"
2223
#include <chrono>
@@ -622,38 +623,43 @@ void DropRowsByPrefix(
622623
(std::move(admin), argv.at(0), argv.at(1), argv.at(2), argv.at(3));
623624
}
624625

625-
// TODO(#7732) - update this sample to use the helper method
626626
void WaitForConsistencyCheck(
627-
google::cloud::bigtable_admin::BigtableTableAdminClient const&,
627+
google::cloud::bigtable_admin::BigtableTableAdminClient admin,
628628
std::vector<std::string> const& argv) {
629-
auto old_admin = google::cloud::bigtable::TableAdmin(
630-
google::cloud::bigtable::MakeAdminClient(argv.at(0)), argv.at(1));
631-
632629
//! [wait for consistency check]
633630
namespace cbt = ::google::cloud::bigtable;
631+
namespace cbta = ::google::cloud::bigtable_admin;
632+
using ::google::cloud::CompletionQueue;
634633
using ::google::cloud::future;
634+
using ::google::cloud::Status;
635635
using ::google::cloud::StatusOr;
636-
[](cbt::TableAdmin admin, std::string const& table_id) {
637-
StatusOr<std::string> consistency_token =
638-
admin.GenerateConsistencyToken(table_id);
636+
[](cbta::BigtableTableAdminClient admin, std::string const& project_id,
637+
std::string const& instance_id, std::string const& table_id) {
638+
std::string table_name = cbt::TableName(project_id, instance_id, table_id);
639+
StatusOr<google::bigtable::admin::v2::GenerateConsistencyTokenResponse>
640+
consistency_token = admin.GenerateConsistencyToken(table_name);
639641
if (!consistency_token) {
640642
throw std::runtime_error(consistency_token.status().message());
641643
}
642-
future<StatusOr<cbt::Consistency>> consistent_future =
643-
admin.WaitForConsistency(table_id, *consistency_token);
644-
future<void> fut = consistent_future.then(
645-
[&consistency_token](future<StatusOr<cbt::Consistency>> f) {
646-
auto is_consistent = f.get();
647-
if (!is_consistent) {
648-
throw std::runtime_error(is_consistent.status().message());
649-
}
650-
std::cout << "Table is consistent with token " << *consistency_token
651-
<< "\n";
652-
});
653-
fut.get(); // simplify example by blocking until operation is done.
644+
// Start a thread to perform the background work.
645+
CompletionQueue cq;
646+
std::thread cq_runner([&cq] { cq.Run(); });
647+
648+
std::string token = consistency_token->consistency_token();
649+
future<Status> consistent_future =
650+
cbta::AsyncWaitForConsistency(cq, admin, table_name, token);
651+
652+
// Simplify the example by blocking until the operation is done.
653+
Status status = consistent_future.get();
654+
if (!status.ok()) throw std::runtime_error(status.message());
655+
std::cout << "Table is consistent with token " << token << "\n";
656+
657+
// Shutdown the work queue and join the background thread
658+
cq.Shutdown();
659+
cq_runner.join();
654660
}
655661
//! [wait for consistency check]
656-
(std::move(old_admin), argv.at(2));
662+
(std::move(admin), argv.at(0), argv.at(1), argv.at(2));
657663
}
658664

659665
void CheckConsistency(

google/cloud/bigtable/google_cloud_cpp_bigtable.bzl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@ google_cloud_cpp_bigtable_hdrs = [
9595
"internal/rpc_policy_parameters.h",
9696
"internal/rpc_policy_parameters.inc",
9797
"internal/unary_client_utils.h",
98-
"internal/wait_for_consistency.h",
9998
"metadata_update_policy.h",
10099
"mutation_batcher.h",
101100
"mutation_branch.h",
@@ -117,6 +116,7 @@ google_cloud_cpp_bigtable_hdrs = [
117116
"table_config.h",
118117
"version.h",
119118
"version_info.h",
119+
"wait_for_consistency.h",
120120
]
121121

122122
google_cloud_cpp_bigtable_srcs = [
@@ -179,7 +179,6 @@ google_cloud_cpp_bigtable_srcs = [
179179
"internal/logging_data_client.cc",
180180
"internal/prefix_range_end.cc",
181181
"internal/readrowsparser.cc",
182-
"internal/wait_for_consistency.cc",
183182
"metadata_update_policy.cc",
184183
"mutation_batcher.cc",
185184
"mutations.cc",
@@ -194,4 +193,5 @@ google_cloud_cpp_bigtable_srcs = [
194193
"table_admin.cc",
195194
"table_config.cc",
196195
"version.cc",
196+
"wait_for_consistency.cc",
197197
]

google/cloud/bigtable/table_admin.cc

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
// limitations under the License.
1414

1515
#include "google/cloud/bigtable/table_admin.h"
16-
#include "google/cloud/bigtable/internal/wait_for_consistency.h"
16+
#include "google/cloud/bigtable/admin/bigtable_table_admin_client.h"
17+
#include "google/cloud/bigtable/wait_for_consistency.h"
1718
#include "google/cloud/internal/retry_policy.h"
1819
#include "google/cloud/internal/time_utils.h"
1920
#include <google/protobuf/duration.pb.h>
@@ -226,9 +227,16 @@ Status TableAdmin::DropRowsByPrefix(std::string const& table_id,
226227

227228
future<StatusOr<Consistency>> TableAdmin::WaitForConsistency(
228229
std::string const& table_id, std::string const& consistency_token) {
229-
return bigtable_admin_internal::AsyncWaitForConsistency(
230-
cq_, connection_, TableName(table_id), consistency_token,
231-
policies_)
230+
// We avoid lifetime issues due to ownership cycles, by holding the
231+
// `BackgroundThreads` which run the `CompletionQueue` outside of the
232+
// operation, in this class. If the `BackgroundThreads` running the
233+
// `CompletionQueue` were instead owned by the Connection, we would have an
234+
// ownership cycle. We have made this mistake before. See #7740 for more
235+
// details.
236+
auto client = bigtable_admin::BigtableTableAdminClient(connection_);
237+
return bigtable_admin::AsyncWaitForConsistency(cq_, std::move(client),
238+
TableName(table_id),
239+
consistency_token, policies_)
232240
.then([](future<Status> f) -> StatusOr<Consistency> {
233241
auto s = f.get();
234242
if (!s.ok()) return s;

google/cloud/bigtable/internal/wait_for_consistency.cc renamed to google/cloud/bigtable/wait_for_consistency.cc

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,31 +12,30 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
#include "google/cloud/bigtable/internal/wait_for_consistency.h"
15+
#include "google/cloud/bigtable/wait_for_consistency.h"
1616
#include "google/cloud/bigtable/admin/bigtable_table_admin_options.h"
1717
#include "google/cloud/bigtable/admin/internal/bigtable_table_admin_option_defaults.h"
1818
#include <chrono>
1919

2020
namespace google {
2121
namespace cloud {
22-
namespace bigtable_admin_internal {
22+
namespace bigtable_admin {
2323
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
2424
namespace {
2525

2626
// This class borrows heavily from `google::cloud::internal::AsyncRetryLoop`
2727
class AsyncWaitForConsistencyImpl
2828
: public std::enable_shared_from_this<AsyncWaitForConsistencyImpl> {
2929
public:
30-
AsyncWaitForConsistencyImpl(
31-
CompletionQueue cq,
32-
std::shared_ptr<bigtable_admin::BigtableTableAdminConnection> connection,
33-
std::string table_name, std::string consistency_token, Options options)
30+
AsyncWaitForConsistencyImpl(CompletionQueue cq,
31+
BigtableTableAdminClient client,
32+
std::string table_name,
33+
std::string consistency_token, Options options)
3434
: cq_(std::move(cq)),
35-
connection_(std::move(connection)),
36-
options_(BigtableTableAdminDefaultOptions(std::move(options))),
37-
polling_policy_(
38-
options_
39-
.get<bigtable_admin::BigtableTableAdminPollingPolicyOption>()) {
35+
client_(std::move(client)),
36+
options_(bigtable_admin_internal::BigtableTableAdminDefaultOptions(
37+
std::move(options))),
38+
polling_policy_(options_.get<BigtableTableAdminPollingPolicyOption>()) {
4039
request_.set_name(std::move(table_name));
4140
request_.set_consistency_token(std::move(consistency_token));
4241
}
@@ -73,13 +72,13 @@ class AsyncWaitForConsistencyImpl
7372
}
7473

7574
void StartAttempt() {
76-
internal::OptionsSpan span(options_);
7775
auto self = this->shared_from_this();
7876
auto state = StartOperation();
7977
if (state.cancelled) return;
80-
SetPending(state.operation,
81-
connection_->AsyncCheckConsistency(request_).then(
82-
[self](future<RespType> f) { self->OnAttempt(f.get()); }));
78+
SetPending(
79+
state.operation,
80+
client_.AsyncCheckConsistency(request_, options_)
81+
.then([self](future<RespType> f) { self->OnAttempt(f.get()); }));
8382
}
8483

8584
void StartBackoff() {
@@ -152,7 +151,7 @@ class AsyncWaitForConsistencyImpl
152151

153152
CompletionQueue cq_;
154153
bigtable::admin::v2::CheckConsistencyRequest request_;
155-
std::shared_ptr<bigtable_admin::BigtableTableAdminConnection> connection_;
154+
BigtableTableAdminClient client_;
156155
Options options_;
157156
std::shared_ptr<PollingPolicy> polling_policy_;
158157
promise<Status> result_;
@@ -169,17 +168,18 @@ class AsyncWaitForConsistencyImpl
169168

170169
} // namespace
171170

172-
future<Status> AsyncWaitForConsistency(
173-
CompletionQueue cq,
174-
std::shared_ptr<bigtable_admin::BigtableTableAdminConnection> connection,
175-
std::string table_name, std::string consistency_token, Options options) {
171+
future<Status> AsyncWaitForConsistency(CompletionQueue cq,
172+
BigtableTableAdminClient client,
173+
std::string table_name,
174+
std::string consistency_token,
175+
Options options) {
176176
auto loop = std::make_shared<AsyncWaitForConsistencyImpl>(
177-
std::move(cq), std::move(connection), std::move(table_name),
177+
std::move(cq), std::move(client), std::move(table_name),
178178
std::move(consistency_token), std::move(options));
179179
return loop->Start();
180180
}
181181

182182
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
183-
} // namespace bigtable_admin_internal
183+
} // namespace bigtable_admin
184184
} // namespace cloud
185185
} // namespace google

google/cloud/bigtable/internal/wait_for_consistency.h renamed to google/cloud/bigtable/wait_for_consistency.h

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,18 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_WAIT_FOR_CONSISTENCY_H
16-
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_WAIT_FOR_CONSISTENCY_H
15+
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_WAIT_FOR_CONSISTENCY_H
16+
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_WAIT_FOR_CONSISTENCY_H
1717

18-
#include "google/cloud/bigtable/admin/bigtable_table_admin_connection.h"
18+
#include "google/cloud/bigtable/admin/bigtable_table_admin_client.h"
1919

2020
namespace google {
2121
namespace cloud {
22-
namespace bigtable_admin_internal {
22+
namespace bigtable_admin {
2323
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
2424

2525
/**
26-
* Checks consistency of a table with multiple calls using background threads
26+
* Polls until a table is consistent, or until the polling policy has expired.
2727
*
2828
* @param cq the completion queue that will execute the asynchronous
2929
* calls. The application must ensure that one or more threads are
@@ -34,19 +34,20 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
3434
* @param consistency_token the consistency token of the table.
3535
* @param options (optional) configuration options. Users who wish to modify the
3636
* default polling behavior can supply a custom polling policy with
37-
* `BigtableTableAdminPollingPolicyOption`.
37+
* `BigtableTableAdminPollingPolicyOption`. Note that the client's polling
38+
* policy is not used for this operation.
3839
* @return the consistency status for the table. The status is OK if and only if
3940
* the table is consistent.
4041
*/
41-
future<Status> AsyncWaitForConsistency(
42-
CompletionQueue cq,
43-
std::shared_ptr<bigtable_admin::BigtableTableAdminConnection> connection,
44-
std::string table_name, std::string consistency_token,
45-
Options options = {});
42+
future<Status> AsyncWaitForConsistency(CompletionQueue cq,
43+
BigtableTableAdminClient client,
44+
std::string table_name,
45+
std::string consistency_token,
46+
Options options = {});
4647

4748
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
48-
} // namespace bigtable_admin_internal
49+
} // namespace bigtable_admin
4950
} // namespace cloud
5051
} // namespace google
5152

52-
#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_WAIT_FOR_CONSISTENCY_H
53+
#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_WAIT_FOR_CONSISTENCY_H

0 commit comments

Comments
 (0)