Skip to content

Commit 4f26155

Browse files
committed
add and remove implemented
1 parent 984599c commit 4f26155

File tree

5 files changed

+180
-131
lines changed

5 files changed

+180
-131
lines changed

google/cloud/bigtable/internal/bigtable_random_two_least_used_decorator.cc

Lines changed: 12 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,12 @@ class AsyncStreamingReadRpcTracking
7979

8080
BigtableRandomTwoLeastUsed::BigtableRandomTwoLeastUsed(
8181
CompletionQueue cq,
82-
internal::DynamicChannelPool<BigtableStub>::StubFactoryFn factory_fn,
82+
internal::DynamicChannelPool<BigtableStub>::StubFactoryFn
83+
refreshing_channel_stub_factory_fn,
8384
std::vector<std::shared_ptr<BigtableStub>> children)
8485
: pool_(internal::DynamicChannelPool<BigtableStub>::Create(
85-
std::move(cq), std::move(children), std::move(factory_fn))) {}
86+
std::move(cq), std::move(children),
87+
std::move(refreshing_channel_stub_factory_fn))) {}
8688

8789
std::unique_ptr<google::cloud::internal::StreamingReadRpc<
8890
google::bigtable::v2::ReadRowsResponse>>
@@ -93,8 +95,7 @@ BigtableRandomTwoLeastUsed::ReadRows(
9395
auto child = Child();
9496
auto stub = child->AcquireStub();
9597
auto result = stub->ReadRows(std::move(context), options, request);
96-
std::weak_ptr<internal::StubWrapper<BigtableStub>> weak = child;
97-
auto release_fn = [weak = std::move(weak)]() {
98+
auto release_fn = [weak = child->MakeWeak()] {
9899
auto child = weak.lock();
99100
if (child) child->ReleaseStub();
100101
};
@@ -111,8 +112,7 @@ BigtableRandomTwoLeastUsed::SampleRowKeys(
111112
auto child = Child();
112113
auto stub = child->AcquireStub();
113114
auto result = stub->SampleRowKeys(std::move(context), options, request);
114-
std::weak_ptr<internal::StubWrapper<BigtableStub>> weak = child;
115-
auto release_fn = [weak = std::move(weak)]() {
115+
auto release_fn = [weak = child->MakeWeak()] {
116116
auto child = weak.lock();
117117
if (child) child->ReleaseStub();
118118
};
@@ -141,8 +141,7 @@ BigtableRandomTwoLeastUsed::MutateRows(
141141
auto child = Child();
142142
auto stub = child->AcquireStub();
143143
auto result = stub->MutateRows(std::move(context), options, request);
144-
std::weak_ptr<internal::StubWrapper<BigtableStub>> weak = child;
145-
auto release_fn = [weak = std::move(weak)]() {
144+
auto release_fn = [weak = child->MakeWeak()] {
146145
auto child = weak.lock();
147146
if (child) child->ReleaseStub();
148147
};
@@ -203,8 +202,7 @@ BigtableRandomTwoLeastUsed::ExecuteQuery(
203202
auto child = Child();
204203
auto stub = child->AcquireStub();
205204
auto result = stub->ExecuteQuery(std::move(context), options, request);
206-
std::weak_ptr<internal::StubWrapper<BigtableStub>> weak = child;
207-
auto release_fn = [weak = std::move(weak)]() {
205+
auto release_fn = [weak = child->MakeWeak()] {
208206
auto child = weak.lock();
209207
if (child) child->ReleaseStub();
210208
};
@@ -224,8 +222,7 @@ BigtableRandomTwoLeastUsed::AsyncReadRows(
224222
auto stub = child->AcquireStub();
225223
auto result =
226224
stub->AsyncReadRows(cq, std::move(context), std::move(options), request);
227-
std::weak_ptr<internal::StubWrapper<BigtableStub>> weak = child;
228-
auto release_fn = [weak = std::move(weak)]() {
225+
auto release_fn = [weak = child->MakeWeak()] {
229226
auto child = weak.lock();
230227
if (child) child->ReleaseStub();
231228
};
@@ -245,8 +242,7 @@ BigtableRandomTwoLeastUsed::AsyncSampleRowKeys(
245242
auto stub = child->AcquireStub();
246243
auto result = stub->AsyncSampleRowKeys(cq, std::move(context),
247244
std::move(options), request);
248-
std::weak_ptr<internal::StubWrapper<BigtableStub>> weak = child;
249-
auto release_fn = [weak = std::move(weak)]() {
245+
auto release_fn = [weak = child->MakeWeak()] {
250246
auto child = weak.lock();
251247
if (child) child->ReleaseStub();
252248
};
@@ -280,9 +276,7 @@ BigtableRandomTwoLeastUsed::AsyncMutateRows(
280276
auto stub = child->AcquireStub();
281277
auto result = stub->AsyncMutateRows(cq, std::move(context),
282278
std::move(options), request);
283-
284-
std::weak_ptr<internal::StubWrapper<BigtableStub>> weak = child;
285-
auto release_fn = [weak = std::move(weak)]() {
279+
auto release_fn = [weak = child->MakeWeak()] {
286280
auto child = weak.lock();
287281
if (child) child->ReleaseStub();
288282
};
@@ -334,21 +328,10 @@ BigtableRandomTwoLeastUsed::AsyncPrepareQuery(
334328
return result;
335329
}
336330

337-
std::shared_ptr<internal::StubWrapper<BigtableStub>>
331+
std::shared_ptr<internal::StubUsageWrapper<BigtableStub>>
338332
BigtableRandomTwoLeastUsed::Child() {
339333
std::cout << __PRETTY_FUNCTION__ << std::endl;
340334
return pool_->GetChannelRandomTwoLeastUsed();
341-
// std::unique_lock<std::mutex> lk(mu_);
342-
// std::vector<std::size_t> indices(pool_->size(lk) - 1);
343-
// // TODO(sdhart): Maybe use iota on iterators instead of indices
344-
// std::iota(indices.begin(), indices.end(), 0);
345-
// std::shuffle(indices.begin(), indices.end(), rng_);
346-
// auto channel_1 = pool_->GetChannel(lk, indices[0]);
347-
// auto channel_2 = pool_->GetChannel(lk, indices[1]);
348-
//
349-
// return channel_1->outstanding_rpcs(lk) < channel_2->outstanding_rpcs(lk)
350-
// ? channel_1
351-
// : channel_2;
352335
}
353336

354337
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/bigtable/internal/bigtable_random_two_least_used_decorator.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ class BigtableRandomTwoLeastUsed : public BigtableStub {
3535
public:
3636
BigtableRandomTwoLeastUsed(
3737
CompletionQueue cq,
38-
internal::DynamicChannelPool<BigtableStub>::StubFactoryFn factory_fn,
38+
internal::DynamicChannelPool<BigtableStub>::StubFactoryFn
39+
refreshing_channel_stub_factory_fn,
3940
std::vector<std::shared_ptr<BigtableStub>> children);
4041
~BigtableRandomTwoLeastUsed() override = default;
4142

@@ -133,7 +134,7 @@ class BigtableRandomTwoLeastUsed : public BigtableStub {
133134
google::bigtable::v2::PrepareQueryRequest const& request) override;
134135

135136
private:
136-
std::shared_ptr<internal::StubWrapper<BigtableStub>> Child();
137+
std::shared_ptr<internal::StubUsageWrapper<BigtableStub>> Child();
137138

138139
// std::mutex mu_;
139140
std::shared_ptr<internal::DynamicChannelPool<BigtableStub>> pool_;

google/cloud/bigtable/internal/bigtable_stub_factory.cc

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -64,46 +64,59 @@ std::string FeaturesMetadata() {
6464
} // namespace
6565

6666
std::shared_ptr<BigtableStub> CreateBigtableStubRoundRobin(
67-
Options const& options,
68-
std::function<std::shared_ptr<BigtableStub>(int)> child_factory) {
67+
Options const& options, std::function<std::shared_ptr<BigtableStub>(int)>
68+
refreshing_channel_stub_factory) {
6969
std::vector<std::shared_ptr<BigtableStub>> children(
7070
(std::max)(1, options.get<GrpcNumChannelsOption>()));
7171
int id = 0;
7272
std::generate(children.begin(), children.end(),
73-
[&id, &child_factory] { return child_factory(id++); });
73+
[&id, &refreshing_channel_stub_factory] {
74+
return refreshing_channel_stub_factory(id++);
75+
});
7476
return std::make_shared<BigtableRoundRobin>(std::move(children));
7577
}
7678

7779
std::shared_ptr<BigtableStub> CreateBigtableStubRandomTwoLeastUsed(
78-
Options const& options, CompletionQueue cq,
79-
std::function<std::shared_ptr<BigtableStub>(int)> child_factory) {
80+
CompletionQueue cq, Options const& options,
81+
std::function<std::shared_ptr<BigtableStub>(int)>
82+
refreshing_channel_stub_factory) {
8083
std::vector<std::shared_ptr<BigtableStub>> children(
8184
(std::max)(1, options.get<GrpcNumChannelsOption>()));
8285
int id = 0;
8386
std::generate(children.begin(), children.end(),
84-
[&id, &child_factory] { return child_factory(id++); });
85-
return std::make_shared<BigtableRandomTwoLeastUsed>(cq, child_factory,
86-
std::move(children));
87+
[&id, &refreshing_channel_stub_factory] {
88+
return refreshing_channel_stub_factory(id++);
89+
});
90+
return std::make_shared<BigtableRandomTwoLeastUsed>(
91+
cq, refreshing_channel_stub_factory, std::move(children));
8792
}
8893

8994
std::shared_ptr<BigtableStub> CreateDecoratedStubs(
9095
std::shared_ptr<internal::GrpcAuthenticationStrategy> auth,
9196
CompletionQueue const& cq, Options const& options,
92-
BaseBigtableStubFactory const& base_factory) {
97+
BaseBigtableStubFactory const& stub_factory) {
9398
auto cq_impl = internal::GetCompletionQueueImpl(cq);
9499
auto refresh = std::make_shared<ConnectionRefreshState>(
95100
cq_impl, options.get<bigtable::MinConnectionRefreshOption>(),
96101
options.get<bigtable::MaxConnectionRefreshOption>());
97-
auto child_factory = [base_factory, cq_impl, refresh, &auth,
98-
options](int id) {
102+
auto refreshing_channel_stub_factory = [stub_factory, cq_impl, refresh, &auth,
103+
options](int id) {
99104
auto channel = CreateGrpcChannel(*auth, options, id);
100105
if (refresh->enabled()) ScheduleChannelRefresh(cq_impl, refresh, channel);
101-
return base_factory(std::move(channel));
106+
return stub_factory(std::move(channel));
102107
};
103-
// auto stub = CreateBigtableStubRoundRobin(options,
104-
// std::move(child_factory));
105-
auto stub = CreateBigtableStubRandomTwoLeastUsed(options, cq,
106-
std::move(child_factory));
108+
109+
std::shared_ptr<BigtableStub> stub;
110+
if (options.has<ChannelSelectionStrategyOption>() &&
111+
options.get<ChannelSelectionStrategyOption>() ==
112+
ChannelSelectionStrategy::kRandomTwoLeastUsed) {
113+
stub = CreateBigtableStubRandomTwoLeastUsed(
114+
cq, options, std::move(refreshing_channel_stub_factory));
115+
} else {
116+
stub = CreateBigtableStubRoundRobin(
117+
options, std::move(refreshing_channel_stub_factory));
118+
}
119+
107120
if (refresh->enabled()) {
108121
stub = std::make_shared<BigtableChannelRefresh>(std::move(stub),
109122
std::move(refresh));

google/cloud/bigtable/internal/bigtable_stub_factory.h

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,29 @@ namespace cloud {
2828
namespace bigtable_internal {
2929
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
3030

31+
enum class ChannelSelectionStrategy { kNone, kRoundRobin, kRandomTwoLeastUsed };
32+
33+
struct ChannelSelectionStrategyOption {
34+
using Type = ChannelSelectionStrategy;
35+
};
36+
3137
using BaseBigtableStubFactory = std::function<std::shared_ptr<BigtableStub>(
3238
std::shared_ptr<grpc::Channel>)>;
3339

3440
std::shared_ptr<BigtableStub> CreateBigtableStubRoundRobin(
35-
Options const& options,
36-
std::function<std::shared_ptr<BigtableStub>(int)> child_factory);
41+
Options const& options, std::function<std::shared_ptr<BigtableStub>(int)>
42+
refreshing_channel_stub_factory);
43+
44+
std::shared_ptr<BigtableStub> CreateBigtableStubRandomTwoLeastUsed(
45+
CompletionQueue cq, Options const& options,
46+
std::function<std::shared_ptr<BigtableStub>(int)>
47+
refreshing_channel_stub_factory);
3748

3849
/// Used in testing to create decorated mocks.
3950
std::shared_ptr<BigtableStub> CreateDecoratedStubs(
4051
std::shared_ptr<internal::GrpcAuthenticationStrategy> auth,
4152
CompletionQueue const& cq, Options const& options,
42-
BaseBigtableStubFactory const& base_factory);
53+
BaseBigtableStubFactory const& stub_factory);
4354

4455
/// Default function used by `DataConnectionImpl`.
4556
std::shared_ptr<BigtableStub> CreateBigtableStub(

0 commit comments

Comments
 (0)