Skip to content

Commit 81631aa

Browse files
committed
connection refresh callback sets status
1 parent c2d51e4 commit 81631aa

File tree

8 files changed

+78
-99
lines changed

8 files changed

+78
-99
lines changed

ci/cloudbuild/builds/integration-production.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ excluded_rules=(
3939
"-//google/cloud/storagecontrol:v2_samples_storage_control_anywhere_cache_samples"
4040
)
4141

42+
# --test_filter="*ReadRowsAllRows*" --test_timeout=30 \
4243
io::log_h2 "Running the integration tests against prod"
4344
mapfile -t integration_args < <(integration::bazel_args)
4445
io::run bazel test "${args[@]}" "${integration_args[@]}" --test_output=all \
45-
--test_filter="*ReadRowsAllRows*" --test_timeout=30 \
4646
//google/cloud/bigtable/tests:data_integration_test

google/cloud/bigtable/internal/bigtable_random_two_least_used_decorator.cc

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -73,15 +73,6 @@ class AsyncStreamingReadRpcTracking
7373

7474
} // namespace
7575

76-
BigtableRandomTwoLeastUsed::BigtableRandomTwoLeastUsed(
77-
CompletionQueue cq, std::shared_ptr<ConnectionRefreshState> refresh_state,
78-
DynamicChannelPool<BigtableStub>::StubFactoryFn
79-
refreshing_channel_stub_factory_fn,
80-
std::vector<std::shared_ptr<BigtableStub>> children)
81-
: pool_(DynamicChannelPool<BigtableStub>::Create(
82-
std::move(cq), std::move(children), std::move(refresh_state),
83-
std::move(refreshing_channel_stub_factory_fn))) {}
84-
8576
std::unique_ptr<google::cloud::internal::StreamingReadRpc<
8677
google::bigtable::v2::ReadRowsResponse>>
8778
BigtableRandomTwoLeastUsed::ReadRows(

google/cloud/bigtable/internal/bigtable_random_two_least_used_decorator.h

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,10 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
2929

3030
class BigtableRandomTwoLeastUsed : public BigtableStub {
3131
public:
32-
BigtableRandomTwoLeastUsed(
33-
CompletionQueue cq, std::shared_ptr<ConnectionRefreshState> refresh_state,
34-
DynamicChannelPool<BigtableStub>::StubFactoryFn
35-
refreshing_channel_stub_factory_fn,
36-
std::vector<std::shared_ptr<BigtableStub>> children);
32+
explicit BigtableRandomTwoLeastUsed(
33+
std::shared_ptr<DynamicChannelPool<BigtableStub>> pool)
34+
: pool_(std::move(pool)) {}
35+
3736
~BigtableRandomTwoLeastUsed() override = default;
3837

3938
std::unique_ptr<google::cloud::internal::StreamingReadRpc<

google/cloud/bigtable/internal/bigtable_stub_factory.cc

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -77,23 +77,47 @@ std::shared_ptr<BigtableStub> CreateBigtableStubRoundRobin(
7777
}
7878

7979
std::shared_ptr<BigtableStub> CreateBigtableStubRandomTwoLeastUsed(
80-
CompletionQueue cq, Options const& options,
81-
std::function<std::shared_ptr<BigtableStub>(int)>
82-
refreshing_channel_stub_factory,
80+
std::shared_ptr<internal::GrpcAuthenticationStrategy> auth,
81+
std::shared_ptr<internal::CompletionQueueImpl> cq_impl,
82+
Options const& options, BaseBigtableStubFactory stub_factory,
8383
std::shared_ptr<ConnectionRefreshState> refresh_state) {
8484
std::cout << __PRETTY_FUNCTION__ << std::endl;
85-
std::vector<std::shared_ptr<BigtableStub>> children(
86-
(std::max)(1, options.get<GrpcNumChannelsOption>()));
87-
int id = 0;
85+
86+
auto refreshing_channel_stub_factory =
87+
[stub_factory = std::move(stub_factory), cq_impl, refresh_state,
88+
auth = std::move(auth), options](std::uint32_t id)
89+
-> std::shared_ptr<ChannelUsageWrapper<BigtableStub>> {
90+
auto wrapper = std::make_shared<ChannelUsageWrapper<BigtableStub>>();
91+
auto connection_status_fn = [weak = wrapper->MakeWeak()](Status const& s) {
92+
if (auto self = weak.lock()) {
93+
self->SetLastRefreshStatus(s);
94+
}
95+
if (!s.ok()) {
96+
GCP_LOG(WARNING) << "Failed to refresh connection. Error: " << s;
97+
}
98+
};
99+
auto channel = CreateGrpcChannel(*auth, options, id);
100+
ScheduleChannelRefresh(cq_impl, refresh_state, channel,
101+
std::move(connection_status_fn));
102+
wrapper->set_channel(stub_factory(std::move(channel)));
103+
return wrapper;
104+
};
105+
106+
std::vector<std::shared_ptr<ChannelUsageWrapper<BigtableStub>>> children(
107+
std::max(1, options.get<GrpcNumChannelsOption>()));
108+
std::uint32_t id = 0;
88109
std::generate(children.begin(), children.end(),
89110
[&id, &refreshing_channel_stub_factory] {
90111
return refreshing_channel_stub_factory(id++);
91112
});
92-
std::cout << __PRETTY_FUNCTION__ << ": children.size()=" << children.size()
93-
<< std::endl;
113+
94114
return std::make_shared<BigtableRandomTwoLeastUsed>(
95-
std::move(cq), std::move(refresh_state), refreshing_channel_stub_factory,
96-
std::move(children));
115+
DynamicChannelPool<BigtableStub>::Create(
116+
CompletionQueue(std::move(cq_impl)), std::move(children),
117+
std::move(refresh_state),
118+
std::move(refreshing_channel_stub_factory)));
119+
// CompletionQueue(cq_impl), std::move(refresh_state),
120+
// std::move(refreshing_channel_stub_factory), std::move(children));
97121
}
98122

99123
std::shared_ptr<BigtableStub> CreateDecoratedStubs(
@@ -109,14 +133,9 @@ std::shared_ptr<BigtableStub> CreateDecoratedStubs(
109133
if (options.has<bigtable::experimental::ChannelPoolTypeOption>() &&
110134
options.get<bigtable::experimental::ChannelPoolTypeOption>() ==
111135
bigtable::experimental::ChannelPoolType::kDynamic) {
112-
auto refreshing_channel_stub_factory = [stub_factory, cq_impl, refresh,
113-
&auth, options](int id) {
114-
auto channel = CreateGrpcChannel(*auth, options, id);
115-
ScheduleChannelRefresh(cq_impl, refresh, channel);
116-
return stub_factory(std::move(channel));
117-
};
118136
stub = CreateBigtableStubRandomTwoLeastUsed(
119-
cq, options, std::move(refreshing_channel_stub_factory),
137+
auth, std::move(cq_impl), options, stub_factory,
138+
// std::move(refreshing_channel_stub_factory),
120139
std::move(refresh));
121140
} else {
122141
auto refreshing_channel_stub_factory = [stub_factory, cq_impl, refresh,

google/cloud/bigtable/internal/bigtable_stub_factory.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,11 @@ std::shared_ptr<BigtableStub> CreateBigtableStubRoundRobin(
4343
refreshing_channel_stub_factory);
4444

4545
std::shared_ptr<BigtableStub> CreateBigtableStubRandomTwoLeastUsed(
46-
CompletionQueue cq, Options const& options,
47-
std::function<std::shared_ptr<BigtableStub>(int)>
48-
refreshing_channel_stub_factory,
46+
std::shared_ptr<internal::GrpcAuthenticationStrategy> auth,
47+
std::shared_ptr<internal::CompletionQueueImpl> cq_impl,
48+
Options const& options, BaseBigtableStubFactory stub_factory,
49+
// std::function<std::shared_ptr<BigtableStub>(int)>
50+
// refreshing_channel_stub_factory,
4951
std::shared_ptr<ConnectionRefreshState> refresh_state);
5052

5153
/// Used in testing to create decorated mocks.

google/cloud/bigtable/internal/connection_refresh_state.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,7 @@ void ScheduleChannelRefresh(
9595
connection_status_fn = std::move(connection_status_fn)](
9696
future<Status> fut) {
9797
auto conn_status = fut.get();
98-
if (connection_status_fn) connection_status_fn(conn_status);
99-
98+
connection_status_fn(conn_status);
10099
auto channel = weak_channel.lock();
101100
if (!channel) return;
102101
auto cq_impl = weak_cq_impl.lock();

google/cloud/bigtable/internal/dynamic_channel_pool.h

Lines changed: 26 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ template <typename T>
3434
class ChannelUsageWrapper
3535
: public std::enable_shared_from_this<ChannelUsageWrapper<T>> {
3636
public:
37+
ChannelUsageWrapper() = default;
3738
explicit ChannelUsageWrapper(std::shared_ptr<T> stub)
3839
: stub_(std::move(stub)) {}
3940

@@ -55,6 +56,11 @@ class ChannelUsageWrapper
5556
last_refresh_status_ = std::move(s);
5657
}
5758

59+
ChannelUsageWrapper& set_channel(std::shared_ptr<T> channel) {
60+
stub_ = std::move(channel);
61+
return *this;
62+
}
63+
5864
std::weak_ptr<ChannelUsageWrapper<T>> MakeWeak() {
5965
return this->shared_from_this();
6066
}
@@ -81,7 +87,8 @@ template <typename T>
8187
class DynamicChannelPool
8288
: public std::enable_shared_from_this<DynamicChannelPool<T>> {
8389
public:
84-
using StubFactoryFn = std::function<std::shared_ptr<T>(int id)>;
90+
using StubFactoryFn =
91+
std::function<std::shared_ptr<ChannelUsageWrapper<T>>(std::uint32_t id)>;
8592
struct SizingPolicy {
8693
// To avoid channel churn, the pool will not add or remove channels more
8794
// frequently that this period.
@@ -117,26 +124,8 @@ class DynamicChannelPool
117124
};
118125

119126
static std::shared_ptr<DynamicChannelPool> Create(
120-
CompletionQueue cq, std::size_t initial_size,
121-
StubFactoryFn stub_factory_fn,
122-
std::shared_ptr<ConnectionRefreshState> refresh_state,
123-
SizingPolicy sizing_policy = {}) {
124-
std::cout << __PRETTY_FUNCTION__ << ": enter" << std::endl;
125-
std::vector<std::shared_ptr<ChannelUsageWrapper<T>>>
126-
initial_wrapped_channels;
127-
for (std::size_t i = 0; i < initial_size; ++i) {
128-
initial_wrapped_channels.emplace_back(stub_factory_fn());
129-
}
130-
auto pool = std::shared_ptr<DynamicChannelPool>(new DynamicChannelPool(
131-
std::move(cq), std::move(initial_wrapped_channels),
132-
std::move(refresh_state), std::move(stub_factory_fn),
133-
std::move(sizing_policy)));
134-
std::cout << __PRETTY_FUNCTION__ << ": return pool" << std::endl;
135-
return pool;
136-
}
137-
138-
static std::shared_ptr<DynamicChannelPool> Create(
139-
CompletionQueue cq, std::vector<std::shared_ptr<T>> initial_channels,
127+
CompletionQueue cq,
128+
std::vector<std::shared_ptr<ChannelUsageWrapper<T>>> initial_channels,
140129
std::shared_ptr<ConnectionRefreshState> refresh_state,
141130
StubFactoryFn stub_factory_fn, SizingPolicy sizing_policy = {}) {
142131
std::cout << __PRETTY_FUNCTION__ << ": enter" << std::endl;
@@ -193,26 +182,22 @@ class DynamicChannelPool
193182
std::iota(iterators.begin(), iterators.end(), channels_.begin());
194183
std::shuffle(iterators.begin(), iterators.end(), rng_);
195184

196-
// std::cout << __PRETTY_FUNCTION__ << ": shuffled iterators" <<
197-
// std::endl;
198-
// std::vector<
199-
// typename
200-
// std::vector<std::shared_ptr<ChannelUsageWrapper<T>>>::iterator>
201-
// iterators;
202-
typename std::vector<typename std::vector<
203-
std::shared_ptr<ChannelUsageWrapper<T>>>::iterator>::iterator
204-
shuffle_iter = iterators.begin();
205-
typename std::vector<std::shared_ptr<ChannelUsageWrapper<T>>>::iterator
206-
channel_1 = *shuffle_iter;
185+
// typename std::vector<typename std::vector<
186+
// std::shared_ptr<ChannelUsageWrapper<T>>>::iterator>::iterator
187+
auto shuffle_iter = iterators.begin();
188+
// typename
189+
// std::vector<std::shared_ptr<ChannelUsageWrapper<T>>>::iterator
190+
auto channel_1 = *shuffle_iter;
207191
std::shared_ptr<ChannelUsageWrapper<T>> c = *channel_1;
208192
// std::cout << __PRETTY_FUNCTION__
209193
// << ": check channel 1=" << c.get() << std::endl;
210194
auto channel_1_rpcs = shuffle_iter != iterators.end()
211195
? (*channel_1)->outstanding_rpcs()
212196
: Status{StatusCode::kNotFound, ""};
213197
++shuffle_iter;
214-
typename std::vector<std::shared_ptr<ChannelUsageWrapper<T>>>::iterator
215-
channel_2 = *shuffle_iter;
198+
// typename
199+
// std::vector<std::shared_ptr<ChannelUsageWrapper<T>>>::iterator
200+
auto channel_2 = *shuffle_iter;
216201
// We want to snapshot these outstanding_rpcs values.
217202
// std::cout << __PRETTY_FUNCTION__
218203
// << ": check channel 2=" << (channel_2)->get() << std::endl;
@@ -264,7 +249,7 @@ class DynamicChannelPool
264249
return *channel_2;
265250
}
266251

267-
// TODO: we have no usable channels in the entire pool; this is bad.
252+
// TODO(sdhart): we have no usable channels in the entire pool; this is bad.
268253
std::cout << __PRETTY_FUNCTION__ << ": NO USABLE CHANNELS" << std::endl;
269254
return nullptr;
270255
}
@@ -280,26 +265,7 @@ class DynamicChannelPool
280265
stub_factory_fn_(std::move(stub_factory_fn)),
281266
channels_(std::move(initial_wrapped_channels)),
282267
sizing_policy_(std::move(sizing_policy)),
283-
next_channel_id_(channels_.size()) {
284-
sizing_policy_.minimum_channel_pool_size = channels_.size();
285-
}
286-
287-
DynamicChannelPool(CompletionQueue cq,
288-
std::vector<std::shared_ptr<T>> initial_channels,
289-
std::shared_ptr<ConnectionRefreshState> refresh_state,
290-
StubFactoryFn stub_factory_fn, SizingPolicy sizing_policy)
291-
: cq_(std::move(cq)),
292-
refresh_state_(std::move(refresh_state)),
293-
stub_factory_fn_(std::move(stub_factory_fn)),
294-
channels_(),
295-
sizing_policy_(std::move(sizing_policy)),
296-
next_channel_id_(static_cast<int>(initial_channels.size())) {
297-
std::cout << __PRETTY_FUNCTION__ << ": wrap initial_channels" << std::endl;
298-
channels_.reserve(initial_channels.size());
299-
for (auto& channel : initial_channels) {
300-
channels_.push_back(
301-
std::make_shared<ChannelUsageWrapper<T>>(std::move(channel)));
302-
}
268+
next_channel_id_(static_cast<std::uint32_t>(channels_.size())) {
303269
sizing_policy_.minimum_channel_pool_size = channels_.size();
304270
}
305271

@@ -348,8 +314,8 @@ class DynamicChannelPool
348314
std::vector<std::shared_ptr<ChannelUsageWrapper<T>>> new_stubs;
349315
new_stubs.reserve(new_channel_ids.size());
350316
for (auto const& id : new_channel_ids) {
351-
new_stubs.push_back(
352-
std::make_shared<ChannelUsageWrapper<T>>(stub_factory_fn_(id)));
317+
new_stubs.push_back(stub_factory_fn_(id));
318+
// std::make_shared<ChannelUsageWrapper<T>>(stub_factory_fn_(id)));
353319
}
354320
std::unique_lock<std::mutex> lk(mu_);
355321
channels_.insert(channels_.end(),
@@ -391,7 +357,8 @@ class DynamicChannelPool
391357
}
392358
draining_channels_.pop_back();
393359
}
394-
// TODO: If iterators becomes a member variable perhaps add logic to call
360+
// TODO(sdhart): If iterators becomes a member variable perhaps add logic to
361+
// call
395362
// shrink_to_fit on iterators_ if there's a large
396363
// difference between iterators_.capacity and channels_.size
397364
}
@@ -446,7 +413,7 @@ class DynamicChannelPool
446413
future<void> remove_channel_poll_timer_;
447414
absl::optional<future<StatusOr<std::chrono::system_clock::time_point>>>
448415
pool_resize_cooldown_timer_ = absl::nullopt;
449-
int next_channel_id_;
416+
std::uint32_t next_channel_id_;
450417
// std::vector<
451418
// typename
452419
// std::vector<std::shared_ptr<ChannelUsageWrapper<T>>>::iterator>

google/cloud/bigtable/internal/dynamic_channel_pool_test.cc

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,15 @@ TEST(DynamicChannelPoolTest, GetChannelRandomTwoLeastUsed) {
3434
fake_cq_impl, std::chrono::milliseconds(1),
3535
std::chrono::milliseconds(10));
3636

37-
auto stub_factory_fn = [](int) -> std::shared_ptr<BigtableStub> {
38-
return std::make_shared<MockBigtableStub>();
37+
auto stub_factory_fn =
38+
[](int) -> std::shared_ptr<ChannelUsageWrapper<BigtableStub>> {
39+
auto mock = std::make_shared<MockBigtableStub>();
40+
return std::make_shared<ChannelUsageWrapper<BigtableStub>>(mock);
3941
};
4042

4143
DynamicChannelPool<BigtableStub>::SizingPolicy sizing_policy;
4244

43-
std::vector<std::shared_ptr<BigtableStub>> channels(10);
45+
std::vector<std::shared_ptr<ChannelUsageWrapper<BigtableStub>>> channels(10);
4446
int id = 0;
4547
std::generate(channels.begin(), channels.end(),
4648
[&]() { return stub_factory_fn(id++); });

0 commit comments

Comments
 (0)