diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java index 977d9754e..ef59e2498 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java @@ -799,6 +799,10 @@ public ExecuteNexusOperationOutput executeNexusOperation( input.getHeaders().forEach((k, v) -> attributes.putNexusHeader(k.toLowerCase(), v)); attributes.setScheduleToCloseTimeout( ProtobufTimeUtils.toProtoDuration(input.getOptions().getScheduleToCloseTimeout())); + attributes.setScheduleToStartTimeout( + ProtobufTimeUtils.toProtoDuration(input.getOptions().getScheduleToStartTimeout())); + attributes.setStartToCloseTimeout( + ProtobufTimeUtils.toProtoDuration(input.getOptions().getStartToCloseTimeout())); @Nullable UserMetadata userMetadata = diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/NexusOperationOptions.java b/temporal-sdk/src/main/java/io/temporal/workflow/NexusOperationOptions.java index b22930eba..0952f6853 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/NexusOperationOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/NexusOperationOptions.java @@ -31,6 +31,8 @@ public static NexusOperationOptions getDefaultInstance() { public static final class Builder { private Duration scheduleToCloseTimeout; + private Duration scheduleToStartTimeout; + private Duration startToCloseTimeout; private NexusOperationCancellationType cancellationType; private String summary; @@ -46,6 +48,45 @@ public NexusOperationOptions.Builder setScheduleToCloseTimeout( return this; } + /** + * Sets the schedule to start timeout for the Nexus operation. + * + *

Maximum time to wait for the operation to be started (or completed if synchronous) by the + * handler. If the operation is not started within this timeout, it will fail with + * TIMEOUT_TYPE_SCHEDULE_TO_START. + * + *

Requires Temporal Server 1.31.0 or later. + * + * @param scheduleToStartTimeout the schedule to start timeout for the Nexus operation + * @return this + */ + @Experimental + public NexusOperationOptions.Builder setScheduleToStartTimeout( + Duration scheduleToStartTimeout) { + this.scheduleToStartTimeout = scheduleToStartTimeout; + return this; + } + + /** + * Sets the start to close timeout for the Nexus operation. + * + *

Maximum time to wait for an asynchronous operation to complete after it has been started. + * If the operation does not complete within this timeout after starting, it will fail with + * TIMEOUT_TYPE_START_TO_CLOSE. + * + *

Only applies to asynchronous operations. Synchronous operations ignore this timeout. + * + *

Requires Temporal Server 1.31.0 or later. + * + * @param startToCloseTimeout the start to close timeout for the Nexus operation + * @return this + */ + @Experimental + public NexusOperationOptions.Builder setStartToCloseTimeout(Duration startToCloseTimeout) { + this.startToCloseTimeout = startToCloseTimeout; + return this; + } + /** * Sets the cancellation type for the Nexus operation. Defaults to WAIT_COMPLETED. * @@ -78,12 +119,19 @@ private Builder(NexusOperationOptions options) { return; } this.scheduleToCloseTimeout = options.getScheduleToCloseTimeout(); + this.scheduleToStartTimeout = options.getScheduleToStartTimeout(); + this.startToCloseTimeout = options.getStartToCloseTimeout(); this.cancellationType = options.getCancellationType(); this.summary = options.getSummary(); } public NexusOperationOptions build() { - return new NexusOperationOptions(scheduleToCloseTimeout, cancellationType, summary); + return new NexusOperationOptions( + scheduleToCloseTimeout, + scheduleToStartTimeout, + startToCloseTimeout, + cancellationType, + summary); } public NexusOperationOptions.Builder mergeNexusOperationOptions( @@ -95,6 +143,14 @@ public NexusOperationOptions.Builder mergeNexusOperationOptions( (override.scheduleToCloseTimeout == null) ? this.scheduleToCloseTimeout : override.scheduleToCloseTimeout; + this.scheduleToStartTimeout = + (override.scheduleToStartTimeout == null) + ? this.scheduleToStartTimeout + : override.scheduleToStartTimeout; + this.startToCloseTimeout = + (override.startToCloseTimeout == null) + ? this.startToCloseTimeout + : override.startToCloseTimeout; this.cancellationType = (override.cancellationType == null) ? this.cancellationType : override.cancellationType; this.summary = (override.summary == null) ? this.summary : override.summary; @@ -104,9 +160,13 @@ public NexusOperationOptions.Builder mergeNexusOperationOptions( private NexusOperationOptions( Duration scheduleToCloseTimeout, + Duration scheduleToStartTimeout, + Duration startToCloseTimeout, NexusOperationCancellationType cancellationType, String summary) { this.scheduleToCloseTimeout = scheduleToCloseTimeout; + this.scheduleToStartTimeout = scheduleToStartTimeout; + this.startToCloseTimeout = startToCloseTimeout; this.cancellationType = cancellationType; this.summary = summary; } @@ -116,6 +176,8 @@ public NexusOperationOptions.Builder toBuilder() { } private final Duration scheduleToCloseTimeout; + private final Duration scheduleToStartTimeout; + private final Duration startToCloseTimeout; private final NexusOperationCancellationType cancellationType; private final String summary; @@ -123,6 +185,16 @@ public Duration getScheduleToCloseTimeout() { return scheduleToCloseTimeout; } + @Experimental + public Duration getScheduleToStartTimeout() { + return scheduleToStartTimeout; + } + + @Experimental + public Duration getStartToCloseTimeout() { + return startToCloseTimeout; + } + public NexusOperationCancellationType getCancellationType() { return cancellationType; } @@ -138,13 +210,20 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; NexusOperationOptions that = (NexusOperationOptions) o; return Objects.equals(scheduleToCloseTimeout, that.scheduleToCloseTimeout) + && Objects.equals(scheduleToStartTimeout, that.scheduleToStartTimeout) + && Objects.equals(startToCloseTimeout, that.startToCloseTimeout) && Objects.equals(cancellationType, that.cancellationType) && Objects.equals(summary, that.summary); } @Override public int hashCode() { - return Objects.hash(scheduleToCloseTimeout, cancellationType, summary); + return Objects.hash( + scheduleToCloseTimeout, + scheduleToStartTimeout, + startToCloseTimeout, + cancellationType, + summary); } @Override @@ -152,6 +231,10 @@ public String toString() { return "NexusOperationOptions{" + "scheduleToCloseTimeout=" + scheduleToCloseTimeout + + ", scheduleToStartTimeout=" + + scheduleToStartTimeout + + ", startToCloseTimeout=" + + startToCloseTimeout + ", cancellationType=" + cancellationType + ", summary='" diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateWithStartTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateWithStartTest.java index fa2b0ca40..40b2313ae 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateWithStartTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateWithStartTest.java @@ -645,6 +645,7 @@ public void failWhenUpdateNamesDoNotMatch() { } } + @SuppressWarnings("deprecation") // Backwards compatibility for WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING @Test public void failServerSideWhenStartIsInvalid() { WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); diff --git a/temporal-serviceclient/src/main/proto b/temporal-serviceclient/src/main/proto index 1ae5b673d..44dec06c6 160000 --- a/temporal-serviceclient/src/main/proto +++ b/temporal-serviceclient/src/main/proto @@ -1 +1 @@ -Subproject commit 1ae5b673d66b0a94f6131c3eb06bc7173ae2c326 +Subproject commit 44dec06c674f05b03c7855e1026a326880af2000 diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java index ae20d2285..82761109b 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java @@ -660,6 +660,29 @@ private static void scheduleNexusOperation( : Timestamp.getDefaultInstance(); TestServiceRetryState retryState = new TestServiceRetryState(data.retryPolicy, expirationTime); + // Trim secondary timeouts to the primary timeout (scheduleToClose). + java.time.Duration scheduleToCloseTimeout = + ProtobufTimeUtils.toJavaDuration(attr.getScheduleToCloseTimeout()); + java.time.Duration scheduleToStartTimeout = + ProtobufTimeUtils.toJavaDuration(attr.getScheduleToStartTimeout()); + java.time.Duration startToCloseTimeout = + ProtobufTimeUtils.toJavaDuration(attr.getStartToCloseTimeout()); + + com.google.protobuf.Duration cappedScheduleToStartTimeout = attr.getScheduleToStartTimeout(); + com.google.protobuf.Duration cappedStartToCloseTimeout = attr.getStartToCloseTimeout(); + + if (!scheduleToCloseTimeout.isZero() + && !scheduleToStartTimeout.isZero() + && scheduleToStartTimeout.compareTo(scheduleToCloseTimeout) > 0) { + cappedScheduleToStartTimeout = attr.getScheduleToCloseTimeout(); + } + + if (!scheduleToCloseTimeout.isZero() + && !startToCloseTimeout.isZero() + && startToCloseTimeout.compareTo(scheduleToCloseTimeout) > 0) { + cappedStartToCloseTimeout = attr.getScheduleToCloseTimeout(); + } + NexusOperationScheduledEventAttributes.Builder a = NexusOperationScheduledEventAttributes.newBuilder() .setEndpoint(attr.getEndpoint()) @@ -668,6 +691,8 @@ private static void scheduleNexusOperation( .setOperation(attr.getOperation()) .setInput(attr.getInput()) .setScheduleToCloseTimeout(attr.getScheduleToCloseTimeout()) + .setScheduleToStartTimeout(cappedScheduleToStartTimeout) + .setStartToCloseTimeout(cappedStartToCloseTimeout) .putAllNexusHeader(attr.getNexusHeaderMap()) .setRequestId(UUID.randomUUID().toString()) .setWorkflowTaskCompletedEventId(workflowTaskCompletedId); @@ -704,9 +729,6 @@ private static void scheduleNexusOperation( io.temporal.api.nexus.v1.Request.newBuilder() .setScheduledTime(ctx.currentTime()) .putAllHeader(attr.getNexusHeaderMap()) - .putHeader( - io.nexusrpc.Header.OPERATION_TIMEOUT.toLowerCase(), - attr.getScheduleToCloseTimeout().toString()) .setStartOperation( StartOperationRequest.newBuilder() .setService(attr.getService()) @@ -778,11 +800,27 @@ private static void completeNexusOperation( private static void timeoutNexusOperation( RequestContext ctx, NexusOperationData data, TimeoutType timeoutType, long notUsed) { - if (timeoutType != TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE) { + if (timeoutType != TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE + && timeoutType != TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_START + && timeoutType != TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE) { throw new IllegalArgumentException( "Timeout type not supported for Nexus operations: " + timeoutType); } + String timeoutMessage; + switch (timeoutType) { + case TIMEOUT_TYPE_SCHEDULE_TO_START: + timeoutMessage = "operation timed out before starting"; + break; + case TIMEOUT_TYPE_START_TO_CLOSE: + timeoutMessage = "operation timed out after starting"; + break; + case TIMEOUT_TYPE_SCHEDULE_TO_CLOSE: + default: + timeoutMessage = "operation timed out"; + break; + } + Failure failure = Failure.newBuilder() .setMessage("nexus operation completed unsuccessfully") @@ -795,7 +833,7 @@ private static void timeoutNexusOperation( .setScheduledEventId(data.scheduledEventId)) .setCause( Failure.newBuilder() - .setMessage("operation timed out") + .setMessage(timeoutMessage) .setTimeoutFailureInfo( TimeoutFailureInfo.newBuilder().setTimeoutType(timeoutType))) .build(); diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java index 7c6343a1a..a1fab0a44 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java @@ -116,6 +116,12 @@ void completeAsyncNexusOperation( boolean validateOperationTaskToken(NexusTaskToken tt); + @Nullable + NexusOperationScheduledEventAttributes getNexusOperationScheduledEventAttributes( + long scheduledEventId); + + boolean isNexusOperationStarted(long scheduledEventId); + QueryWorkflowResponse query(QueryWorkflowRequest queryRequest, long deadline); TestWorkflowMutableStateImpl.UpdateHandle updateWorkflowExecution( diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java index c667c47a2..9b238aa3b 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java @@ -852,6 +852,18 @@ private void processScheduleNexusOperation( operation.getData().getAttempt()), "NexusOperation ScheduleToCloseTimeout"); } + if (attr.hasScheduleToStartTimeout() + && Durations.toMillis(attr.getScheduleToStartTimeout()) > 0) { + // ScheduleToStartTimeout is the time from schedule to start (or completion if synchronous) + ctx.addTimer( + ProtobufTimeUtils.toJavaDuration(attr.getScheduleToStartTimeout()), + () -> + timeoutNexusOperation( + scheduleEventId, + TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_START, + operation.getData().getAttempt()), + "NexusOperation ScheduleToStartTimeout"); + } ctx.lockTimer("processScheduleNexusOperation"); } @@ -2309,6 +2321,23 @@ public void startNexusOperation( StateMachine operation = getPendingNexusOperation(scheduledEventId); operation.action(StateMachines.Action.START, ctx, resp, 0); operation.getData().identity = clientIdentity; + + // Add start-to-close timeout timer if configured + NexusOperationScheduledEventAttributes scheduledEvent = + operation.getData().scheduledEvent; + if (scheduledEvent.hasStartToCloseTimeout() + && Durations.toMillis(scheduledEvent.getStartToCloseTimeout()) > 0) { + // StartToCloseTimeout measures from when the operation started to when it completes + ctx.addTimer( + ProtobufTimeUtils.toJavaDuration(scheduledEvent.getStartToCloseTimeout()), + () -> + timeoutNexusOperation( + scheduledEventId, + TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE, + operation.getData().getAttempt()), + "NexusOperation StartToCloseTimeout"); + } + scheduleWorkflowTask(ctx); }); } @@ -3691,6 +3720,20 @@ public boolean validateOperationTaskToken(NexusTaskToken tt) { return true; } + @Override + public NexusOperationScheduledEventAttributes getNexusOperationScheduledEventAttributes( + long scheduledEventId) { + StateMachine operation = getPendingNexusOperation(scheduledEventId); + return operation.getData().scheduledEvent; + } + + @Override + public boolean isNexusOperationStarted(long scheduledEventId) { + StateMachine operation = getPendingNexusOperation(scheduledEventId); + // Operation is considered started if it has an operation token + return !operation.getData().operationToken.isEmpty(); + } + private boolean isTerminalState(State workflowState) { return workflowState == State.COMPLETED || workflowState == State.TIMED_OUT diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java index 469fde42d..049689f14 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java @@ -26,6 +26,7 @@ import io.temporal.api.failure.v1.*; import io.temporal.api.failure.v1.Failure; import io.temporal.api.history.v1.HistoryEvent; +import io.temporal.api.history.v1.NexusOperationScheduledEventAttributes; import io.temporal.api.history.v1.WorkflowExecutionContinuedAsNewEventAttributes; import io.temporal.api.namespace.v1.NamespaceInfo; import io.temporal.api.nexus.v1.*; @@ -285,6 +286,8 @@ private TestWorkflowMutableState getMutableState( return getMutableState(executionId, failNotExists); } + @SuppressWarnings( + "deprecation") // Backwards compatibility for WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING @Override public void startWorkflowExecution( StartWorkflowExecutionRequest request, @@ -310,6 +313,8 @@ public void startWorkflowExecution( } } + @SuppressWarnings( + "deprecation") // Backwards compatibility for WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING StartWorkflowExecutionResponse startWorkflowExecutionImpl( StartWorkflowExecutionRequest startRequest, Duration backoffStartInterval, @@ -325,7 +330,8 @@ StartWorkflowExecutionResponse startWorkflowExecutionImpl( validateWorkflowIdReusePolicy(reusePolicy, conflictPolicy); validateOnConflictOptions(startRequest); - // Backwards compatibility: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING is deprecated + // Backwards compatibility: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING + // is deprecated if (reusePolicy == WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING) { conflictPolicy = WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING; reusePolicy = WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE; @@ -475,6 +481,8 @@ private StartWorkflowExecutionResponse throwDuplicatedWorkflow( WorkflowExecutionAlreadyStartedFailure.getDescriptor()); } + @SuppressWarnings( + "deprecation") // Backwards compatibility for WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING private void validateWorkflowIdReusePolicy( WorkflowIdReusePolicy reusePolicy, WorkflowIdConflictPolicy conflictPolicy) { if (conflictPolicy != WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED @@ -955,6 +963,65 @@ public void pollNexusTaskQueue( task.getTask() .getRequestBuilder() .putHeader(Header.REQUEST_TIMEOUT.toLowerCase(), taskTimeout + "s"); + + // Calculate and set OPERATION_TIMEOUT header if not already present and operation has + // timeouts + if (req.hasStartOperation() + && !req.getHeaderMap().containsKey(Header.OPERATION_TIMEOUT.toLowerCase())) { + NexusTaskToken token = NexusTaskToken.fromBytes(task.getTask().getTaskToken()); + TestWorkflowMutableState mutableState = + getMutableState(token.getOperationRef().getExecutionId()); + long scheduledEventId = token.getOperationRef().getScheduledEventId(); + NexusOperationScheduledEventAttributes scheduledEvent = + mutableState.getNexusOperationScheduledEventAttributes(scheduledEventId); + boolean isStarted = mutableState.isNexusOperationStarted(scheduledEventId); + + Timestamp scheduledTime = req.getScheduledTime(); + Timestamp currentTime = store.currentTime(); + long elapsedSeconds = Timestamps.between(scheduledTime, currentTime).getSeconds(); + long elapsedMillis = elapsedSeconds * 1000; + + // Calculate minimum of all applicable timeouts + Long remainingMillis = null; + + if (!isStarted && scheduledEvent.hasScheduleToStartTimeout()) { + long scheduleToStartMillis = + com.google.protobuf.util.Durations.toMillis( + scheduledEvent.getScheduleToStartTimeout()); + if (scheduleToStartMillis > 0) { + long remaining = scheduleToStartMillis - elapsedMillis; + remainingMillis = + (remainingMillis == null) ? remaining : Math.min(remainingMillis, remaining); + } + } + + if (scheduledEvent.hasStartToCloseTimeout()) { + long startToCloseMillis = + com.google.protobuf.util.Durations.toMillis(scheduledEvent.getStartToCloseTimeout()); + if (startToCloseMillis > 0) { + long remaining = startToCloseMillis - elapsedMillis; + remainingMillis = + (remainingMillis == null) ? remaining : Math.min(remainingMillis, remaining); + } + } + + if (scheduledEvent.hasScheduleToCloseTimeout()) { + long scheduleToCloseMillis = + com.google.protobuf.util.Durations.toMillis( + scheduledEvent.getScheduleToCloseTimeout()); + if (scheduleToCloseMillis > 0) { + long remaining = scheduleToCloseMillis - elapsedMillis; + remainingMillis = + (remainingMillis == null) ? remaining : Math.min(remainingMillis, remaining); + } + } + + if (remainingMillis != null && remainingMillis > 0) { + req.putHeader( + Header.OPERATION_TIMEOUT.toLowerCase(), Long.toString(remainingMillis) + "ms"); + } + } + PollNexusTaskQueueResponse.Builder resp = task.getTask().setRequest(req); responseObserver.onNext(resp.build()); diff --git a/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java b/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java index 445d5e033..ca68e8db7 100644 --- a/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java +++ b/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java @@ -712,6 +712,140 @@ public void testNexusOperationTimeout_AfterCancel() { } } + @Test + public void testNexusOperationScheduleToStartTimeout() { + WorkflowStub stub = newWorkflowStub("TestNexusOperationScheduleToStartTimeoutWorkflow"); + WorkflowExecution execution = stub.start(); + + // Get first WFT and respond with ScheduleNexusOperation command with schedule-to-start timeout + PollWorkflowTaskQueueResponse pollResp = pollWorkflowTask(); + completeWorkflowTask( + pollResp.getTaskToken(), + newScheduleOperationCommand( + defaultScheduleOperationAttributes() + .setScheduleToStartTimeout(Durations.fromSeconds(1)) + .setScheduleToCloseTimeout(Durations.fromSeconds(30)))); + testWorkflowRule.assertHistoryEvent( + execution.getWorkflowId(), EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED); + + try { + // Poll for Nexus task but do not complete it - let it time out before starting + PollNexusTaskQueueResponse nexusPollResp = pollNexusTask().get(); + Assert.assertTrue(nexusPollResp.getRequest().hasStartOperation()); + + // Verify OPERATION_TIMEOUT header is set and valid + String operationTimeoutHeader = + nexusPollResp.getRequest().getHeaderMap().get("operation-timeout"); + Assert.assertNotNull("OPERATION_TIMEOUT header should be set", operationTimeoutHeader); + Assert.assertTrue( + "OPERATION_TIMEOUT should end with 'ms'", operationTimeoutHeader.endsWith("ms")); + long operationTimeoutMs = + Long.parseLong(operationTimeoutHeader.substring(0, operationTimeoutHeader.length() - 2)); + // Should be <= schedule-to-start timeout (1 second = 1000ms) + Assert.assertTrue( + "OPERATION_TIMEOUT should be <= schedule-to-start timeout", operationTimeoutMs <= 1000); + Assert.assertTrue("OPERATION_TIMEOUT should be positive", operationTimeoutMs > 0); + + // Sleep longer than schedule-to-start timeout to trigger the timeout + Thread.sleep(2000); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + + // Poll to wait for new task after operation times out + pollResp = pollWorkflowTask(); + completeWorkflow(pollResp.getTaskToken()); + + List events = + testWorkflowRule.getHistoryEvents( + execution.getWorkflowId(), EventType.EVENT_TYPE_NEXUS_OPERATION_TIMED_OUT); + Assert.assertEquals(1, events.size()); + io.temporal.api.failure.v1.Failure failure = + events.get(0).getNexusOperationTimedOutEventAttributes().getFailure(); + assertOperationFailureInfo(failure.getNexusOperationExecutionFailureInfo()); + Assert.assertEquals("nexus operation completed unsuccessfully", failure.getMessage()); + io.temporal.api.failure.v1.Failure cause = failure.getCause(); + Assert.assertEquals("operation timed out before starting", cause.getMessage()); + Assert.assertTrue(cause.hasTimeoutFailureInfo()); + Assert.assertEquals( + TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_START, cause.getTimeoutFailureInfo().getTimeoutType()); + } + + @Test + public void testNexusOperationStartToCloseTimeout() { + String operationId = UUID.randomUUID().toString(); + CompletableFuture nexusPoller = + pollNexusTask() + .thenCompose( + task -> { + // Verify OPERATION_TIMEOUT header is set and valid + String operationTimeoutHeader = + task.getRequest().getHeaderMap().get("operation-timeout"); + Assert.assertNotNull( + "OPERATION_TIMEOUT header should be set", operationTimeoutHeader); + Assert.assertTrue( + "OPERATION_TIMEOUT should end with 'ms'", + operationTimeoutHeader.endsWith("ms")); + long operationTimeoutMs = + Long.parseLong( + operationTimeoutHeader.substring(0, operationTimeoutHeader.length() - 2)); + // Should be <= start-to-close timeout (1 second = 1000ms) + Assert.assertTrue( + "OPERATION_TIMEOUT should be <= start-to-close timeout", + operationTimeoutMs <= 1000); + Assert.assertTrue("OPERATION_TIMEOUT should be positive", operationTimeoutMs > 0); + + return completeNexusTask(task, operationId); + }); + + try { + WorkflowStub stub = newWorkflowStub("TestNexusOperationStartToCloseTimeoutWorkflow"); + WorkflowExecution execution = stub.start(); + + // Get first WFT and respond with ScheduleNexusOperation command with start-to-close timeout + PollWorkflowTaskQueueResponse pollResp = pollWorkflowTask(); + completeWorkflowTask( + pollResp.getTaskToken(), + newScheduleOperationCommand( + defaultScheduleOperationAttributes() + .setStartToCloseTimeout(Durations.fromSeconds(1)) + .setScheduleToCloseTimeout(Durations.fromSeconds(30)))); + testWorkflowRule.assertHistoryEvent( + execution.getWorkflowId(), EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED); + + // Wait for operation to be started + nexusPoller.get(); + + // Poll and verify started event is recorded + pollResp = pollWorkflowTask(); + testWorkflowRule.assertHistoryEvent( + execution.getWorkflowId(), EventType.EVENT_TYPE_NEXUS_OPERATION_STARTED); + completeWorkflowTask(pollResp.getTaskToken()); + + // Poll to wait for new task after operation times out (start-to-close timeout) + pollResp = pollWorkflowTask(); + completeWorkflow(pollResp.getTaskToken()); + + List events = + testWorkflowRule.getHistoryEvents( + execution.getWorkflowId(), EventType.EVENT_TYPE_NEXUS_OPERATION_TIMED_OUT); + Assert.assertEquals(1, events.size()); + io.temporal.api.failure.v1.Failure failure = + events.get(0).getNexusOperationTimedOutEventAttributes().getFailure(); + assertOperationFailureInfo(operationId, failure.getNexusOperationExecutionFailureInfo()); + Assert.assertEquals("nexus operation completed unsuccessfully", failure.getMessage()); + io.temporal.api.failure.v1.Failure cause = failure.getCause(); + Assert.assertEquals("operation timed out after starting", cause.getMessage()); + Assert.assertTrue(cause.hasTimeoutFailureInfo()); + Assert.assertEquals( + TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE, cause.getTimeoutFailureInfo().getTimeoutType()); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } finally { + nexusPoller.cancel(true); + } + } + @Test public void testNexusOperationError() { Response unsuccessfulResp = diff --git a/temporal-test-server/src/test/java/io/temporal/testserver/functional/WorkflowIdReusePolicyTest.java b/temporal-test-server/src/test/java/io/temporal/testserver/functional/WorkflowIdReusePolicyTest.java index 989dd8150..3ee65540f 100644 --- a/temporal-test-server/src/test/java/io/temporal/testserver/functional/WorkflowIdReusePolicyTest.java +++ b/temporal-test-server/src/test/java/io/temporal/testserver/functional/WorkflowIdReusePolicyTest.java @@ -80,6 +80,8 @@ public void alreadyRunningWorkflowBlocksSecondEvenWithAllowDuplicate() { } @Test + @SuppressWarnings( + "deprecation") // Test for deprecated WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING public void secondWorkflowTerminatesFirst() { String workflowId = "terminate-if-running-1"; WorkflowOptions options =