Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1043,4 +1043,14 @@
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.spanner.v1.TransactionOptions$ReadWrite$ReadLockMode getReadLockMode()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>void setTransactionTimeout(java.time.Duration)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>java.time.Duration getTransactionTimeout()</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -358,22 +358,25 @@ <T> ApiFuture<T> executeStatementAsync(
statement, StatementExecutionStep.EXECUTE_STATEMENT, this);
}
Context context = Context.current();
if (statementTimeout.hasTimeout() && !applyStatementTimeoutToMethods.isEmpty()) {
Deadline deadline =
Deadline.after(
statementTimeout.getTimeoutValue(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
Deadline transactionDeadline = getTransactionDeadline();
Deadline statementDeadline =
statementTimeout.hasTimeout()
? Deadline.after(
statementTimeout.getTimeoutValue(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS)
: null;
Deadline effectiveDeadline = min(transactionDeadline, statementDeadline);
if (effectiveDeadline != null && !applyStatementTimeoutToMethods.isEmpty()) {
context =
context.withValue(
SpannerOptions.CALL_CONTEXT_CONFIGURATOR_KEY,
new SpannerOptions.CallContextConfigurator() {
@Override
public <ReqT, RespT> ApiCallContext configure(
ApiCallContext context, ReqT request, MethodDescriptor<ReqT, RespT> method) {
if (statementTimeout.hasTimeout()
&& applyStatementTimeoutToMethods.contains(method)) {
if (applyStatementTimeoutToMethods.contains(method)) {
// Calculate the remaining timeout. This method could be called multiple times
// if the transaction is retried.
long remainingTimeout = deadline.timeRemaining(TimeUnit.NANOSECONDS);
long remainingTimeout = effectiveDeadline.timeRemaining(TimeUnit.NANOSECONDS);
if (remainingTimeout <= 0) {
remainingTimeout = 1;
}
Expand Down Expand Up @@ -427,4 +430,23 @@ public void run() {
return future;
}
}

@Nullable
static Deadline min(@Nullable Deadline a, @Nullable Deadline b) {
if (a == null && b == null) {
return null;
}
if (a == null) {
return b;
}
if (b == null) {
return a;
}
return a.minimum(b);
}

@Nullable
Deadline getTransactionDeadline() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,12 @@ public interface Connection extends AutoCloseable {
/** Returns the read lock mode for read/write transactions for this connection. */
ReadLockMode getReadLockMode();

/** Sets the timeout for read/write transactions. */
void setTransactionTimeout(Duration timeout);

/** Returns the timeout for read/write transactions. */
Duration getTransactionTimeout();

/**
* Sets the duration the connection should wait before automatically aborting the execution of a
* statement. The default is no timeout. Statement timeouts are applied all types of statements,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import static com.google.cloud.spanner.connection.ConnectionProperties.RPC_PRIORITY;
import static com.google.cloud.spanner.connection.ConnectionProperties.SAVEPOINT_SUPPORT;
import static com.google.cloud.spanner.connection.ConnectionProperties.TRACING_PREFIX;
import static com.google.cloud.spanner.connection.ConnectionProperties.TRANSACTION_TIMEOUT;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
Expand Down Expand Up @@ -94,6 +95,8 @@
import com.google.spanner.v1.ResultSetStats;
import com.google.spanner.v1.TransactionOptions.IsolationLevel;
import com.google.spanner.v1.TransactionOptions.ReadWrite.ReadLockMode;
import io.grpc.Deadline;
import io.grpc.Deadline.Ticker;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
Expand Down Expand Up @@ -254,6 +257,7 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
}
}

private final Ticker ticker;
private StatementExecutor.StatementTimeout statementTimeout =
new StatementExecutor.StatementTimeout();
private boolean closed = false;
Expand Down Expand Up @@ -311,6 +315,7 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
? StatementExecutorType.VIRTUAL_THREAD
: StatementExecutorType.PLATFORM_THREAD;
}
this.ticker = options.getTicker();
this.statementExecutor =
new StatementExecutor(statementExecutorType, options.getStatementExecutionInterceptors());
this.spannerPool = SpannerPool.INSTANCE;
Expand Down Expand Up @@ -361,6 +366,7 @@ && getDialect() == Dialect.POSTGRESQL
? StatementExecutorType.VIRTUAL_THREAD
: StatementExecutorType.PLATFORM_THREAD,
Collections.emptyList());
this.ticker = options.getTicker();
this.spannerPool = Preconditions.checkNotNull(spannerPool);
this.options = Preconditions.checkNotNull(options);
this.spanner = spannerPool.getSpanner(options, this);
Expand Down Expand Up @@ -489,6 +495,7 @@ private void reset(Context context, boolean inTransaction) {
this.connectionState.resetValue(READONLY, context, inTransaction);
this.connectionState.resetValue(DEFAULT_ISOLATION_LEVEL, context, inTransaction);
this.connectionState.resetValue(READ_LOCK_MODE, context, inTransaction);
this.connectionState.resetValue(TRANSACTION_TIMEOUT, context, inTransaction);
this.connectionState.resetValue(READ_ONLY_STALENESS, context, inTransaction);
this.connectionState.resetValue(OPTIMIZER_VERSION, context, inTransaction);
this.connectionState.resetValue(OPTIMIZER_STATISTICS_PACKAGE, context, inTransaction);
Expand Down Expand Up @@ -683,6 +690,26 @@ public ReadLockMode getReadLockMode() {
return getConnectionPropertyValue(READ_LOCK_MODE);
}

@Override
public void setTransactionTimeout(Duration timeout) {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
setConnectionPropertyValue(TRANSACTION_TIMEOUT, timeout);
}

@Override
public Duration getTransactionTimeout() {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
return getConnectionPropertyValue(TRANSACTION_TIMEOUT);
}

@Nullable
Deadline getTransactionDeadline() {
Duration timeout = getTransactionTimeout();
return timeout == null
? null
: Deadline.after(timeout.toNanos(), TimeUnit.NANOSECONDS, this.ticker);
}

@Override
public void setAutocommitDmlMode(AutocommitDmlMode mode) {
Preconditions.checkNotNull(mode);
Expand Down Expand Up @@ -2271,6 +2298,7 @@ UnitOfWork createNewUnitOfWork(
.setDatabaseClient(dbClient)
.setIsolationLevel(transactionIsolationLevel)
.setReadLockMode(getConnectionPropertyValue(READ_LOCK_MODE))
.setDeadline(getTransactionDeadline())
.setDelayTransactionStartUntilFirstWrite(
getConnectionPropertyValue(DELAY_TRANSACTION_START_UNTIL_FIRST_WRITE))
.setKeepTransactionAlive(getConnectionPropertyValue(KEEP_TRANSACTION_ALIVE))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@
import com.google.common.base.Strings;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import io.grpc.Deadline;
import io.grpc.Deadline.Ticker;
import io.opentelemetry.api.OpenTelemetry;
import java.io.IOException;
import java.net.URL;
Expand Down Expand Up @@ -388,6 +390,7 @@ public static class Builder {
Collections.emptyList();
private SpannerOptionsConfigurator configurator;
private OpenTelemetry openTelemetry;
private Ticker ticker = Deadline.getSystemTicker();

private Builder() {}

Expand Down Expand Up @@ -559,6 +562,12 @@ Builder setCredentials(Credentials credentials) {
return this;
}

@VisibleForTesting
Builder setTicker(Ticker ticker) {
this.ticker = Preconditions.checkNotNull(ticker);
return this;
}

/**
* Sets the executor type to use for connections. See {@link StatementExecutorType} for more
* information on what the different options mean.
Expand Down Expand Up @@ -613,6 +622,7 @@ public static Builder newBuilder() {
private final OpenTelemetry openTelemetry;
private final List<StatementExecutionInterceptor> statementExecutionInterceptors;
private final SpannerOptionsConfigurator configurator;
private final Ticker ticker;

private ConnectionOptions(Builder builder) {
Matcher matcher;
Expand Down Expand Up @@ -641,6 +651,7 @@ private ConnectionOptions(Builder builder) {
this.statementExecutionInterceptors =
Collections.unmodifiableList(builder.statementExecutionInterceptors);
this.configurator = builder.configurator;
this.ticker = builder.ticker;

// Create the initial connection state from the parsed properties in the connection URL.
this.initialConnectionState = new ConnectionState(connectionPropertyValues);
Expand Down Expand Up @@ -813,6 +824,10 @@ SpannerOptionsConfigurator getConfigurator() {
return configurator;
}

Ticker getTicker() {
return ticker;
}

@VisibleForTesting
CredentialsService getCredentialsService() {
return CredentialsService.INSTANCE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,14 @@ public class ConnectionProperties {
.toArray(new ReadLockMode[0]),
ReadLockModeConverter.INSTANCE,
Context.USER);
static final ConnectionProperty<Duration> TRANSACTION_TIMEOUT =
create(
"transaction_timeout",
"Timeout for read/write transactions.",
null,
null,
DurationConverter.INSTANCE,
Context.USER);
static final ConnectionProperty<AutocommitDmlMode> AUTOCOMMIT_DML_MODE =
create(
"autocommit_dml_mode",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ interface ConnectionStatementExecutor {

StatementResult statementShowStatementTimeout();

StatementResult statementSetTransactionTimeout(Duration duration);

StatementResult statementShowTransactionTimeout();

StatementResult statementShowReadTimestamp();

StatementResult statementShowCommitTimestamp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_STATEMENT_TIMEOUT;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_TRANSACTION_MODE;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_TRANSACTION_TAG;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_TRANSACTION_TIMEOUT;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_AUTOCOMMIT;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_AUTOCOMMIT_DML_MODE;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_AUTO_BATCH_DML;
Expand Down Expand Up @@ -85,6 +86,7 @@
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_STATEMENT_TIMEOUT;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_TRANSACTION_ISOLATION_LEVEL;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_TRANSACTION_TAG;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_TRANSACTION_TIMEOUT;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.START_BATCH_DDL;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.START_BATCH_DML;
import static com.google.cloud.spanner.connection.StatementResultImpl.noResult;
Expand Down Expand Up @@ -248,6 +250,24 @@ public StatementResult statementShowStatementTimeout() {
SHOW_STATEMENT_TIMEOUT);
}

@Override
public StatementResult statementSetTransactionTimeout(Duration duration) {
if (duration == null || duration.isZero()) {
getConnection().setTransactionTimeout(null);
} else {
getConnection().setTransactionTimeout(duration);
}
return noResult(SET_TRANSACTION_TIMEOUT);
}

@Override
public StatementResult statementShowTransactionTimeout() {
return resultSet(
String.format("%sTRANSACTION_TIMEOUT", getNamespace(connection.getDialect())),
String.valueOf(getConnection().getTransactionTimeout()),
SHOW_TRANSACTION_TIMEOUT);
}

@Override
public StatementResult statementShowReadTimestamp() {
return resultSet(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import com.google.spanner.v1.SpannerGrpc;
import com.google.spanner.v1.TransactionOptions.IsolationLevel;
import com.google.spanner.v1.TransactionOptions.ReadWrite.ReadLockMode;
import io.grpc.Deadline;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.context.Scope;
import java.time.Duration;
Expand All @@ -81,6 +82,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/**
* Transaction that is used when a {@link Connection} is normal read/write mode (i.e. not autocommit
Expand Down Expand Up @@ -157,6 +159,7 @@ class ReadWriteTransaction extends AbstractMultiUseTransaction {
private final SavepointSupport savepointSupport;
@Nonnull private final IsolationLevel isolationLevel;
private final ReadLockMode readLockMode;
private final Deadline deadline;
private int transactionRetryAttempts;
private int successfulRetries;
private volatile ApiFuture<TransactionContext> txContextFuture;
Expand Down Expand Up @@ -210,6 +213,7 @@ static class Builder extends AbstractMultiUseTransaction.Builder<Builder, ReadWr
private SavepointSupport savepointSupport;
private IsolationLevel isolationLevel;
private ReadLockMode readLockMode = ReadLockMode.READ_LOCK_MODE_UNSPECIFIED;
private Deadline deadline;

private Builder() {}

Expand Down Expand Up @@ -269,6 +273,11 @@ Builder setReadLockMode(ReadLockMode readLockMode) {
return this;
}

Builder setDeadline(Deadline deadline) {
this.deadline = deadline;
return this;
}

@Override
ReadWriteTransaction build() {
Preconditions.checkState(dbClient != null, "No DatabaseClient client specified");
Expand Down Expand Up @@ -314,6 +323,7 @@ private ReadWriteTransaction(Builder builder) {
this.savepointSupport = builder.savepointSupport;
this.isolationLevel = Preconditions.checkNotNull(builder.isolationLevel);
this.readLockMode = Preconditions.checkNotNull(builder.readLockMode);
this.deadline = builder.deadline;
this.transactionOptions = extractOptions(builder);
}

Expand Down Expand Up @@ -1307,6 +1317,12 @@ String getUnitOfWorkName() {
return "read/write transaction";
}

@Nullable
@Override
Deadline getTransactionDeadline() {
return this.deadline;
}

static class ReadWriteSavepoint extends Savepoint {
private final int statementPosition;
private final int mutationPosition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ enum ClientSideStatementType {
SET_AUTOCOMMIT_DML_MODE,
SHOW_STATEMENT_TIMEOUT,
SET_STATEMENT_TIMEOUT,
SHOW_TRANSACTION_TIMEOUT,
SET_TRANSACTION_TIMEOUT,
SHOW_READ_TIMESTAMP,
SHOW_COMMIT_TIMESTAMP,
SHOW_COMMIT_RESPONSE,
Expand Down
Loading
Loading