Skip to content
This repository was archived by the owner on Dec 8, 2021. It is now read-only.

Commit d250c48

Browse files
authored
feat: use BackgroundThreads in SessionPool (#1259)
* feat: use `BackgroundThreads` in `SessionPool` Currently this just schedules a recurring 5s timer and cancels it in the destructor. * address review comments * rewrite the comment in `~SessionPool` * point to the proper background_threads also change some ;'s to .'s per bww * API/ABI update (internal namespace break only)
1 parent 9574fca commit d250c48

File tree

5 files changed

+69
-3
lines changed

5 files changed

+69
-3
lines changed
3.22 KB
Binary file not shown.

google/cloud/spanner/internal/connection_impl.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ ConnectionImpl::ConnectionImpl(Database db,
9898
backoff_policy_prototype_(std::move(backoff_policy)),
9999
session_pool_(MakeSessionPool(db_, std::move(stubs),
100100
std::move(session_pool_options),
101+
options.background_threads_factory()(),
101102
retry_policy_prototype_->clone(),
102103
backoff_policy_prototype_->clone())),
103104
rpc_stream_tracing_enabled_(options.tracing_enabled("rpc-streams")),

google/cloud/spanner/internal/session_pool.cc

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,15 @@
1616
#include "google/cloud/spanner/internal/connection_impl.h"
1717
#include "google/cloud/spanner/internal/retry_loop.h"
1818
#include "google/cloud/spanner/internal/session.h"
19+
#include "google/cloud/grpc_utils/completion_queue.h"
20+
#include "google/cloud/internal/background_threads_impl.h"
1921
#include "google/cloud/internal/make_unique.h"
22+
#include "google/cloud/log.h"
2023
#include "google/cloud/status.h"
2124
#include <algorithm>
25+
#include <chrono>
2226
#include <random>
27+
#include <thread>
2328

2429
namespace google {
2530
namespace cloud {
@@ -31,23 +36,28 @@ namespace spanner_proto = ::google::spanner::v1;
3136

3237
std::shared_ptr<SessionPool> MakeSessionPool(
3338
Database db, std::vector<std::shared_ptr<SpannerStub>> stubs,
34-
SessionPoolOptions options, std::unique_ptr<RetryPolicy> retry_policy,
39+
SessionPoolOptions options,
40+
std::unique_ptr<BackgroundThreads> background_threads,
41+
std::unique_ptr<RetryPolicy> retry_policy,
3542
std::unique_ptr<BackoffPolicy> backoff_policy) {
3643
auto pool = std::make_shared<SessionPool>(
3744
std::move(db), std::move(stubs), std::move(options),
38-
std::move(retry_policy), std::move(backoff_policy));
45+
std::move(background_threads), std::move(retry_policy),
46+
std::move(backoff_policy));
3947
pool->Initialize();
4048
return pool;
4149
}
4250

4351
SessionPool::SessionPool(Database db,
4452
std::vector<std::shared_ptr<SpannerStub>> stubs,
4553
SessionPoolOptions options,
54+
std::unique_ptr<BackgroundThreads> background_threads,
4655
std::unique_ptr<RetryPolicy> retry_policy,
4756
std::unique_ptr<BackoffPolicy> backoff_policy)
4857
: db_(std::move(db)),
4958
options_(std::move(
5059
options.EnforceConstraints(static_cast<int>(stubs.size())))),
60+
background_threads_(std::move(background_threads)),
5161
retry_policy_prototype_(std::move(retry_policy)),
5262
backoff_policy_prototype_(std::move(backoff_policy)),
5363
max_pool_size_(options_.max_sessions_per_channel() *
@@ -89,6 +99,43 @@ void SessionPool::Initialize() {
8999
(void)CreateSessions(lk, channel, options_.labels(), num_sessions);
90100
}
91101
}
102+
ScheduleBackgroundWork(std::chrono::seconds(5));
103+
}
104+
105+
SessionPool::~SessionPool() {
106+
// All references to this object are via `shared_ptr`; since we're in the
107+
// destructor that implies there can be no concurrent accesses to any member
108+
// variables, including `current_timer_`.
109+
//
110+
// Note that it *is* possible the timer lambda in `ScheduleBackgroundWork`
111+
// is executing concurrently. However, since we are in the destructor we know
112+
// that the lambda must not have yet successfully finished a call to `lock()`
113+
// on the `weak_ptr` to `this` it holds. Any subsequent or in-progress calls
114+
// must return `nullptr`, and the lambda will not do any work nor reschedule
115+
// the timer.
116+
current_timer_.cancel();
117+
}
118+
119+
void SessionPool::ScheduleBackgroundWork(std::chrono::seconds relative_time) {
120+
// See the comment in the destructor about the thread safety of this method.
121+
std::weak_ptr<SessionPool> pool = shared_from_this();
122+
current_timer_ =
123+
background_threads_->cq()
124+
.MakeRelativeTimer(relative_time)
125+
.then([pool](future<StatusOr<std::chrono::system_clock::time_point>>
126+
result) {
127+
if (result.get().ok()) {
128+
if (auto shared_pool = pool.lock()) {
129+
shared_pool->DoBackgroundWork();
130+
}
131+
}
132+
});
133+
}
134+
135+
void SessionPool::DoBackgroundWork() {
136+
// TODO(#1171) Implement SessionPool session refresh
137+
// TODO(#1172) maintain desired SessionPool size
138+
ScheduleBackgroundWork(std::chrono::seconds(5));
92139
}
93140

94141
StatusOr<SessionHolder> SessionPool::Allocate(bool dissociate_from_pool) {
@@ -219,6 +266,7 @@ Status SessionPool::CreateSessions(
219266
}
220267

221268
void SessionPool::UpdateNextChannelForCreateSessions() {
269+
// `mu_` must be held by the caller.
222270
next_channel_for_create_sessions_ = channels_.begin();
223271
for (auto it = channels_.begin(); it != channels_.end(); ++it) {
224272
if (it->session_count < next_channel_for_create_sessions_->session_count) {

google/cloud/spanner/internal/session_pool.h

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@
2222
#include "google/cloud/spanner/retry_policy.h"
2323
#include "google/cloud/spanner/session_pool_options.h"
2424
#include "google/cloud/spanner/version.h"
25+
#include "google/cloud/background_threads.h"
26+
#include "google/cloud/future.h"
2527
#include "google/cloud/status_or.h"
2628
#include <google/spanner/v1/spanner.pb.h>
29+
#include <chrono>
2730
#include <condition_variable>
2831
#include <cstddef>
2932
#include <map>
@@ -62,8 +65,12 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {
6265
*/
6366
SessionPool(Database db, std::vector<std::shared_ptr<SpannerStub>> stubs,
6467
SessionPoolOptions options,
68+
std::unique_ptr<BackgroundThreads> background_threads,
6569
std::unique_ptr<RetryPolicy> retry_policy,
6670
std::unique_ptr<BackoffPolicy> backoff_policy);
71+
72+
~SessionPool();
73+
6774
/**
6875
* If using the factory method is not possible, call `Initialize()` exactly
6976
* once, immediately after constructing the pool. This is necessary because
@@ -118,8 +125,12 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {
118125

119126
void UpdateNextChannelForCreateSessions(); // EXCLUSIVE_LOCKS_REQUIRED(mu_)
120127

128+
void ScheduleBackgroundWork(std::chrono::seconds relative_time);
129+
void DoBackgroundWork();
130+
121131
Database const db_;
122132
SessionPoolOptions const options_;
133+
std::unique_ptr<BackgroundThreads> const background_threads_;
123134
std::unique_ptr<RetryPolicy const> retry_policy_prototype_;
124135
std::unique_ptr<BackoffPolicy const> backoff_policy_prototype_;
125136
int const max_pool_size_;
@@ -130,6 +141,7 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {
130141
int total_sessions_ = 0; // GUARDED_BY(mu_)
131142
bool create_in_progress_ = false; // GUARDED_BY(mu_)
132143
int num_waiting_for_session_ = 0; // GUARDED_BY(mu_)
144+
future<void> current_timer_;
133145

134146
// `channels_` is guaranteed to be non-empty and will not be resized after
135147
// the constructor runs (so the iterators are guaranteed to always be valid).
@@ -150,7 +162,9 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {
150162
*/
151163
std::shared_ptr<SessionPool> MakeSessionPool(
152164
Database db, std::vector<std::shared_ptr<SpannerStub>> stubs,
153-
SessionPoolOptions options, std::unique_ptr<RetryPolicy> retry_policy,
165+
SessionPoolOptions options,
166+
std::unique_ptr<BackgroundThreads> background_threads,
167+
std::unique_ptr<RetryPolicy> retry_policy,
154168
std::unique_ptr<BackoffPolicy> backoff_policy);
155169

156170
} // namespace internal

google/cloud/spanner/internal/session_pool_test.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "google/cloud/spanner/internal/session_pool.h"
1616
#include "google/cloud/spanner/internal/session.h"
1717
#include "google/cloud/spanner/testing/mock_spanner_stub.h"
18+
#include "google/cloud/internal/background_threads_impl.h"
1819
#include "google/cloud/internal/make_unique.h"
1920
#include "google/cloud/status.h"
2021
#include "google/cloud/testing_util/assert_ok.h"
@@ -67,6 +68,8 @@ std::shared_ptr<SessionPool> MakeSessionPool(
6768
SessionPoolOptions options) {
6869
return MakeSessionPool(
6970
std::move(db), std::move(stubs), std::move(options),
71+
google::cloud::internal::make_unique<
72+
google::cloud::internal::AutomaticallyCreatedBackgroundThreads>(),
7073
google::cloud::internal::make_unique<LimitedTimeRetryPolicy>(
7174
std::chrono::minutes(10)),
7275
google::cloud::internal::make_unique<ExponentialBackoffPolicy>(

0 commit comments

Comments
 (0)