Skip to content

Commit b78297b

Browse files
authored
Make timer task non-blocking (#161)
* Make timer task async * Update CHANGELOG.md * Create the timer tasks as they complete instead of upfront * Update CHANGELOG.md * Ensure that task does not complete unless all subtimers complete * Enhance integration test to verify length of subtimers * Add comment and refactor integration test
1 parent 52c44f5 commit b78297b

File tree

3 files changed

+150
-9
lines changed

3 files changed

+150
-9
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
## v1.3.0
22
* Refactor `RetriableTask` and add new `CompoundTask`, fixing Fan-out/Fan-in stuck when using `RetriableTask` ([#157](https://github.com/microsoft/durabletask-java/pull/157))
3+
* Refactor `createTimer` to be non-blocking ([#161](https://github.com/microsoft/durabletask-java/pull/161))
34

45
## v1.2.0
56

@@ -20,7 +21,6 @@
2021
* Fix the potential NPE issue of `DurableTaskClient#terminate` method ([#104](https://github.com/microsoft/durabletask-java/issues/104))
2122
* Add waitForCompletionOrCreateCheckStatusResponse client API ([#115](https://github.com/microsoft/durabletask-java/pull/115))
2223
* Support long timers by breaking up into smaller timers ([#114](https://github.com/microsoft/durabletask-java/issues/114))
23-
* Support restartInstance and pass restartPostUri in HttpManagementPayload ([#108](https://github.com/microsoft/durabletask-java/issues/108))
2424

2525
## v1.0.0
2626

client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java

Lines changed: 62 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -586,16 +586,11 @@ public Task<Void> createTimer(ZonedDateTime zonedDateTime) {
586586
}
587587

588588
private Task<Void> createTimer(Instant finalFireAt) {
589-
Duration remainingTime = Duration.between(this.currentInstant, finalFireAt);
590-
while (remainingTime.compareTo(this.maximumTimerInterval) > 0) {
591-
Instant nextFireAt = this.currentInstant.plus(this.maximumTimerInterval);
592-
createInstantTimer(this.sequenceNumber++, nextFireAt).await();
593-
remainingTime = Duration.between(this.currentInstant, finalFireAt);
594-
}
595-
return createInstantTimer(this.sequenceNumber++, finalFireAt);
589+
TimerTask timer = new TimerTask(finalFireAt);
590+
return timer;
596591
}
597592

598-
private Task<Void> createInstantTimer(int id, Instant fireAt) {
593+
private CompletableTask<Void> createInstantTimer(int id, Instant fireAt) {
599594
Timestamp ts = DataConverter.getTimestampFromInstant(fireAt);
600595
this.pendingActions.put(id, OrchestratorAction.newBuilder()
601596
.setId(id)
@@ -941,6 +936,61 @@ List<HistoryEvent> getNewEvents() {
941936
}
942937
}
943938

939+
private class TimerTask extends CompletableTask<Void> {
940+
private Instant finalFireAt;
941+
CompletableTask<Void> task;
942+
943+
public TimerTask(Instant finalFireAt) {
944+
super();
945+
CompletableTask<Void> firstTimer = createTimerTask(finalFireAt);
946+
CompletableFuture<Void> timerChain = createTimerChain(finalFireAt, firstTimer.future);
947+
this.task = new CompletableTask<>(timerChain);
948+
this.finalFireAt = finalFireAt;
949+
}
950+
951+
// For a short timer (less than maximumTimerInterval), once the currentFuture completes, we must have reached finalFireAt,
952+
// so we return and no more sub-timers are created. For a long timer (more than maximumTimerInterval), once a given
953+
// currentFuture completes, we check if we have not yet reached finalFireAt. If that is the case, we create a new sub-timer
954+
// task and make a recursive call on that new sub-timer task so that once it completes, another sub-timer task is created
955+
// if necessary. Otherwise, we return and no more sub-timers are created.
956+
private CompletableFuture<Void> createTimerChain(Instant finalFireAt, CompletableFuture<Void> currentFuture) {
957+
return currentFuture.thenRun(() -> {
958+
if (currentInstant.compareTo(finalFireAt) > 0) {
959+
return;
960+
}
961+
Task<Void> nextTimer = createTimerTask(finalFireAt);
962+
963+
createTimerChain(finalFireAt, nextTimer.future);
964+
});
965+
}
966+
967+
private CompletableTask<Void> createTimerTask(Instant finalFireAt) {
968+
CompletableTask<Void> nextTimer;
969+
Duration remainingTime = Duration.between(currentInstant, finalFireAt);
970+
if (remainingTime.compareTo(maximumTimerInterval) > 0) {
971+
Instant nextFireAt = currentInstant.plus(maximumTimerInterval);
972+
nextTimer = createInstantTimer(sequenceNumber++, nextFireAt);
973+
} else {
974+
nextTimer = createInstantTimer(sequenceNumber++, finalFireAt);
975+
}
976+
nextTimer.setParentTask(this);
977+
return nextTimer;
978+
}
979+
980+
private void handleSubTimerSuccess() {
981+
// check if it is the last timer
982+
if (currentInstant.compareTo(finalFireAt) >= 0) {
983+
this.complete(null);
984+
}
985+
}
986+
987+
@Override
988+
public Void await() {
989+
return this.task.await();
990+
}
991+
992+
}
993+
944994
private class ExternalEventTask<V> extends CompletableTask<V> {
945995
private final String eventName;
946996
private final Duration timeout;
@@ -1257,6 +1307,10 @@ public boolean complete(V value) {
12571307
// notify parent task
12581308
((RetriableTask<V>) parentTask).handleChildSuccess(value);
12591309
}
1310+
if (parentTask instanceof TimerTask) {
1311+
// notify parent task
1312+
((TimerTask) parentTask).handleSubTimerSuccess();
1313+
}
12601314
return result;
12611315
}
12621316

client/src/test/java/com/microsoft/durabletask/IntegrationTests.java

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.concurrent.TimeoutException;
1010
import java.util.concurrent.atomic.AtomicBoolean;
1111
import java.util.concurrent.atomic.AtomicInteger;
12+
import java.util.concurrent.atomic.AtomicReferenceArray;
1213
import java.util.stream.Collectors;
1314
import java.util.stream.IntStream;
1415
import java.util.stream.Stream;
@@ -93,8 +94,10 @@ void longTimer() throws TimeoutException {
9394
final String orchestratorName = "LongTimer";
9495
final Duration delay = Duration.ofSeconds(7);
9596
AtomicInteger counter = new AtomicInteger();
97+
AtomicReferenceArray<LocalDateTime> timestamps = new AtomicReferenceArray<>(4);
9698
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
9799
.addOrchestrator(orchestratorName, ctx -> {
100+
timestamps.set(counter.get(), LocalDateTime.now());
98101
counter.incrementAndGet();
99102
ctx.createTimer(delay).await();
100103
})
@@ -117,9 +120,93 @@ void longTimer() throws TimeoutException {
117120
// Verify that the correct number of timers were created
118121
// This should yield 4 (first invocation + replay invocations for internal timers 3s + 3s + 1s)
119122
assertEquals(4, counter.get());
123+
124+
// Verify that each timer is the expected length
125+
int[] secondsElapsed = new int[3];
126+
for (int i = 0; i < timestamps.length() - 1; i++) {
127+
secondsElapsed[i] = timestamps.get(i + 1).getSecond() - timestamps.get(i).getSecond();
128+
}
129+
assertEquals(secondsElapsed[0], 3);
130+
assertEquals(secondsElapsed[1], 3);
131+
assertEquals(secondsElapsed[2], 1);
132+
}
133+
}
134+
135+
@Test
136+
void longTimerNonblocking() throws TimeoutException {
137+
final String orchestratorName = "ActivityAnyOf";
138+
final String externalEventActivityName = "externalEvent";
139+
final String externalEventWinner = "The external event completed first";
140+
final String timerEventWinner = "The timer event completed first";
141+
final Duration timerDuration = Duration.ofSeconds(20);
142+
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
143+
.addOrchestrator(orchestratorName, ctx -> {
144+
Task<String> externalEvent = ctx.waitForExternalEvent(externalEventActivityName, String.class);
145+
Task<Void> longTimer = ctx.createTimer(timerDuration);
146+
Task<?> winnerEvent = ctx.anyOf(externalEvent, longTimer).await();
147+
if (winnerEvent == externalEvent) {
148+
ctx.complete(externalEventWinner);
149+
} else {
150+
ctx.complete(timerEventWinner);
151+
}
152+
}).setMaximumTimerInterval(Duration.ofSeconds(3)).buildAndStart();
153+
154+
DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
155+
try (worker; client) {
156+
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName);
157+
client.raiseEvent(instanceId, externalEventActivityName, "Hello world");
158+
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
159+
assertNotNull(instance);
160+
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
161+
162+
String output = instance.readOutputAs(String.class);
163+
assertNotNull(output);
164+
assertTrue(output.equals(externalEventWinner));
165+
166+
long createdTime = instance.getCreatedAt().getEpochSecond();
167+
long completedTime = instance.getLastUpdatedAt().getEpochSecond();
168+
// Timer did not block execution
169+
assertTrue(completedTime - createdTime < 5);
120170
}
121171
}
122172

173+
@Test
174+
void longTimerNonblockingNoExternal() throws TimeoutException {
175+
final String orchestratorName = "ActivityAnyOf";
176+
final String externalEventActivityName = "externalEvent";
177+
final String externalEventWinner = "The external event completed first";
178+
final String timerEventWinner = "The timer event completed first";
179+
final Duration timerDuration = Duration.ofSeconds(20);
180+
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
181+
.addOrchestrator(orchestratorName, ctx -> {
182+
Task<String> externalEvent = ctx.waitForExternalEvent(externalEventActivityName, String.class);
183+
Task<Void> longTimer = ctx.createTimer(timerDuration);
184+
Task<?> winnerEvent = ctx.anyOf(externalEvent, longTimer).await();
185+
if (winnerEvent == externalEvent) {
186+
ctx.complete(externalEventWinner);
187+
} else {
188+
ctx.complete(timerEventWinner);
189+
}
190+
}).setMaximumTimerInterval(Duration.ofSeconds(3)).buildAndStart();
191+
192+
DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
193+
try (worker; client) {
194+
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName);
195+
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
196+
assertNotNull(instance);
197+
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
198+
199+
String output = instance.readOutputAs(String.class);
200+
assertNotNull(output);
201+
assertTrue(output.equals(timerEventWinner));
202+
203+
long expectedCompletionSecond = instance.getCreatedAt().plus(timerDuration).getEpochSecond();
204+
long actualCompletionSecond = instance.getLastUpdatedAt().getEpochSecond();
205+
assertTrue(expectedCompletionSecond <= actualCompletionSecond);
206+
}
207+
}
208+
209+
123210
@Test
124211
void longTimeStampTimer() throws TimeoutException {
125212
final String orchestratorName = "LongTimeStampTimer";

0 commit comments

Comments
 (0)