Skip to content

Commit c2d51e4

Browse files
committed
kinda working with iota iterators
1 parent a4503f6 commit c2d51e4

14 files changed

+285
-72
lines changed

ci/cloudbuild/builds/integration-production.sh

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ export CC=clang
2727
export CXX=clang++
2828

2929
mapfile -t args < <(bazel::common_args)
30-
io::run bazel test "${args[@]}" --test_tag_filters=-integration-test "${BAZEL_TARGETS[@]}"
30+
#io::run bazel test "${args[@]}" --test_tag_filters=-integration-test "${BAZEL_TARGETS[@]}"
3131

3232
excluded_rules=(
3333
"-//examples:grpc_credential_types"
@@ -41,6 +41,6 @@ excluded_rules=(
4141

4242
io::log_h2 "Running the integration tests against prod"
4343
mapfile -t integration_args < <(integration::bazel_args)
44-
io::run bazel test "${args[@]}" "${integration_args[@]}" \
45-
--cache_test_results="auto" --test_tag_filters="integration-test,-ud-only" \
46-
-- "${BAZEL_TARGETS[@]}" "${excluded_rules[@]}"
44+
io::run bazel test "${args[@]}" "${integration_args[@]}" --test_output=all \
45+
--test_filter="*ReadRowsAllRows*" --test_timeout=30 \
46+
//google/cloud/bigtable/tests:data_integration_test

ci/cloudbuild/builds/lib/integration.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ function integration::bazel_args() {
5151

5252
# Integration tests are inherently flaky. Make up to three attempts to get the
5353
# test passing.
54-
args+=(--flaky_test_attempts=3)
54+
#args+=(--flaky_test_attempts=3)
5555

5656
args+=(
5757
# Common settings

google/cloud/bigtable/internal/bigtable_random_two_least_used_decorator.cc

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ std::unique_ptr<google::cloud::internal::StreamingReadRpc<
105105
BigtableRandomTwoLeastUsed::SampleRowKeys(
106106
std::shared_ptr<grpc::ClientContext> context, Options const& options,
107107
google::bigtable::v2::SampleRowKeysRequest const& request) {
108+
std::cout << __PRETTY_FUNCTION__ << std::endl;
108109
auto child = Child();
109110
auto stub = child->AcquireStub();
110111
auto result = stub->SampleRowKeys(std::move(context), options, request);
@@ -121,6 +122,7 @@ StatusOr<google::bigtable::v2::MutateRowResponse>
121122
BigtableRandomTwoLeastUsed::MutateRow(
122123
grpc::ClientContext& context, Options const& options,
123124
google::bigtable::v2::MutateRowRequest const& request) {
125+
std::cout << __PRETTY_FUNCTION__ << std::endl;
124126
auto child = Child();
125127
auto stub = child->AcquireStub();
126128
auto result = stub->MutateRow(context, options, request);
@@ -150,6 +152,7 @@ StatusOr<google::bigtable::v2::CheckAndMutateRowResponse>
150152
BigtableRandomTwoLeastUsed::CheckAndMutateRow(
151153
grpc::ClientContext& context, Options const& options,
152154
google::bigtable::v2::CheckAndMutateRowRequest const& request) {
155+
std::cout << __PRETTY_FUNCTION__ << std::endl;
153156
auto child = Child();
154157
auto stub = child->AcquireStub();
155158
auto result = stub->CheckAndMutateRow(context, options, request);
@@ -161,6 +164,7 @@ StatusOr<google::bigtable::v2::PingAndWarmResponse>
161164
BigtableRandomTwoLeastUsed::PingAndWarm(
162165
grpc::ClientContext& context, Options const& options,
163166
google::bigtable::v2::PingAndWarmRequest const& request) {
167+
std::cout << __PRETTY_FUNCTION__ << std::endl;
164168
auto child = Child();
165169
auto stub = child->AcquireStub();
166170
auto result = stub->PingAndWarm(context, options, request);
@@ -172,6 +176,7 @@ StatusOr<google::bigtable::v2::ReadModifyWriteRowResponse>
172176
BigtableRandomTwoLeastUsed::ReadModifyWriteRow(
173177
grpc::ClientContext& context, Options const& options,
174178
google::bigtable::v2::ReadModifyWriteRowRequest const& request) {
179+
std::cout << __PRETTY_FUNCTION__ << std::endl;
175180
auto child = Child();
176181
auto stub = child->AcquireStub();
177182
auto result = stub->ReadModifyWriteRow(context, options, request);
@@ -183,6 +188,7 @@ StatusOr<google::bigtable::v2::PrepareQueryResponse>
183188
BigtableRandomTwoLeastUsed::PrepareQuery(
184189
grpc::ClientContext& context, Options const& options,
185190
google::bigtable::v2::PrepareQueryRequest const& request) {
191+
std::cout << __PRETTY_FUNCTION__ << std::endl;
186192
auto child = Child();
187193
auto stub = child->AcquireStub();
188194
auto result = stub->PrepareQuery(context, options, request);
@@ -195,6 +201,7 @@ std::unique_ptr<google::cloud::internal::StreamingReadRpc<
195201
BigtableRandomTwoLeastUsed::ExecuteQuery(
196202
std::shared_ptr<grpc::ClientContext> context, Options const& options,
197203
google::bigtable::v2::ExecuteQueryRequest const& request) {
204+
std::cout << __PRETTY_FUNCTION__ << std::endl;
198205
auto child = Child();
199206
auto stub = child->AcquireStub();
200207
auto result = stub->ExecuteQuery(std::move(context), options, request);
@@ -214,6 +221,7 @@ BigtableRandomTwoLeastUsed::AsyncReadRows(
214221
std::shared_ptr<grpc::ClientContext> context,
215222
google::cloud::internal::ImmutableOptions options,
216223
google::bigtable::v2::ReadRowsRequest const& request) {
224+
std::cout << __PRETTY_FUNCTION__ << std::endl;
217225
auto child = Child();
218226
auto stub = child->AcquireStub();
219227
auto result =
@@ -234,6 +242,7 @@ BigtableRandomTwoLeastUsed::AsyncSampleRowKeys(
234242
std::shared_ptr<grpc::ClientContext> context,
235243
google::cloud::internal::ImmutableOptions options,
236244
google::bigtable::v2::SampleRowKeysRequest const& request) {
245+
std::cout << __PRETTY_FUNCTION__ << std::endl;
237246
auto child = Child();
238247
auto stub = child->AcquireStub();
239248
auto result = stub->AsyncSampleRowKeys(cq, std::move(context),
@@ -253,6 +262,7 @@ BigtableRandomTwoLeastUsed::AsyncMutateRow(
253262
std::shared_ptr<grpc::ClientContext> context,
254263
google::cloud::internal::ImmutableOptions options,
255264
google::bigtable::v2::MutateRowRequest const& request) {
265+
std::cout << __PRETTY_FUNCTION__ << std::endl;
256266
auto child = Child();
257267
auto stub = child->AcquireStub();
258268
auto result =
@@ -268,6 +278,7 @@ BigtableRandomTwoLeastUsed::AsyncMutateRows(
268278
std::shared_ptr<grpc::ClientContext> context,
269279
google::cloud::internal::ImmutableOptions options,
270280
google::bigtable::v2::MutateRowsRequest const& request) {
281+
std::cout << __PRETTY_FUNCTION__ << std::endl;
271282
auto child = Child();
272283
auto stub = child->AcquireStub();
273284
auto result = stub->AsyncMutateRows(cq, std::move(context),
@@ -288,6 +299,7 @@ BigtableRandomTwoLeastUsed::AsyncCheckAndMutateRow(
288299
std::shared_ptr<grpc::ClientContext> context,
289300
google::cloud::internal::ImmutableOptions options,
290301
google::bigtable::v2::CheckAndMutateRowRequest const& request) {
302+
std::cout << __PRETTY_FUNCTION__ << std::endl;
291303
auto child = Child();
292304
auto stub = child->AcquireStub();
293305
auto result = stub->AsyncCheckAndMutateRow(cq, std::move(context),
@@ -302,6 +314,7 @@ BigtableRandomTwoLeastUsed::AsyncReadModifyWriteRow(
302314
std::shared_ptr<grpc::ClientContext> context,
303315
google::cloud::internal::ImmutableOptions options,
304316
google::bigtable::v2::ReadModifyWriteRowRequest const& request) {
317+
std::cout << __PRETTY_FUNCTION__ << std::endl;
305318
auto child = Child();
306319
auto stub = child->AcquireStub();
307320
auto result = stub->AsyncReadModifyWriteRow(cq, std::move(context),
@@ -316,6 +329,7 @@ BigtableRandomTwoLeastUsed::AsyncPrepareQuery(
316329
std::shared_ptr<grpc::ClientContext> context,
317330
google::cloud::internal::ImmutableOptions options,
318331
google::bigtable::v2::PrepareQueryRequest const& request) {
332+
std::cout << __PRETTY_FUNCTION__ << std::endl;
319333
auto child = Child();
320334
auto stub = child->AcquireStub();
321335
auto result = stub->AsyncPrepareQuery(cq, std::move(context),
@@ -324,7 +338,7 @@ BigtableRandomTwoLeastUsed::AsyncPrepareQuery(
324338
return result;
325339
}
326340

327-
std::shared_ptr<StubUsageWrapper<BigtableStub>>
341+
std::shared_ptr<ChannelUsageWrapper<BigtableStub>>
328342
BigtableRandomTwoLeastUsed::Child() {
329343
std::cout << __PRETTY_FUNCTION__ << std::endl;
330344
return pool_->GetChannelRandomTwoLeastUsed();

google/cloud/bigtable/internal/bigtable_random_two_least_used_decorator.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ class BigtableRandomTwoLeastUsed : public BigtableStub {
130130
google::bigtable::v2::PrepareQueryRequest const& request) override;
131131

132132
private:
133-
std::shared_ptr<StubUsageWrapper<BigtableStub>> Child();
133+
std::shared_ptr<ChannelUsageWrapper<BigtableStub>> Child();
134134

135135
// std::mutex mu_;
136136
std::shared_ptr<DynamicChannelPool<BigtableStub>> pool_;

google/cloud/bigtable/internal/bigtable_stub_factory.cc

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,13 +81,16 @@ std::shared_ptr<BigtableStub> CreateBigtableStubRandomTwoLeastUsed(
8181
std::function<std::shared_ptr<BigtableStub>(int)>
8282
refreshing_channel_stub_factory,
8383
std::shared_ptr<ConnectionRefreshState> refresh_state) {
84+
std::cout << __PRETTY_FUNCTION__ << std::endl;
8485
std::vector<std::shared_ptr<BigtableStub>> children(
8586
(std::max)(1, options.get<GrpcNumChannelsOption>()));
8687
int id = 0;
8788
std::generate(children.begin(), children.end(),
8889
[&id, &refreshing_channel_stub_factory] {
8990
return refreshing_channel_stub_factory(id++);
9091
});
92+
std::cout << __PRETTY_FUNCTION__ << ": children.size()=" << children.size()
93+
<< std::endl;
9194
return std::make_shared<BigtableRandomTwoLeastUsed>(
9295
std::move(cq), std::move(refresh_state), refreshing_channel_stub_factory,
9396
std::move(children));
@@ -103,9 +106,9 @@ std::shared_ptr<BigtableStub> CreateDecoratedStubs(
103106
options.get<bigtable::MaxConnectionRefreshOption>());
104107

105108
std::shared_ptr<BigtableStub> stub;
106-
if (options.has<ChannelSelectionStrategyOption>() &&
107-
options.get<ChannelSelectionStrategyOption>() ==
108-
ChannelSelectionStrategy::kRandomTwoLeastUsed) {
109+
if (options.has<bigtable::experimental::ChannelPoolTypeOption>() &&
110+
options.get<bigtable::experimental::ChannelPoolTypeOption>() ==
111+
bigtable::experimental::ChannelPoolType::kDynamic) {
109112
auto refreshing_channel_stub_factory = [stub_factory, cq_impl, refresh,
110113
&auth, options](int id) {
111114
auto channel = CreateGrpcChannel(*auth, options, id);

google/cloud/bigtable/internal/bigtable_stub_factory.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ namespace cloud {
2929
namespace bigtable_internal {
3030
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
3131

32-
enum class ChannelSelectionStrategy { kNone, kRoundRobin, kRandomTwoLeastUsed };
32+
enum class ChannelSelectionStrategy { kRoundRobin, kRandomTwoLeastUsed };
3333

3434
struct ChannelSelectionStrategyOption {
3535
using Type = ChannelSelectionStrategy;

google/cloud/bigtable/internal/connection_refresh_state.cc

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,20 @@ bool ConnectionRefreshState::enabled() const {
5353
return max_conn_refresh_period_.count() != 0;
5454
}
5555

56+
void LogFailedConnectionRefresh(Status const& conn_status) {
57+
if (!conn_status.ok()) {
58+
GCP_LOG(WARNING) << "Failed to refresh connection. Error: " << conn_status;
59+
}
60+
}
61+
5662
void ScheduleChannelRefresh(
5763
std::shared_ptr<internal::CompletionQueueImpl> const& cq_impl,
5864
std::shared_ptr<ConnectionRefreshState> const& state,
59-
std::shared_ptr<grpc::Channel> const& channel) {
65+
std::shared_ptr<grpc::Channel> const& channel,
66+
std::function<void(Status const&)> connection_status_fn) {
67+
if (!connection_status_fn) {
68+
connection_status_fn = LogFailedConnectionRefresh;
69+
}
6070
// The timers will only hold weak pointers to the channel or to the
6171
// completion queue, so if either of them are destroyed, the timer chain
6272
// will simply not continue.
@@ -66,7 +76,9 @@ void ScheduleChannelRefresh(
6676
using TimerFuture = future<StatusOr<std::chrono::system_clock::time_point>>;
6777
auto timer_future =
6878
cq.MakeRelativeTimer(state->RandomizedRefreshDelay())
69-
.then([weak_channel, weak_cq_impl, state](TimerFuture fut) {
79+
.then([weak_channel, weak_cq_impl, state,
80+
connection_status_fn =
81+
std::move(connection_status_fn)](TimerFuture fut) {
7082
if (!fut.get()) {
7183
// Timer cancelled.
7284
return;
@@ -79,17 +91,18 @@ void ScheduleChannelRefresh(
7991
cq.AsyncWaitConnectionReady(
8092
channel,
8193
std::chrono::system_clock::now() + kConnectionReadyTimeout)
82-
.then([weak_channel, weak_cq_impl, state](future<Status> fut) {
94+
.then([weak_channel, weak_cq_impl, state,
95+
connection_status_fn = std::move(connection_status_fn)](
96+
future<Status> fut) {
8397
auto conn_status = fut.get();
84-
if (!conn_status.ok()) {
85-
GCP_LOG(WARNING) << "Failed to refresh connection. Error: "
86-
<< conn_status;
87-
}
98+
if (connection_status_fn) connection_status_fn(conn_status);
99+
88100
auto channel = weak_channel.lock();
89101
if (!channel) return;
90102
auto cq_impl = weak_cq_impl.lock();
91103
if (!cq_impl) return;
92-
ScheduleChannelRefresh(cq_impl, state, channel);
104+
ScheduleChannelRefresh(cq_impl, state, channel,
105+
std::move(connection_status_fn));
93106
});
94107
});
95108
state->timers().RegisterTimer(std::move(timer_future));

google/cloud/bigtable/internal/connection_refresh_state.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ class ConnectionRefreshState {
8686
void ScheduleChannelRefresh(
8787
std::shared_ptr<internal::CompletionQueueImpl> const& cq,
8888
std::shared_ptr<ConnectionRefreshState> const& state,
89-
std::shared_ptr<grpc::Channel> const& channel);
89+
std::shared_ptr<grpc::Channel> const& channel,
90+
std::function<void(Status const&)> connection_status_fn = {});
9091

9192
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
9293
} // namespace bigtable_internal

0 commit comments

Comments
 (0)