@@ -63,12 +63,28 @@ SessionPool::SessionPool(spanner::Database db,
6363 opts_.get<spanner::SessionPoolMaxSessionsPerChannelOption>() *
6464 static_cast<int>(stubs.size())),
6565 random_generator_(std::random_device()()),
66+ multiplexed_session_replacement_interval_(
67+ opts_.has<
68+ spanner_internal::MultiplexedSessionReplacementIntervalOption>()
69+ ? opts_.get<spanner_internal::
70+ MultiplexedSessionReplacementIntervalOption>()
71+ : std::chrono::hours(24 * 7 )),
72+ multiplexed_session_background_interval_(
73+ opts_.has<spanner_internal::
74+ MultiplexedSessionBackgroundWorkIntervalOption>()
75+ ? opts_.get<spanner_internal::
76+ MultiplexedSessionBackgroundWorkIntervalOption>()
77+ : std::chrono::minutes(10 )),
6678 channels_(stubs.size()) {
6779 if (stubs.empty ()) {
6880 google::cloud::internal::ThrowInvalidArgument (
6981 " SessionPool requires a non-empty set of stubs" );
7082 }
7183
84+ multiplexed_session_ = internal::NotFoundError (
85+ " no multiplexed session is currently available; try again later" ,
86+ GCP_ERROR_INFO ());
87+
7288 for (auto i = 0U ; i < stubs.size (); ++i) {
7389 channels_[i] = std::make_shared<Channel>(std::move (stubs[i]));
7490 }
@@ -78,13 +94,19 @@ SessionPool::SessionPool(spanner::Database db,
7894
7995void SessionPool::Initialize () {
8096 internal::OptionsSpan span (opts_);
81- CreateMultiplexedSession ();
82- auto const min_sessions = opts_.get <spanner::SessionPoolMinSessionsOption>();
83- if (min_sessions > 0 ) {
97+ if (opts_.has <spanner_experimental::EnableMultiplexedSessionOption>()) {
8498 std::unique_lock<std::mutex> lk (mu_);
85- Grow (lk, min_sessions, WaitForSessionAllocation::kWait );
99+ CreateMultiplexedSession (lk);
100+ ScheduleMultiplexedBackgroundWork (multiplexed_session_background_interval_);
101+ } else {
102+ auto const min_sessions =
103+ opts_.get <spanner::SessionPoolMinSessionsOption>();
104+ if (min_sessions > 0 ) {
105+ std::unique_lock<std::mutex> lk (mu_);
106+ Grow (lk, min_sessions, WaitForSessionAllocation::kWait );
107+ }
108+ ScheduleBackgroundWork (std::chrono::seconds (5 ));
86109 }
87- ScheduleBackgroundWork (std::chrono::seconds (5 ));
88110}
89111
90112SessionPool::~SessionPool () {
@@ -98,21 +120,60 @@ SessionPool::~SessionPool() {
98120 // they must not have successfully finished a call to `lock()` on the
99121 // `weak_ptr` to `this` they hold. Any in-progress or subsequent `lock()`
100122 // will now return `nullptr`, in which case no work is done.
101- current_timer_.cancel ();
123+ if (opts_.has <spanner_experimental::EnableMultiplexedSessionOption>()) {
124+ current_multiplexed_timer_.cancel ();
125+ } else {
126+ current_timer_.cancel ();
127+ }
102128
103129 // Send fire-and-forget `AsyncDeleteSession()` calls for all sessions.
104- if (HasValidMultiplexedSession (std::unique_lock<std::mutex>(mu_))) {
105- AsyncDeleteSession (cq_, GetStub (*multiplexed_session_),
106- multiplexed_session_->session_name ())
107- .then ([](auto result) { auto status = result.get (); });
108- }
130+ // Multiplexed Sessions do not require an explicit Delete call.
109131 for (auto const & session : sessions_) {
110132 if (session->is_bad ()) continue ;
111133 AsyncDeleteSession (cq_, GetStub (*session), session->session_name ())
112134 .then ([](auto result) { auto status = result.get (); });
113135 }
114136}
115137
138+ void SessionPool::ScheduleMultiplexedBackgroundWork (
139+ std::chrono::seconds relative_time) {
140+ std::weak_ptr<SessionPool> pool = shared_from_this ();
141+ current_multiplexed_timer_ =
142+ cq_.MakeRelativeTimer (relative_time)
143+ .then ([pool](future<StatusOr<std::chrono::system_clock::time_point>>
144+ result) {
145+ if (result.get ().ok ()) {
146+ if (auto shared_pool = pool.lock ()) {
147+ shared_pool->DoMultiplexedBackgroundWork ();
148+ }
149+ }
150+ });
151+ }
152+
153+ void SessionPool::ReplaceMultiplexedSession () {
154+ auto now = clock_->Now ();
155+ auto refresh_limit = now - multiplexed_session_replacement_interval_;
156+ std::unique_lock<std::mutex> lk (mu_);
157+ if (create_calls_in_progress_ == 0 &&
158+ (*multiplexed_session_)->creation_time () <= refresh_limit) {
159+ ++create_calls_in_progress_;
160+ auto stub = GetStub (std::move (lk));
161+ std::weak_ptr<SessionPool> pool = shared_from_this ();
162+ CreateMultiplexedSessionAsync (std::move (stub))
163+ .then ([pool](future<StatusOr<google::spanner::v1::Session>> response) {
164+ if (auto shared_pool = pool.lock ()) {
165+ shared_pool->HandleMultiplexedCreateSessionDone (
166+ std::move (response).get ());
167+ }
168+ });
169+ }
170+ }
171+
172+ void SessionPool::DoMultiplexedBackgroundWork () {
173+ ReplaceMultiplexedSession ();
174+ ScheduleMultiplexedBackgroundWork (multiplexed_session_background_interval_);
175+ }
176+
116177void SessionPool::ScheduleBackgroundWork (std::chrono::seconds relative_time) {
117178 std::weak_ptr<SessionPool> pool = shared_from_this ();
118179 current_timer_ =
@@ -136,7 +197,6 @@ void SessionPool::DoBackgroundWork() {
136197// Ensure the pool size conforms to what was specified in the `SessionOptions`,
137198// creating or deleting sessions as necessary.
138199void SessionPool::MaintainPoolSize () {
139- CreateMultiplexedSession ();
140200 auto const min_sessions = opts_.get <spanner::SessionPoolMinSessionsOption>();
141201 std::unique_lock<std::mutex> lk (mu_);
142202 if (create_calls_in_progress_ == 0 && total_sessions_ < min_sessions) {
@@ -205,22 +265,30 @@ void SessionPool::Erase(std::string const& session_name) {
205265 }
206266}
207267
208- Status SessionPool::CreateMultiplexedSession () {
268+ Status SessionPool::HandleMultiplexedCreateSessionDone (
269+ StatusOr<google::spanner::v1::Session> response) {
209270 std::unique_lock<std::mutex> lk (mu_);
210- if (!HasValidMultiplexedSession (lk)) {
271+ --create_calls_in_progress_;
272+ if (!response.ok ()) {
273+ multiplexed_session_ = response.status ();
274+ return response.status ();
275+ }
276+
277+ multiplexed_session_ = MakeMultiplexedSessionHolder (response->name (), clock_);
278+ return {};
279+ }
280+
281+ Status SessionPool::CreateMultiplexedSession (std::unique_lock<std::mutex>& lk) {
282+ if (create_calls_in_progress_ == 0 ) {
283+ create_calls_in_progress_++;
211284 auto stub = GetStub (std::move (lk));
212- auto name = CreateMultiplexedSession (std::move (stub));
213- if (!name) return name.status ();
214- auto session = std::make_shared<Session>(*std::move (name),
215- /* channel=*/ nullptr , clock_);
216- std::unique_lock<std::mutex> lk (mu_);
217- multiplexed_session_ = std::move (session);
285+ return CreateMultiplexedSessionSync (std::move (stub));
218286 }
219287 return Status{};
220288}
221289
222- StatusOr<std::string> SessionPool::CreateMultiplexedSession (
223- std::shared_ptr<SpannerStub> stub) const {
290+ Status SessionPool::CreateMultiplexedSessionSync (
291+ std::shared_ptr<SpannerStub> stub) {
224292 google::spanner::v1::CreateSessionRequest request;
225293 request.set_database (db_.FullName ());
226294 auto * session = request.mutable_session ();
@@ -241,13 +309,33 @@ StatusOr<std::string> SessionPool::CreateMultiplexedSession(
241309 return stub->CreateSession (context, options, request);
242310 },
243311 opts_, request, __func__);
244- if (!response) return std::move (response).status ();
245- return response->name ();
312+ return HandleMultiplexedCreateSessionDone (std::move (response));
246313}
247314
248- bool SessionPool::HasValidMultiplexedSession (
249- std::unique_lock<std::mutex> const &) const {
250- return multiplexed_session_ && !multiplexed_session_->is_bad ();
315+ future<StatusOr<google::spanner::v1::Session>>
316+ SessionPool::CreateMultiplexedSessionAsync (std::shared_ptr<SpannerStub> stub) {
317+ google::spanner::v1::CreateSessionRequest request;
318+ request.set_database (db_.FullName ());
319+ auto * session = request.mutable_session ();
320+ auto const & labels = opts_.get <spanner::SessionPoolLabelsOption>();
321+ if (!labels.empty ()) {
322+ session->mutable_labels ()->insert (labels.begin (), labels.end ());
323+ }
324+ auto const & role = opts_.get <spanner::SessionCreatorRoleOption>();
325+ if (!role.empty ()) session->set_creator_role (role);
326+ session->set_multiplexed (true );
327+
328+ return google::cloud::internal::AsyncRetryLoop (
329+ retry_policy_prototype_->clone (), backoff_policy_prototype_->clone (),
330+ google::cloud::Idempotency::kIdempotent , cq_,
331+ [&stub](CompletionQueue cq, std::shared_ptr<grpc::ClientContext> context,
332+ internal::ImmutableOptions options,
333+ google::spanner::v1::CreateSessionRequest const & request) {
334+ RouteToLeader (*context); // always for CreateSession()
335+ return stub->AsyncCreateSession (cq, std::move (context),
336+ std::move (options), request);
337+ },
338+ internal::SaveCurrentOptions (), std::move (request), __func__);
251339}
252340
253341/*
@@ -354,10 +442,12 @@ StatusOr<SessionHolder> SessionPool::Allocate(bool dissociate_from_pool) {
354442}
355443
356444StatusOr<SessionHolder> SessionPool::Multiplexed () {
357- std::unique_lock<std::mutex> lk (mu_);
358- // If we don't have a multiplexed session (yet), use a regular one.
359- if (!HasValidMultiplexedSession (lk)) return Allocate (std::move (lk), false );
360- return multiplexed_session_;
445+ if (opts_.has <spanner_experimental::EnableMultiplexedSessionOption>()) {
446+ std::unique_lock<std::mutex> lk (mu_);
447+ return multiplexed_session_;
448+ }
449+ return internal::FailedPreconditionError (
450+ " multiplexed sessions are not enabled" , GCP_ERROR_INFO ());
361451}
362452
363453std::shared_ptr<SpannerStub> SessionPool::GetStub (Session const & session) {
0 commit comments