5454import com .google .cloud .spanner .SessionPoolOptions .InactiveTransactionRemovalOptions ;
5555import com .google .cloud .spanner .SpannerException .ResourceNotFoundException ;
5656import com .google .cloud .spanner .SpannerImpl .ClosedException ;
57+ import com .google .cloud .spanner .spi .v1 .SpannerRpc ;
5758import com .google .common .annotations .VisibleForTesting ;
5859import com .google .common .base .Function ;
5960import com .google .common .base .MoreObjects ;
@@ -1366,12 +1367,19 @@ PooledSession get(final boolean eligibleForLongRunning) {
13661367 }
13671368 }
13681369
1369- final class PooledSession implements Session {
1370+ class PooledSession implements Session {
13701371 @ VisibleForTesting SessionImpl delegate ;
13711372 private volatile Instant lastUseTime ;
13721373 private volatile SpannerException lastException ;
13731374 private volatile boolean allowReplacing = true ;
13741375
1376+ /**
1377+ * This ensures that the session is added at a random position in the pool the first time it is
1378+ * actually added to the pool.
1379+ */
1380+ @ GuardedBy ("lock" )
1381+ private Position releaseToPosition = initialReleasePosition ;
1382+
13751383 /**
13761384 * Property to mark if the session is eligible to be long-running. This can only be true if the
13771385 * session is executing certain types of transactions (for ex - Partitioned DML) which can be
@@ -1403,6 +1411,13 @@ private PooledSession(SessionImpl delegate) {
14031411 this .lastUseTime = clock .instant ();
14041412 }
14051413
1414+ int getChannel () {
1415+ Long channelHint = (Long ) delegate .getOptions ().get (SpannerRpc .Option .CHANNEL_HINT );
1416+ return channelHint == null
1417+ ? 0
1418+ : (int ) (channelHint % sessionClient .getSpanner ().getOptions ().getNumChannels ());
1419+ }
1420+
14061421 @ Override
14071422 public String toString () {
14081423 return getName ();
@@ -1536,7 +1551,7 @@ public void close() {
15361551 if (state != SessionState .CLOSING ) {
15371552 state = SessionState .AVAILABLE ;
15381553 }
1539- releaseSession (this , Position . FIRST );
1554+ releaseSession (this , false );
15401555 }
15411556 }
15421557
@@ -1576,7 +1591,7 @@ private void determineDialectAsync(final SettableFuture<Dialect> dialect) {
15761591 // in the database dialect, and there's nothing sensible that we can do with it here.
15771592 dialect .setException (t );
15781593 } finally {
1579- releaseSession (this , Position . FIRST );
1594+ releaseSession (this , false );
15801595 }
15811596 });
15821597 }
@@ -1830,7 +1845,7 @@ private void keepAliveSessions(Instant currTime) {
18301845 logger .log (Level .FINE , "Keeping alive session " + sessionToKeepAlive .getName ());
18311846 numSessionsToKeepAlive --;
18321847 sessionToKeepAlive .keepAlive ();
1833- releaseSession (sessionToKeepAlive , Position . FIRST );
1848+ releaseSession (sessionToKeepAlive , false );
18341849 } catch (SpannerException e ) {
18351850 handleException (e , sessionToKeepAlive );
18361851 }
@@ -1929,7 +1944,7 @@ private void removeLongRunningSessions(
19291944 }
19301945 }
19311946
1932- private enum Position {
1947+ enum Position {
19331948 FIRST ,
19341949 RANDOM
19351950 }
@@ -1962,6 +1977,15 @@ private enum Position {
19621977
19631978 final PoolMaintainer poolMaintainer ;
19641979 private final Clock clock ;
1980+ /**
1981+ * initialReleasePosition determines where in the pool sessions are added when they are released
1982+ * into the pool the first time. This is always RANDOM in production, but some tests use FIRST to
1983+ * be able to verify the order of sessions in the pool. Using RANDOM ensures that we do not get an
1984+ * unbalanced session pool where all sessions belonging to one gRPC channel are added to the same
1985+ * region in the pool.
1986+ */
1987+ private final Position initialReleasePosition ;
1988+
19651989 private final Object lock = new Object ();
19661990 private final Random random = new Random ();
19671991
@@ -2045,6 +2069,7 @@ static SessionPool createPool(
20452069 ((GrpcTransportOptions ) spannerOptions .getTransportOptions ()).getExecutorFactory (),
20462070 sessionClient ,
20472071 poolMaintainerClock == null ? new Clock () : poolMaintainerClock ,
2072+ Position .RANDOM ,
20482073 Metrics .getMetricRegistry (),
20492074 labelValues );
20502075 }
@@ -2053,20 +2078,22 @@ static SessionPool createPool(
20532078 SessionPoolOptions poolOptions ,
20542079 ExecutorFactory <ScheduledExecutorService > executorFactory ,
20552080 SessionClient sessionClient ) {
2056- return createPool (poolOptions , executorFactory , sessionClient , new Clock ());
2081+ return createPool (poolOptions , executorFactory , sessionClient , new Clock (), Position . RANDOM );
20572082 }
20582083
20592084 static SessionPool createPool (
20602085 SessionPoolOptions poolOptions ,
20612086 ExecutorFactory <ScheduledExecutorService > executorFactory ,
20622087 SessionClient sessionClient ,
2063- Clock clock ) {
2088+ Clock clock ,
2089+ Position initialReleasePosition ) {
20642090 return createPool (
20652091 poolOptions ,
20662092 null ,
20672093 executorFactory ,
20682094 sessionClient ,
20692095 clock ,
2096+ initialReleasePosition ,
20702097 Metrics .getMetricRegistry (),
20712098 SPANNER_DEFAULT_LABEL_VALUES );
20722099 }
@@ -2077,6 +2104,7 @@ static SessionPool createPool(
20772104 ExecutorFactory <ScheduledExecutorService > executorFactory ,
20782105 SessionClient sessionClient ,
20792106 Clock clock ,
2107+ Position initialReleasePosition ,
20802108 MetricRegistry metricRegistry ,
20812109 List <LabelValue > labelValues ) {
20822110 SessionPool pool =
@@ -2087,6 +2115,7 @@ static SessionPool createPool(
20872115 executorFactory .get (),
20882116 sessionClient ,
20892117 clock ,
2118+ initialReleasePosition ,
20902119 metricRegistry ,
20912120 labelValues );
20922121 pool .initPool ();
@@ -2100,6 +2129,7 @@ private SessionPool(
21002129 ScheduledExecutorService executor ,
21012130 SessionClient sessionClient ,
21022131 Clock clock ,
2132+ Position initialReleasePosition ,
21032133 MetricRegistry metricRegistry ,
21042134 List <LabelValue > labelValues ) {
21052135 this .options = options ;
@@ -2108,6 +2138,7 @@ private SessionPool(
21082138 this .executor = executor ;
21092139 this .sessionClient = sessionClient ;
21102140 this .clock = clock ;
2141+ this .initialReleasePosition = initialReleasePosition ;
21112142 this .poolMaintainer = new PoolMaintainer ();
21122143 this .initMetricsCollection (metricRegistry , labelValues );
21132144 this .waitOnMinSessionsLatch =
@@ -2233,7 +2264,7 @@ private void handleException(SpannerException e, PooledSession session) {
22332264 if (isSessionNotFound (e )) {
22342265 invalidateSession (session );
22352266 } else {
2236- releaseSession (session , Position . FIRST );
2267+ releaseSession (session , false );
22372268 }
22382269 }
22392270
@@ -2396,33 +2427,128 @@ private void maybeCreateSession() {
23962427 }
23972428 }
23982429 }
2430+
23992431 /** Releases a session back to the pool. This might cause one of the waiters to be unblocked. */
2400- private void releaseSession (PooledSession session , Position position ) {
2432+ private void releaseSession (PooledSession session , boolean isNewSession ) {
24012433 Preconditions .checkNotNull (session );
24022434 synchronized (lock ) {
24032435 if (closureFuture != null ) {
24042436 return ;
24052437 }
24062438 if (waiters .size () == 0 ) {
2407- // No pending waiters
2408- switch (position ) {
2409- case RANDOM :
2410- if (!sessions .isEmpty ()) {
2411- int pos = random .nextInt (sessions .size () + 1 );
2412- sessions .add (pos , session );
2413- break ;
2414- }
2415- // fallthrough
2416- case FIRST :
2417- default :
2418- sessions .addFirst (session );
2439+ // There are no pending waiters.
2440+ // Add to a random position if the head of the session pool already contains many sessions
2441+ // with the same channel as this one.
2442+ if (session .releaseToPosition == Position .FIRST && isUnbalanced (session )) {
2443+ session .releaseToPosition = Position .RANDOM ;
2444+ } else if (session .releaseToPosition == Position .RANDOM
2445+ && !isNewSession
2446+ && checkedOutSessions .size () <= 2 ) {
2447+ // Do not randomize if there are few other sessions checked out and this session has been
2448+ // used. This ensures that this session will be re-used for the next transaction, which is
2449+ // more efficient.
2450+ session .releaseToPosition = Position .FIRST ;
2451+ }
2452+ if (session .releaseToPosition == Position .RANDOM && !sessions .isEmpty ()) {
2453+ // A session should only be added at a random position the first time it is added to
2454+ // the pool or if the pool was deemed unbalanced. All following releases into the pool
2455+ // should normally happen at the front of the pool (unless the pool is again deemed to be
2456+ // unbalanced).
2457+ session .releaseToPosition = Position .FIRST ;
2458+ int pos = random .nextInt (sessions .size () + 1 );
2459+ sessions .add (pos , session );
2460+ } else {
2461+ sessions .addFirst (session );
24192462 }
24202463 } else {
24212464 waiters .poll ().put (session );
24222465 }
24232466 }
24242467 }
24252468
2469+ private boolean isUnbalanced (PooledSession session ) {
2470+ int channel = session .getChannel ();
2471+ int numChannels = sessionClient .getSpanner ().getOptions ().getNumChannels ();
2472+ return isUnbalanced (channel , this .sessions , this .checkedOutSessions , numChannels );
2473+ }
2474+
2475+ /**
2476+ * Returns true if the given list of sessions is considered unbalanced when compared to the
2477+ * sessionChannel that is about to be added to the pool.
2478+ *
2479+ * <p>The method returns true if all the following is true:
2480+ *
2481+ * <ol>
2482+ * <li>The list of sessions is not empty.
2483+ * <li>The number of checked out sessions is > 2.
2484+ * <li>The number of channels being used by the pool is > 1.
2485+ * <li>And at least one of the following is true:
2486+ * <ol>
2487+ * <li>The first numChannels sessions in the list of sessions contains more than 2
2488+ * sessions that use the same channel as the one being added.
2489+ * <li>The list of currently checked out sessions contains more than 2 times the the
2490+ * number of sessions with the same channel as the one being added than it should in
2491+ * order for it to be perfectly balanced. Perfectly balanced in this case means that
2492+ * the list should preferably contain size/numChannels sessions of each channel.
2493+ * </ol>
2494+ * </ol>
2495+ *
2496+ * @param channelOfSessionBeingAdded the channel number being used by the session that is about to
2497+ * be released into the pool
2498+ * @param sessions the list of all sessions in the pool
2499+ * @param checkedOutSessions the currently checked out sessions of the pool
2500+ * @param numChannels the number of channels in use
2501+ * @return true if the pool is considered unbalanced, and false otherwise
2502+ */
2503+ @ VisibleForTesting
2504+ static boolean isUnbalanced (
2505+ int channelOfSessionBeingAdded ,
2506+ List <PooledSession > sessions ,
2507+ Set <PooledSessionFuture > checkedOutSessions ,
2508+ int numChannels ) {
2509+ // Do not re-balance the pool if the number of checked out sessions is low, as it is
2510+ // better to re-use sessions as much as possible in a low-QPS scenario.
2511+ if (sessions .isEmpty () || checkedOutSessions .size () <= 2 ) {
2512+ return false ;
2513+ }
2514+ if (numChannels == 1 ) {
2515+ return false ;
2516+ }
2517+
2518+ // Ideally, the first numChannels sessions in the pool should contain exactly one session for
2519+ // each channel.
2520+ // Check if the first numChannels sessions at the head of the pool already contain more than 2
2521+ // sessions that use the same channel as this one. If so, we re-balance.
2522+ // We also re-balance the pool in the specific case that the pool uses 2 channels and the first
2523+ // two sessions use those two channels.
2524+ int maxSessionsAtHeadOfPool = Math .min (numChannels , 3 );
2525+ int count = 0 ;
2526+ for (int i = 0 ; i < Math .min (numChannels , sessions .size ()); i ++) {
2527+ PooledSession otherSession = sessions .get (i );
2528+ if (channelOfSessionBeingAdded == otherSession .getChannel ()) {
2529+ count ++;
2530+ if (count >= maxSessionsAtHeadOfPool ) {
2531+ return true ;
2532+ }
2533+ }
2534+ }
2535+ // Ideally, the use of a channel in the checked out sessions is exactly
2536+ // numCheckedOut / numChannels
2537+ // We check whether we are more than a factor two away from that perfect distribution.
2538+ // If we are, then we re-balance.
2539+ count = 0 ;
2540+ int checkedOutThreshold = Math .max (2 , 2 * checkedOutSessions .size () / numChannels );
2541+ for (PooledSessionFuture otherSession : checkedOutSessions ) {
2542+ if (otherSession .isDone () && channelOfSessionBeingAdded == otherSession .get ().getChannel ()) {
2543+ count ++;
2544+ if (count > checkedOutThreshold ) {
2545+ return true ;
2546+ }
2547+ }
2548+ }
2549+ return false ;
2550+ }
2551+
24262552 private void handleCreateSessionsFailure (SpannerException e , int count ) {
24272553 synchronized (lock ) {
24282554 for (int i = 0 ; i < count ; i ++) {
@@ -2622,7 +2748,7 @@ public void onSessionReady(SessionImpl session) {
26222748 // Release the session to a random position in the pool to prevent the case that a batch
26232749 // of sessions that are affiliated with the same channel are all placed sequentially in
26242750 // the pool.
2625- releaseSession (pooledSession , Position . RANDOM );
2751+ releaseSession (pooledSession , true );
26262752 }
26272753 }
26282754 }
0 commit comments