Skip to content
This repository was archived by the owner on Dec 8, 2021. It is now read-only.

Commit 5481d0e

Browse files
authored
feat: add SessionPoolOptions with labels to MakeConnection() (#1109)
Moves `SessionPoolOptions` out of the `internal` namespace, and adds a `std::map<std::string, std::string> labels` field. Plumbs those labels through to the `BatchCreateSessions()` call. Fixes #421.
1 parent 56682fb commit 5481d0e

File tree

13 files changed

+159
-73
lines changed

13 files changed

+159
-73
lines changed

google/cloud/spanner/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ add_library(
193193
retry_policy.h
194194
row.cc
195195
row.h
196+
session_pool_options.h
196197
sql_statement.cc
197198
sql_statement.h
198199
timestamp.h

google/cloud/spanner/client.cc

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -219,24 +219,29 @@ StatusOr<PartitionedDmlResult> Client::ExecutePartitionedDml(
219219
return conn_->ExecutePartitionedDml({std::move(statement)});
220220
}
221221

222-
std::shared_ptr<Connection> MakeConnection(Database const& db,
223-
ConnectionOptions const& options) {
224-
return MakeConnection(db, options, internal::DefaultConnectionRetryPolicy(),
222+
std::shared_ptr<Connection> MakeConnection(
223+
Database const& db, ConnectionOptions const& connection_options,
224+
SessionPoolOptions session_pool_options) {
225+
return MakeConnection(db, connection_options, std::move(session_pool_options),
226+
internal::DefaultConnectionRetryPolicy(),
225227
internal::DefaultConnectionBackoffPolicy());
226228
}
227229

228230
std::shared_ptr<Connection> MakeConnection(
229-
Database const& db, ConnectionOptions const& options,
231+
Database const& db, ConnectionOptions const& connection_options,
232+
SessionPoolOptions session_pool_options,
230233
std::unique_ptr<RetryPolicy> retry_policy,
231234
std::unique_ptr<BackoffPolicy> backoff_policy) {
232235
std::vector<std::shared_ptr<internal::SpannerStub>> stubs;
233-
int num_channels = std::min(options.num_channels(), 1);
236+
int num_channels = std::min(connection_options.num_channels(), 1);
234237
stubs.reserve(num_channels);
235238
for (int channel_id = 0; channel_id < num_channels; ++channel_id) {
236-
stubs.push_back(internal::CreateDefaultSpannerStub(options, channel_id));
239+
stubs.push_back(
240+
internal::CreateDefaultSpannerStub(connection_options, channel_id));
237241
}
238-
return internal::MakeConnection(db, std::move(stubs), std::move(retry_policy),
239-
std::move(backoff_policy));
242+
return internal::MakeConnection(
243+
db, std::move(stubs), std::move(session_pool_options),
244+
std::move(retry_policy), std::move(backoff_policy));
240245
}
241246

242247
} // namespace SPANNER_CLIENT_NS

google/cloud/spanner/client.h

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "google/cloud/spanner/read_partition.h"
3030
#include "google/cloud/spanner/results.h"
3131
#include "google/cloud/spanner/retry_policy.h"
32+
#include "google/cloud/spanner/session_pool_options.h"
3233
#include "google/cloud/spanner/sql_statement.h"
3334
#include "google/cloud/spanner/transaction.h"
3435
#include "google/cloud/optional.h"
@@ -534,23 +535,28 @@ class Client {
534535
* @see `Connection`
535536
*
536537
* @param db See `Database`.
537-
* @param options (optional) configure the `Connection` created by this
538-
* function.
538+
* @param connection_options (optional) configure the `Connection` created by
539+
* this function.
540+
* @param session_pool_options (optional) configure the `SessionPool` created
541+
* by the `Connection`.
539542
*/
540543
std::shared_ptr<Connection> MakeConnection(
541-
Database const& db, ConnectionOptions const& options = ConnectionOptions());
544+
Database const& db,
545+
ConnectionOptions const& connection_options = ConnectionOptions(),
546+
SessionPoolOptions session_pool_options = SessionPoolOptions());
542547

543548
/**
544-
* @copydoc MakeConnection(Database const&, ConnectionOptions const&)
549+
* @copydoc MakeConnection(Database const&, ConnectionOptions const&, SessionPoolOptions)
545550
*
546-
* @param retry_policy override the default `RetryPolicy`, controls for how long
547-
* does the returned `Connection` object retry requests on transient
551+
* @param retry_policy override the default `RetryPolicy`, controls how long
552+
* the returned `Connection` object retries requests on transient
548553
* failures.
549-
* @param backoff_policy override the default `BackoffPolicy`, controls for how
550-
* long does the `Connection` object waits before retrying a failed request.
554+
* @param backoff_policy override the default `BackoffPolicy`, controls how
555+
* long the `Connection` object waits before retrying a failed request.
551556
*/
552557
std::shared_ptr<Connection> MakeConnection(
553-
Database const& db, ConnectionOptions const& options,
558+
Database const& db, ConnectionOptions const& connection_options,
559+
SessionPoolOptions session_pool_options,
554560
std::unique_ptr<RetryPolicy> retry_policy,
555561
std::unique_ptr<BackoffPolicy> backoff_policy);
556562

google/cloud/spanner/client_test.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,9 @@ TEST(ClientTest, MakeConnectionOptionalArguments) {
389389

390390
conn = MakeConnection(db, ConnectionOptions());
391391
EXPECT_NE(conn, nullptr);
392+
393+
conn = MakeConnection(db, ConnectionOptions(), SessionPoolOptions());
394+
EXPECT_NE(conn, nullptr);
392395
}
393396

394397
TEST(ClientTest, CommitMutatorSuccess) {

google/cloud/spanner/internal/connection_impl.cc

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,22 +78,25 @@ std::unique_ptr<BackoffPolicy> DefaultConnectionBackoffPolicy() {
7878

7979
std::shared_ptr<ConnectionImpl> MakeConnection(
8080
Database db, std::vector<std::shared_ptr<SpannerStub>> stubs,
81+
SessionPoolOptions session_pool_options,
8182
std::unique_ptr<RetryPolicy> retry_policy,
8283
std::unique_ptr<BackoffPolicy> backoff_policy) {
83-
return std::shared_ptr<ConnectionImpl>(
84-
new ConnectionImpl(std::move(db), std::move(stubs),
85-
std::move(retry_policy), std::move(backoff_policy)));
84+
return std::shared_ptr<ConnectionImpl>(new ConnectionImpl(
85+
std::move(db), std::move(stubs), std::move(session_pool_options),
86+
std::move(retry_policy), std::move(backoff_policy)));
8687
}
8788

8889
ConnectionImpl::ConnectionImpl(Database db,
8990
std::vector<std::shared_ptr<SpannerStub>> stubs,
91+
SessionPoolOptions session_pool_options,
9092
std::unique_ptr<RetryPolicy> retry_policy,
9193
std::unique_ptr<BackoffPolicy> backoff_policy)
9294
: db_(std::move(db)),
9395
retry_policy_prototype_(std::move(retry_policy)),
9496
backoff_policy_prototype_(std::move(backoff_policy)),
9597
session_pool_(std::make_shared<SessionPool>(
96-
db_, std::move(stubs), retry_policy_prototype_->clone(),
98+
db_, std::move(stubs), std::move(session_pool_options),
99+
retry_policy_prototype_->clone(),
97100
backoff_policy_prototype_->clone())) {}
98101

99102
RowStream ConnectionImpl::Read(ReadParams params) {

google/cloud/spanner/internal/connection_impl.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ std::unique_ptr<BackoffPolicy> DefaultConnectionBackoffPolicy();
5252
class ConnectionImpl;
5353
std::shared_ptr<ConnectionImpl> MakeConnection(
5454
Database db, std::vector<std::shared_ptr<SpannerStub>> stubs,
55+
SessionPoolOptions session_pool_options = SessionPoolOptions{},
5556
std::unique_ptr<RetryPolicy> retry_policy = DefaultConnectionRetryPolicy(),
5657
std::unique_ptr<BackoffPolicy> backoff_policy =
5758
DefaultConnectionBackoffPolicy());
@@ -82,9 +83,10 @@ class ConnectionImpl : public Connection {
8283
private:
8384
// Only the factory method can construct instances of this class.
8485
friend std::shared_ptr<ConnectionImpl> MakeConnection(
85-
Database, std::vector<std::shared_ptr<SpannerStub>>,
86+
Database, std::vector<std::shared_ptr<SpannerStub>>, SessionPoolOptions,
8687
std::unique_ptr<RetryPolicy>, std::unique_ptr<BackoffPolicy>);
8788
ConnectionImpl(Database db, std::vector<std::shared_ptr<SpannerStub>> stubs,
89+
SessionPoolOptions session_pool_options,
8890
std::unique_ptr<RetryPolicy> retry_policy,
8991
std::unique_ptr<BackoffPolicy> backoff_policy);
9092

google/cloud/spanner/internal/connection_impl_test.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ std::shared_ptr<Connection> MakeLimitedRetryConnection(
9292
Database const& db,
9393
std::shared_ptr<spanner_testing::MockSpannerStub> mock) {
9494
return MakeConnection(
95-
db, {std::move(mock)},
95+
db, {std::move(mock)}, SessionPoolOptions{},
9696
LimitedErrorCountRetryPolicy(/*maximum_failures=*/2).clone(),
9797
ExponentialBackoffPolicy(/*initial_delay=*/std::chrono::microseconds(1),
9898
/*maximum_delay=*/std::chrono::microseconds(1),

google/cloud/spanner/internal/session_pool.cc

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,14 @@ SessionPoolOptions SanitizeOptions(SessionPoolOptions options,
4747

4848
SessionPool::SessionPool(Database db,
4949
std::vector<std::shared_ptr<SpannerStub>> stubs,
50+
SessionPoolOptions options,
5051
std::unique_ptr<RetryPolicy> retry_policy,
51-
std::unique_ptr<BackoffPolicy> backoff_policy,
52-
SessionPoolOptions options)
52+
std::unique_ptr<BackoffPolicy> backoff_policy)
5353
: db_(std::move(db)),
54+
options_(
55+
SanitizeOptions(std::move(options), static_cast<int>(stubs.size()))),
5456
retry_policy_prototype_(std::move(retry_policy)),
5557
backoff_policy_prototype_(std::move(backoff_policy)),
56-
options_(SanitizeOptions(options, static_cast<int>(stubs.size()))),
5758
max_pool_size_(options_.max_sessions_per_channel *
5859
static_cast<int>(stubs.size())) {
5960
if (stubs.empty()) {
@@ -88,7 +89,7 @@ SessionPool::SessionPool(Database db,
8889
}
8990
// Just ignore failures; we'll try again when the caller requests a
9091
// session, and we'll be in a position to return an error at that time.
91-
(void)CreateSessions(lk, channel, num_sessions);
92+
(void)CreateSessions(lk, channel, options_.labels, num_sessions);
9293
}
9394
}
9495

@@ -135,7 +136,8 @@ StatusOr<SessionHolder> SessionPool::Allocate(bool dissociate_from_pool) {
135136
int sessions_to_create =
136137
(std::min)(options_.min_sessions + 1,
137138
options_.max_sessions_per_channel - channel.session_count);
138-
auto create_status = CreateSessions(lk, channel, sessions_to_create);
139+
auto create_status =
140+
CreateSessions(lk, channel, options_.labels, sessions_to_create);
139141
if (!create_status.ok()) {
140142
return create_status;
141143
}
@@ -173,12 +175,15 @@ void SessionPool::Release(Session* session) {
173175
//
174176
// Requires `lk` has locked `mu_` prior to this call. `lk` will be dropped
175177
// while the RPC is in progress and then reacquired.
176-
Status SessionPool::CreateSessions(std::unique_lock<std::mutex>& lk,
177-
ChannelInfo& channel, int num_sessions) {
178+
Status SessionPool::CreateSessions(
179+
std::unique_lock<std::mutex>& lk, ChannelInfo& channel,
180+
std::map<std::string, std::string> const& labels, int num_sessions) {
178181
create_in_progress_ = true;
179182
lk.unlock();
180183
spanner_proto::BatchCreateSessionsRequest request;
181184
request.set_database(db_.FullName());
185+
request.mutable_session_template()->mutable_labels()->insert(labels.begin(),
186+
labels.end());
182187
request.set_session_count(std::int32_t{num_sessions});
183188
auto const& stub = channel.stub;
184189
auto response = RetryLoop(

google/cloud/spanner/internal/session_pool.h

Lines changed: 5 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@
2020
#include "google/cloud/spanner/internal/session.h"
2121
#include "google/cloud/spanner/internal/spanner_stub.h"
2222
#include "google/cloud/spanner/retry_policy.h"
23+
#include "google/cloud/spanner/session_pool_options.h"
2324
#include "google/cloud/spanner/version.h"
2425
#include "google/cloud/status_or.h"
2526
#include <google/spanner/v1/spanner.pb.h>
26-
#include <chrono>
2727
#include <condition_variable>
2828
#include <cstddef>
2929
#include <map>
@@ -39,34 +39,6 @@ namespace spanner {
3939
inline namespace SPANNER_CLIENT_NS {
4040
namespace internal {
4141

42-
// What action to take if the session pool is exhausted.
43-
enum class ActionOnExhaustion { BLOCK, FAIL };
44-
45-
struct SessionPoolOptions {
46-
// The minimum number of sessions to keep in the pool.
47-
// Values <= 0 are treated as 0.
48-
// This value will be reduced if it exceeds the overall limit on the number
49-
// of sessions (`max_sessions_per_channel` * number of channels).
50-
int min_sessions = 0;
51-
52-
// The maximum number of sessions to create on each channel.
53-
// Values <= 1 are treated as 1.
54-
int max_sessions_per_channel = 100;
55-
56-
// The maximum number of sessions that can be in the pool in an idle state.
57-
// Values <= 0 are treated as 0.
58-
int max_idle_sessions = 0;
59-
60-
// Decide whether to block or fail on pool exhaustion.
61-
ActionOnExhaustion action_on_exhaustion = ActionOnExhaustion::BLOCK;
62-
63-
// This is the interval at which we refresh sessions so they don't get
64-
// collected by the backend GC. The GC collects objects older than 60
65-
// minutes, so any duration below that (less some slack to allow the calls
66-
// to be made to refresh the sessions) should suffice.
67-
std::chrono::minutes keep_alive_interval = std::chrono::minutes(55);
68-
};
69-
7042
/**
7143
* Maintains a pool of `Session` objects.
7244
*
@@ -90,9 +62,9 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {
9062
* create them. `stubs` must not be empty.
9163
*/
9264
SessionPool(Database db, std::vector<std::shared_ptr<SpannerStub>> stubs,
65+
SessionPoolOptions options,
9366
std::unique_ptr<RetryPolicy> retry_policy,
94-
std::unique_ptr<BackoffPolicy> backoff_policy,
95-
SessionPoolOptions options = SessionPoolOptions());
67+
std::unique_ptr<BackoffPolicy> backoff_policy);
9668

9769
/**
9870
* Allocate a `Session` from the pool, creating a new one if necessary.
@@ -130,6 +102,7 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {
130102
void Release(Session* session);
131103

132104
Status CreateSessions(std::unique_lock<std::mutex>& lk, ChannelInfo& channel,
105+
std::map<std::string, std::string> const& labels,
133106
int num_sessions); // EXCLUSIVE_LOCKS_REQUIRED(mu_)
134107

135108
SessionHolder MakeSessionHolder(std::unique_ptr<Session> session,
@@ -138,9 +111,9 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {
138111
void UpdateNextChannelForCreateSessions(); // EXCLUSIVE_LOCKS_REQUIRED(mu_)
139112

140113
Database const db_;
114+
SessionPoolOptions const options_;
141115
std::unique_ptr<RetryPolicy const> retry_policy_prototype_;
142116
std::unique_ptr<BackoffPolicy const> backoff_policy_prototype_;
143-
SessionPoolOptions const options_;
144117
int const max_pool_size_;
145118

146119
std::mutex mu_;

0 commit comments

Comments
 (0)