Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,7 @@ public CommitResponse writeWithOptions(Iterable<Mutation> mutations, Transaction

@Override
public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerException {
throw new UnsupportedOperationException();
}

@Override
public CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
throw new UnsupportedOperationException();
return writeAtLeastOnceWithOptions(mutations).getCommitTimestamp();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,38 @@ class DatabaseClientImpl implements DatabaseClient {
@VisibleForTesting final SessionPool pool;
@VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient;

final boolean useMultiplexedSessionForBlindWrite;

@VisibleForTesting
DatabaseClientImpl(SessionPool pool, TraceWrapper tracer) {
this("", pool, /* multiplexedSessionDatabaseClient = */ null, tracer);
this(
"",
pool,
/* multiplexedSessionDatabaseClient = */ null,
/*useMultiplexedSessionForBlindWrite = */ false,
tracer);
}

@VisibleForTesting
DatabaseClientImpl(String clientId, SessionPool pool, TraceWrapper tracer) {
this(clientId, pool, /* multiplexedSessionDatabaseClient = */ null, tracer);
this(
clientId,
pool,
/* multiplexedSessionDatabaseClient = */ null,
/*useMultiplexedSessionForBlindWrite = */ false,
tracer);
}

DatabaseClientImpl(
String clientId,
SessionPool pool,
@Nullable MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient,
boolean useMultiplexedSessionForBlindWrite,
TraceWrapper tracer) {
this.clientId = clientId;
this.pool = pool;
this.multiplexedSessionDatabaseClient = multiplexedSessionDatabaseClient;
this.useMultiplexedSessionForBlindWrite = useMultiplexedSessionForBlindWrite;
this.tracer = tracer;
}

Expand Down Expand Up @@ -112,6 +126,16 @@ public Timestamp writeAtLeastOnce(final Iterable<Mutation> mutations) throws Spa
public CommitResponse writeAtLeastOnceWithOptions(
final Iterable<Mutation> mutations, final TransactionOption... options)
throws SpannerException {
if (useMultiplexedSessionForBlindWrite) {
return getMultiplexedSession().writeAtLeastOnceWithOptions(mutations, options);
} else {
return writeAtLeastOnceWithSession(mutations, options);
}
}

public CommitResponse writeAtLeastOnceWithSession(
final Iterable<Mutation> mutations, final TransactionOption... options)
throws SpannerException {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
try (IScope s = tracer.withSpan(span)) {
return runWithSessionRetry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import com.google.api.core.ApiFutures;
import com.google.cloud.spanner.DelayedReadContext.DelayedReadOnlyTransaction;
import com.google.cloud.spanner.MultiplexedSessionDatabaseClient.MultiplexedSessionTransaction;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ExecutionException;

/**
* Represents a delayed execution of a transaction on a multiplexed session. The execution is
Expand Down Expand Up @@ -119,4 +121,28 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
.readOnlyTransaction(bound),
MoreExecutors.directExecutor()));
}

@Override
public CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
try {
return ApiFutures.transform(
this.sessionFuture,
sessionReference ->
new MultiplexedSessionTransaction(
client, span, sessionReference, NO_CHANNEL_HINT, false)
.writeAtLeastOnceWithOptions(mutations, options),
MoreExecutors.directExecutor())
.get();
} catch (ExecutionException executionException) {
// Propagate the underlying exception as a RuntimeException (SpannerException is also a
// RuntimeException).
if (executionException.getCause() instanceof RuntimeException) {
throw (RuntimeException) executionException.getCause();
}
throw SpannerExceptionFactory.asSpannerException(executionException.getCause());
} catch (InterruptedException interruptedException) {
throw SpannerExceptionFactory.propagateInterrupt(interruptedException);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.SessionClient.SessionConsumer;
import com.google.cloud.spanner.SpannerException.ResourceNotFoundException;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -358,6 +359,13 @@ private int getSingleUseChannelHint() {
}
}

@Override
public CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
return createMultiplexedSessionTransaction(true)
.writeAtLeastOnceWithOptions(mutations, options);
}

@Override
public ReadContext singleUse() {
return createMultiplexedSessionTransaction(true).singleUse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public class SessionPoolOptions {

private final boolean useMultiplexedSession;

private final boolean useMultiplexedSessionForBlindWrite;

// TODO: Change to use java.time.Duration.
private final Duration multiplexedSessionMaintenanceDuration;

Expand Down Expand Up @@ -108,6 +110,14 @@ private SessionPoolOptions(Builder builder) {
(useMultiplexedSessionFromEnvVariable != null)
? useMultiplexedSessionFromEnvVariable
: builder.useMultiplexedSession;
// useMultiplexedSessionForBlindWrite priority => Environment var > private setter > client
// default
Boolean useMultiplexedSessionBlindWriteFromEnvVariable =
getUseMultiplexedSessionBlindWriteFromEnvVariable();
this.useMultiplexedSessionForBlindWrite =
(useMultiplexedSessionBlindWriteFromEnvVariable != null)
? useMultiplexedSessionBlindWriteFromEnvVariable
: builder.useMultiplexedSessionForBlindWrite;
this.multiplexedSessionMaintenanceDuration = builder.multiplexedSessionMaintenanceDuration;
}

Expand Down Expand Up @@ -307,6 +317,28 @@ public boolean getUseMultiplexedSession() {
return useMultiplexedSession;
}

@VisibleForTesting
@InternalApi
public boolean getUseMultiplexedSessionForBlindWrite() {
return useMultiplexedSessionForBlindWrite;
}

private static Boolean getUseMultiplexedSessionBlindWriteFromEnvVariable() {
String useMultiplexedSessionFromEnvVariable =
System.getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_BLIND_WRITES");
if (useMultiplexedSessionFromEnvVariable != null
&& useMultiplexedSessionFromEnvVariable.length() > 0) {
if ("true".equalsIgnoreCase(useMultiplexedSessionFromEnvVariable)
|| "false".equalsIgnoreCase(useMultiplexedSessionFromEnvVariable)) {
return Boolean.parseBoolean(useMultiplexedSessionFromEnvVariable);
} else {
throw new IllegalArgumentException(
"GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_BLIND_WRITES should be either true or false.");
}
}
return null;
}

private static Boolean getUseMultiplexedSessionFromEnvVariable() {
String useMultiplexedSessionFromEnvVariable =
System.getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS");
Expand Down Expand Up @@ -529,6 +561,11 @@ public static class Builder {
// Set useMultiplexedSession to true to make multiplexed session the default.
private boolean useMultiplexedSession = false;

// This field controls the default behavior of session management in Java client.
// Set useMultiplexedSessionForBlindWrite to true to make multiplexed session the default for
// blind writes.
private boolean useMultiplexedSessionForBlindWrite = false;

private Duration multiplexedSessionMaintenanceDuration = Duration.ofDays(7);
private Clock poolMaintainerClock = Clock.INSTANCE;

Expand Down Expand Up @@ -570,6 +607,7 @@ private Builder(SessionPoolOptions options) {
this.randomizePositionQPSThreshold = options.randomizePositionQPSThreshold;
this.inactiveTransactionRemovalOptions = options.inactiveTransactionRemovalOptions;
this.useMultiplexedSession = options.useMultiplexedSession;
this.useMultiplexedSessionForBlindWrite = options.useMultiplexedSessionForBlindWrite;
this.multiplexedSessionMaintenanceDuration = options.multiplexedSessionMaintenanceDuration;
this.poolMaintainerClock = options.poolMaintainerClock;
}
Expand Down Expand Up @@ -757,6 +795,22 @@ Builder setUseMultiplexedSession(boolean useMultiplexedSession) {
return this;
}

/**
* Sets whether the client should use multiplexed session or not for writeAtLeastOnce. If set to
* true, the client optimises and runs multiple applicable requests concurrently on a single
* session. A single multiplexed session is sufficient to handle all concurrent traffic.
*
* <p>When set to false, the client uses the regular session cached in the session pool for
* running 1 concurrent transaction per session. We require to provision sufficient sessions by
* making use of {@link SessionPoolOptions#minSessions} and {@link
* SessionPoolOptions#maxSessions} based on the traffic load. Failing to do so will result in
* higher latencies.
*/
Builder setUseMultiplexedSessionForBlindWrite(boolean useMultiplexedSessionForBlindWrite) {
this.useMultiplexedSessionForBlindWrite = useMultiplexedSessionForBlindWrite;
return this;
}

@VisibleForTesting
Builder setMultiplexedSessionMaintenanceDuration(
Duration multiplexedSessionMaintenanceDuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ public DatabaseClient getDatabaseClient(DatabaseId db) {

boolean useMultiplexedSession =
getOptions().getSessionPoolOptions().getUseMultiplexedSession();
boolean useMultiplexedSessionForBlindWrite =
getOptions().getSessionPoolOptions().getUseMultiplexedSessionForBlindWrite();
MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient =
useMultiplexedSession
? new MultiplexedSessionDatabaseClient(SpannerImpl.this.getSessionClient(db))
Expand All @@ -300,7 +302,11 @@ public DatabaseClient getDatabaseClient(DatabaseId db) {
numMultiplexedSessionsReleased);
pool.maybeWaitOnMinSessions();
DatabaseClientImpl dbClient =
createDatabaseClient(clientId, pool, multiplexedSessionDatabaseClient);
createDatabaseClient(
clientId,
pool,
multiplexedSessionDatabaseClient,
useMultiplexedSessionForBlindWrite);
dbClients.put(db, dbClient);
return dbClient;
}
Expand All @@ -311,8 +317,10 @@ public DatabaseClient getDatabaseClient(DatabaseId db) {
DatabaseClientImpl createDatabaseClient(
String clientId,
SessionPool pool,
@Nullable MultiplexedSessionDatabaseClient multiplexedSessionClient) {
return new DatabaseClientImpl(clientId, pool, multiplexedSessionClient, tracer);
@Nullable MultiplexedSessionDatabaseClient multiplexedSessionClient,
boolean multiplexedSessionForBlindWrite) {
return new DatabaseClientImpl(
clientId, pool, multiplexedSessionClient, multiplexedSessionForBlindWrite, tracer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ private static class SpannerWithClosedSessionsImpl extends SpannerImpl {

@Override
DatabaseClientImpl createDatabaseClient(
String clientId, SessionPool pool, MultiplexedSessionDatabaseClient ignore) {
String clientId,
SessionPool pool,
MultiplexedSessionDatabaseClient ignore,
boolean multiplexedSessionForBlindWrite) {
return new DatabaseClientWithClosedSessionImpl(clientId, pool, tracer);
}
}
Expand Down
Loading