Skip to content

Commit 4b11b16

Browse files
committed
fixed or percentage channel add
1 parent 4f26155 commit 4b11b16

File tree

2 files changed

+51
-13
lines changed

2 files changed

+51
-13
lines changed

google/cloud/bigtable/internal/bigtable_stub_factory.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ std::shared_ptr<BigtableStub> CreateBigtableStubRandomTwoLeastUsed(
8888
return refreshing_channel_stub_factory(id++);
8989
});
9090
return std::make_shared<BigtableRandomTwoLeastUsed>(
91-
cq, refreshing_channel_stub_factory, std::move(children));
91+
std::move(cq), refreshing_channel_stub_factory, std::move(children));
9292
}
9393

9494
std::shared_ptr<BigtableStub> CreateDecoratedStubs(

google/cloud/internal/channel_pool.h

Lines changed: 50 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "google/cloud/completion_queue.h"
1919
#include "google/cloud/internal/random.h"
2020
#include "google/cloud/version.h"
21+
#include <cmath>
2122
#include <functional>
2223
#include <memory>
2324
#include <mutex>
@@ -85,6 +86,15 @@ class DynamicChannelPool
8586
std::chrono::milliseconds pool_resize_cooldown_interval =
8687
std::chrono::milliseconds(60 * 1000);
8788

89+
struct DiscreteChannels {
90+
int number;
91+
};
92+
struct PercentageOfPoolSize {
93+
double percentage;
94+
};
95+
absl::variant<DiscreteChannels, PercentageOfPoolSize>
96+
channels_to_add_per_resize = DiscreteChannels{1};
97+
8898
// If the average number of outstanding RPCs is below this threshold,
8999
// the pool size will be decreased.
90100
int minimum_average_outstanding_rpcs_per_channel = 20;
@@ -154,7 +164,7 @@ class DynamicChannelPool
154164
}
155165

156166
if (channels_.size() == 1) {
157-
return channels_[0];
167+
return channels_.front();
158168
}
159169

160170
std::shared_ptr<StubUsageWrapper<T>> channel_1;
@@ -194,7 +204,7 @@ class DynamicChannelPool
194204
stub_factory_fn_(std::move(stub_factory_fn)),
195205
channels_(),
196206
sizing_policy_(std::move(sizing_policy)),
197-
next_channel_id_(initial_channels.size()) {
207+
next_channel_id_(static_cast<int>(initial_channels.size())) {
198208
std::cout << __PRETTY_FUNCTION__ << ": wrap initial_channels" << std::endl;
199209
channels_.reserve(initial_channels.size());
200210
for (auto& channel : initial_channels) {
@@ -203,21 +213,49 @@ class DynamicChannelPool
203213
}
204214
}
205215

216+
struct ChannelAddVisitor {
217+
std::size_t pool_size;
218+
explicit ChannelAddVisitor(std::size_t pool_size) : pool_size(pool_size) {}
219+
int operator()(typename SizingPolicy::DiscreteChannels const& c) {
220+
return c.number;
221+
}
222+
223+
int operator()(typename SizingPolicy::PercentageOfPoolSize const& c) {
224+
return static_cast<int>(
225+
std::floor(static_cast<double>(pool_size) * c.percentage));
226+
}
227+
};
228+
206229
void ScheduleAddChannel(std::unique_lock<std::mutex> const&) {
230+
auto num_channels_to_add =
231+
absl::visit(ChannelAddVisitor(channels_.size()),
232+
sizing_policy_.channels_to_add_per_resize);
233+
std::vector<int> new_channel_ids;
234+
new_channel_ids.reserve(num_channels_to_add);
235+
for (int i = 0; i < num_channels_to_add; ++i) {
236+
new_channel_ids.push_back(next_channel_id_++);
237+
}
238+
207239
std::weak_ptr<DynamicChannelPool<T>> foo = this->shared_from_this();
208-
cq_.RunAsync(
209-
[new_channel_id = next_channel_id_++, weak = std::move(foo)]() {
210-
if (auto self = weak.lock()) {
211-
self->AddChannel(new_channel_id);
212-
}
213-
});
240+
cq_.RunAsync([new_channel_ids = std::move(new_channel_ids),
241+
weak = std::move(foo)]() {
242+
if (auto self = weak.lock()) {
243+
self->AddChannel(new_channel_ids);
244+
}
245+
});
214246
}
215247

216-
void AddChannel(int new_channel_id) {
217-
auto new_stub = stub_factory_fn_(new_channel_id);
248+
void AddChannel(std::vector<int> const& new_channel_ids) {
249+
std::vector<std::shared_ptr<StubUsageWrapper<T>>> new_stubs;
250+
new_stubs.reserve(new_channel_ids.size());
251+
for (auto const& id : new_channel_ids) {
252+
new_stubs.push_back(
253+
std::make_shared<StubUsageWrapper<T>>(stub_factory_fn_(id)));
254+
}
218255
std::unique_lock<std::mutex> lk(mu_);
219-
channels_.push_back(
220-
std::make_shared<StubUsageWrapper<T>>(std::move(new_stub)));
256+
channels_.insert(channels_.end(),
257+
std::make_move_iterator(new_stubs.begin()),
258+
std::make_move_iterator(new_stubs.end()));
221259
}
222260

223261
void ScheduleRemoveChannel(std::unique_lock<std::mutex> const&) {

0 commit comments

Comments
 (0)