Skip to content

Commit 4556494

Browse files
authored
fix(spanner): update session bookkeeping for session NotFound (#15009)
1 parent a7a6715 commit 4556494

File tree

3 files changed

+33
-13
lines changed

3 files changed

+33
-13
lines changed

google/cloud/spanner/internal/session_pool.cc

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ void SessionPool::Erase(std::string const& session_name) {
199199
target = std::move(session); // deferred deletion
200200
session = std::move(sessions_.back());
201201
sessions_.pop_back();
202+
DecrementSessionCount(lk, *target);
202203
break;
203204
}
204205
}
@@ -369,6 +370,16 @@ std::shared_ptr<SpannerStub> SessionPool::GetStub(Session const& session) {
369370
return GetStub(std::unique_lock<std::mutex>(mu_));
370371
}
371372

373+
void SessionPool::DecrementSessionCount(
374+
std::unique_lock<std::mutex> const&,
375+
google::cloud::spanner_internal::Session& session) {
376+
--total_sessions_;
377+
auto const& channel = session.channel();
378+
if (channel) {
379+
--channel->session_count;
380+
}
381+
}
382+
372383
StatusOr<SessionHolder> SessionPool::Allocate(std::unique_lock<std::mutex> lk,
373384
bool dissociate_from_pool) {
374385
// We choose to ignore the internal::CurrentOptions() here as it is
@@ -381,11 +392,7 @@ StatusOr<SessionHolder> SessionPool::Allocate(std::unique_lock<std::mutex> lk,
381392
auto session = std::move(sessions_.back());
382393
sessions_.pop_back();
383394
if (dissociate_from_pool) {
384-
--total_sessions_;
385-
auto const& channel = session->channel();
386-
if (channel) {
387-
--channel->session_count;
388-
}
395+
DecrementSessionCount(lk, *session);
389396
}
390397
return {MakeSessionHolder(std::move(session), dissociate_from_pool)};
391398
}
@@ -442,11 +449,7 @@ void SessionPool::Release(std::unique_ptr<Session> session) {
442449
if (session->is_bad()) {
443450
// Once we have support for background processing, we may want to signal
444451
// that to replenish this bad session.
445-
--total_sessions_;
446-
auto const& channel = session->channel();
447-
if (channel) {
448-
--channel->session_count;
449-
}
452+
DecrementSessionCount(lk, *session);
450453
return;
451454
}
452455
session->update_last_use_time();

google/cloud/spanner/internal/session_pool.h

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,12 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {
115115
*/
116116
std::shared_ptr<SpannerStub> GetStub(Session const& session);
117117

118+
/**
119+
* Returns the number of sessions in the session pool plus the number of
120+
* sessions allocated to running transactions.
121+
*/
122+
int total_sessions() const { return total_sessions_; }
123+
118124
private:
119125
friend std::shared_ptr<SessionPool> MakeSessionPool(
120126
spanner::Database, std::vector<std::shared_ptr<SpannerStub>>,
@@ -207,6 +213,10 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {
207213
// Remove the named session from the pool (if it is present).
208214
void Erase(std::string const& session_name);
209215

216+
// Performs the necessary bookkeeping when a session is removed from use.
217+
void DecrementSessionCount(std::unique_lock<std::mutex> const& lk,
218+
Session& session);
219+
210220
spanner::Database const db_;
211221
google::cloud::CompletionQueue cq_;
212222
Options const opts_;
@@ -220,9 +230,11 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {
220230
std::condition_variable cond_;
221231
SessionHolder multiplexed_session_; // GUARDED_BY(mu_)
222232
std::vector<std::unique_ptr<Session>> sessions_; // GUARDED_BY(mu_)
223-
int total_sessions_ = 0; // GUARDED_BY(mu_)
224-
int create_calls_in_progress_ = 0; // GUARDED_BY(mu_)
225-
int num_waiting_for_session_ = 0; // GUARDED_BY(mu_)
233+
// total_sessions_ tracks the number of sessions in the pool, a.k.a.
234+
// sessions_.size(), plus the sessions that have been allocated.
235+
int total_sessions_ = 0; // GUARDED_BY(mu_)
236+
int create_calls_in_progress_ = 0; // GUARDED_BY(mu_)
237+
int num_waiting_for_session_ = 0; // GUARDED_BY(mu_)
226238

227239
// Lower bound on all `sessions_[i]->last_use_time()` values.
228240
Session::Clock::time_point last_use_time_lower_bound_ =

google/cloud/spanner/internal/session_pool_test.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -856,21 +856,26 @@ TEST_F(SessionPoolTest, SessionRefreshNotFound) {
856856
// Wait for "s2" to need refreshing before releasing "s1".
857857
clock->AdvanceTime(keep_alive_interval * 2);
858858
}
859+
EXPECT_EQ(pool->total_sessions(), 2);
859860

860861
// Simulate completion of pending operations, which will result in
861862
// a call to RefreshExpiringSessions(). This should refresh "s2" and
862863
// satisfy the AsyncExecuteSql() expectation, which fails the call.
863864
impl->SimulateCompletion(true);
865+
EXPECT_EQ(pool->total_sessions(), 1);
864866

865867
// We should still be able to allocate session "s1".
866868
auto s1 = pool->Allocate();
867869
ASSERT_STATUS_OK(s1);
868870
EXPECT_EQ("s1", (*s1)->session_name());
871+
EXPECT_EQ(pool->total_sessions(), 1);
872+
869873
// However "s2" will be gone now, so a new allocation will produce
870874
// "s3", satisfying the final BatchCreateSessions() expectation.
871875
auto s3 = pool->Allocate();
872876
ASSERT_STATUS_OK(s3);
873877
EXPECT_EQ("s3", (*s3)->session_name());
878+
EXPECT_EQ(pool->total_sessions(), 2);
874879

875880
// Cancel all pending operations, satisfying any remaining futures. When
876881
// compiling with exceptions disabled the destructors eventually invoke

0 commit comments

Comments
 (0)