Skip to content

Commit a4503f6

Browse files
committed
make refresh_state a member of channel pool
1 parent 434a1a8 commit a4503f6

File tree

5 files changed

+42
-21
lines changed

5 files changed

+42
-21
lines changed

google/cloud/bigtable/internal/bigtable_random_two_least_used_decorator.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,12 @@ class AsyncStreamingReadRpcTracking
7474
} // namespace
7575

7676
BigtableRandomTwoLeastUsed::BigtableRandomTwoLeastUsed(
77-
CompletionQueue cq,
77+
CompletionQueue cq, std::shared_ptr<ConnectionRefreshState> refresh_state,
7878
DynamicChannelPool<BigtableStub>::StubFactoryFn
7979
refreshing_channel_stub_factory_fn,
8080
std::vector<std::shared_ptr<BigtableStub>> children)
8181
: pool_(DynamicChannelPool<BigtableStub>::Create(
82-
std::move(cq), std::move(children),
82+
std::move(cq), std::move(children), std::move(refresh_state),
8383
std::move(refreshing_channel_stub_factory_fn))) {}
8484

8585
std::unique_ptr<google::cloud::internal::StreamingReadRpc<

google/cloud/bigtable/internal/bigtable_random_two_least_used_decorator.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
3030
class BigtableRandomTwoLeastUsed : public BigtableStub {
3131
public:
3232
BigtableRandomTwoLeastUsed(
33-
CompletionQueue cq,
33+
CompletionQueue cq, std::shared_ptr<ConnectionRefreshState> refresh_state,
3434
DynamicChannelPool<BigtableStub>::StubFactoryFn
3535
refreshing_channel_stub_factory_fn,
3636
std::vector<std::shared_ptr<BigtableStub>> children);

google/cloud/bigtable/internal/bigtable_stub_factory.cc

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ std::shared_ptr<BigtableStub> CreateBigtableStubRoundRobin(
7979
std::shared_ptr<BigtableStub> CreateBigtableStubRandomTwoLeastUsed(
8080
CompletionQueue cq, Options const& options,
8181
std::function<std::shared_ptr<BigtableStub>(int)>
82-
refreshing_channel_stub_factory) {
82+
refreshing_channel_stub_factory,
83+
std::shared_ptr<ConnectionRefreshState> refresh_state) {
8384
std::vector<std::shared_ptr<BigtableStub>> children(
8485
(std::max)(1, options.get<GrpcNumChannelsOption>()));
8586
int id = 0;
@@ -88,7 +89,8 @@ std::shared_ptr<BigtableStub> CreateBigtableStubRandomTwoLeastUsed(
8889
return refreshing_channel_stub_factory(id++);
8990
});
9091
return std::make_shared<BigtableRandomTwoLeastUsed>(
91-
std::move(cq), refreshing_channel_stub_factory, std::move(children));
92+
std::move(cq), std::move(refresh_state), refreshing_channel_stub_factory,
93+
std::move(children));
9294
}
9395

