Skip to content

Commit e5a1ff8

Browse files
committed
factor out EvictBadChannels
1 parent 81631aa commit e5a1ff8

File tree

1 file changed

+50
-17
lines changed

1 file changed

+50
-17
lines changed

google/cloud/bigtable/internal/dynamic_channel_pool.h

Lines changed: 50 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -187,66 +187,72 @@ class DynamicChannelPool
187187
auto shuffle_iter = iterators.begin();
188188
// typename
189189
// std::vector<std::shared_ptr<ChannelUsageWrapper<T>>>::iterator
190-
auto channel_1 = *shuffle_iter;
191-
std::shared_ptr<ChannelUsageWrapper<T>> c = *channel_1;
190+
auto channel_1_iter = *shuffle_iter;
191+
std::shared_ptr<ChannelUsageWrapper<T>> c = *channel_1_iter;
192192
// std::cout << __PRETTY_FUNCTION__
193193
// << ": check channel 1=" << c.get() << std::endl;
194194
auto channel_1_rpcs = shuffle_iter != iterators.end()
195-
? (*channel_1)->outstanding_rpcs()
195+
? (*channel_1_iter)->outstanding_rpcs()
196196
: Status{StatusCode::kNotFound, ""};
197197
++shuffle_iter;
198198
// typename
199199
// std::vector<std::shared_ptr<ChannelUsageWrapper<T>>>::iterator
200-
auto channel_2 = *shuffle_iter;
200+
auto channel_2_iter = *shuffle_iter;
201201
// We want to snapshot these outstanding_rpcs values.
202202
// std::cout << __PRETTY_FUNCTION__
203203
// << ": check channel 2=" << (channel_2)->get() << std::endl;
204204
auto channel_2_rpcs = shuffle_iter != iterators.end()
205-
? (*channel_2)->outstanding_rpcs()
205+
? (*channel_2_iter)->outstanding_rpcs()
206206
: Status{StatusCode::kNotFound, ""};
207207
// This is the ideal (and most common ) case so we try it first.
208208
// std::cout << __PRETTY_FUNCTION__ << ": compare channel rpcs" <<
209209
// std::endl;
210210
if (channel_1_rpcs.ok() && channel_2_rpcs.ok()) {
211211
std::cout << __PRETTY_FUNCTION__ << ": 2 ok channels, returning smaller"
212212
<< std::endl;
213-
return *channel_1_rpcs < *channel_2_rpcs ? *channel_1 : *channel_2;
213+
return *channel_1_rpcs < *channel_2_rpcs ? *channel_1_iter
214+
: *channel_2_iter;
214215
}
215216

216217
// We have one or more bad channels. Spending time finding a good channel
217218
// will be cheaper than trying to use a bad channel in the long run.
218219
std::vector<
219220
typename std::vector<std::shared_ptr<ChannelUsageWrapper<T>>>::iterator>
220-
bad_channels;
221+
bad_channel_iters;
222+
221223
while (!channel_1_rpcs.ok() && shuffle_iter != iterators.end()) {
222-
bad_channels.push_back(channel_1);
224+
bad_channel_iters.push_back(channel_1_iter);
223225
++shuffle_iter;
224-
channel_1 = *shuffle_iter;
226+
channel_1_iter = *shuffle_iter;
225227
channel_1_rpcs = shuffle_iter != iterators.end()
226-
? (*channel_1)->outstanding_rpcs()
228+
? (*channel_1_iter)->outstanding_rpcs()
227229
: Status{StatusCode::kNotFound, ""};
228230
}
229231

230232
while (!channel_2_rpcs.ok() && shuffle_iter != iterators.end()) {
231-
bad_channels.push_back(channel_2);
233+
bad_channel_iters.push_back(channel_2_iter);
232234
++shuffle_iter;
233-
channel_2 = *shuffle_iter;
235+
channel_2_iter = *shuffle_iter;
234236
channel_2_rpcs = shuffle_iter != iterators.end()
235-
? (*channel_2)->outstanding_rpcs()
237+
? (*channel_2_iter)->outstanding_rpcs()
236238
: Status{StatusCode::kNotFound, ""};
237239
}
238240

