Skip to content

CSOT refactoring #1781

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 150 additions & 106 deletions driver-core/src/main/com/mongodb/internal/TimeoutContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,37 @@

import com.mongodb.MongoClientException;
import com.mongodb.MongoOperationTimeoutException;
import com.mongodb.internal.async.AsyncRunnable;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.connection.CommandMessage;
import com.mongodb.internal.time.StartTime;
import com.mongodb.internal.time.Timeout;
import com.mongodb.lang.Nullable;
import com.mongodb.session.ClientSession;

import java.util.Objects;
import java.util.Optional;
import java.util.function.LongConsumer;

import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.assertions.Assertions.assertNull;
import static com.mongodb.assertions.Assertions.isTrue;
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
import static com.mongodb.internal.time.Timeout.ZeroSemantics.ZERO_DURATION_MEANS_INFINITE;
import static java.util.Optional.empty;
import static java.util.Optional.ofNullable;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

/**
* Timeout Context.
*
* <p>The context for handling timeouts in relation to the Client Side Operation Timeout specification.</p>
*/
public class TimeoutContext {

private final boolean isMaintenanceContext;
private final TimeoutSettings timeoutSettings;

@Nullable
private Timeout timeout;
private final Timeout timeout;
@Nullable
private Timeout computedServerSelectionTimeout;
private long minRoundTripTimeMS = 0;

private final Timeout computedServerSelectionTimeout;
@Nullable
private MaxTimeSupplier maxTimeSupplier = null;
private final MaxTimeSupplier maxTimeSupplier;
private final boolean isMaintenanceContext;
private final long minRoundTripTimeMS;

public static MongoOperationTimeoutException createMongoRoundTripTimeoutException() {
return createMongoTimeoutException("Remaining timeoutMS is less than or equal to the server's minimum round trip time.");
Expand Down Expand Up @@ -116,11 +106,6 @@ public static TimeoutContext createTimeoutContext(final ClientSession session, f
return new TimeoutContext(timeoutSettings);
}

// Creates a copy of the timeout context that can be reset without resetting the original.
public TimeoutContext copyTimeoutContext() {
return new TimeoutContext(getTimeoutSettings(), getTimeout());
}

public TimeoutContext(final TimeoutSettings timeoutSettings) {
this(false, timeoutSettings, startTimeout(timeoutSettings.getTimeoutMS()));
}
Expand All @@ -129,9 +114,41 @@ private TimeoutContext(final TimeoutSettings timeoutSettings, @Nullable final Ti
this(false, timeoutSettings, timeout);
}

private TimeoutContext(final boolean isMaintenanceContext, final TimeoutSettings timeoutSettings, @Nullable final Timeout timeout) {
private TimeoutContext(final boolean isMaintenanceContext,
final TimeoutSettings timeoutSettings,
@Nullable final Timeout timeout) {
this(isMaintenanceContext,
null,
0,
timeoutSettings,
null,
timeout);
}

private TimeoutContext(final boolean isMaintenanceContext,
@Nullable final Timeout computedServerSelectionTimeout,
final long minRoundTripTimeMS,
final TimeoutSettings timeoutSettings,
@Nullable final MaxTimeSupplier maxTimeSupplier) {
this(isMaintenanceContext,
computedServerSelectionTimeout,
minRoundTripTimeMS,
timeoutSettings,
maxTimeSupplier,
startTimeout(timeoutSettings.getTimeoutMS()));
}

private TimeoutContext(final boolean isMaintenanceContext,
@Nullable final Timeout computedServerSelectionTimeout,
final long minRoundTripTimeMS,
final TimeoutSettings timeoutSettings,
@Nullable final MaxTimeSupplier maxTimeSupplier,
@Nullable final Timeout timeout) {
this.isMaintenanceContext = isMaintenanceContext;
this.timeoutSettings = timeoutSettings;
this.computedServerSelectionTimeout = computedServerSelectionTimeout;
this.minRoundTripTimeMS = minRoundTripTimeMS;
this.maxTimeSupplier = maxTimeSupplier;
this.timeout = timeout;
}

Expand All @@ -152,17 +169,6 @@ public void onExpired(final Runnable onExpired) {
Timeout.nullAsInfinite(timeout).onExpired(onExpired);
}

/**
* Sets the recent min round trip time
* @param minRoundTripTimeMS the min round trip time
* @return this
*/
public TimeoutContext minRoundTripTimeMS(final long minRoundTripTimeMS) {
isTrue("'minRoundTripTimeMS' must be a positive number", minRoundTripTimeMS >= 0);
this.minRoundTripTimeMS = minRoundTripTimeMS;
return this;
}

@Nullable
public Timeout timeoutIncludingRoundTrip() {
return timeout == null ? null : timeout.shortenBy(minRoundTripTimeMS, MILLISECONDS);
Expand Down Expand Up @@ -237,8 +243,19 @@ private static void runWithFixedTimeout(final long ms, final LongConsumer onRema
}
}

public void resetToDefaultMaxTime() {
this.maxTimeSupplier = null;
/**
* Creates a new {@link TimeoutContext} with the same settings, but with the
* {@link TimeoutSettings#getMaxAwaitTimeMS()} as the maxTimeMS override which will be used
* in {@link #runMaxTimeMS(LongConsumer)}.
*/
public TimeoutContext withMaxTimeAsMaxAwaitTimeOverride() {
return new TimeoutContext(
isMaintenanceContext,
computedServerSelectionTimeout,
minRoundTripTimeMS,
timeoutSettings,
timeoutSettings::getMaxAwaitTimeMS,
timeout);
}

/**
Expand All @@ -253,26 +270,77 @@ public void resetToDefaultMaxTime() {
* If remaining CSOT timeout is less than this static timeout, then CSOT timeout will be used.
*
*/
public void setMaxTimeOverride(final long maxTimeMS) {
this.maxTimeSupplier = () -> maxTimeMS;
public TimeoutContext withMaxTimeOverride(final long maxTimeMS) {
return new TimeoutContext(
isMaintenanceContext,
computedServerSelectionTimeout,
minRoundTripTimeMS,
timeoutSettings,
() -> maxTimeMS,
timeout);
}

/**
* Creates {@link TimeoutContext} with the default maxTimeMS behaviour in {@link #runMaxTimeMS(LongConsumer)}:
* - if timeoutMS is set, the remaining timeoutMS will be used as the maxTimeMS.
* - if timeoutMS is not set, the {@link TimeoutSettings#getMaxTimeMS()} will be used.
*/
public TimeoutContext withDefaultMaxTime() {
return new TimeoutContext(
isMaintenanceContext,
computedServerSelectionTimeout,
minRoundTripTimeMS,
timeoutSettings,
null,
timeout);
}

/**
* Disable the maxTimeMS override. This way the maxTimeMS will not
* be appended to the command in the {@link CommandMessage}.
*/
public void disableMaxTimeOverride() {
this.maxTimeSupplier = () -> 0;
public TimeoutContext withDisabledMaxTimeOverride() {
return new TimeoutContext(
isMaintenanceContext,
computedServerSelectionTimeout,
minRoundTripTimeMS,
timeoutSettings,
() -> 0,
timeout);
}

/**
* The override will be provided as the remaining value in
* {@link #runMaxTimeMS}, where 0 is ignored.
*/
public void setMaxTimeOverrideToMaxCommitTime() {
this.maxTimeSupplier = () -> getMaxCommitTimeMS();
public TimeoutContext withMaxTimeOverrideAsMaxCommitTime() {
return new TimeoutContext(
isMaintenanceContext,
computedServerSelectionTimeout,
minRoundTripTimeMS,
timeoutSettings,
() -> getMaxCommitTimeMS(),
timeout);
}


/**
* Creates {@link TimeoutContext} with the recent min round trip time.
*
* @param minRoundTripTimeMS the min round trip time
* @return this
*/
public TimeoutContext withMinRoundTripTime(final long minRoundTripTimeMS) {
return new TimeoutContext(
isMaintenanceContext,
computedServerSelectionTimeout,
minRoundTripTimeMS,
timeoutSettings,
maxTimeSupplier,
timeout);
}


@VisibleForTesting(otherwise = PRIVATE)
public long getMaxCommitTimeMS() {
Long maxCommitTimeMS = timeoutSettings.getMaxCommitTimeMS();
Expand All @@ -296,65 +364,44 @@ public int getConnectTimeoutMs() {
}

/**
* @see #hasTimeoutMS()
* @see #doWithResetTimeout(Runnable)
* @see #doWithResetTimeout(AsyncRunnable, SingleResultCallback)
*/
public void resetTimeoutIfPresent() {
getAndResetTimeoutIfPresent();
}

/**
* @see #hasTimeoutMS()
* @return A {@linkplain Optional#isPresent() non-empty} previous {@linkplain Timeout} iff {@link #hasTimeoutMS()},
* i.e., iff it was reset.
* Resets the timeout if this timeout context is being used by pool maintenance
*/
private Optional<Timeout> getAndResetTimeoutIfPresent() {
Timeout result = timeout;
if (hasTimeoutMS()) {
timeout = startTimeout(timeoutSettings.getTimeoutMS());
return ofNullable(result);
public TimeoutContext withNewlyStartedTimeoutMaintenanceTimeout() {
if (!isMaintenanceContext) {
return this;
}
return empty();

return new TimeoutContext(
true,
computedServerSelectionTimeout,
minRoundTripTimeMS,
timeoutSettings,
maxTimeSupplier);
}

/**
* @see #resetTimeoutIfPresent()
*/
public void doWithResetTimeout(final Runnable action) {
Optional<Timeout> originalTimeout = getAndResetTimeoutIfPresent();
try {
action.run();
} finally {
originalTimeout.ifPresent(original -> timeout = original);
}

public TimeoutContext withMinRoundTripTimeMS(final long minRoundTripTimeMS) {
isTrue("'minRoundTripTimeMS' must be a positive number", minRoundTripTimeMS >= 0);
return new TimeoutContext(isMaintenanceContext,
computedServerSelectionTimeout,
minRoundTripTimeMS,
timeoutSettings,
maxTimeSupplier,
timeout);
}

/**
* @see #resetTimeoutIfPresent()
*/
public void doWithResetTimeout(final AsyncRunnable action, final SingleResultCallback<Void> callback) {
beginAsync().thenRun(c -> {
Optional<Timeout> originalTimeout = getAndResetTimeoutIfPresent();
beginAsync().thenRun(c2 -> {
action.finish(c2);
}).thenAlwaysRunAndFinish(() -> {
originalTimeout.ifPresent(original -> timeout = original);
}, c);
}).finish(callback);
// Creates a copy of the timeout context that can be reset without resetting the original.
public TimeoutContext copyTimeoutContext() {
return new TimeoutContext(getTimeoutSettings(), getTimeout());
}

/**
* Resets the timeout if this timeout context is being used by pool maintenance
*/
public void resetMaintenanceTimeout() {
if (!isMaintenanceContext) {
return;
}
timeout = Timeout.nullAsInfinite(timeout).call(NANOSECONDS,
() -> timeout,
(ms) -> startTimeout(timeoutSettings.getTimeoutMS()),
() -> startTimeout(timeoutSettings.getTimeoutMS()));
public TimeoutContext withNewlyStartedTimeout() {
return new TimeoutContext(
isMaintenanceContext,
computedServerSelectionTimeout,
minRoundTripTimeMS,
timeoutSettings,
maxTimeSupplier);
}

public TimeoutContext withAdditionalReadTimeout(final int additionalReadTimeout) {
Expand Down Expand Up @@ -421,31 +468,28 @@ public static Timeout startTimeout(@Nullable final Long timeoutMS) {
* @return the timeout context
*/
public Timeout computeServerSelectionTimeout() {
Timeout serverSelectionTimeout = StartTime.now()
.timeoutAfterOrInfiniteIfNegative(getTimeoutSettings().getServerSelectionTimeoutMS(), MILLISECONDS);


if (isMaintenanceContext || !hasTimeoutMS()) {
return serverSelectionTimeout;
}

if (timeout != null && Timeout.earliest(serverSelectionTimeout, timeout) == timeout) {
return timeout;
if (hasTimeoutMS()) {
return assertNotNull(timeout);
}

computedServerSelectionTimeout = serverSelectionTimeout;
return computedServerSelectionTimeout;
return StartTime.now().timeoutAfterOrInfiniteIfNegative(getTimeoutSettings().getServerSelectionTimeoutMS(), MILLISECONDS);
}

/**
* Returns the timeout context to use for the handshake process
*
* @return a new timeout context with the cached computed server selection timeout if available or this
*/
public TimeoutContext withComputedServerSelectionTimeoutContext() {
if (this.hasTimeoutMS() && computedServerSelectionTimeout != null) {
return new TimeoutContext(false, timeoutSettings, computedServerSelectionTimeout);
public TimeoutContext withComputedServerSelectionTimeoutContextNew() {
if (this.hasTimeoutMS()) {
Timeout serverSelectionTimeout = StartTime.now()
.timeoutAfterOrInfiniteIfNegative(getTimeoutSettings().getServerSelectionTimeoutMS(), MILLISECONDS);
if (isMaintenanceContext) {
return new TimeoutContext(false, timeoutSettings, serverSelectionTimeout);
}
return new TimeoutContext(false, timeoutSettings, Timeout.earliest(serverSelectionTimeout, timeout));
}

return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
*<p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public interface SingleResultCallback<T> {
SingleResultCallback<Void> THEN_DO_NOTHING = (r, t) -> {};

/**
* Called when the function completes. This method must not complete abruptly, see {@link AsyncCallbackFunction} for more details.
*
Expand Down
Loading