9496
std::shared_ptr<BigtableStub> CreateDecoratedStubs(
@@ -99,28 +101,35 @@ std::shared_ptr<BigtableStub> CreateDecoratedStubs(
99101
auto refresh = std::make_shared<ConnectionRefreshState>(
100102
cq_impl, options.get<bigtable::MinConnectionRefreshOption>(),
101103
options.get<bigtable::MaxConnectionRefreshOption>());
102-
auto refreshing_channel_stub_factory = [stub_factory, cq_impl, refresh, &auth,
103-
options](int id) {
104-
auto channel = CreateGrpcChannel(*auth, options, id);
105-
if (refresh->enabled()) ScheduleChannelRefresh(cq_impl, refresh, channel);
106-
return stub_factory(std::move(channel));
107-
};
108104

109105
std::shared_ptr<BigtableStub> stub;
110106
if (options.has<ChannelSelectionStrategyOption>() &&
111107
options.get<ChannelSelectionStrategyOption>() ==
112108
ChannelSelectionStrategy::kRandomTwoLeastUsed) {
109+
auto refreshing_channel_stub_factory = [stub_factory, cq_impl, refresh,
110+
&auth, options](int id) {
111+
auto channel = CreateGrpcChannel(*auth, options, id);
112+
ScheduleChannelRefresh(cq_impl, refresh, channel);
113+
return stub_factory(std::move(channel));
114+
};
113115
stub = CreateBigtableStubRandomTwoLeastUsed(
114-
cq, options, std::move(refreshing_channel_stub_factory));
116+
cq, options, std::move(refreshing_channel_stub_factory),
117+
std::move(refresh));
115118
} else {
119+
auto refreshing_channel_stub_factory = [stub_factory, cq_impl, refresh,
120+
&auth, options](int id) {
121+
auto channel = CreateGrpcChannel(*auth, options, id);
122+
if (refresh->enabled()) ScheduleChannelRefresh(cq_impl, refresh, channel);
123+
return stub_factory(std::move(channel));
124+
};
116125
stub = CreateBigtableStubRoundRobin(
117126
options, std::move(refreshing_channel_stub_factory));
127+
if (refresh->enabled()) {
128+
stub = std::make_shared<BigtableChannelRefresh>(std::move(stub),
129+
std::move(refresh));
130+
}
118131
}
119132

120-
if (refresh->enabled()) {
121-
stub = std::make_shared<BigtableChannelRefresh>(std::move(stub),
122-
std::move(refresh));
123-
}
124133
if (auth->RequiresConfigureContext()) {
125134
stub = std::make_shared<BigtableAuth>(std::move(auth), std::move(stub));
126135
}

google/cloud/bigtable/internal/bigtable_stub_factory.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_BIGTABLE_STUB_FACTORY_H
1717

1818
#include "google/cloud/bigtable/internal/bigtable_stub.h"
19+
#include "google/cloud/bigtable/internal/connection_refresh_state.h"
1920
#include "google/cloud/completion_queue.h"
2021
#include "google/cloud/internal/unified_grpc_credentials.h"
2122
#include "google/cloud/options.h"
@@ -44,7 +45,8 @@ std::shared_ptr<BigtableStub> CreateBigtableStubRoundRobin(
4445
std::shared_ptr<BigtableStub> CreateBigtableStubRandomTwoLeastUsed(
4546
CompletionQueue cq, Options const& options,
4647
std::function<std::shared_ptr<BigtableStub>(int)>
47-
refreshing_channel_stub_factory);
48+
refreshing_channel_stub_factory,
49+
std::shared_ptr<ConnectionRefreshState> refresh_state);
4850

4951
/// Used in testing to create decorated mocks.
5052
std::shared_ptr<BigtableStub> CreateDecoratedStubs(

google/cloud/bigtable/internal/dynamic_channel_pool.h

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_DYNAMIC_CHANNEL_POOL_H
1616
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_DYNAMIC_CHANNEL_POOL_H
1717

18+
#include "google/cloud/bigtable/internal/connection_refresh_state.h"
1819
#include "google/cloud/completion_queue.h"
1920
#include "google/cloud/internal/random.h"
2021
#include "google/cloud/version.h"
@@ -98,24 +99,28 @@ class DynamicChannelPool
9899

99100
static std::shared_ptr<DynamicChannelPool> Create(
100101
CompletionQueue cq, std::size_t initial_size,
101-
StubFactoryFn stub_factory_fn, SizingPolicy sizing_policy = {}) {
102+
StubFactoryFn stub_factory_fn,
103+
std::shared_ptr<ConnectionRefreshState> refresh_state,
104+
SizingPolicy sizing_policy = {}) {
102105
std::cout << __PRETTY_FUNCTION__ << std::endl;
103106
std::vector<std::shared_ptr<StubUsageWrapper<T>>> initial_wrapped_channels;
104107
for (std::size_t i = 0; i < initial_size; ++i) {
105108
initial_wrapped_channels.emplace_back(stub_factory_fn());
106109
}
107110
auto pool = std::shared_ptr<DynamicChannelPool>(new DynamicChannelPool(
108111
std::move(cq), std::move(initial_wrapped_channels),
109-
std::move(stub_factory_fn), std::move(sizing_policy)));
112+
std::move(refresh_state), std::move(stub_factory_fn),
113+
std::move(sizing_policy)));
110114
}
111115

112116
static std::shared_ptr<DynamicChannelPool> Create(
113117
CompletionQueue cq, std::vector<std::shared_ptr<T>> initial_channels,
118+
std::shared_ptr<ConnectionRefreshState> refresh_state,
114119
StubFactoryFn stub_factory_fn, SizingPolicy sizing_policy = {}) {
115120
std::cout << __PRETTY_FUNCTION__ << std::endl;
116121
auto pool = std::shared_ptr<DynamicChannelPool>(new DynamicChannelPool(
117-
std::move(cq), std::move(initial_channels), std::move(stub_factory_fn),
118-
std::move(sizing_policy)));
122+
std::move(cq), std::move(initial_channels), std::move(refresh_state),
123+
std::move(stub_factory_fn), std::move(sizing_policy)));
119124
return pool;
120125
}
121126

@@ -166,17 +171,21 @@ class DynamicChannelPool
166171
DynamicChannelPool(CompletionQueue cq,
167172
std::vector<std::shared_ptr<StubUsageWrapper<T>>>
168173
initial_wrapped_channels,
174+
std::shared_ptr<ConnectionRefreshState> refresh_state,
169175
StubFactoryFn stub_factory_fn, SizingPolicy sizing_policy)
170176
: cq_(std::move(cq)),
177+
refresh_state_(std::move(refresh_state)),
171178
stub_factory_fn_(std::move(stub_factory_fn)),
172179
channels_(std::move(initial_wrapped_channels)),
173180
sizing_policy_(std::move(sizing_policy)),
174181
next_channel_id_(channels_.size()) {}
175182

176183
DynamicChannelPool(CompletionQueue cq,
177184
std::vector<std::shared_ptr<T>> initial_channels,
185+
std::shared_ptr<ConnectionRefreshState> refresh_state,
178186
StubFactoryFn stub_factory_fn, SizingPolicy sizing_policy)
179187
: cq_(std::move(cq)),
188+
refresh_state_(std::move(refresh_state)),
180189
stub_factory_fn_(std::move(stub_factory_fn)),
181190
channels_(),
182191
sizing_policy_(std::move(sizing_policy)),
@@ -299,6 +308,7 @@ class DynamicChannelPool
299308
mutable std::mutex mu_;
300309
CompletionQueue cq_;
301310
google::cloud::internal::DefaultPRNG rng_;
311+
std::shared_ptr<ConnectionRefreshState> refresh_state_;
302312
StubFactoryFn stub_factory_fn_;
303313
std::vector<std::shared_ptr<StubUsageWrapper<T>>> channels_;
304314
SizingPolicy sizing_policy_;

0 commit comments

Comments
 (0)