Skip to content
This repository was archived by the owner on Dec 8, 2021. It is now read-only.

Commit 245a482

Browse files
authored
feat!: implement SessionPool session refresh (#1425)
Call `GetSession()` during `SessionPool::DoBackgroundWork()` for any session whose last-use time is older than the keep-alive interval. Also change the type of `SessionPoolOptions::keep_alive_interval_` from `std::chrono::minutes` to `std::chrono::seconds`. This is API compatible for callers of `SessionPoolOptions::set_keep_alive_interval()`, but is a breakage if the result of `SessionPoolOptions::keep_alive_interval()` is assigned to a `std::chrono::minutes`. We believe this is unlikely to impact many/any users. Fixes #1171.
1 parent 5b49464 commit 245a482

File tree

5 files changed

+122
-14
lines changed

5 files changed

+122
-14
lines changed

google/cloud/spanner/internal/session.h

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class Session {
4040
: session_name_(std::move(session_name)),
4141
channel_(std::move(channel)),
4242
is_bad_(false),
43-
last_use_time_(std::chrono::steady_clock::now()) {}
43+
last_use_time_(Clock::now()) {}
4444

4545
// Not copyable or moveable.
4646
Session(Session const&) = delete;
@@ -55,23 +55,20 @@ class Session {
5555
bool is_bad() const { return is_bad_.load(std::memory_order_relaxed); }
5656

5757
private:
58-
// Give `SessionPool` access to the private methods below.
58+
// Give `SessionPool` access to the private types/methods below.
5959
friend class SessionPool;
60+
using Clock = std::chrono::steady_clock;
6061
std::shared_ptr<Channel> const& channel() const { return channel_; }
6162

6263
// The caller is responsible for ensuring these methods are used in a
6364
// thread-safe manner (i.e. using external locking).
64-
std::chrono::steady_clock::time_point last_use_time() const {
65-
return last_use_time_;
66-
}
67-
void update_last_use_time() {
68-
last_use_time_ = std::chrono::steady_clock::now();
69-
}
65+
Clock::time_point last_use_time() const { return last_use_time_; }
66+
void update_last_use_time() { last_use_time_ = Clock::now(); }
7067

7168
std::string const session_name_;
7269
std::shared_ptr<Channel> const channel_;
7370
std::atomic<bool> is_bad_;
74-
std::chrono::steady_clock::time_point last_use_time_;
71+
Clock::time_point last_use_time_;
7572
};
7673

7774
/**

google/cloud/spanner/internal/session_pool.cc

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <chrono>
2626
#include <random>
2727
#include <thread>
28+
#include <utility>
2829
#include <vector>
2930

3031
namespace google {
@@ -114,7 +115,7 @@ void SessionPool::ScheduleBackgroundWork(std::chrono::seconds relative_time) {
114115

115116
void SessionPool::DoBackgroundWork() {
116117
MaintainPoolSize();
117-
// TODO(#1171) Implement SessionPool session refresh
118+
RefreshExpiringSessions();
118119
ScheduleBackgroundWork(std::chrono::seconds(5));
119120
}
120121

@@ -129,6 +130,43 @@ void SessionPool::MaintainPoolSize() {
129130
}
130131
}
131132

133+
// Initiate an async GetSession() call on any session whose last-use time is
134+
// older than the keep-alive interval.
135+
void SessionPool::RefreshExpiringSessions() {
136+
std::vector<std::pair<std::shared_ptr<SpannerStub>, std::string>>
137+
sessions_to_refresh;
138+
Session::Clock::time_point now = Session::Clock::now();
139+
Session::Clock::time_point refresh_limit =
140+
now - options_.keep_alive_interval();
141+
{
142+
std::unique_lock<std::mutex> lk(mu_);
143+
if (last_use_time_lower_bound_ <= refresh_limit) {
144+
last_use_time_lower_bound_ = now;
145+
for (auto const& session : sessions_) {
146+
Session::Clock::time_point last_use_time = session->last_use_time();
147+
if (last_use_time <= refresh_limit) {
148+
sessions_to_refresh.emplace_back(session->channel()->stub,
149+
session->session_name());
150+
session->update_last_use_time();
151+
} else if (last_use_time < last_use_time_lower_bound_) {
152+
last_use_time_lower_bound_ = last_use_time;
153+
}
154+
}
155+
}
156+
}
157+
for (auto& refresh : sessions_to_refresh) {
158+
AsyncGetSession(cq_, std::move(refresh.first), std::move(refresh.second))
159+
.then([](future<StatusOr<spanner_proto::Session>> result) {
160+
// We simply discard the response as handling IsSessionNotFound()
161+
// by removing the session from the pool is problematic (and would
162+
// not eliminate the possibility of IsSessionNotFound() elsewhere).
163+
// The last-use time has already been updated to throttle attempts.
164+
// TODO(#1430): Re-evaluate these decisions.
165+
(void)result.get();
166+
});
167+
}
168+
}
169+
132170
/**
133171
* Grow the session pool by creating up to `sessions_to_create` sessions and
134172
* adding them to the pool. Note that `lk` may be released and reacquired in

google/cloud/spanner/internal/session_pool.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {
156156
void ScheduleBackgroundWork(std::chrono::seconds relative_time);
157157
void DoBackgroundWork();
158158
void MaintainPoolSize();
159+
void RefreshExpiringSessions();
159160

160161
Database const db_;
161162
SessionPoolOptions const options_;
@@ -171,6 +172,11 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {
171172
int total_sessions_ = 0; // GUARDED_BY(mu_)
172173
int create_calls_in_progress_ = 0; // GUARDED_BY(mu_)
173174
int num_waiting_for_session_ = 0; // GUARDED_BY(mu_)
175+
176+
// Lower bound on all `sessions_[i]->last_use_time()` values.
177+
Session::Clock::time_point last_use_time_lower_bound_ =
178+
Session::Clock::now(); // GUARDED_BY(mu_)
179+
174180
future<void> current_timer_;
175181

176182
// `channels_` is guaranteed to be non-empty and will not be resized after

google/cloud/spanner/internal/session_pool_test.cc

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
#include "google/cloud/spanner/internal/session_pool.h"
1616
#include "google/cloud/spanner/internal/session.h"
17+
#include "google/cloud/spanner/testing/mock_completion_queue.h"
18+
#include "google/cloud/spanner/testing/mock_response_reader.h"
1719
#include "google/cloud/spanner/testing/mock_spanner_stub.h"
1820
#include "google/cloud/internal/background_threads_impl.h"
1921
#include "google/cloud/internal/make_unique.h"
@@ -36,7 +38,9 @@ namespace {
3638
using ::testing::_;
3739
using ::testing::ByMove;
3840
using ::testing::HasSubstr;
41+
using ::testing::Invoke;
3942
using ::testing::Return;
43+
using ::testing::StrictMock;
4044
using ::testing::UnorderedElementsAre;
4145

4246
namespace spanner_proto = ::google::spanner::v1;
@@ -382,6 +386,69 @@ TEST(SessionPool, GetStubForStublessSession) {
382386
EXPECT_EQ(pool->GetStub(*session), mock);
383387
}
384388

389+
// TODO(#1428): This test runs in real time. SessionPool does not currently
390+
// provide any mechanism to inject a clock source, or to control its
391+
// background-work scheduling. This makes the test slower and more fragile
392+
// than desired.
393+
TEST(SessionPool, SessionRefresh) {
394+
auto mock = std::make_shared<StrictMock<spanner_testing::MockSpannerStub>>();
395+
EXPECT_CALL(*mock, BatchCreateSessions(_, _))
396+
.WillOnce(Return(ByMove(MakeSessionsResponse({"s1"}))))
397+
.WillOnce(Return(ByMove(MakeSessionsResponse({"s2"}))));
398+
399+
auto reader = google::cloud::internal::make_unique<
400+
StrictMock<google::cloud::spanner::testing::MockAsyncResponseReader<
401+
spanner_proto::Session>>>();
402+
EXPECT_CALL(*mock, AsyncGetSession(_, _, _))
403+
.WillOnce(Invoke([&reader](
404+
grpc::ClientContext&,
405+
spanner_proto::GetSessionRequest const& request,
406+
grpc::CompletionQueue*) {
407+
EXPECT_EQ("s2", request.name());
408+
// This is safe. See comments in MockAsyncResponseReader.
409+
return std::unique_ptr<
410+
grpc::ClientAsyncResponseReaderInterface<spanner_proto::Session>>(
411+
reader.get());
412+
}));
413+
EXPECT_CALL(*reader, Finish(_, _, _))
414+
.WillOnce(Invoke(
415+
[](spanner_proto::Session* session, grpc::Status* status, void*) {
416+
session->set_name("s2");
417+
*status = grpc::Status::OK;
418+
}));
419+
420+
auto db = Database("project", "instance", "database");
421+
SessionPoolOptions options;
422+
options.set_keep_alive_interval(std::chrono::seconds(10));
423+
auto impl = std::make_shared<testing::MockCompletionQueue>();
424+
auto pool = MakeSessionPool(db, {mock}, options, CompletionQueue(impl));
425+
426+
// now == t0: Allocate and release two sessions such that "s1" will expire
427+
// at t0 + 18s, and "s2" will expire at t0 + 10s, and after which both the
428+
// BatchCreateSessions() expectations have been satisfied.
429+
{
430+
auto s1 = pool->Allocate();
431+
ASSERT_STATUS_OK(s1);
432+
EXPECT_EQ("s1", (*s1)->session_name());
433+
{
434+
auto s2 = pool->Allocate();
435+
ASSERT_STATUS_OK(s2);
436+
EXPECT_EQ("s2", (*s2)->session_name());
437+
}
438+
std::this_thread::sleep_for(std::chrono::seconds(8));
439+
}
440+
std::this_thread::sleep_for(std::chrono::seconds(8));
441+
442+
// now == t0 + 16s: Session "s2" has expired, but "s1" has not, and
443+
// RefreshExpiringSessions() has run exactly once since "s2" expired.
444+
impl->SimulateCompletion(true); // make the async GetSession() RPC
445+
446+
// The AsyncGetSession() and Finish() expectations should now have been
447+
// satisfied. If anything goes wrong we'll get unsatisfied/uninteresting
448+
// gmock errors.
449+
impl->SimulateCompletion(true); // run the completion callback
450+
}
451+
385452
} // namespace
386453
} // namespace internal
387454
} // namespace SPANNER_CLIENT_NS

google/cloud/spanner/session_pool_options.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -122,13 +122,13 @@ class SessionPoolOptions {
122122
* minutes, so any duration below that (less some slack to allow the calls
123123
* to be made to refresh the sessions) should suffice.
124124
*/
125-
SessionPoolOptions& set_keep_alive_interval(std::chrono::minutes minutes) {
126-
keep_alive_interval_ = minutes;
125+
SessionPoolOptions& set_keep_alive_interval(std::chrono::seconds interval) {
126+
keep_alive_interval_ = interval;
127127
return *this;
128128
}
129129

130130
/// Return the interval at which we refresh sessions to prevent GC.
131-
std::chrono::minutes keep_alive_interval() const {
131+
std::chrono::seconds keep_alive_interval() const {
132132
return keep_alive_interval_;
133133
}
134134

@@ -151,7 +151,7 @@ class SessionPoolOptions {
151151
int max_sessions_per_channel_ = 100;
152152
int max_idle_sessions_ = 0;
153153
ActionOnExhaustion action_on_exhaustion_ = ActionOnExhaustion::kBlock;
154-
std::chrono::minutes keep_alive_interval_ = std::chrono::minutes(55);
154+
std::chrono::seconds keep_alive_interval_ = std::chrono::minutes(55);
155155
std::map<std::string, std::string> labels_;
156156
};
157157

0 commit comments

Comments
 (0)