diff --git a/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java b/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java index b3822db..8ef9ffb 100644 --- a/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java +++ b/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java @@ -88,10 +88,10 @@ private class ContextImplTask implements TaskOrchestrationContext { private String appId; // LinkedHashMap to maintain insertion order when returning the list of pending actions - private final LinkedHashMap pendingActions = new LinkedHashMap<>(); - private final HashMap> openTasks = new HashMap<>(); - private final LinkedHashMap>> outstandingEvents = new LinkedHashMap<>(); - private final LinkedList unprocessedEvents = new LinkedList<>(); + private final Map pendingActions = new LinkedHashMap<>(); + private final Map> openTasks = new HashMap<>(); + private final Map>> outstandingEvents = new LinkedHashMap<>(); + private final List unprocessedEvents = new LinkedList<>(); private final Queue eventsWhileSuspended = new ArrayDeque<>(); private final DataConverter dataConverter = TaskOrchestrationExecutor.this.dataConverter; private final Duration maximumTimerInterval = TaskOrchestrationExecutor.this.maximumTimerInterval; @@ -303,12 +303,10 @@ public Task callActivity( } TaskFactory taskFactory = () -> { int id = this.sequenceNumber++; - ScheduleTaskAction scheduleTaskAction = scheduleTaskBuilder.build(); OrchestratorAction.Builder actionBuilder = OrchestratorAction.newBuilder() .setId(id) .setScheduleTask(scheduleTaskBuilder); - if (options != null && options.hasAppID()) { String targetAppId = options.getAppID(); TaskRouter actionRouter = TaskRouter.newBuilder() @@ -317,7 +315,6 @@ public Task callActivity( .build(); actionBuilder.setRouter(actionRouter); } - this.pendingActions.put(id, actionBuilder.build()); if (!this.isReplaying) { @@ -410,7 +407,7 @@ public Task callSubOrchestrator( if (input instanceof TaskOptions) { throw new IllegalArgumentException("TaskOptions cannot be used as an input. Did you call the wrong method overload?"); } - + String serializedInput = this.dataConverter.serialize(input); CreateSubOrchestrationAction.Builder createSubOrchestrationActionBuilder = CreateSubOrchestrationAction.newBuilder().setName(name); if (serializedInput != null) { @@ -466,7 +463,7 @@ public Task waitForExternalEvent(String name, Duration timeout, Class int id = this.sequenceNumber++; CompletableTask eventTask = new ExternalEventTask<>(name, id, timeout); - + // Check for a previously received event with the same name for (HistoryEvent e : this.unprocessedEvents) { EventRaisedEvent existing = e.getEventRaised(); @@ -648,8 +645,7 @@ public Task createTimer(ZonedDateTime zonedDateTime) { } private Task createTimer(Instant finalFireAt) { - TimerTask timer = new TimerTask(finalFireAt); - return timer; + return new TimerTask(finalFireAt); } private CompletableTask createInstantTimer(int id, Instant fireAt) { @@ -660,7 +656,7 @@ private CompletableTask createInstantTimer(int id, Instant fireAt) { .build()); if (!this.isReplaying) { - // TODO: Log timer creation, including the expected fire-time + logger.finer(() -> String.format("Creating Instant Timer with id: %s, fireAt: %s ", id, fireAt)); } CompletableTask timerTask = new CompletableTask<>(); @@ -701,7 +697,10 @@ public void handleTimerFired(HistoryEvent e) { } if (!this.isReplaying) { - // TODO: Log timer fired, including the scheduled fire-time + this.logger.finer(() -> + String.format("Firing timer by completing task: %s expected fire at time: %s", timerEventId, + Instant.ofEpochSecond(timerFiredEvent.getFireAt().getSeconds(), + timerFiredEvent.getFireAt().getNanos()))); } CompletableTask task = record.getTask(); @@ -851,7 +850,7 @@ private void addCarryoverEvents(CompleteOrchestrationAction.Builder builder) { externalEvents.forEach(builder::addCarryoverEvents); } - + private boolean waitingForEvents() { return this.outstandingEvents.size() > 0; } @@ -894,7 +893,7 @@ private void processEvent(HistoryEvent e) { if (factory == null) { throw new IllegalStateException("No factory found for orchestrator: " + executionStarted.getName()); } - + TaskOrchestration orchestrator = factory.create(); orchestrator.run(this); break; @@ -1038,11 +1037,12 @@ public TimerTask(Instant finalFireAt) { // if necessary. Otherwise, we return and no more sub-timers are created. private CompletableFuture createTimerChain(Instant finalFireAt, CompletableFuture currentFuture) { return currentFuture.thenRun(() -> { - if (currentInstant.compareTo(finalFireAt) > 0) { + Instant currentInstsanceMinusNanos = currentInstant.minusNanos(currentInstant.getNano()); + Instant finalFireAtMinusNanos = finalFireAt.minusNanos(finalFireAt.getNano()); + if (currentInstsanceMinusNanos.compareTo(finalFireAtMinusNanos) >= 0) { return; } Task nextTimer = createTimerTask(finalFireAt); - createTimerChain(finalFireAt, nextTimer.future); }); } @@ -1062,7 +1062,9 @@ private CompletableTask createTimerTask(Instant finalFireAt) { private void handleSubTimerSuccess() { // check if it is the last timer - if (currentInstant.compareTo(finalFireAt) >= 0) { + Instant currentInstantMinusNanos = currentInstant.minusNanos(currentInstant.getNano()); + Instant finalFireAtMinusNanos = finalFireAt.minusNanos(finalFireAt.getNano()); + if (currentInstantMinusNanos.compareTo(finalFireAtMinusNanos) >= 0) { this.complete(null); } } diff --git a/client/src/test/java/io/dapr/durabletask/IntegrationTests.java b/client/src/test/java/io/dapr/durabletask/IntegrationTests.java index 8043bc6..5d64153 100644 --- a/client/src/test/java/io/dapr/durabletask/IntegrationTests.java +++ b/client/src/test/java/io/dapr/durabletask/IntegrationTests.java @@ -87,8 +87,14 @@ void emptyOrchestration() throws TimeoutException { void singleTimer() throws IOException, TimeoutException { final String orchestratorName = "SingleTimer"; final Duration delay = Duration.ofSeconds(3); + AtomicReferenceArray timestamps = new AtomicReferenceArray<>(2); + AtomicInteger counter = new AtomicInteger(); DurableTaskGrpcWorker worker = this.createWorkerBuilder() - .addOrchestrator(orchestratorName, ctx -> ctx.createTimer(delay).await()) + .addOrchestrator(orchestratorName, ctx -> { + timestamps.set(counter.get(), LocalDateTime.now()); + counter.incrementAndGet(); + ctx.createTimer(delay).await(); + }) .buildAndStart(); DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); @@ -103,11 +109,130 @@ void singleTimer() throws IOException, TimeoutException { long expectedCompletionSecond = instance.getCreatedAt().plus(delay).getEpochSecond(); long actualCompletionSecond = instance.getLastUpdatedAt().getEpochSecond(); assertTrue(expectedCompletionSecond <= actualCompletionSecond); + + // Verify that the correct number of timers were created + // This should yield 2 (first invocation + replay invocations for internal timers) + assertEquals(2, counter.get()); + + // Verify that each timer is the expected length + int[] secondsElapsed = new int[1]; + for (int i = 0; i < timestamps.length() - 1; i++) { + secondsElapsed[i] = timestamps.get(i + 1).getSecond() - timestamps.get(i).getSecond(); + } + assertEquals(3, secondsElapsed[0]); + + } + } + + + @Test + void loopWithTimer() throws IOException, TimeoutException { + final String orchestratorName = "LoopWithTimer"; + final Duration delay = Duration.ofSeconds(2); + AtomicReferenceArray timestamps = new AtomicReferenceArray<>(4); + AtomicInteger counter = new AtomicInteger(); + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(orchestratorName, ctx -> { + for(int i = 0 ; i < 3; i++) { + if(!ctx.getIsReplaying()) { + timestamps.set(counter.get(), LocalDateTime.now()); + counter.incrementAndGet(); + } + ctx.createTimer(delay).await(); + } + }) + .buildAndStart(); + + DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); + Duration timeout = delay.plus(defaultTimeout); + OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, timeout, false); + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + + // Verify that the delay actually happened + long expectedCompletionSecond = instance.getCreatedAt().plus(delay).getEpochSecond(); + long actualCompletionSecond = instance.getLastUpdatedAt().getEpochSecond(); + assertTrue(expectedCompletionSecond <= actualCompletionSecond); + + // Verify that the correct number of timers were created + assertEquals(3, counter.get()); + + // Verify that each timer is the expected length + int[] secondsElapsed = new int[timestamps.length()]; + for (int i = 0; i < timestamps.length() - 1; i++) { + if(timestamps.get(i + 1) != null && timestamps.get(i) != null ) { + secondsElapsed[i] = timestamps.get(i + 1).getSecond() - timestamps.get(i).getSecond(); + }else{ + secondsElapsed[i] = -1; + } + } + assertEquals(2, secondsElapsed[0]); + assertEquals(2, secondsElapsed[1]); + assertEquals(-1, secondsElapsed[2]); + + + } + } + + @Test + void loopWithWaitForEvent() throws IOException, TimeoutException { + final String orchestratorName = "LoopWithTimer"; + final Duration delay = Duration.ofSeconds(2); + AtomicReferenceArray timestamps = new AtomicReferenceArray<>(4); + AtomicInteger counter = new AtomicInteger(); + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(orchestratorName, ctx -> { + for(int i = 0 ; i < 4; i++) { + try{ + ctx.waitForExternalEvent("HELLO", delay).await(); + }catch(TaskCanceledException tce ){ + if(!ctx.getIsReplaying()){ + timestamps.set(counter.get(), LocalDateTime.now()); + counter.incrementAndGet(); + } + + } + } + }) + .buildAndStart(); + + DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); + Duration timeout = delay.plus(defaultTimeout); + OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, timeout, false); + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + + // Verify that the delay actually happened + long expectedCompletionSecond = instance.getCreatedAt().plus(delay).getEpochSecond(); + long actualCompletionSecond = instance.getLastUpdatedAt().getEpochSecond(); + assertTrue(expectedCompletionSecond <= actualCompletionSecond); + + // Verify that the correct number of timers were created + assertEquals(4, counter.get()); + + // Verify that each timer is the expected length + int[] secondsElapsed = new int[timestamps.length()]; + for (int i = 0; i < timestamps.length() - 1; i++) { + if(timestamps.get(i + 1) != null && timestamps.get(i) != null ) { + secondsElapsed[i] = timestamps.get(i + 1).getSecond() - timestamps.get(i).getSecond(); + }else{ + secondsElapsed[i] = -1; + } + } + assertEquals(2, secondsElapsed[0]); + assertEquals(2, secondsElapsed[1]); + assertEquals(2, secondsElapsed[2]); + assertEquals(0, secondsElapsed[3]); + + } } @Test - @Disabled("Test is disabled for investigation, fixing the test retry pattern exposed the failure (could be timer creation issue)") void longTimer() throws TimeoutException { final String orchestratorName = "LongTimer"; final Duration delay = Duration.ofSeconds(7); @@ -128,7 +253,7 @@ void longTimer() throws TimeoutException { Duration timeout = delay.plus(defaultTimeout); OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, timeout, false); assertNotNull(instance); - assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus(), + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus(), String.format("Orchestration failed with error: %s", instance.getFailureDetails().getErrorMessage())); // Verify that the delay actually happened @@ -285,6 +410,31 @@ void singleTimeStampTimer() throws IOException, TimeoutException { } } + + @Test + void singleTimeStampCreateTimer() throws IOException, TimeoutException { + final String orchestratorName = "SingleTimeStampTimer"; + final Duration delay = Duration.ofSeconds(3); + final ZonedDateTime zonedDateTime = ZonedDateTime.of(LocalDateTime.now().plusSeconds(delay.getSeconds()), ZoneId.systemDefault()); + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(orchestratorName, ctx -> ctx.createTimer(zonedDateTime).await()) + .buildAndStart(); + + DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); + Duration timeout = delay.plus(defaultTimeout); + OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, timeout, false); + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + + // Verify that the delay actually happened + long expectedCompletionSecond = zonedDateTime.toInstant().getEpochSecond(); + long actualCompletionSecond = instance.getLastUpdatedAt().getEpochSecond(); + assertTrue(expectedCompletionSecond <= actualCompletionSecond); + } + } + @Test void isReplaying() throws IOException, InterruptedException, TimeoutException { final String orchestratorName = "SingleTimer"; @@ -884,13 +1034,13 @@ void multiInstanceQuery() throws TimeoutException{ // Test CreatedTimeTo filter query.setCreatedTimeTo(startTime.minus(Duration.ofSeconds(1))); result = client.queryInstances(query); - assertTrue(result.getOrchestrationState().isEmpty(), - "Result should be empty but found " + result.getOrchestrationState().size() + " instances: " + + assertTrue(result.getOrchestrationState().isEmpty(), + "Result should be empty but found " + result.getOrchestrationState().size() + " instances: " + "Start time: " + startTime + ", " + result.getOrchestrationState().stream() - .map(state -> String.format("\nID: %s, Status: %s, Created: %s", - state.getInstanceId(), - state.getRuntimeStatus(), + .map(state -> String.format("\nID: %s, Status: %s, Created: %s", + state.getInstanceId(), + state.getRuntimeStatus(), state.getCreatedAt())) .collect(Collectors.joining(", "))); @@ -1203,7 +1353,7 @@ void waitForInstanceStartThrowsException() { client.scheduleNewOrchestrationInstance(orchestratorName, null, instanceId); }); thread.start(); - + assertThrows(TimeoutException.class, () -> client.waitForInstanceStart(instanceId, Duration.ofSeconds(2)) ); } } @@ -1591,8 +1741,8 @@ public void taskExecutionIdTest() { DurableTaskGrpcWorker worker = this.createWorkerBuilder() .addOrchestrator(orchestratorName, ctx -> { - ctx.callActivity(retryActivityName,null,taskOptions).await(); - ctx.callActivity(retryActivityName,null,taskOptions).await(); + ctx.callActivity(retryActivityName,null,taskOptions).await(); + ctx.callActivity(retryActivityName,null,taskOptions).await(); ctx.complete(true); }) .addActivity(retryActivityName, ctx -> {