241+
EvictBadChannels(lk, bad_channel_iters);
242+
ScheduleRemoveChannel(lk);
243+
239244
if (channel_1_rpcs.ok() && channel_2_rpcs.ok()) {
240245
std::cout << __PRETTY_FUNCTION__ << ": 2 ok channels" << std::endl;
241-
return *channel_1_rpcs < *channel_2_rpcs ? *channel_1 : *channel_2;
246+
return *channel_1_rpcs < *channel_2_rpcs ? *channel_1_iter
247+
: *channel_2_iter;
242248
}
243249
if (channel_1_rpcs.ok()) {
244250
std::cout << __PRETTY_FUNCTION__ << ": ONLY channel_1 ok" << std::endl;
245-
return *channel_1;
251+
return *channel_1_iter;
246252
}
247253
if (channel_2_rpcs.ok()) {
248254
std::cout << __PRETTY_FUNCTION__ << ": ONLY channel_2 ok" << std::endl;
249-
return *channel_2;
255+
return *channel_2_iter;
250256
}
251257

252258
// TODO(sdhart): we have no usable channels in the entire pool; this is bad.
@@ -324,6 +330,9 @@ class DynamicChannelPool
324330
}
325331

326332
void ScheduleRemoveChannel(std::unique_lock<std::mutex> const&) {
333+
if (remove_channel_poll_timer_.valid()) return;
334+
std::cout << __PRETTY_FUNCTION__ << ": set remove_channel_poll_timer"
335+
<< std::endl;
327336
std::weak_ptr<DynamicChannelPool<T>> foo = this->shared_from_this();
328337
remove_channel_poll_timer_ =
329338
cq_.MakeRelativeTimer(sizing_policy_.remove_channel_polling_interval)
@@ -351,7 +360,7 @@ class DynamicChannelPool
351360
});
352361
while (!draining_channels_.empty()) {
353362
auto outstanding_rpcs = draining_channels_.back()->outstanding_rpcs();
354-
if (outstanding_rpcs.ok() && *outstanding_rpcs != 0) {
363+
if (outstanding_rpcs.ok() && *outstanding_rpcs > 0) {
355364
ScheduleRemoveChannel(lk);
356365
return;
357366
}
@@ -363,6 +372,28 @@ class DynamicChannelPool
363372
// difference between iterators_.capacity and channels_.size
364373
}
365374

375+
void EvictBadChannels(std::unique_lock<std::mutex> const&,
376+
std::vector<typename std::vector<
377+
std::shared_ptr<ChannelUsageWrapper<T>>>::iterator>&
378+
bad_channel_iters) {
379+
auto back_iter = channels_.rbegin();
380+
for (auto& bad_channel_iter : bad_channel_iters) {
381+
bool swapped = false;
382+
while (!swapped) {
383+
auto b = (*back_iter)->outstanding_rpcs();
384+
if (b.ok()) {
385+
std::swap(*back_iter, *bad_channel_iter);
386+
draining_channels_.push_back(std::move(*back_iter));
387+
swapped = true;
388+
}
389+
++back_iter;
390+
}
391+
}
392+
for (std::size_t i = 0; i < bad_channel_iters.size(); ++i) {
393+
channels_.pop_back();
394+
}
395+
}
396+
366397
void SetResizeCooldownTimer(std::unique_lock<std::mutex> const&) {
367398
pool_resize_cooldown_timer_ =
368399
cq_.MakeRelativeTimer(sizing_policy_.pool_resize_cooldown_interval);
@@ -380,6 +411,7 @@ class DynamicChannelPool
380411
<< ": channels_.size()=" << channels_.size()
381412
<< "; sizing_policy_.minimum_channel_pool_size="
382413
<< sizing_policy_.minimum_channel_pool_size << std::endl;
414+
// TODO(sdhart): do we need to check if we're over max pool size here?
383415
if (average_rpc_per_channel <
384416
sizing_policy_.minimum_average_outstanding_rpcs_per_channel &&
385417
channels_.size() > sizing_policy_.minimum_channel_pool_size) {
@@ -391,6 +423,7 @@ class DynamicChannelPool
391423
ScheduleRemoveChannel(lk);
392424
SetResizeCooldownTimer(lk);
393425
}
426+
// TODO(sdhart): do we need to check if we're under min pool size here?
394427
if (average_rpc_per_channel >
395428
sizing_policy_.maximum_average_outstanding_rpcs_per_channel &&
396429
channels_.size() < sizing_policy_.maximum_channel_pool_size) {

0 commit comments

Comments
 (0)