Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,10 @@ public <R> ExecuteNexusOperationOutput<R> executeNexusOperation(
input.getHeaders().forEach((k, v) -> attributes.putNexusHeader(k.toLowerCase(), v));
attributes.setScheduleToCloseTimeout(
ProtobufTimeUtils.toProtoDuration(input.getOptions().getScheduleToCloseTimeout()));
attributes.setScheduleToStartTimeout(
ProtobufTimeUtils.toProtoDuration(input.getOptions().getScheduleToStartTimeout()));
attributes.setStartToCloseTimeout(
ProtobufTimeUtils.toProtoDuration(input.getOptions().getStartToCloseTimeout()));

@Nullable
UserMetadata userMetadata =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public static NexusOperationOptions getDefaultInstance() {

public static final class Builder {
private Duration scheduleToCloseTimeout;
private Duration scheduleToStartTimeout;
private Duration startToCloseTimeout;
private NexusOperationCancellationType cancellationType;
private String summary;

Expand All @@ -46,6 +48,45 @@ public NexusOperationOptions.Builder setScheduleToCloseTimeout(
return this;
}

/**
* Sets the schedule to start timeout for the Nexus operation.
*
* <p>Maximum time to wait for the operation to be started (or completed if synchronous) by the
* handler. If the operation is not started within this timeout, it will fail with
* TIMEOUT_TYPE_SCHEDULE_TO_START.
*
* <p>Requires Temporal Server 1.31.0 or later.
*
* @param scheduleToStartTimeout the schedule to start timeout for the Nexus operation
* @return this
*/
@Experimental
public NexusOperationOptions.Builder setScheduleToStartTimeout(
Duration scheduleToStartTimeout) {
this.scheduleToStartTimeout = scheduleToStartTimeout;
return this;
}

/**
* Sets the start to close timeout for the Nexus operation.
*
* <p>Maximum time to wait for an asynchronous operation to complete after it has been started.
* If the operation does not complete within this timeout after starting, it will fail with
* TIMEOUT_TYPE_START_TO_CLOSE.
*
* <p>Only applies to asynchronous operations. Synchronous operations ignore this timeout.
*
* <p>Requires Temporal Server 1.31.0 or later.
*
* @param startToCloseTimeout the start to close timeout for the Nexus operation
* @return this
*/
@Experimental
public NexusOperationOptions.Builder setStartToCloseTimeout(Duration startToCloseTimeout) {
this.startToCloseTimeout = startToCloseTimeout;
return this;
}

/**
* Sets the cancellation type for the Nexus operation. Defaults to WAIT_COMPLETED.
*
Expand Down Expand Up @@ -78,12 +119,19 @@ private Builder(NexusOperationOptions options) {
return;
}
this.scheduleToCloseTimeout = options.getScheduleToCloseTimeout();
this.scheduleToStartTimeout = options.getScheduleToStartTimeout();
this.startToCloseTimeout = options.getStartToCloseTimeout();
this.cancellationType = options.getCancellationType();
this.summary = options.getSummary();
}

public NexusOperationOptions build() {
return new NexusOperationOptions(scheduleToCloseTimeout, cancellationType, summary);
return new NexusOperationOptions(
scheduleToCloseTimeout,
scheduleToStartTimeout,
startToCloseTimeout,
cancellationType,
summary);
}

public NexusOperationOptions.Builder mergeNexusOperationOptions(
Expand All @@ -95,6 +143,14 @@ public NexusOperationOptions.Builder mergeNexusOperationOptions(
(override.scheduleToCloseTimeout == null)
? this.scheduleToCloseTimeout
: override.scheduleToCloseTimeout;
this.scheduleToStartTimeout =
(override.scheduleToStartTimeout == null)
? this.scheduleToStartTimeout
: override.scheduleToStartTimeout;
this.startToCloseTimeout =
(override.startToCloseTimeout == null)
? this.startToCloseTimeout
: override.startToCloseTimeout;
this.cancellationType =
(override.cancellationType == null) ? this.cancellationType : override.cancellationType;
this.summary = (override.summary == null) ? this.summary : override.summary;
Expand All @@ -104,9 +160,13 @@ public NexusOperationOptions.Builder mergeNexusOperationOptions(

private NexusOperationOptions(
Duration scheduleToCloseTimeout,
Duration scheduleToStartTimeout,
Duration startToCloseTimeout,
NexusOperationCancellationType cancellationType,
String summary) {
this.scheduleToCloseTimeout = scheduleToCloseTimeout;
this.scheduleToStartTimeout = scheduleToStartTimeout;
this.startToCloseTimeout = startToCloseTimeout;
this.cancellationType = cancellationType;
this.summary = summary;
}
Expand All @@ -116,13 +176,25 @@ public NexusOperationOptions.Builder toBuilder() {
}

private final Duration scheduleToCloseTimeout;
private final Duration scheduleToStartTimeout;
private final Duration startToCloseTimeout;
private final NexusOperationCancellationType cancellationType;
private final String summary;

public Duration getScheduleToCloseTimeout() {
return scheduleToCloseTimeout;
}

@Experimental
public Duration getScheduleToStartTimeout() {
return scheduleToStartTimeout;
}

@Experimental
public Duration getStartToCloseTimeout() {
return startToCloseTimeout;
}

public NexusOperationCancellationType getCancellationType() {
return cancellationType;
}
Expand All @@ -138,20 +210,31 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
NexusOperationOptions that = (NexusOperationOptions) o;
return Objects.equals(scheduleToCloseTimeout, that.scheduleToCloseTimeout)
&& Objects.equals(scheduleToStartTimeout, that.scheduleToStartTimeout)
&& Objects.equals(startToCloseTimeout, that.startToCloseTimeout)
&& Objects.equals(cancellationType, that.cancellationType)
&& Objects.equals(summary, that.summary);
}

@Override
public int hashCode() {
return Objects.hash(scheduleToCloseTimeout, cancellationType, summary);
return Objects.hash(
scheduleToCloseTimeout,
scheduleToStartTimeout,
startToCloseTimeout,
cancellationType,
summary);
}

@Override
public String toString() {
return "NexusOperationOptions{"
+ "scheduleToCloseTimeout="
+ scheduleToCloseTimeout
+ ", scheduleToStartTimeout="
+ scheduleToStartTimeout
+ ", startToCloseTimeout="
+ startToCloseTimeout
+ ", cancellationType="
+ cancellationType
+ ", summary='"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,7 @@ public void failWhenUpdateNamesDoNotMatch() {
}
}

@SuppressWarnings("deprecation") // Backwards compatibility for WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING
@Test
public void failServerSideWhenStartIsInvalid() {
WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,29 @@ private static void scheduleNexusOperation(
: Timestamp.getDefaultInstance();
TestServiceRetryState retryState = new TestServiceRetryState(data.retryPolicy, expirationTime);

// Trim secondary timeouts to the primary timeout (scheduleToClose).
java.time.Duration scheduleToCloseTimeout =
ProtobufTimeUtils.toJavaDuration(attr.getScheduleToCloseTimeout());
java.time.Duration scheduleToStartTimeout =
ProtobufTimeUtils.toJavaDuration(attr.getScheduleToStartTimeout());
java.time.Duration startToCloseTimeout =
ProtobufTimeUtils.toJavaDuration(attr.getStartToCloseTimeout());

com.google.protobuf.Duration cappedScheduleToStartTimeout = attr.getScheduleToStartTimeout();
com.google.protobuf.Duration cappedStartToCloseTimeout = attr.getStartToCloseTimeout();

if (!scheduleToCloseTimeout.isZero()
&& !scheduleToStartTimeout.isZero()
&& scheduleToStartTimeout.compareTo(scheduleToCloseTimeout) > 0) {
cappedScheduleToStartTimeout = attr.getScheduleToCloseTimeout();
}

if (!scheduleToCloseTimeout.isZero()
&& !startToCloseTimeout.isZero()
&& startToCloseTimeout.compareTo(scheduleToCloseTimeout) > 0) {
cappedStartToCloseTimeout = attr.getScheduleToCloseTimeout();
}

NexusOperationScheduledEventAttributes.Builder a =
NexusOperationScheduledEventAttributes.newBuilder()
.setEndpoint(attr.getEndpoint())
Expand All @@ -668,6 +691,8 @@ private static void scheduleNexusOperation(
.setOperation(attr.getOperation())
.setInput(attr.getInput())
.setScheduleToCloseTimeout(attr.getScheduleToCloseTimeout())
.setScheduleToStartTimeout(cappedScheduleToStartTimeout)
.setStartToCloseTimeout(cappedStartToCloseTimeout)
.putAllNexusHeader(attr.getNexusHeaderMap())
.setRequestId(UUID.randomUUID().toString())
.setWorkflowTaskCompletedEventId(workflowTaskCompletedId);
Expand Down Expand Up @@ -704,9 +729,6 @@ private static void scheduleNexusOperation(
io.temporal.api.nexus.v1.Request.newBuilder()
.setScheduledTime(ctx.currentTime())
.putAllHeader(attr.getNexusHeaderMap())
.putHeader(
io.nexusrpc.Header.OPERATION_TIMEOUT.toLowerCase(),
attr.getScheduleToCloseTimeout().toString())
.setStartOperation(
StartOperationRequest.newBuilder()
.setService(attr.getService())
Expand Down Expand Up @@ -778,11 +800,27 @@ private static void completeNexusOperation(

private static void timeoutNexusOperation(
RequestContext ctx, NexusOperationData data, TimeoutType timeoutType, long notUsed) {
if (timeoutType != TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE) {
if (timeoutType != TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE
&& timeoutType != TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_START
&& timeoutType != TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE) {
throw new IllegalArgumentException(
"Timeout type not supported for Nexus operations: " + timeoutType);
}

String timeoutMessage;
switch (timeoutType) {
case TIMEOUT_TYPE_SCHEDULE_TO_START:
timeoutMessage = "operation timed out before starting";
break;
case TIMEOUT_TYPE_START_TO_CLOSE:
timeoutMessage = "operation timed out after starting";
break;
case TIMEOUT_TYPE_SCHEDULE_TO_CLOSE:
default:
timeoutMessage = "operation timed out";
break;
}

Failure failure =
Failure.newBuilder()
.setMessage("nexus operation completed unsuccessfully")
Expand All @@ -795,7 +833,7 @@ private static void timeoutNexusOperation(
.setScheduledEventId(data.scheduledEventId))
.setCause(
Failure.newBuilder()
.setMessage("operation timed out")
.setMessage(timeoutMessage)
.setTimeoutFailureInfo(
TimeoutFailureInfo.newBuilder().setTimeoutType(timeoutType)))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ void completeAsyncNexusOperation(

boolean validateOperationTaskToken(NexusTaskToken tt);

@Nullable
NexusOperationScheduledEventAttributes getNexusOperationScheduledEventAttributes(
long scheduledEventId);

boolean isNexusOperationStarted(long scheduledEventId);

QueryWorkflowResponse query(QueryWorkflowRequest queryRequest, long deadline);

TestWorkflowMutableStateImpl.UpdateHandle updateWorkflowExecution(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,18 @@ private void processScheduleNexusOperation(
operation.getData().getAttempt()),
"NexusOperation ScheduleToCloseTimeout");
}
if (attr.hasScheduleToStartTimeout()
&& Durations.toMillis(attr.getScheduleToStartTimeout()) > 0) {
// ScheduleToStartTimeout is the time from schedule to start (or completion if synchronous)
ctx.addTimer(
ProtobufTimeUtils.toJavaDuration(attr.getScheduleToStartTimeout()),
() ->
timeoutNexusOperation(
scheduleEventId,
TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_START,
operation.getData().getAttempt()),
"NexusOperation ScheduleToStartTimeout");
}
ctx.lockTimer("processScheduleNexusOperation");
}

Expand Down Expand Up @@ -2309,6 +2321,23 @@ public void startNexusOperation(
StateMachine<NexusOperationData> operation = getPendingNexusOperation(scheduledEventId);
operation.action(StateMachines.Action.START, ctx, resp, 0);
operation.getData().identity = clientIdentity;

// Add start-to-close timeout timer if configured
NexusOperationScheduledEventAttributes scheduledEvent =
operation.getData().scheduledEvent;
if (scheduledEvent.hasStartToCloseTimeout()
&& Durations.toMillis(scheduledEvent.getStartToCloseTimeout()) > 0) {
// StartToCloseTimeout measures from when the operation started to when it completes
ctx.addTimer(
ProtobufTimeUtils.toJavaDuration(scheduledEvent.getStartToCloseTimeout()),
() ->
timeoutNexusOperation(
scheduledEventId,
TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE,
operation.getData().getAttempt()),
"NexusOperation StartToCloseTimeout");
}

scheduleWorkflowTask(ctx);
});
}
Expand Down Expand Up @@ -3691,6 +3720,20 @@ public boolean validateOperationTaskToken(NexusTaskToken tt) {
return true;
}

@Override
public NexusOperationScheduledEventAttributes getNexusOperationScheduledEventAttributes(
long scheduledEventId) {
StateMachine<NexusOperationData> operation = getPendingNexusOperation(scheduledEventId);
return operation.getData().scheduledEvent;
}

@Override
public boolean isNexusOperationStarted(long scheduledEventId) {
StateMachine<NexusOperationData> operation = getPendingNexusOperation(scheduledEventId);
// Operation is considered started if it has an operation token
return !operation.getData().operationToken.isEmpty();
}

private boolean isTerminalState(State workflowState) {
return workflowState == State.COMPLETED
|| workflowState == State.TIMED_OUT
Expand Down
Loading
Loading