Skip to content

Commit 984599c

Browse files
committed
working integration tests with new stub and pool
1 parent e7a2773 commit 984599c

File tree

9 files changed

+63
-12
lines changed

9 files changed

+63
-12
lines changed

google/cloud/bigtable/internal/bigtable_random_two_least_used_decorator.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ std::unique_ptr<google::cloud::internal::StreamingReadRpc<
8989
BigtableRandomTwoLeastUsed::ReadRows(
9090
std::shared_ptr<grpc::ClientContext> context, Options const& options,
9191
google::bigtable::v2::ReadRowsRequest const& request) {
92+
std::cout << __PRETTY_FUNCTION__ << std::endl;
9293
auto child = Child();
9394
auto stub = child->AcquireStub();
9495
auto result = stub->ReadRows(std::move(context), options, request);
@@ -136,6 +137,7 @@ std::unique_ptr<google::cloud::internal::StreamingReadRpc<
136137
BigtableRandomTwoLeastUsed::MutateRows(
137138
std::shared_ptr<grpc::ClientContext> context, Options const& options,
138139
google::bigtable::v2::MutateRowsRequest const& request) {
140+
std::cout << __PRETTY_FUNCTION__ << std::endl;
139141
auto child = Child();
140142
auto stub = child->AcquireStub();
141143
auto result = stub->MutateRows(std::move(context), options, request);
@@ -334,6 +336,7 @@ BigtableRandomTwoLeastUsed::AsyncPrepareQuery(
334336

335337
std::shared_ptr<internal::StubWrapper<BigtableStub>>
336338
BigtableRandomTwoLeastUsed::Child() {
339+
std::cout << __PRETTY_FUNCTION__ << std::endl;
337340
return pool_->GetChannelRandomTwoLeastUsed();
338341
// std::unique_lock<std::mutex> lk(mu_);
339342
// std::vector<std::size_t> indices(pool_->size(lk) - 1);

google/cloud/bigtable/internal/bigtable_stub.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ std::unique_ptr<google::cloud::internal::StreamingReadRpc<
3636
DefaultBigtableStub::ReadRows(
3737
std::shared_ptr<grpc::ClientContext> context, Options const&,
3838
google::bigtable::v2::ReadRowsRequest const& request) {
39+
std::cout << __PRETTY_FUNCTION__ << std::endl;
3940
auto stream = grpc_stub_->ReadRows(context.get(), request);
4041
return std::make_unique<google::cloud::internal::StreamingReadRpcImpl<
4142
google::bigtable::v2::ReadRowsResponse>>(std::move(context),
@@ -70,6 +71,7 @@ std::unique_ptr<google::cloud::internal::StreamingReadRpc<
7071
DefaultBigtableStub::MutateRows(
7172
std::shared_ptr<grpc::ClientContext> context, Options const&,
7273
google::bigtable::v2::MutateRowsRequest const& request) {
74+
std::cout << __PRETTY_FUNCTION__ << std::endl;
7375
auto stream = grpc_stub_->MutateRows(context.get(), request);
7476
return std::make_unique<google::cloud::internal::StreamingReadRpcImpl<
7577
google::bigtable::v2::MutateRowsResponse>>(std::move(context),

google/cloud/bigtable/internal/bigtable_stub_factory.cc

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "google/cloud/bigtable/internal/bigtable_channel_refresh.h"
1818
#include "google/cloud/bigtable/internal/bigtable_logging_decorator.h"
1919
#include "google/cloud/bigtable/internal/bigtable_metadata_decorator.h"
20+
#include "google/cloud/bigtable/internal/bigtable_random_two_least_used_decorator.h"
2021
#include "google/cloud/bigtable/internal/bigtable_round_robin_decorator.h"
2122
#include "google/cloud/bigtable/internal/bigtable_tracing_stub.h"
2223
#include "google/cloud/bigtable/internal/connection_refresh_state.h"
@@ -73,6 +74,18 @@ std::shared_ptr<BigtableStub> CreateBigtableStubRoundRobin(
7374
return std::make_shared<BigtableRoundRobin>(std::move(children));
7475
}
7576

77+
std::shared_ptr<BigtableStub> CreateBigtableStubRandomTwoLeastUsed(
78+
Options const& options, CompletionQueue cq,
79+
std::function<std::shared_ptr<BigtableStub>(int)> child_factory) {
80+
std::vector<std::shared_ptr<BigtableStub>> children(
81+
(std::max)(1, options.get<GrpcNumChannelsOption>()));
82+
int id = 0;
83+
std::generate(children.begin(), children.end(),
84+
[&id, &child_factory] { return child_factory(id++); });
85+
return std::make_shared<BigtableRandomTwoLeastUsed>(cq, child_factory,
86+
std::move(children));
87+
}
88+
7689
std::shared_ptr<BigtableStub> CreateDecoratedStubs(
7790
std::shared_ptr<internal::GrpcAuthenticationStrategy> auth,
7891
CompletionQueue const& cq, Options const& options,
@@ -87,7 +100,10 @@ std::shared_ptr<BigtableStub> CreateDecoratedStubs(
87100
if (refresh->enabled()) ScheduleChannelRefresh(cq_impl, refresh, channel);
88101
return base_factory(std::move(channel));
89102
};
90-
auto stub = CreateBigtableStubRoundRobin(options, std::move(child_factory));
103+
// auto stub = CreateBigtableStubRoundRobin(options,
104+
// std::move(child_factory));
105+
auto stub = CreateBigtableStubRandomTwoLeastUsed(options, cq,
106+
std::move(child_factory));
91107
if (refresh->enabled()) {
92108
stub = std::make_shared<BigtableChannelRefresh>(std::move(stub),
93109
std::move(refresh));

google/cloud/bigtable/internal/bulk_mutator.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ grpc::Status BulkMutator::MakeOneRequest(bigtable::DataClient& client,
214214
Status BulkMutator::MakeOneRequest(BigtableStub& stub,
215215
MutateRowsLimiter& limiter,
216216
Options const& options) {
217+
std::cout << __PRETTY_FUNCTION__ << std::endl;
217218
// Send the request to the server.
218219
auto const& mutations = state_.BeforeStart();
219220

@@ -226,8 +227,10 @@ Status BulkMutator::MakeOneRequest(BigtableStub& stub,
226227
// Potentially throttle the request
227228
limiter.Acquire();
228229

230+
std::cout << __PRETTY_FUNCTION__ << ": pre-stub" << std::endl;
229231
// Read the stream of responses.
230232
auto stream = stub.MutateRows(client_context, options, mutations);
233+
std::cout << __PRETTY_FUNCTION__ << ": post-stub" << std::endl;
231234
absl::optional<Status> status;
232235
while (true) {
233236
btproto::MutateRowsResponse response;

google/cloud/bigtable/internal/data_connection_impl.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ bigtable::RowReader ReadRowsHelper(
102102
params, // NOLINT(performance-unnecessary-value-param)
103103
std::shared_ptr<OperationContext>
104104
operation_context) { // NOLINT(performance-unnecessary-value-param)
105+
std::cout << __PRETTY_FUNCTION__ << std::endl;
105106
auto impl = std::make_shared<DefaultRowReader>(
106107
stub, std::move(params.app_profile_id), std::move(params.table_name),
107108
std::move(params.row_set), params.rows_limit, std::move(params.filter),
@@ -338,6 +339,7 @@ future<Status> DataConnectionImpl::AsyncApply(std::string const& table_name,
338339

339340
std::vector<bigtable::FailedMutation> DataConnectionImpl::BulkApply(
340341
std::string const& table_name, bigtable::BulkMutation mut) {
342+
std::cout << __PRETTY_FUNCTION__ << std::endl;
341343
auto current = google::cloud::internal::SaveCurrentOptions();
342344
if (mut.empty()) return {};
343345
auto operation_context = operation_context_factory_->MutateRows(
@@ -350,6 +352,7 @@ std::vector<bigtable::FailedMutation> DataConnectionImpl::BulkApply(
350352
std::unique_ptr<bigtable::DataRetryPolicy> retry;
351353
std::unique_ptr<BackoffPolicy> backoff;
352354
Status status;
355+
std::cout << __PRETTY_FUNCTION__ << ": pre-loop" << std::endl;
353356
while (true) {
354357
status = mutator.MakeOneRequest(*stub_, *limiter_, *current);
355358
if (!mutator.HasPendingMutations()) break;
@@ -361,6 +364,7 @@ std::vector<bigtable::FailedMutation> DataConnectionImpl::BulkApply(
361364
if (!delay) break;
362365
std::this_thread::sleep_for(*delay);
363366
}
367+
std::cout << __PRETTY_FUNCTION__ << ": post-loop" << std::endl;
364368
operation_context->OnDone(status);
365369
return std::move(mutator).OnRetryDone();
366370
}
@@ -380,6 +384,7 @@ DataConnectionImpl::AsyncBulkApply(std::string const& table_name,
380384

381385
bigtable::RowReader DataConnectionImpl::ReadRowsFull(
382386
bigtable::ReadRowsParams params) {
387+
std::cout << __PRETTY_FUNCTION__ << std::endl;
383388
auto current = google::cloud::internal::SaveCurrentOptions();
384389
auto operation_context = operation_context_factory_->ReadRows(
385390
params.table_name, params.app_profile_id);
@@ -660,6 +665,7 @@ void DataConnectionImpl::AsyncReadRows(
660665
std::function<future<bool>(bigtable::Row)> on_row,
661666
std::function<void(Status)> on_finish, bigtable::RowSet row_set,
662667
std::int64_t rows_limit, bigtable::Filter filter) {
668+
std::cout << __PRETTY_FUNCTION__ << std::endl;
663669
auto current = google::cloud::internal::SaveCurrentOptions();
664670
auto operation_context = operation_context_factory_->ReadRows(
665671
table_name, app_profile_id(*current));

google/cloud/bigtable/table.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@ RowReader Table::ReadRows(RowSet row_set, Filter filter, Options opts) {
222222

223223
RowReader Table::ReadRows(RowSet row_set, std::int64_t rows_limit,
224224
Filter filter, Options opts) {
225+
std::cout << __PRETTY_FUNCTION__ << std::endl;
225226
if (connection_) {
226227
OptionsSpan span(MergeOptions(std::move(opts), options_));
227228
return connection_->ReadRows(table_name_, std::move(row_set), rows_limit,

google/cloud/bigtable/testing/table_integration_test.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ void TableAdminTestEnvironment::TearDown() {
122122
}
123123

124124
void TableIntegrationTest::SetUp() {
125+
std::cout << __PRETTY_FUNCTION__ << std::endl;
125126
data_connection_ = MakeDataConnection();
126127
data_client_ = bigtable::MakeDataClient(TableTestEnvironment::project_id(),
127128
TableTestEnvironment::instance_id());

google/cloud/bigtable/tests/data_integration_test.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ TEST_P(DataIntegrationTest, TableReadRowNotExistTest) {
206206
}
207207

208208
TEST_P(DataIntegrationTest, TableReadRowsAllRows) {
209+
std::cout << __PRETTY_FUNCTION__ << std::endl;
209210
auto table = GetTable(GetParam());
210211
std::string const row_key1 = "row-key-1";
211212
std::string const row_key2 = "row-key-2";

google/cloud/internal/channel_pool.h

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -74,27 +74,30 @@ template <typename T>
7474
class DynamicChannelPool
7575
: public std::enable_shared_from_this<DynamicChannelPool<T>> {
7676
public:
77-
using StubFactoryFn = std::function<std::shared_ptr<T>()>;
77+
using StubFactoryFn = std::function<std::shared_ptr<T>(int id)>;
7878
struct SizingPolicy {
7979
// To avoid channel churn, the pool will not add or remove channels more
8080
// frequently that this period.
81-
std::chrono::milliseconds resize_cooldown_interval;
81+
std::chrono::milliseconds resize_cooldown_interval =
82+
std::chrono::milliseconds(60 * 1000);
8283

8384
// If the average number of outstanding RPCs is below this threshold,
8485
// the pool size will be decreased.
85-
std::size_t minimum_average_outstanding_rpcs_per_channel;
86+
std::size_t minimum_average_outstanding_rpcs_per_channel = 20;
8687
// If the average number of outstanding RPCs is above this threshold,
8788
// the pool size will be increased.
88-
std::size_t maximum_average_outstanding_rpcs_per_channel;
89+
std::size_t maximum_average_outstanding_rpcs_per_channel = 80;
8990

9091
// When channels are removed from the pool, we have to wait until all
9192
// outstanding RPCs on that channel are completed before destroying it.
92-
std::chrono::milliseconds decommissioned_channel_polling_interval;
93+
std::chrono::milliseconds decommissioned_channel_polling_interval =
94+
std::chrono::milliseconds(30 * 1000);
9395
};
9496

9597
static std::shared_ptr<DynamicChannelPool> Create(
9698
CompletionQueue cq, std::size_t initial_size, StubFactoryFn factory_fn,
9799
SizingPolicy sizing_policy = {}) {
100+
std::cout << __PRETTY_FUNCTION__ << std::endl;
98101
std::vector<std::shared_ptr<StubWrapper<T>>> initial_wrapped_channels;
99102
for (std::size_t i = 0; i < initial_size; ++i) {
100103
initial_wrapped_channels.emplace_back(factory_fn());
@@ -103,9 +106,11 @@ class DynamicChannelPool
103106
std::move(cq), std::move(initial_wrapped_channels),
104107
std::move(factory_fn), std::move(sizing_policy)));
105108
}
109+
106110
static std::shared_ptr<DynamicChannelPool> Create(
107111
CompletionQueue cq, std::vector<std::shared_ptr<T>> initial_channels,
108112
StubFactoryFn factory_fn, SizingPolicy sizing_policy = {}) {
113+
std::cout << __PRETTY_FUNCTION__ << std::endl;
109114
auto pool = std::shared_ptr<DynamicChannelPool>(new DynamicChannelPool(
110115
std::move(cq), std::move(initial_channels), std::move(factory_fn),
111116
std::move(sizing_policy)));
@@ -132,16 +137,20 @@ class DynamicChannelPool
132137
// }
133138

134139
std::shared_ptr<StubWrapper<T>> GetChannelRandomTwoLeastUsed() {
140+
std::cout << __PRETTY_FUNCTION__ << std::endl;
135141
std::unique_lock<std::mutex> lk(mu_);
136142

143+
std::cout << __PRETTY_FUNCTION__ << ": channels_size()=" << channels_.size()
144+
<< std::endl;
137145
// TODO: check if resize is needed.
138146

139147
std::vector<std::size_t> indices(channels_.size());
140148
// TODO(sdhart): Maybe use iota on iterators instead of indices
141149
std::iota(indices.begin(), indices.end(), 0);
142150
std::shuffle(indices.begin(), indices.end(), rng_);
143-
auto channel_1 = channels_[indices[0]];
144-
auto channel_2 = channels_[indices[1]];
151+
152+
std::shared_ptr<StubWrapper<T>> channel_1 = channels_[indices[0]];
153+
std::shared_ptr<StubWrapper<T>> channel_2 = channels_[indices[1]];
145154

146155
return channel_1->outstanding_rpcs(lk) < channel_2->outstanding_rpcs(lk)
147156
? channel_1
@@ -156,18 +165,26 @@ class DynamicChannelPool
156165
: cq_(std::move(cq)),
157166
factory_fn_(std::move(factory_fn)),
158167
channels_(std::move(initial_wrapped_channels)),
159-
sizing_policy_(std::move(sizing_policy)) {}
168+
sizing_policy_(std::move(sizing_policy)),
169+
next_channel_id_(channels_.size()) {}
160170

161171
DynamicChannelPool(CompletionQueue cq,
162172
std::vector<std::shared_ptr<T>> initial_channels,
163173
StubFactoryFn factory_fn, SizingPolicy sizing_policy)
164174
: cq_(std::move(cq)),
165175
factory_fn_(std::move(factory_fn)),
166-
channels_(initial_channels.size()),
167-
sizing_policy_(std::move(sizing_policy)) {
176+
channels_(),
177+
sizing_policy_(std::move(sizing_policy)),
178+
next_channel_id_(initial_channels.size()) {
179+
std::cout << __PRETTY_FUNCTION__ << ": wrap initial_channels" << std::endl;
180+
channels_.reserve(initial_channels.size());
168181
for (auto& channel : initial_channels) {
169-
channels_.push_back(std::make_shared<StubWrapper<T>>(channel));
182+
channels_.push_back(std::make_shared<StubWrapper<T>>(std::move(channel)));
170183
}
184+
// for (auto i = 0; i < channels_.size(); ++i) {
185+
// std::cout << __PRETTY_FUNCTION__ << ": channels_[" << i <<
186+
// "].get()=" << channels_[i].get() << std::endl;
187+
// }
171188
}
172189

173190
void ScheduleAddChannel() {}
@@ -234,6 +251,7 @@ class DynamicChannelPool
234251
SizingPolicy sizing_policy_;
235252
std::vector<std::shared_ptr<StubWrapper<T>>> decommissioned_channels_;
236253
future<StatusOr<std::chrono::system_clock::time_point>> decommission_timer_;
254+
int next_channel_id_;
237255
};
238256

239257
} // namespace internal

0 commit comments

Comments
 (0)