Skip to content

Commit f4271cc

Browse files
feat(spanner): Modified BatchClientImpl to store multiplexed session and create fresh session after expiration date.
1 parent 3a66903 commit f4271cc

File tree

3 files changed

+74
-3
lines changed

3 files changed

+74
-3
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,53 @@
3030
import com.google.spanner.v1.PartitionReadRequest;
3131
import com.google.spanner.v1.PartitionResponse;
3232
import com.google.spanner.v1.TransactionSelector;
33+
import java.time.Clock;
34+
import java.time.Duration;
35+
import java.time.Instant;
3336
import java.util.List;
3437
import java.util.Map;
38+
import java.util.concurrent.atomic.AtomicReference;
39+
import java.util.concurrent.locks.ReentrantLock;
3540
import javax.annotation.Nullable;
41+
import javax.annotation.concurrent.GuardedBy;
3642

3743
/** Default implementation for Batch Client interface. */
3844
public class BatchClientImpl implements BatchClient {
3945
private final SessionClient sessionClient;
46+
4047
private final boolean isMultiplexedSessionEnabled;
4148

49+
/** Lock to protect the multiplexed session. */
50+
private final ReentrantLock multiplexedSessionLock = new ReentrantLock();
51+
52+
/** The duration before we try to replace the multiplexed session. The default is 7 days. */
53+
private final Duration sessionExpirationDuration;
54+
55+
/** The expiration date/time of the current multiplexed session. */
56+
@GuardedBy("multiplexedSessionLock")
57+
private final AtomicReference<Instant> expirationDate;
58+
59+
@GuardedBy("multiplexedSessionLock")
60+
private final AtomicReference<SessionImpl> multiplexedSessionReference;
61+
62+
private final Clock clock;
63+
4264
BatchClientImpl(SessionClient sessionClient, boolean isMultiplexedSessionEnabled) {
4365
this.sessionClient = checkNotNull(sessionClient);
4466
this.isMultiplexedSessionEnabled = isMultiplexedSessionEnabled;
67+
this.sessionExpirationDuration =
68+
Duration.ofMillis(
69+
sessionClient
70+
.getSpanner()
71+
.getOptions()
72+
.getSessionPoolOptions()
73+
.getMultiplexedSessionMaintenanceDuration()
74+
.toMillis());
75+
// Initialize the expiration date to the start of time to avoid unnecessary null checks.
76+
// This also ensured that a new session is created on first request.
77+
this.expirationDate = new AtomicReference<>(Instant.MIN);
78+
this.multiplexedSessionReference = new AtomicReference<>();
79+
clock = Clock.systemUTC();
4580
}
4681

4782
@Override
@@ -54,7 +89,7 @@ public String getDatabaseRole() {
5489
public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) {
5590
SessionImpl session;
5691
if (isMultiplexedSessionEnabled) {
57-
session = sessionClient.createMultiplexedSession();
92+
session = getMultiplexedSession();
5893
} else {
5994
session = sessionClient.createSession();
6095
}
@@ -99,6 +134,20 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batc
99134
batchTransactionId);
100135
}
101136

137+
private SessionImpl getMultiplexedSession() {
138+
this.multiplexedSessionLock.lock();
139+
try {
140+
if (this.clock.instant().isAfter(this.expirationDate.get())
141+
|| this.multiplexedSessionReference.get() == null) {
142+
this.multiplexedSessionReference.set(this.sessionClient.createMultiplexedSession());
143+
this.expirationDate.set(this.clock.instant().plus(this.sessionExpirationDuration));
144+
}
145+
return this.multiplexedSessionReference.get();
146+
} finally {
147+
this.multiplexedSessionLock.unlock();
148+
}
149+
}
150+
102151
private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransaction
103152
implements BatchReadOnlyTransaction {
104153
private final String sessionName;

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import java.util.concurrent.TimeUnit;
5252
import java.util.concurrent.TimeoutException;
5353
import java.util.concurrent.atomic.AtomicLong;
54+
import java.util.concurrent.locks.ReentrantLock;
5455
import java.util.logging.Level;
5556
import java.util.logging.Logger;
5657
import javax.annotation.Nullable;
@@ -107,6 +108,11 @@ private static String nextDatabaseClientId(DatabaseId databaseId) {
107108
@GuardedBy("this")
108109
private final Map<DatabaseId, DatabaseClientImpl> dbClients = new HashMap<>();
109110

111+
@GuardedBy("dbBatchClientLock")
112+
private final Map<DatabaseId, BatchClientImpl> dbBatchClients = new HashMap<>();
113+
114+
private final ReentrantLock dbBatchClientLock = new ReentrantLock();
115+
110116
private final CloseableExecutorProvider asyncExecutorProvider;
111117

112118
@GuardedBy("this")
@@ -336,9 +342,23 @@ DatabaseClientImpl createDatabaseClient(
336342

337343
@Override
338344
public BatchClient getBatchClient(DatabaseId db) {
345+
if (getOptions().getSessionPoolOptions().getUseMultiplexedSessionPartitionedOps()) {
346+
this.dbBatchClientLock.lock();
347+
try {
348+
if (this.dbBatchClients.containsKey(db)) {
349+
return this.dbBatchClients.get(db);
350+
}
351+
BatchClientImpl batchClient =
352+
new BatchClientImpl(
353+
getSessionClient(db), /*useMultiplexedSessionPartitionedOps=*/ true);
354+
this.dbBatchClients.put(db, batchClient);
355+
return batchClient;
356+
} finally {
357+
this.dbBatchClientLock.unlock();
358+
}
359+
}
339360
return new BatchClientImpl(
340-
getSessionClient(db),
341-
getOptions().getSessionPoolOptions().getUseMultiplexedSessionPartitionedOps());
361+
getSessionClient(db), /*useMultiplexedSessionPartitionedOps=*/ false);
342362
}
343363

344364
@Override

google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.mockito.Captor;
4848
import org.mockito.Mock;
4949
import org.mockito.Mockito;
50+
import org.threeten.bp.Duration;
5051

5152
/** Unit tests for {@link com.google.cloud.spanner.BatchClientImpl}. */
5253
@RunWith(JUnit4.class)
@@ -96,6 +97,7 @@ public void setUp() {
9697
when(sessionPoolOptions.getPoolMaintainerClock()).thenReturn(Clock.INSTANCE);
9798
when(sessionPoolOptions.getUseMultiplexedSessionPartitionedOps())
9899
.thenReturn(isMultiplexedSession);
100+
when(sessionPoolOptions.getMultiplexedSessionMaintenanceDuration()).thenReturn(Duration.ZERO);
99101
when(spannerOptions.getSessionPoolOptions()).thenReturn(sessionPoolOptions);
100102
@SuppressWarnings("resource")
101103
SpannerImpl spanner = new SpannerImpl(gapicRpc, spannerOptions);

0 commit comments

Comments
 (0)