Skip to content
This repository was archived by the owner on Dec 8, 2021. It is now read-only.

Commit 51e4f55

Browse files
authored
feat: split ConnectionImpl and SessionPool cleanly (#1022)
* feat: split `ConnectionImpl` and `SessionPool` cleanly Remove the circular dependency between `ConnectionImpl` and `SessionPool`. The latter can now make RPCs directly instead of having to call back through the `SessionManager` interface. The `SessionHolder` deleter also now calls directly into `SessionPool` so we don't need to forward those calls from `ConnectionImpl`. I also removed the tests where `BatchCreateSessions` returned `RESOURCE_EXHAUSTED` since we learned from spanner team that will never happen (since there are no longer server-side session limits), and the tests conflicted with our retry policy. Part of #307
1 parent e006f3d commit 51e4f55

File tree

5 files changed

+225
-216
lines changed

5 files changed

+225
-216
lines changed

google/cloud/spanner/internal/connection_impl.cc

Lines changed: 10 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@
2121
#include "google/cloud/spanner/read_partition.h"
2222
#include "google/cloud/grpc_utils/grpc_error_delegate.h"
2323
#include "google/cloud/internal/make_unique.h"
24-
#include <google/spanner/v1/spanner.pb.h>
2524
#include <limits>
26-
#include <memory>
2725

2826
namespace google {
2927
namespace cloud {
@@ -111,7 +109,8 @@ ConnectionImpl::ConnectionImpl(Database db, std::shared_ptr<SpannerStub> stub,
111109
stub_(std::move(stub)),
112110
retry_policy_(std::move(retry_policy)),
113111
backoff_policy_(std::move(backoff_policy)),
114-
session_pool_(this) {}
112+
session_pool_(std::make_shared<SessionPool>(
113+
db_, stub_, retry_policy_->clone(), backoff_policy_->clone())) {}
115114

116115
RowStream ConnectionImpl::Read(ReadParams params) {
117116
return internal::Visit(
@@ -286,7 +285,7 @@ RowStream ConnectionImpl::ReadImpl(SessionHolder& session,
286285
spanner_proto::TransactionSelector& s,
287286
ReadParams params) {
288287
if (!session) {
289-
auto session_or = AllocateSession();
288+
auto session_or = session_pool_->Allocate();
290289
if (!session_or) {
291290
return RowStream(
292291
google::cloud::internal::make_unique<StatusOnlyResultSetSource>(
@@ -348,7 +347,7 @@ StatusOr<std::vector<ReadPartition>> ConnectionImpl::PartitionReadImpl(
348347
if (!session) {
349348
// Since the session may be sent to other machines, it should not be
350349
// returned to the pool when the Transaction is destroyed.
351-
auto session_or = AllocateSession(/*dissociate_from_pool=*/true);
350+
auto session_or = session_pool_->Allocate(/*dissociate_from_pool=*/true);
352351
if (!session_or) {
353352
return std::move(session_or).status();
354353
}
@@ -401,7 +400,7 @@ StatusOr<ResultType> ConnectionImpl::ExecuteSqlImpl(
401400
google::spanner::v1 ::ExecuteSqlRequest& request)> const&
402401
retry_resume_fn) {
403402
if (!session) {
404-
auto session_or = AllocateSession();
403+
auto session_or = session_pool_->Allocate();
405404
if (!session_or) {
406405
return std::move(session_or).status();
407406
}
@@ -560,7 +559,7 @@ StatusOr<std::vector<QueryPartition>> ConnectionImpl::PartitionQueryImpl(
560559
if (!session) {
561560
// Since the session may be sent to other machines, it should not be
562561
// returned to the pool when the Transaction is destroyed.
563-
auto session_or = AllocateSession(/*dissociate_from_pool=*/true);
562+
auto session_or = session_pool_->Allocate(/*dissociate_from_pool=*/true);
564563
if (!session_or) {
565564
return std::move(session_or).status();
566565
}
@@ -606,7 +605,7 @@ StatusOr<BatchDmlResult> ConnectionImpl::ExecuteBatchDmlImpl(
606605
SessionHolder& session, spanner_proto::TransactionSelector& s,
607606
std::int64_t seqno, ExecuteBatchDmlParams params) {
608607
if (!session) {
609-
auto session_or = AllocateSession();
608+
auto session_or = session_pool_->Allocate();
610609
if (!session_or) {
611610
return std::move(session_or).status();
612611
}
@@ -649,7 +648,7 @@ StatusOr<PartitionedDmlResult> ConnectionImpl::ExecutePartitionedDmlImpl(
649648
SessionHolder& session, spanner_proto::TransactionSelector& s,
650649
std::int64_t seqno, ExecutePartitionedDmlParams params) {
651650
if (!session) {
652-
auto session_or = AllocateSession();
651+
auto session_or = session_pool_->Allocate();
653652
if (!session_or) {
654653
return std::move(session_or).status();
655654
}
@@ -703,7 +702,7 @@ StatusOr<CommitResult> ConnectionImpl::CommitImpl(
703702
SessionHolder& session, spanner_proto::TransactionSelector& s,
704703
CommitParams params) {
705704
if (!session) {
706-
auto session_or = AllocateSession();
705+
auto session_or = session_pool_->Allocate();
707706
if (!session_or) {
708707
return std::move(session_or).status();
709708
}
@@ -743,7 +742,7 @@ StatusOr<CommitResult> ConnectionImpl::CommitImpl(
743742
Status ConnectionImpl::RollbackImpl(SessionHolder& session,
744743
spanner_proto::TransactionSelector& s) {
745744
if (!session) {
746-
auto session_or = AllocateSession();
745+
auto session_or = session_pool_->Allocate();
747746
if (!session_or) {
748747
return std::move(session_or).status();
749748
}
@@ -771,55 +770,6 @@ Status ConnectionImpl::RollbackImpl(SessionHolder& session,
771770
request, __func__);
772771
}
773772

774-
StatusOr<SessionHolder> ConnectionImpl::AllocateSession(
775-
bool dissociate_from_pool) {
776-
auto session = session_pool_.Allocate(dissociate_from_pool);
777-
if (!session.ok()) {
778-
return std::move(session).status();
779-
}
780-
781-
if (dissociate_from_pool) {
782-
// Uses the default deleter; the Session is not returned to the pool.
783-
return {*std::move(session)};
784-
}
785-
786-
std::weak_ptr<ConnectionImpl> connection = shared_from_this();
787-
return SessionHolder(session->release(), [connection](Session* session) {
788-
auto shared_connection = connection.lock();
789-
// If `connection` is still alive, release the `Session` to its pool;
790-
// otherwise just delete the `Session`.
791-
if (shared_connection) {
792-
shared_connection->ReleaseSession(session);
793-
} else {
794-
delete session;
795-
}
796-
});
797-
}
798-
799-
StatusOr<std::vector<std::unique_ptr<Session>>> ConnectionImpl::CreateSessions(
800-
int num_sessions) {
801-
spanner_proto::BatchCreateSessionsRequest request;
802-
request.set_database(db_.FullName());
803-
request.set_session_count(std::int32_t{num_sessions});
804-
auto response = RetryLoop(
805-
retry_policy_->clone(), backoff_policy_->clone(), true,
806-
[this](grpc::ClientContext& context,
807-
spanner_proto::BatchCreateSessionsRequest const& request) {
808-
return stub_->BatchCreateSessions(context, request);
809-
},
810-
request, __func__);
811-
if (!response) {
812-
return response.status();
813-
}
814-
std::vector<std::unique_ptr<Session>> sessions;
815-
sessions.reserve(response->session_size());
816-
for (auto& session : *response->mutable_session()) {
817-
sessions.push_back(google::cloud::internal::make_unique<Session>(
818-
std::move(*session.mutable_name())));
819-
}
820-
return {std::move(sessions)};
821-
}
822-
823773
} // namespace internal
824774
} // namespace SPANNER_CLIENT_NS
825775
} // namespace spanner

google/cloud/spanner/internal/connection_impl.h

Lines changed: 3 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "google/cloud/spanner/version.h"
2626
#include "google/cloud/status.h"
2727
#include "google/cloud/status_or.h"
28+
#include <google/spanner/v1/spanner.pb.h>
2829
#include <cstdint>
2930
#include <memory>
3031
#include <mutex>
@@ -46,9 +47,6 @@ std::unique_ptr<BackoffPolicy> DefaultConnectionBackoffPolicy();
4647
/**
4748
* Factory method to construct a `ConnectionImpl`.
4849
*
49-
* @note `ConnectionImpl` relies on `std::enable_shared_from_this`; the
50-
* factory method ensures it can only be constructed using a `std::shared_ptr`
51-
*
5250
* @note In tests we can use a mock stub and custom (or mock) policies.
5351
*/
5452
class ConnectionImpl;
@@ -63,9 +61,7 @@ std::shared_ptr<ConnectionImpl> MakeConnection(
6361
* Spanner instance. See `MakeConnection()` for a factory function that creates
6462
* and returns instances of this class.
6563
*/
66-
class ConnectionImpl : public Connection,
67-
public SessionManager,
68-
public std::enable_shared_from_this<ConnectionImpl> {
64+
class ConnectionImpl : public Connection {
6965
public:
7066
RowStream Read(ReadParams) override;
7167
StatusOr<std::vector<ReadPartition>> PartitionRead(
@@ -139,29 +135,6 @@ class ConnectionImpl : public Connection,
139135
Status RollbackImpl(SessionHolder& session,
140136
google::spanner::v1::TransactionSelector& s);
141137

142-
/**
143-
* Get a session from the pool, or create one if the pool is empty.
144-
* @returns an error if session creation fails; always returns a valid
145-
* `SessionHolder` (never `nullptr`) on success.
146-
*
147-
* The `SessionHolder` usually returns the session to the pool when it is
148-
* destroyed. However, if `dissociate_from_pool` is true the session will not
149-
* be returned to the session pool. This is used in partitioned operations,
150-
* since we don't know when all parties are done using the session.
151-
*/
152-
StatusOr<SessionHolder> AllocateSession(bool dissociate_from_pool = false);
153-
154-
// Forwards calls for the `SessionPool`; used in the `SessionHolder` deleter
155-
// so it can hold a `weak_ptr` to `ConnectionImpl` (it's already reference
156-
// counted, and manages the lifetime of `SessionPool`).
157-
void ReleaseSession(Session* session) {
158-
session_pool_.Release(std::unique_ptr<Session>(session));
159-
}
160-
161-
// `SessionManager` methods; used by the `SessionPool`
162-
StatusOr<std::vector<std::unique_ptr<Session>>> CreateSessions(
163-
int num_sessions) override;
164-
165138
template <typename ResultType>
166139
StatusOr<ResultType> ExecuteSqlImpl(
167140
SessionHolder& session, google::spanner::v1::TransactionSelector& s,
@@ -176,7 +149,6 @@ class ConnectionImpl : public Connection,
176149
SessionHolder& session, google::spanner::v1::TransactionSelector& s,
177150
std::int64_t seqno, SqlParams params,
178151
google::spanner::v1::ExecuteSqlRequest::QueryMode query_mode);
179-
180152
template <typename ResultType>
181153
StatusOr<ResultType> CommonDmlImpl(
182154
SessionHolder& session, google::spanner::v1::TransactionSelector& s,
@@ -187,7 +159,7 @@ class ConnectionImpl : public Connection,
187159
std::shared_ptr<SpannerStub> stub_;
188160
std::shared_ptr<RetryPolicy> retry_policy_;
189161
std::shared_ptr<BackoffPolicy> backoff_policy_;
190-
SessionPool session_pool_;
162+
std::shared_ptr<SessionPool> session_pool_;
191163
};
192164

193165
} // namespace internal

google/cloud/spanner/internal/session_pool.cc

Lines changed: 63 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,29 @@
1414

1515
#include "google/cloud/spanner/internal/session_pool.h"
1616
#include "google/cloud/spanner/internal/connection_impl.h"
17+
#include "google/cloud/spanner/internal/retry_loop.h"
1718
#include "google/cloud/spanner/internal/session.h"
19+
#include "google/cloud/internal/make_unique.h"
1820
#include "google/cloud/status.h"
1921
#include <algorithm>
20-
#include <memory>
2122

2223
namespace google {
2324
namespace cloud {
2425
namespace spanner {
2526
inline namespace SPANNER_CLIENT_NS {
2627
namespace internal {
2728

28-
SessionPool::SessionPool(SessionManager* manager, SessionPoolOptions options)
29-
: manager_(manager), options_(options) {
29+
namespace spanner_proto = ::google::spanner::v1;
30+
31+
SessionPool::SessionPool(Database db, std::shared_ptr<SpannerStub> stub,
32+
std::unique_ptr<RetryPolicy> retry_policy,
33+
std::unique_ptr<BackoffPolicy> backoff_policy,
34+
SessionPoolOptions options)
35+
: db_(std::move(db)),
36+
stub_(std::move(stub)),
37+
retry_policy_(std::move(retry_policy)),
38+
backoff_policy_(std::move(backoff_policy)),
39+
options_(options) {
3040
// Ensure the options have sensible values.
3141
options_.min_sessions = (std::max)(options_.min_sessions, 0);
3242
options_.max_sessions = (std::max)(options_.max_sessions, 1);
@@ -43,7 +53,7 @@ SessionPool::SessionPool(SessionManager* manager, SessionPoolOptions options)
4353
if (options_.min_sessions == 0) {
4454
return;
4555
}
46-
auto sessions = manager_->CreateSessions(options_.min_sessions);
56+
auto sessions = CreateSessions(options_.min_sessions);
4757
if (sessions.ok()) {
4858
std::unique_lock<std::mutex> lk(mu_);
4959
total_sessions_ += static_cast<int>(sessions->size());
@@ -53,8 +63,7 @@ SessionPool::SessionPool(SessionManager* manager, SessionPoolOptions options)
5363
}
5464
}
5565

56-
StatusOr<std::unique_ptr<Session>> SessionPool::Allocate(
57-
bool dissociate_from_pool) {
66+
StatusOr<SessionHolder> SessionPool::Allocate(bool dissociate_from_pool) {
5867
std::unique_lock<std::mutex> lk(mu_);
5968
for (;;) {
6069
if (!sessions_.empty()) {
@@ -64,7 +73,7 @@ StatusOr<std::unique_ptr<Session>> SessionPool::Allocate(
6473
if (dissociate_from_pool) {
6574
--total_sessions_;
6675
}
67-
return {std::move(session)};
76+
return {MakeSessionHolder(std::move(session), dissociate_from_pool)};
6877
}
6978

7079
// If the pool is at its max size, fail or wait until someone returns a
@@ -98,7 +107,7 @@ StatusOr<std::unique_ptr<Session>> SessionPool::Allocate(
98107
create_in_progress_ = true;
99108
lk.unlock();
100109
// TODO(#307) do we need to limit the call rate here?
101-
auto sessions = manager_->CreateSessions(sessions_to_create);
110+
auto sessions = CreateSessions(sessions_to_create);
102111
lk.lock();
103112
create_in_progress_ = false;
104113
if (!sessions.ok() || sessions->empty()) {
@@ -128,21 +137,64 @@ StatusOr<std::unique_ptr<Session>> SessionPool::Allocate(
128137
// Wake up everyone that was waiting for a session.
129138
cond_.notify_all();
130139
}
131-
return {std::move(session)};
140+
return {MakeSessionHolder(std::move(session), dissociate_from_pool)};
132141
}
133142
}
134143

135-
void SessionPool::Release(std::unique_ptr<Session> session) {
144+
void SessionPool::Release(Session* session) {
136145
std::unique_lock<std::mutex> lk(mu_);
137146
bool notify = sessions_.empty();
138-
sessions_.push_back(std::move(session));
147+
sessions_.emplace_back(session);
139148
// If sessions_ was empty, wake up someone who was waiting for a session.
140149
if (notify) {
141150
lk.unlock();
142151
cond_.notify_one();
143152
}
144153
}
145154

155+
StatusOr<std::vector<std::unique_ptr<Session>>> SessionPool::CreateSessions(
156+
int num_sessions) {
157+
spanner_proto::BatchCreateSessionsRequest request;
158+
request.set_database(db_.FullName());
159+
request.set_session_count(std::int32_t{num_sessions});
160+
auto response = RetryLoop(
161+
retry_policy_->clone(), backoff_policy_->clone(), true,
162+
[this](grpc::ClientContext& context,
163+
spanner_proto::BatchCreateSessionsRequest const& request) {
164+
return stub_->BatchCreateSessions(context, request);
165+
},
166+
request, __func__);
167+
if (!response) {
168+
return response.status();
169+
}
170+
std::vector<std::unique_ptr<Session>> sessions;
171+
sessions.reserve(response->session_size());
172+
for (auto& session : *response->mutable_session()) {
173+
sessions.push_back(google::cloud::internal::make_unique<Session>(
174+
std::move(*session.mutable_name())));
175+
}
176+
return {std::move(sessions)};
177+
}
178+
179+
SessionHolder SessionPool::MakeSessionHolder(std::unique_ptr<Session> session,
180+
bool dissociate_from_pool) {
181+
if (dissociate_from_pool) {
182+
// Uses the default deleter; the `Session` is not returned to the pool.
183+
return {std::move(session)};
184+
}
185+
std::weak_ptr<SessionPool> pool = shared_from_this();
186+
return SessionHolder(session.release(), [pool](Session* session) {
187+
auto shared_pool = pool.lock();
188+
// If `pool` is still alive, release the `Session` to it; otherwise just
189+
// delete the `Session`.
190+
if (shared_pool) {
191+
shared_pool->Release(session);
192+
} else {
193+
delete session;
194+
}
195+
});
196+
}
197+
146198
} // namespace internal
147199
} // namespace SPANNER_CLIENT_NS
148200
} // namespace spanner

0 commit comments

Comments
 (0)