Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -370,9 +370,7 @@ private static Boolean getUseMultiplexedSessionFromEnvVariable() {
@VisibleForTesting
@InternalApi
protected static Boolean getUseMultiplexedSessionFromEnvVariablePartitionedOps() {
// Checks the value of env, GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS
// This returns null until Partitioned Operations is supported.
return null;
return parseBooleanEnvVariable("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS");
}

private static Boolean parseBooleanEnvVariable(String variableName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public void setUp() {
@SuppressWarnings("resource")
SpannerImpl spanner = new SpannerImpl(gapicRpc, spannerOptions);
client = new BatchClientImpl(spanner.getSessionClient(db), isMultiplexedSession);
BatchClientImpl.unimplementedForPartitionedOps.set(false);
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,14 @@ boolean isMultiplexedSessionsEnabled(Spanner spanner) {
return spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSession();
}

boolean isMultiplexedSessionsEnabledForPartitionedOps(Spanner spanner) {
if (spanner.getOptions() == null || spanner.getOptions().getSessionPoolOptions() == null) {
return false;
}
return spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionPartitionedOps()
&& spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSession();
}

boolean isMultiplexedSessionsEnabledForRW(Spanner spanner) {
if (spanner.getOptions() == null || spanner.getOptions().getSessionPoolOptions() == null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,7 @@ public void testPartitionQuery() {
assertFalse(resultSet.next());
}
}
if (isMultiplexedSessionsEnabled(connection.getSpanner())) {
assertEquals(3, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
} else {
assertEquals(2, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
}
assertEquals(2, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
}
assertEquals(2, mockSpanner.countRequestsOfType(BeginTransactionRequest.class));
assertEquals(2, mockSpanner.countRequestsOfType(PartitionQueryRequest.class));
Expand Down Expand Up @@ -155,11 +151,7 @@ public void testMixNormalAndPartitionQueryInReadOnlyTransaction() {
readTimestamps.add(connection.getReadTimestamp());
connection.commit();
}
if (isMultiplexedSessionsEnabled(connection.getSpanner())) {
assertEquals(3, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
} else {
assertEquals(2, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
}
assertEquals(2, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
}
// The above will start two transactions:
// 1. The initial 'normal' read-only transaction.
Expand Down Expand Up @@ -228,6 +220,10 @@ public void testRunPartition() {
if (createSessionRequestCounts == expectedCreateSessionsRPC + 1) {
isMultiplexedSessionCreated = true;
}
} else if (isMultiplexedSessionsEnabledForPartitionedOps(connection.getSpanner())
&& isMultiplexedSessionCreated) {
// When multiplexed session will be reused for each iteration.
assertEquals(0, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
} else {
assertEquals(
expectedCreateSessionsRPC,
Expand Down Expand Up @@ -261,6 +257,7 @@ public void testRunPartitionUsingSql() {
String prefix = dialect == Dialect.POSTGRESQL ? "spanner." : "";

int maxPartitions = 5;
boolean isMultiplexedSessionCreated = false;
try (Connection connection = createConnection()) {
connection.execute(Statement.of("set autocommit=true"));
assertTrue(connection.isAutocommit());
Expand All @@ -284,7 +281,6 @@ public void testRunPartitionUsingSql() {
assertFalse(resultSet.next());
}

boolean isMultiplexedSessionCreated = false;
for (boolean useLiteral : new boolean[] {true, false}) {
try (ResultSet partitions =
connection.executeQuery(
Expand Down Expand Up @@ -328,6 +324,10 @@ public void testRunPartitionUsingSql() {
if (createSessionRequestCounts == expectedCreateSessionsRPC + 1) {
isMultiplexedSessionCreated = true;
}
} else if (isMultiplexedSessionsEnabledForPartitionedOps(connection.getSpanner())
&& isMultiplexedSessionCreated) {
// When multiplexed session will be reused for each iteration.
assertEquals(0, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
} else {
assertEquals(
expectedCreateSessionsRPC,
Expand Down Expand Up @@ -570,11 +570,7 @@ public void testRunPartitionedQueryUsingSql() {
assertEquals(maxPartitions * generatedRowCount, rowCount);
}
}
if (isMultiplexedSessionsEnabled(connection.getSpanner())) {
assertEquals(3, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
} else {
assertEquals(2, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
}
assertEquals(2, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
}
// We have 2 requests of each, as we run the query with data boost both enabled and disabled.
assertEquals(2, mockSpanner.countRequestsOfType(BeginTransactionRequest.class));
Expand Down Expand Up @@ -679,8 +675,8 @@ public void testRunPartitionedQueryWithMaxParallelism() {
assertEquals(maxPartitions * generatedRowCount, rowCount);
}
}
if (isMultiplexedSessionsEnabled(connection.getSpanner())) {
assertEquals(6, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
if (isMultiplexedSessionsEnabledForPartitionedOps(connection.getSpanner())) {
assertEquals(2, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
} else {
assertEquals(5, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
}
Expand Down Expand Up @@ -758,11 +754,8 @@ public void testAutoPartitionMode() {
exception
.getMessage()
.contains("Partition query is not supported for read/write transaction"));
if (isMultiplexedSessionsEnabled(connection.getSpanner())) {
assertEquals(3, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
} else {
assertEquals(2, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
}

assertEquals(2, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
}
assertEquals(2, mockSpanner.countRequestsOfType(BeginTransactionRequest.class));
assertEquals(2, mockSpanner.countRequestsOfType(PartitionQueryRequest.class));
Expand Down
Loading