Skip to content
This repository was archived by the owner on Dec 8, 2021. It is now read-only.

Commit e33b4f8

Browse files
authored
fix: avoid deadlock waiting for Session allocation (#1170)
* fix: avoid deadlock waiting for Session allocation Track the number of threads waiting for a `Session` to ensure we always notify properly; the prior implementation had race conditions that could lead to a thread waiting forever even when a `Session` was available (see #1000 for details). I also removed the workaround for this bug in the benchmark. With the workaround removed, I ran the benchmark using the flags below, both with and without the `SessionPool` fix. Without the fix it hangs reliably, with the fix it never hangs. ``` .build/google/cloud/spanner/benchmarks/single_row_throughput_benchmark \ --project=${GOOGLE_CLOUD_PROJECT} \ --instance=${GOOGLE_CLOUD_CPP_SPANNER_INSTANCE} \ --table-size=1000 \ --minimum-clients=1 --maximum-clients=1 \ --minimum-threads=1000 --maximum-threads=1000 \ --iteration-duration=2 --samples=100 \ --experiment=read ``` fixes #1000 * Factor "Wait" logic out to a private member function.
1 parent 48494ad commit e33b4f8

File tree

3 files changed

+19
-14
lines changed

3 files changed

+19
-14
lines changed

google/cloud/spanner/benchmarks/single_row_throughput_benchmark.cc

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -213,10 +213,8 @@ void FillTable(Config const& config, cloud_spanner::Database const& database,
213213
}
214214

215215
int ClientCount(Config const& config,
216-
google::cloud::internal::DefaultPRNG& generator,
217-
int thread_count) {
218-
// TODO(#1000) - avoid deadlocks with more than 100 threads per client
219-
auto min_clients = (std::max)(thread_count / 100 + 1, config.minimum_clients);
216+
google::cloud::internal::DefaultPRNG& generator) {
217+
auto min_clients = config.minimum_clients;
220218
auto const max_clients = config.maximum_clients;
221219
if (min_clients <= max_clients) {
222220
return min_clients;
@@ -249,7 +247,7 @@ class InsertOrUpdateExperiment : public Experiment {
249247

250248
for (int i = 0; i != config.samples; ++i) {
251249
auto const thread_count = thread_count_gen(generator);
252-
auto const client_count = ClientCount(config, generator, thread_count);
250+
auto const client_count = ClientCount(config, generator);
253251
std::vector<cloud_spanner::Client> iteration_clients(
254252
clients.begin(), clients.begin() + client_count);
255253
RunIteration(config, iteration_clients, thread_count, sink, generator);
@@ -354,7 +352,7 @@ class ReadExperiment : public Experiment {
354352

355353
for (int i = 0; i != config.samples; ++i) {
356354
auto const thread_count = thread_count_gen(generator_);
357-
auto const client_count = ClientCount(config, generator_, thread_count);
355+
auto const client_count = ClientCount(config, generator_);
358356
std::vector<cloud_spanner::Client> iteration_clients(
359357
clients.begin(), clients.begin() + client_count);
360358
RunIteration(config, iteration_clients, thread_count, sink);
@@ -466,7 +464,7 @@ class UpdateDmlExperiment : public Experiment {
466464

467465
for (int i = 0; i != config.samples; ++i) {
468466
auto const thread_count = thread_count_gen(generator);
469-
auto const client_count = ClientCount(config, generator, thread_count);
467+
auto const client_count = ClientCount(config, generator);
470468
std::vector<cloud_spanner::Client> iteration_clients(
471469
clients.begin(), clients.begin() + client_count);
472470
RunIteration(config, iteration_clients, thread_count, sink, generator);
@@ -581,7 +579,7 @@ class SelectExperiment : public Experiment {
581579

582580
for (int i = 0; i != config.samples; ++i) {
583581
auto const thread_count = thread_count_gen(generator_);
584-
auto const client_count = ClientCount(config, generator_, thread_count);
582+
auto const client_count = ClientCount(config, generator_);
585583
std::vector<cloud_spanner::Client> iteration_clients(
586584
clients.begin(), clients.begin() + client_count);
587585
RunIteration(config, iteration_clients, thread_count, sink);

google/cloud/spanner/internal/session_pool.cc

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ StatusOr<SessionHolder> SessionPool::Allocate(bool dissociate_from_pool) {
110110
if (options_.action_on_exhaustion() == ActionOnExhaustion::FAIL) {
111111
return Status(StatusCode::kResourceExhausted, "session pool exhausted");
112112
}
113-
cond_.wait(lk, [this] {
113+
Wait(lk, [this] {
114114
return !sessions_.empty() || total_sessions_ < max_pool_size_;
115115
});
116116
continue;
@@ -123,8 +123,7 @@ StatusOr<SessionHolder> SessionPool::Allocate(bool dissociate_from_pool) {
123123
// simulaneous calls if additional sessions are needed. We can also use the
124124
// number of waiters in the `sessions_to_create` calculation below.
125125
if (create_in_progress_) {
126-
cond_.wait(lk,
127-
[this] { return !sessions_.empty() || !create_in_progress_; });
126+
Wait(lk, [this] { return !sessions_.empty() || !create_in_progress_; });
128127
continue;
129128
}
130129

@@ -166,10 +165,8 @@ void SessionPool::Release(std::unique_ptr<Session> session) {
166165
--total_sessions_;
167166
return;
168167
}
169-
bool notify = sessions_.empty();
170168
sessions_.push_back(std::move(session));
171-
// If sessions_ was empty, wake up someone who was waiting for a session.
172-
if (notify) {
169+
if (num_waiting_for_session_ > 0) {
173170
lk.unlock();
174171
cond_.notify_one();
175172
}

google/cloud/spanner/internal/session_pool.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,15 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {
100100
// Release session back to the pool.
101101
void Release(std::unique_ptr<Session> session);
102102

103+
// Called when a thread needs to wait for a `Session` to become available.
104+
// @p specifies the condition to wait for.
105+
template <typename Predicate>
106+
void Wait(std::unique_lock<std::mutex>& lk, Predicate&& p) {
107+
++num_waiting_for_session_;
108+
cond_.wait(lk, std::forward<Predicate>(p));
109+
--num_waiting_for_session_;
110+
}
111+
103112
Status CreateSessions(std::unique_lock<std::mutex>& lk, ChannelInfo& channel,
104113
std::map<std::string, std::string> const& labels,
105114
int num_sessions); // EXCLUSIVE_LOCKS_REQUIRED(mu_)
@@ -120,6 +129,7 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {
120129
std::vector<std::unique_ptr<Session>> sessions_; // GUARDED_BY(mu_)
121130
int total_sessions_ = 0; // GUARDED_BY(mu_)
122131
bool create_in_progress_ = false; // GUARDED_BY(mu_)
132+
int num_waiting_for_session_ = 0; // GUARDED_BY(mu_)
123133

124134
// `channels_` is guaranteed to be non-empty and will not be resized after
125135
// the constructor runs (so the iterators are guaranteed to always be valid).

0 commit comments

Comments
 (0)