diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java index e6edc6caacc..b481c67bb45 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java @@ -1921,6 +1921,7 @@ UnitOfWork createNewUnitOfWork( .build(); case READ_WRITE_TRANSACTION: return ReadWriteTransaction.newBuilder() + .setUsesEmulator(options.usesEmulator()) .setUseAutoSavepointsForEmulator(options.useAutoSavepointsForEmulator()) .setDatabaseClient(dbClient) .setDelayTransactionStartUntilFirstWrite(delayTransactionStartUntilFirstWrite) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java index f08a9522ea8..7710a1dee58 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java @@ -37,6 +37,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.base.Suppliers; import com.google.common.collect.Sets; import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions; import io.opentelemetry.api.OpenTelemetry; @@ -382,6 +383,10 @@ private static String generateGuardedConnectionPropertyError( + "The instance and database in the connection string will automatically be created if these do not yet exist on the emulator. " + "Add dialect=postgresql to the connection string to make sure that the database that is created uses the PostgreSQL dialect.", false), + ConnectionProperty.createBooleanProperty( + "useAutoSavepointsForEmulator", + "Automatically creates savepoints for each statement in a read/write transaction when using the Emulator. This is no longer needed when using Emulator version 1.5.23 or higher.", + false), ConnectionProperty.createBooleanProperty( LENIENT_PROPERTY_NAME, "Silently ignore unknown properties in the connection string/properties (true/false)", @@ -740,6 +745,7 @@ public static Builder newBuilder() { private final boolean returnCommitStats; private final Long maxCommitDelay; private final boolean autoConfigEmulator; + private final boolean useAutoSavepointsForEmulator; private final Dialect dialect; private final RpcPriority rpcPriority; private final DdlInTransactionMode ddlInTransactionMode; @@ -801,6 +807,7 @@ private ConnectionOptions(Builder builder) { this.returnCommitStats = parseReturnCommitStats(this.uri); this.maxCommitDelay = parseMaxCommitDelay(this.uri); this.autoConfigEmulator = parseAutoConfigEmulator(this.uri); + this.useAutoSavepointsForEmulator = parseUseAutoSavepointsForEmulator(this.uri); this.dialect = parseDialect(this.uri); this.usePlainText = this.autoConfigEmulator || parseUsePlainText(this.uri); this.host = @@ -1170,6 +1177,11 @@ static boolean parseAutoConfigEmulator(String uri) { return Boolean.parseBoolean(value); } + static boolean parseUseAutoSavepointsForEmulator(String uri) { + String value = parseUriProperty(uri, "useAutoSavepointsForEmulator"); + return Boolean.parseBoolean(value); + } + @VisibleForTesting static Dialect parseDialect(String uri) { String value = parseUriProperty(uri, DIALECT_PROPERTY_NAME); @@ -1535,6 +1547,14 @@ public Duration getMaxCommitDelay() { return maxCommitDelay == null ? null : Duration.ofMillis(maxCommitDelay); } + boolean usesEmulator() { + return Suppliers.memoize( + () -> + this.autoConfigEmulator + || !Strings.isNullOrEmpty(System.getenv("SPANNER_EMULATOR_HOST"))) + .get(); + } + /** * Whether connections created by this {@link ConnectionOptions} will automatically try to connect * to the emulator using the default host/port of the emulator, and automatically create the @@ -1548,11 +1568,11 @@ public boolean isAutoConfigEmulator() { /** * Returns true if a connection should generate auto-savepoints for retrying transactions on the * emulator. This allows some more concurrent transactions on the emulator. + * + *

This is no longer needed since version 1.5.23 of the emulator. */ boolean useAutoSavepointsForEmulator() { - // For now, this option is directly linked to the option autoConfigEmulator=true, which is the - // recommended way to configure the emulator for the Connection API. - return autoConfigEmulator; + return useAutoSavepointsForEmulator; } public Dialect getDialect() { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java index 0362ffc2050..4c4e7b8e00a 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java @@ -121,6 +121,8 @@ class ReadWriteTransaction extends AbstractMultiUseTransaction { */ private static final String AUTO_SAVEPOINT_NAME = "_auto_savepoint"; + private final boolean usesEmulator; + /** * Indicates whether an automatic savepoint should be generated after each statement, so the * transaction can be manually aborted and retried by the Connection API when connected to the @@ -191,6 +193,7 @@ public void onSuccess(V result) { } static class Builder extends AbstractMultiUseTransaction.Builder { + private boolean usesEmulator; private boolean useAutoSavepointsForEmulator; private DatabaseClient dbClient; private Boolean retryAbortsInternally; @@ -203,6 +206,11 @@ static class Builder extends AbstractMultiUseTransaction.Builder> futures = new ArrayList<>(numThreads); for (int thread = 0; thread < numThreads; thread++) { - executor.submit(() -> runRandomTransactions(numRowsInserted)); + futures.add(executor.submit(() -> runRandomTransactions(numRowsInserted))); } executor.shutdown(); assertTrue(executor.awaitTermination(60L, TimeUnit.SECONDS)); + // Get the results of each transaction so the test case fails with a logical error message if + // any of the transactions failed. + for (Future future : futures) { + assertNull(future.get()); + } verifyRowCount(numRowsInserted.get()); } @@ -141,7 +160,7 @@ private void runRandomTransactions(AtomicInteger numRowsInserted) { while (!connections.isEmpty()) { int index = ThreadLocalRandom.current().nextInt(connections.size()); Connection connection = connections.get(index); - if (ThreadLocalRandom.current().nextInt(10) < 3) { + if (ThreadLocalRandom.current().nextInt(10) < 5) { connection.commit(); connection.close(); assertEquals(connection, connections.remove(index)); @@ -155,6 +174,12 @@ private void runRandomTransactions(AtomicInteger numRowsInserted) { .build())); numRowsInserted.incrementAndGet(); } + try { + // Make sure to have a small wait between statements. + Thread.sleep(ThreadLocalRandom.current().nextInt(1, 5)); + } catch (InterruptedException interruptedException) { + throw SpannerExceptionFactory.propagateInterrupt(interruptedException); + } } } finally { for (Connection connection : connections) {