From 9a3af8488ebf9045e9f08131e07486ec426ca4df Mon Sep 17 00:00:00 2001 From: salaboy Date: Wed, 20 Aug 2025 09:50:31 +0100 Subject: [PATCH 1/4] createTimerChain was comparing the same timestamp for future validation Signed-off-by: salaboy --- .../TaskOrchestrationExecutor.java | 2 +- .../io/dapr/durabletask/IntegrationTests.java | 46 ++++++++++++++++++- 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java b/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java index b3822db..4ac67db 100644 --- a/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java +++ b/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java @@ -1038,7 +1038,7 @@ public TimerTask(Instant finalFireAt) { // if necessary. Otherwise, we return and no more sub-timers are created. private CompletableFuture createTimerChain(Instant finalFireAt, CompletableFuture currentFuture) { return currentFuture.thenRun(() -> { - if (currentInstant.compareTo(finalFireAt) > 0) { + if (currentInstant.compareTo(finalFireAt) >= 0) { return; } Task nextTimer = createTimerTask(finalFireAt); diff --git a/client/src/test/java/io/dapr/durabletask/IntegrationTests.java b/client/src/test/java/io/dapr/durabletask/IntegrationTests.java index 8043bc6..005c8b3 100644 --- a/client/src/test/java/io/dapr/durabletask/IntegrationTests.java +++ b/client/src/test/java/io/dapr/durabletask/IntegrationTests.java @@ -87,8 +87,14 @@ void emptyOrchestration() throws TimeoutException { void singleTimer() throws IOException, TimeoutException { final String orchestratorName = "SingleTimer"; final Duration delay = Duration.ofSeconds(3); + AtomicReferenceArray timestamps = new AtomicReferenceArray<>(2); + AtomicInteger counter = new AtomicInteger(); DurableTaskGrpcWorker worker = this.createWorkerBuilder() - .addOrchestrator(orchestratorName, ctx -> ctx.createTimer(delay).await()) + .addOrchestrator(orchestratorName, ctx -> { + timestamps.set(counter.get(), LocalDateTime.now()); + counter.incrementAndGet(); + ctx.createTimer(delay).await(); + }) .buildAndStart(); DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); @@ -103,11 +109,22 @@ void singleTimer() throws IOException, TimeoutException { long expectedCompletionSecond = instance.getCreatedAt().plus(delay).getEpochSecond(); long actualCompletionSecond = instance.getLastUpdatedAt().getEpochSecond(); assertTrue(expectedCompletionSecond <= actualCompletionSecond); + + // Verify that the correct number of timers were created + // This should yield 2 (first invocation + replay invocations for internal timers) + assertEquals(2, counter.get()); + + // Verify that each timer is the expected length + int[] secondsElapsed = new int[1]; + for (int i = 0; i < timestamps.length() - 1; i++) { + secondsElapsed[i] = timestamps.get(i + 1).getSecond() - timestamps.get(i).getSecond(); + } + assertEquals(secondsElapsed[0], 3); + } } @Test - @Disabled("Test is disabled for investigation, fixing the test retry pattern exposed the failure (could be timer creation issue)") void longTimer() throws TimeoutException { final String orchestratorName = "LongTimer"; final Duration delay = Duration.ofSeconds(7); @@ -285,6 +302,31 @@ void singleTimeStampTimer() throws IOException, TimeoutException { } } + + @Test + void singleTimeStampCreateTimer() throws IOException, TimeoutException { + final String orchestratorName = "SingleTimeStampTimer"; + final Duration delay = Duration.ofSeconds(3); + final ZonedDateTime zonedDateTime = ZonedDateTime.of(LocalDateTime.now().plusSeconds(delay.getSeconds()), ZoneId.systemDefault()); + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(orchestratorName, ctx -> ctx.createTimer(zonedDateTime).await()) + .buildAndStart(); + + DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); + Duration timeout = delay.plus(defaultTimeout); + OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, timeout, false); + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + + // Verify that the delay actually happened + long expectedCompletionSecond = zonedDateTime.toInstant().getEpochSecond(); + long actualCompletionSecond = instance.getLastUpdatedAt().getEpochSecond(); + assertTrue(expectedCompletionSecond <= actualCompletionSecond); + } + } + @Test void isReplaying() throws IOException, InterruptedException, TimeoutException { final String orchestratorName = "SingleTimer"; From d5c12a3b1fd68f3ad1a0f138f144d2be6c0e55f1 Mon Sep 17 00:00:00 2001 From: salaboy Date: Thu, 21 Aug 2025 14:52:19 +0100 Subject: [PATCH 2/4] adding test for loop createTimer Signed-off-by: salaboy --- client/build.gradle | 2 +- .../dapr/durabletask/DurableTaskClient.java | 2 + .../TaskOrchestrationExecutor.java | 38 +++++++++------ .../io/dapr/durabletask/IntegrationTests.java | 47 +++++++++++++++++++ 4 files changed, 74 insertions(+), 15 deletions(-) diff --git a/client/build.gradle b/client/build.gradle index 07885db..baa5785 100644 --- a/client/build.gradle +++ b/client/build.gradle @@ -11,7 +11,7 @@ plugins { } group 'io.dapr' -version = '1.5.9' +version = '1.5.10-SNAPSHOT' archivesBaseName = 'durabletask-client' def grpcVersion = '1.69.0' diff --git a/client/src/main/java/io/dapr/durabletask/DurableTaskClient.java b/client/src/main/java/io/dapr/durabletask/DurableTaskClient.java index 3136d80..efd3957 100644 --- a/client/src/main/java/io/dapr/durabletask/DurableTaskClient.java +++ b/client/src/main/java/io/dapr/durabletask/DurableTaskClient.java @@ -4,6 +4,7 @@ import javax.annotation.Nullable; import java.time.Duration; +import java.time.Instant; import java.util.concurrent.TimeoutException; /** @@ -62,6 +63,7 @@ public String scheduleNewOrchestrationInstance(String orchestratorName, Object i */ public String scheduleNewOrchestrationInstance(String orchestratorName, Object input, String instanceId) { NewOrchestrationInstanceOptions options = new NewOrchestrationInstanceOptions() + .setStartTime(Instant.now()) .setInput(input) .setInstanceId(instanceId); return this.scheduleNewOrchestrationInstance(orchestratorName, options); diff --git a/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java b/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java index 4ac67db..68d1bdc 100644 --- a/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java +++ b/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java @@ -14,9 +14,11 @@ import java.time.Duration; import java.time.Instant; import java.time.ZonedDateTime; +import java.time.temporal.TemporalField; import java.util.*; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.function.Consumer; import java.util.function.Function; @@ -88,10 +90,10 @@ private class ContextImplTask implements TaskOrchestrationContext { private String appId; // LinkedHashMap to maintain insertion order when returning the list of pending actions - private final LinkedHashMap pendingActions = new LinkedHashMap<>(); - private final HashMap> openTasks = new HashMap<>(); - private final LinkedHashMap>> outstandingEvents = new LinkedHashMap<>(); - private final LinkedList unprocessedEvents = new LinkedList<>(); + private final Map pendingActions = new LinkedHashMap<>(); + private final Map> openTasks = new ConcurrentHashMap<>(); + private final Map>> outstandingEvents = new LinkedHashMap<>(); + private final List unprocessedEvents = Collections.synchronizedList(new LinkedList<>()); private final Queue eventsWhileSuspended = new ArrayDeque<>(); private final DataConverter dataConverter = TaskOrchestrationExecutor.this.dataConverter; private final Duration maximumTimerInterval = TaskOrchestrationExecutor.this.maximumTimerInterval; @@ -303,12 +305,12 @@ public Task callActivity( } TaskFactory taskFactory = () -> { int id = this.sequenceNumber++; - + ScheduleTaskAction scheduleTaskAction = scheduleTaskBuilder.build(); OrchestratorAction.Builder actionBuilder = OrchestratorAction.newBuilder() .setId(id) .setScheduleTask(scheduleTaskBuilder); - + if (options != null && options.hasAppID()) { String targetAppId = options.getAppID(); TaskRouter actionRouter = TaskRouter.newBuilder() @@ -317,7 +319,7 @@ public Task callActivity( .build(); actionBuilder.setRouter(actionRouter); } - + this.pendingActions.put(id, actionBuilder.build()); if (!this.isReplaying) { @@ -410,7 +412,7 @@ public Task callSubOrchestrator( if (input instanceof TaskOptions) { throw new IllegalArgumentException("TaskOptions cannot be used as an input. Did you call the wrong method overload?"); } - + String serializedInput = this.dataConverter.serialize(input); CreateSubOrchestrationAction.Builder createSubOrchestrationActionBuilder = CreateSubOrchestrationAction.newBuilder().setName(name); if (serializedInput != null) { @@ -466,7 +468,7 @@ public Task waitForExternalEvent(String name, Duration timeout, Class int id = this.sequenceNumber++; CompletableTask eventTask = new ExternalEventTask<>(name, id, timeout); - + // Check for a previously received event with the same name for (HistoryEvent e : this.unprocessedEvents) { EventRaisedEvent existing = e.getEventRaised(); @@ -648,8 +650,8 @@ public Task createTimer(ZonedDateTime zonedDateTime) { } private Task createTimer(Instant finalFireAt) { - TimerTask timer = new TimerTask(finalFireAt); - return timer; + logger.info(">>>> Creating a Timer Task to fire at: "+ finalFireAt); + return new TimerTask(finalFireAt); } private CompletableTask createInstantTimer(int id, Instant fireAt) { @@ -660,9 +662,11 @@ private CompletableTask createInstantTimer(int id, Instant fireAt) { .build()); if (!this.isReplaying) { - // TODO: Log timer creation, including the expected fire-time + logger.info("Creating Instant Timer with id: " + id + " fireAt: " + fireAt); } + logger.info("REPLAY: Creating Instant Timer with id: " + id + " fireAt: " + fireAt); + CompletableTask timerTask = new CompletableTask<>(); TaskRecord record = new TaskRecord<>(timerTask, "(timer)", Void.class); this.openTasks.put(id, record); @@ -702,7 +706,9 @@ public void handleTimerFired(HistoryEvent e) { if (!this.isReplaying) { // TODO: Log timer fired, including the scheduled fire-time + this.logger.info("Firing timer by completing task: "+timerEventId+" expected fire at time: "+ Instant.ofEpochSecond(timerFiredEvent.getFireAt().getSeconds(), timerFiredEvent.getFireAt().getNanos())); } + this.logger.info("REPLAY: Firing timer by completing task: "+timerEventId+" expected fire at time: "+ Instant.ofEpochSecond(timerFiredEvent.getFireAt().getSeconds(), timerFiredEvent.getFireAt().getNanos())); CompletableTask task = record.getTask(); task.complete(null); @@ -851,7 +857,7 @@ private void addCarryoverEvents(CompleteOrchestrationAction.Builder builder) { externalEvents.forEach(builder::addCarryoverEvents); } - + private boolean waitingForEvents() { return this.outstandingEvents.size() > 0; } @@ -894,7 +900,7 @@ private void processEvent(HistoryEvent e) { if (factory == null) { throw new IllegalStateException("No factory found for orchestrator: " + executionStarted.getName()); } - + TaskOrchestration orchestrator = factory.create(); orchestrator.run(this); break; @@ -1025,6 +1031,7 @@ private class TimerTask extends CompletableTask { public TimerTask(Instant finalFireAt) { super(); + logger.info("Creating a Timer task at: " + finalFireAt + " -> with current time " + currentInstant ); CompletableTask firstTimer = createTimerTask(finalFireAt); CompletableFuture timerChain = createTimerChain(finalFireAt, firstTimer.future); this.task = new CompletableTask<>(timerChain); @@ -1062,7 +1069,10 @@ private CompletableTask createTimerTask(Instant finalFireAt) { private void handleSubTimerSuccess() { // check if it is the last timer + logger.info(">>>> Comparing current instant "+ currentInstant+" with finalFireAt: " + finalFireAt); + logger.info(">>>>Comparison: "+currentInstant.compareTo(finalFireAt)); if (currentInstant.compareTo(finalFireAt) >= 0) { + logger.info(">>>> Comparison -> Complete"); this.complete(null); } } diff --git a/client/src/test/java/io/dapr/durabletask/IntegrationTests.java b/client/src/test/java/io/dapr/durabletask/IntegrationTests.java index 005c8b3..06e3f59 100644 --- a/client/src/test/java/io/dapr/durabletask/IntegrationTests.java +++ b/client/src/test/java/io/dapr/durabletask/IntegrationTests.java @@ -124,6 +124,53 @@ void singleTimer() throws IOException, TimeoutException { } } + @Test + void loopWithTimer() throws IOException, TimeoutException { + final String orchestratorName = "LoopWithTimer"; + final Duration delay = Duration.ofSeconds(2); + AtomicReferenceArray timestamps = new AtomicReferenceArray<>(100); + AtomicInteger counter = new AtomicInteger(); + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(orchestratorName, ctx -> { + for(int i = 0 ; i < 10; i++) { + timestamps.set(counter.get(), LocalDateTime.now()); + counter.incrementAndGet(); + ctx.createTimer(delay).await(); + } + }) + .buildAndStart(); + + DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); + Duration timeout = delay.plus(defaultTimeout); + OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, timeout, false); + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + + // Verify that the delay actually happened + long expectedCompletionSecond = instance.getCreatedAt().plus(delay).getEpochSecond(); + long actualCompletionSecond = instance.getLastUpdatedAt().getEpochSecond(); + assertTrue(expectedCompletionSecond <= actualCompletionSecond); + + // Verify that the correct number of timers were created + // ??? Not sure why 65, this seems consistent with what we see in Catalyst + assertEquals(65, counter.get()); + + // Verify that each timer is the expected length + int[] secondsElapsed = new int[timestamps.length()]; + for (int i = 0; i < timestamps.length() - 1; i++) { + if(timestamps.get(i + 1) != null && timestamps.get(i) != null ) { + secondsElapsed[i] = timestamps.get(i + 1).getSecond() - timestamps.get(i).getSecond(); + }else{ + secondsElapsed[i] = -1; + } + } + assertEquals(secondsElapsed[0], 2); + + } + } + @Test void longTimer() throws TimeoutException { final String orchestratorName = "LongTimer"; From abfd3ab99e6e3b77ad85aa9935b5632bec9cc261 Mon Sep 17 00:00:00 2001 From: salaboy Date: Fri, 22 Aug 2025 10:20:45 +0100 Subject: [PATCH 3/4] updating timer comparison Signed-off-by: salaboy --- client/build.gradle | 2 +- .../dapr/durabletask/DurableTaskClient.java | 2 - .../TaskOrchestrationExecutor.java | 29 +++--- .../io/dapr/durabletask/IntegrationTests.java | 95 +++++++++++++++---- 4 files changed, 90 insertions(+), 38 deletions(-) diff --git a/client/build.gradle b/client/build.gradle index baa5785..07885db 100644 --- a/client/build.gradle +++ b/client/build.gradle @@ -11,7 +11,7 @@ plugins { } group 'io.dapr' -version = '1.5.10-SNAPSHOT' +version = '1.5.9' archivesBaseName = 'durabletask-client' def grpcVersion = '1.69.0' diff --git a/client/src/main/java/io/dapr/durabletask/DurableTaskClient.java b/client/src/main/java/io/dapr/durabletask/DurableTaskClient.java index efd3957..3136d80 100644 --- a/client/src/main/java/io/dapr/durabletask/DurableTaskClient.java +++ b/client/src/main/java/io/dapr/durabletask/DurableTaskClient.java @@ -4,7 +4,6 @@ import javax.annotation.Nullable; import java.time.Duration; -import java.time.Instant; import java.util.concurrent.TimeoutException; /** @@ -63,7 +62,6 @@ public String scheduleNewOrchestrationInstance(String orchestratorName, Object i */ public String scheduleNewOrchestrationInstance(String orchestratorName, Object input, String instanceId) { NewOrchestrationInstanceOptions options = new NewOrchestrationInstanceOptions() - .setStartTime(Instant.now()) .setInput(input) .setInstanceId(instanceId); return this.scheduleNewOrchestrationInstance(orchestratorName, options); diff --git a/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java b/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java index 68d1bdc..c6d5958 100644 --- a/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java +++ b/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java @@ -14,6 +14,7 @@ import java.time.Duration; import java.time.Instant; import java.time.ZonedDateTime; +import java.time.temporal.ChronoUnit; import java.time.temporal.TemporalField; import java.util.*; import java.util.concurrent.CancellationException; @@ -91,9 +92,9 @@ private class ContextImplTask implements TaskOrchestrationContext { // LinkedHashMap to maintain insertion order when returning the list of pending actions private final Map pendingActions = new LinkedHashMap<>(); - private final Map> openTasks = new ConcurrentHashMap<>(); + private final Map> openTasks = new HashMap<>(); private final Map>> outstandingEvents = new LinkedHashMap<>(); - private final List unprocessedEvents = Collections.synchronizedList(new LinkedList<>()); + private final List unprocessedEvents = new LinkedList<>(); private final Queue eventsWhileSuspended = new ArrayDeque<>(); private final DataConverter dataConverter = TaskOrchestrationExecutor.this.dataConverter; private final Duration maximumTimerInterval = TaskOrchestrationExecutor.this.maximumTimerInterval; @@ -305,12 +306,10 @@ public Task callActivity( } TaskFactory taskFactory = () -> { int id = this.sequenceNumber++; - ScheduleTaskAction scheduleTaskAction = scheduleTaskBuilder.build(); OrchestratorAction.Builder actionBuilder = OrchestratorAction.newBuilder() .setId(id) .setScheduleTask(scheduleTaskBuilder); - if (options != null && options.hasAppID()) { String targetAppId = options.getAppID(); TaskRouter actionRouter = TaskRouter.newBuilder() @@ -319,7 +318,6 @@ public Task callActivity( .build(); actionBuilder.setRouter(actionRouter); } - this.pendingActions.put(id, actionBuilder.build()); if (!this.isReplaying) { @@ -650,7 +648,6 @@ public Task createTimer(ZonedDateTime zonedDateTime) { } private Task createTimer(Instant finalFireAt) { - logger.info(">>>> Creating a Timer Task to fire at: "+ finalFireAt); return new TimerTask(finalFireAt); } @@ -662,11 +659,9 @@ private CompletableTask createInstantTimer(int id, Instant fireAt) { .build()); if (!this.isReplaying) { - logger.info("Creating Instant Timer with id: " + id + " fireAt: " + fireAt); + logger.finer("Creating Instant Timer with id: " + id + " fireAt: " + fireAt); } - logger.info("REPLAY: Creating Instant Timer with id: " + id + " fireAt: " + fireAt); - CompletableTask timerTask = new CompletableTask<>(); TaskRecord record = new TaskRecord<>(timerTask, "(timer)", Void.class); this.openTasks.put(id, record); @@ -706,9 +701,8 @@ public void handleTimerFired(HistoryEvent e) { if (!this.isReplaying) { // TODO: Log timer fired, including the scheduled fire-time - this.logger.info("Firing timer by completing task: "+timerEventId+" expected fire at time: "+ Instant.ofEpochSecond(timerFiredEvent.getFireAt().getSeconds(), timerFiredEvent.getFireAt().getNanos())); + this.logger.finer("Firing timer by completing task: "+timerEventId+" expected fire at time: "+ Instant.ofEpochSecond(timerFiredEvent.getFireAt().getSeconds(), timerFiredEvent.getFireAt().getNanos())); } - this.logger.info("REPLAY: Firing timer by completing task: "+timerEventId+" expected fire at time: "+ Instant.ofEpochSecond(timerFiredEvent.getFireAt().getSeconds(), timerFiredEvent.getFireAt().getNanos())); CompletableTask task = record.getTask(); task.complete(null); @@ -1031,7 +1025,6 @@ private class TimerTask extends CompletableTask { public TimerTask(Instant finalFireAt) { super(); - logger.info("Creating a Timer task at: " + finalFireAt + " -> with current time " + currentInstant ); CompletableTask firstTimer = createTimerTask(finalFireAt); CompletableFuture timerChain = createTimerChain(finalFireAt, firstTimer.future); this.task = new CompletableTask<>(timerChain); @@ -1045,11 +1038,12 @@ public TimerTask(Instant finalFireAt) { // if necessary. Otherwise, we return and no more sub-timers are created. private CompletableFuture createTimerChain(Instant finalFireAt, CompletableFuture currentFuture) { return currentFuture.thenRun(() -> { - if (currentInstant.compareTo(finalFireAt) >= 0) { + Instant currentInstsanceMinusNanos = currentInstant.minusNanos(currentInstant.getNano()); + Instant finalFireAtMinusNanos = finalFireAt.minusNanos(finalFireAt.getNano()); + if (currentInstsanceMinusNanos.compareTo(finalFireAtMinusNanos) >= 0) { return; } Task nextTimer = createTimerTask(finalFireAt); - createTimerChain(finalFireAt, nextTimer.future); }); } @@ -1069,10 +1063,9 @@ private CompletableTask createTimerTask(Instant finalFireAt) { private void handleSubTimerSuccess() { // check if it is the last timer - logger.info(">>>> Comparing current instant "+ currentInstant+" with finalFireAt: " + finalFireAt); - logger.info(">>>>Comparison: "+currentInstant.compareTo(finalFireAt)); - if (currentInstant.compareTo(finalFireAt) >= 0) { - logger.info(">>>> Comparison -> Complete"); + Instant currentInstantMinusNanos = currentInstant.minusNanos(currentInstant.getNano()); + Instant finalFireAtMinusNanos = finalFireAt.minusNanos(finalFireAt.getNano()); + if (currentInstantMinusNanos.compareTo(finalFireAtMinusNanos) >= 0) { this.complete(null); } } diff --git a/client/src/test/java/io/dapr/durabletask/IntegrationTests.java b/client/src/test/java/io/dapr/durabletask/IntegrationTests.java index 06e3f59..5d64153 100644 --- a/client/src/test/java/io/dapr/durabletask/IntegrationTests.java +++ b/client/src/test/java/io/dapr/durabletask/IntegrationTests.java @@ -119,22 +119,25 @@ void singleTimer() throws IOException, TimeoutException { for (int i = 0; i < timestamps.length() - 1; i++) { secondsElapsed[i] = timestamps.get(i + 1).getSecond() - timestamps.get(i).getSecond(); } - assertEquals(secondsElapsed[0], 3); + assertEquals(3, secondsElapsed[0]); } } + @Test void loopWithTimer() throws IOException, TimeoutException { final String orchestratorName = "LoopWithTimer"; final Duration delay = Duration.ofSeconds(2); - AtomicReferenceArray timestamps = new AtomicReferenceArray<>(100); + AtomicReferenceArray timestamps = new AtomicReferenceArray<>(4); AtomicInteger counter = new AtomicInteger(); DurableTaskGrpcWorker worker = this.createWorkerBuilder() .addOrchestrator(orchestratorName, ctx -> { - for(int i = 0 ; i < 10; i++) { - timestamps.set(counter.get(), LocalDateTime.now()); - counter.incrementAndGet(); + for(int i = 0 ; i < 3; i++) { + if(!ctx.getIsReplaying()) { + timestamps.set(counter.get(), LocalDateTime.now()); + counter.incrementAndGet(); + } ctx.createTimer(delay).await(); } }) @@ -154,8 +157,7 @@ void loopWithTimer() throws IOException, TimeoutException { assertTrue(expectedCompletionSecond <= actualCompletionSecond); // Verify that the correct number of timers were created - // ??? Not sure why 65, this seems consistent with what we see in Catalyst - assertEquals(65, counter.get()); + assertEquals(3, counter.get()); // Verify that each timer is the expected length int[] secondsElapsed = new int[timestamps.length()]; @@ -166,7 +168,66 @@ void loopWithTimer() throws IOException, TimeoutException { secondsElapsed[i] = -1; } } - assertEquals(secondsElapsed[0], 2); + assertEquals(2, secondsElapsed[0]); + assertEquals(2, secondsElapsed[1]); + assertEquals(-1, secondsElapsed[2]); + + + } + } + + @Test + void loopWithWaitForEvent() throws IOException, TimeoutException { + final String orchestratorName = "LoopWithTimer"; + final Duration delay = Duration.ofSeconds(2); + AtomicReferenceArray timestamps = new AtomicReferenceArray<>(4); + AtomicInteger counter = new AtomicInteger(); + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(orchestratorName, ctx -> { + for(int i = 0 ; i < 4; i++) { + try{ + ctx.waitForExternalEvent("HELLO", delay).await(); + }catch(TaskCanceledException tce ){ + if(!ctx.getIsReplaying()){ + timestamps.set(counter.get(), LocalDateTime.now()); + counter.incrementAndGet(); + } + + } + } + }) + .buildAndStart(); + + DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); + Duration timeout = delay.plus(defaultTimeout); + OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, timeout, false); + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + + // Verify that the delay actually happened + long expectedCompletionSecond = instance.getCreatedAt().plus(delay).getEpochSecond(); + long actualCompletionSecond = instance.getLastUpdatedAt().getEpochSecond(); + assertTrue(expectedCompletionSecond <= actualCompletionSecond); + + // Verify that the correct number of timers were created + assertEquals(4, counter.get()); + + // Verify that each timer is the expected length + int[] secondsElapsed = new int[timestamps.length()]; + for (int i = 0; i < timestamps.length() - 1; i++) { + if(timestamps.get(i + 1) != null && timestamps.get(i) != null ) { + secondsElapsed[i] = timestamps.get(i + 1).getSecond() - timestamps.get(i).getSecond(); + }else{ + secondsElapsed[i] = -1; + } + } + assertEquals(2, secondsElapsed[0]); + assertEquals(2, secondsElapsed[1]); + assertEquals(2, secondsElapsed[2]); + assertEquals(0, secondsElapsed[3]); + } } @@ -192,7 +253,7 @@ void longTimer() throws TimeoutException { Duration timeout = delay.plus(defaultTimeout); OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, timeout, false); assertNotNull(instance); - assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus(), + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus(), String.format("Orchestration failed with error: %s", instance.getFailureDetails().getErrorMessage())); // Verify that the delay actually happened @@ -973,13 +1034,13 @@ void multiInstanceQuery() throws TimeoutException{ // Test CreatedTimeTo filter query.setCreatedTimeTo(startTime.minus(Duration.ofSeconds(1))); result = client.queryInstances(query); - assertTrue(result.getOrchestrationState().isEmpty(), - "Result should be empty but found " + result.getOrchestrationState().size() + " instances: " + + assertTrue(result.getOrchestrationState().isEmpty(), + "Result should be empty but found " + result.getOrchestrationState().size() + " instances: " + "Start time: " + startTime + ", " + result.getOrchestrationState().stream() - .map(state -> String.format("\nID: %s, Status: %s, Created: %s", - state.getInstanceId(), - state.getRuntimeStatus(), + .map(state -> String.format("\nID: %s, Status: %s, Created: %s", + state.getInstanceId(), + state.getRuntimeStatus(), state.getCreatedAt())) .collect(Collectors.joining(", "))); @@ -1292,7 +1353,7 @@ void waitForInstanceStartThrowsException() { client.scheduleNewOrchestrationInstance(orchestratorName, null, instanceId); }); thread.start(); - + assertThrows(TimeoutException.class, () -> client.waitForInstanceStart(instanceId, Duration.ofSeconds(2)) ); } } @@ -1680,8 +1741,8 @@ public void taskExecutionIdTest() { DurableTaskGrpcWorker worker = this.createWorkerBuilder() .addOrchestrator(orchestratorName, ctx -> { - ctx.callActivity(retryActivityName,null,taskOptions).await(); - ctx.callActivity(retryActivityName,null,taskOptions).await(); + ctx.callActivity(retryActivityName,null,taskOptions).await(); + ctx.callActivity(retryActivityName,null,taskOptions).await(); ctx.complete(true); }) .addActivity(retryActivityName, ctx -> { From 075f3466669dad8971769e1c2a80601106e834e2 Mon Sep 17 00:00:00 2001 From: salaboy Date: Fri, 22 Aug 2025 10:27:13 +0100 Subject: [PATCH 4/4] formatting logger and remove unused imports Signed-off-by: salaboy --- .../dapr/durabletask/TaskOrchestrationExecutor.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java b/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java index c6d5958..8ef9ffb 100644 --- a/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java +++ b/client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java @@ -14,12 +14,9 @@ import java.time.Duration; import java.time.Instant; import java.time.ZonedDateTime; -import java.time.temporal.ChronoUnit; -import java.time.temporal.TemporalField; import java.util.*; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.function.Consumer; import java.util.function.Function; @@ -659,7 +656,7 @@ private CompletableTask createInstantTimer(int id, Instant fireAt) { .build()); if (!this.isReplaying) { - logger.finer("Creating Instant Timer with id: " + id + " fireAt: " + fireAt); + logger.finer(() -> String.format("Creating Instant Timer with id: %s, fireAt: %s ", id, fireAt)); } CompletableTask timerTask = new CompletableTask<>(); @@ -700,8 +697,10 @@ public void handleTimerFired(HistoryEvent e) { } if (!this.isReplaying) { - // TODO: Log timer fired, including the scheduled fire-time - this.logger.finer("Firing timer by completing task: "+timerEventId+" expected fire at time: "+ Instant.ofEpochSecond(timerFiredEvent.getFireAt().getSeconds(), timerFiredEvent.getFireAt().getNanos())); + this.logger.finer(() -> + String.format("Firing timer by completing task: %s expected fire at time: %s", timerEventId, + Instant.ofEpochSecond(timerFiredEvent.getFireAt().getSeconds(), + timerFiredEvent.getFireAt().getNanos()))); } CompletableTask task = record.getTask();