Skip to content

Commit d5c12a3

Browse files
committed
adding test for loop createTimer
Signed-off-by: salaboy <[email protected]>
1 parent 9a3af84 commit d5c12a3

File tree

4 files changed

+74
-15
lines changed

4 files changed

+74
-15
lines changed

client/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ plugins {
1111
}
1212

1313
group 'io.dapr'
14-
version = '1.5.9'
14+
version = '1.5.10-SNAPSHOT'
1515
archivesBaseName = 'durabletask-client'
1616

1717
def grpcVersion = '1.69.0'

client/src/main/java/io/dapr/durabletask/DurableTaskClient.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import javax.annotation.Nullable;
66
import java.time.Duration;
7+
import java.time.Instant;
78
import java.util.concurrent.TimeoutException;
89

910
/**
@@ -62,6 +63,7 @@ public String scheduleNewOrchestrationInstance(String orchestratorName, Object i
6263
*/
6364
public String scheduleNewOrchestrationInstance(String orchestratorName, Object input, String instanceId) {
6465
NewOrchestrationInstanceOptions options = new NewOrchestrationInstanceOptions()
66+
.setStartTime(Instant.now())
6567
.setInput(input)
6668
.setInstanceId(instanceId);
6769
return this.scheduleNewOrchestrationInstance(orchestratorName, options);

client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@
1414
import java.time.Duration;
1515
import java.time.Instant;
1616
import java.time.ZonedDateTime;
17+
import java.time.temporal.TemporalField;
1718
import java.util.*;
1819
import java.util.concurrent.CancellationException;
1920
import java.util.concurrent.CompletableFuture;
21+
import java.util.concurrent.ConcurrentHashMap;
2022
import java.util.concurrent.ExecutionException;
2123
import java.util.function.Consumer;
2224
import java.util.function.Function;
@@ -88,10 +90,10 @@ private class ContextImplTask implements TaskOrchestrationContext {
8890
private String appId;
8991

9092
// LinkedHashMap to maintain insertion order when returning the list of pending actions
91-
private final LinkedHashMap<Integer, OrchestratorAction> pendingActions = new LinkedHashMap<>();
92-
private final HashMap<Integer, TaskRecord<?>> openTasks = new HashMap<>();
93-
private final LinkedHashMap<String, Queue<TaskRecord<?>>> outstandingEvents = new LinkedHashMap<>();
94-
private final LinkedList<HistoryEvent> unprocessedEvents = new LinkedList<>();
93+
private final Map<Integer, OrchestratorAction> pendingActions = new LinkedHashMap<>();
94+
private final Map<Integer, TaskRecord<?>> openTasks = new ConcurrentHashMap<>();
95+
private final Map<String, Queue<TaskRecord<?>>> outstandingEvents = new LinkedHashMap<>();
96+
private final List<HistoryEvent> unprocessedEvents = Collections.synchronizedList(new LinkedList<>());
9597
private final Queue<HistoryEvent> eventsWhileSuspended = new ArrayDeque<>();
9698
private final DataConverter dataConverter = TaskOrchestrationExecutor.this.dataConverter;
9799
private final Duration maximumTimerInterval = TaskOrchestrationExecutor.this.maximumTimerInterval;
@@ -303,12 +305,12 @@ public <V> Task<V> callActivity(
303305
}
304306
TaskFactory<V> taskFactory = () -> {
305307
int id = this.sequenceNumber++;
306-
308+
307309
ScheduleTaskAction scheduleTaskAction = scheduleTaskBuilder.build();
308310
OrchestratorAction.Builder actionBuilder = OrchestratorAction.newBuilder()
309311
.setId(id)
310312
.setScheduleTask(scheduleTaskBuilder);
311-
313+
312314
if (options != null && options.hasAppID()) {
313315
String targetAppId = options.getAppID();
314316
TaskRouter actionRouter = TaskRouter.newBuilder()
@@ -317,7 +319,7 @@ public <V> Task<V> callActivity(
317319
.build();
318320
actionBuilder.setRouter(actionRouter);
319321
}
320-
322+
321323
this.pendingActions.put(id, actionBuilder.build());
322324

323325
if (!this.isReplaying) {
@@ -410,7 +412,7 @@ public <V> Task<V> callSubOrchestrator(
410412
if (input instanceof TaskOptions) {
411413
throw new IllegalArgumentException("TaskOptions cannot be used as an input. Did you call the wrong method overload?");
412414
}
413-
415+
414416
String serializedInput = this.dataConverter.serialize(input);
415417
CreateSubOrchestrationAction.Builder createSubOrchestrationActionBuilder = CreateSubOrchestrationAction.newBuilder().setName(name);
416418
if (serializedInput != null) {
@@ -466,7 +468,7 @@ public <V> Task<V> waitForExternalEvent(String name, Duration timeout, Class<V>
466468
int id = this.sequenceNumber++;
467469

468470
CompletableTask<V> eventTask = new ExternalEventTask<>(name, id, timeout);
469-
471+
470472
// Check for a previously received event with the same name
471473
for (HistoryEvent e : this.unprocessedEvents) {
472474
EventRaisedEvent existing = e.getEventRaised();
@@ -648,8 +650,8 @@ public Task<Void> createTimer(ZonedDateTime zonedDateTime) {
648650
}
649651

650652
private Task<Void> createTimer(Instant finalFireAt) {
651-
TimerTask timer = new TimerTask(finalFireAt);
652-
return timer;
653+
logger.info(">>>> Creating a Timer Task to fire at: "+ finalFireAt);
654+
return new TimerTask(finalFireAt);
653655
}
654656

655657
private CompletableTask<Void> createInstantTimer(int id, Instant fireAt) {
@@ -660,9 +662,11 @@ private CompletableTask<Void> createInstantTimer(int id, Instant fireAt) {
660662
.build());
661663

662664
if (!this.isReplaying) {
663-
// TODO: Log timer creation, including the expected fire-time
665+
logger.info("Creating Instant Timer with id: " + id + " fireAt: " + fireAt);
664666
}
665667

668+
logger.info("REPLAY: Creating Instant Timer with id: " + id + " fireAt: " + fireAt);
669+
666670
CompletableTask<Void> timerTask = new CompletableTask<>();
667671
TaskRecord<Void> record = new TaskRecord<>(timerTask, "(timer)", Void.class);
668672
this.openTasks.put(id, record);
@@ -702,7 +706,9 @@ public void handleTimerFired(HistoryEvent e) {
702706

703707
if (!this.isReplaying) {
704708
// TODO: Log timer fired, including the scheduled fire-time
709+
this.logger.info("Firing timer by completing task: "+timerEventId+" expected fire at time: "+ Instant.ofEpochSecond(timerFiredEvent.getFireAt().getSeconds(), timerFiredEvent.getFireAt().getNanos()));
705710
}
711+
this.logger.info("REPLAY: Firing timer by completing task: "+timerEventId+" expected fire at time: "+ Instant.ofEpochSecond(timerFiredEvent.getFireAt().getSeconds(), timerFiredEvent.getFireAt().getNanos()));
706712

707713
CompletableTask<?> task = record.getTask();
708714
task.complete(null);
@@ -851,7 +857,7 @@ private void addCarryoverEvents(CompleteOrchestrationAction.Builder builder) {
851857

852858
externalEvents.forEach(builder::addCarryoverEvents);
853859
}
854-
860+
855861
private boolean waitingForEvents() {
856862
return this.outstandingEvents.size() > 0;
857863
}
@@ -894,7 +900,7 @@ private void processEvent(HistoryEvent e) {
894900
if (factory == null) {
895901
throw new IllegalStateException("No factory found for orchestrator: " + executionStarted.getName());
896902
}
897-
903+
898904
TaskOrchestration orchestrator = factory.create();
899905
orchestrator.run(this);
900906
break;
@@ -1025,6 +1031,7 @@ private class TimerTask extends CompletableTask<Void> {
10251031

10261032
public TimerTask(Instant finalFireAt) {
10271033
super();
1034+
logger.info("Creating a Timer task at: " + finalFireAt + " -> with current time " + currentInstant );
10281035
CompletableTask<Void> firstTimer = createTimerTask(finalFireAt);
10291036
CompletableFuture<Void> timerChain = createTimerChain(finalFireAt, firstTimer.future);
10301037
this.task = new CompletableTask<>(timerChain);
@@ -1062,7 +1069,10 @@ private CompletableTask<Void> createTimerTask(Instant finalFireAt) {
10621069

10631070
private void handleSubTimerSuccess() {
10641071
// check if it is the last timer
1072+
logger.info(">>>> Comparing current instant "+ currentInstant+" with finalFireAt: " + finalFireAt);
1073+
logger.info(">>>>Comparison: "+currentInstant.compareTo(finalFireAt));
10651074
if (currentInstant.compareTo(finalFireAt) >= 0) {
1075+
logger.info(">>>> Comparison -> Complete");
10661076
this.complete(null);
10671077
}
10681078
}

client/src/test/java/io/dapr/durabletask/IntegrationTests.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,53 @@ void singleTimer() throws IOException, TimeoutException {
124124
}
125125
}
126126

127+
@Test
128+
void loopWithTimer() throws IOException, TimeoutException {
129+
final String orchestratorName = "LoopWithTimer";
130+
final Duration delay = Duration.ofSeconds(2);
131+
AtomicReferenceArray<LocalDateTime> timestamps = new AtomicReferenceArray<>(100);
132+
AtomicInteger counter = new AtomicInteger();
133+
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
134+
.addOrchestrator(orchestratorName, ctx -> {
135+
for(int i = 0 ; i < 10; i++) {
136+
timestamps.set(counter.get(), LocalDateTime.now());
137+
counter.incrementAndGet();
138+
ctx.createTimer(delay).await();
139+
}
140+
})
141+
.buildAndStart();
142+
143+
DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
144+
try (worker; client) {
145+
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName);
146+
Duration timeout = delay.plus(defaultTimeout);
147+
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, timeout, false);
148+
assertNotNull(instance);
149+
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
150+
151+
// Verify that the delay actually happened
152+
long expectedCompletionSecond = instance.getCreatedAt().plus(delay).getEpochSecond();
153+
long actualCompletionSecond = instance.getLastUpdatedAt().getEpochSecond();
154+
assertTrue(expectedCompletionSecond <= actualCompletionSecond);
155+
156+
// Verify that the correct number of timers were created
157+
// ??? Not sure why 65, this seems consistent with what we see in Catalyst
158+
assertEquals(65, counter.get());
159+
160+
// Verify that each timer is the expected length
161+
int[] secondsElapsed = new int[timestamps.length()];
162+
for (int i = 0; i < timestamps.length() - 1; i++) {
163+
if(timestamps.get(i + 1) != null && timestamps.get(i) != null ) {
164+
secondsElapsed[i] = timestamps.get(i + 1).getSecond() - timestamps.get(i).getSecond();
165+
}else{
166+
secondsElapsed[i] = -1;
167+
}
168+
}
169+
assertEquals(secondsElapsed[0], 2);
170+
171+
}
172+
}
173+
127174
@Test
128175
void longTimer() throws TimeoutException {
129176
final String orchestratorName = "LongTimer";

0 commit comments

Comments
 (0)