Skip to content

Commit 1640939

Browse files
authored
Implement set timer by time stamp using Java.time.ZonedDateTime (#106)
* Implement set timer by time stamp * remove unnecessary check for timer * add default implementation to avoid breaking change * update exception message
1 parent c303a12 commit 1640939

File tree

3 files changed

+55
-2
lines changed

3 files changed

+55
-2
lines changed

client/src/main/java/com/microsoft/durabletask/TaskOrchestrationContext.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import javax.annotation.Nullable;
66
import java.time.Duration;
77
import java.time.Instant;
8+
import java.time.ZonedDateTime;
89
import java.util.Arrays;
910
import java.util.List;
1011

@@ -155,6 +156,20 @@ default Task<Task<?>> anyOf(Task<?>... tasks) {
155156
*/
156157
Task<Void> createTimer(Duration delay);
157158

159+
/**
160+
* Creates a durable timer that expires after the specified timestamp with specific zone.
161+
* <p>
162+
* Specifying a long delay (for example, a delay of a few days or more) may result in the creation of multiple,
163+
* internally-managed durable timers. The orchestration code doesn't need to be aware of this behavior. However,
164+
* it may be visible in framework logs and the stored history state.
165+
*
166+
* @param zonedDateTime timestamp with specific zone when the timer should expire
167+
* @return a new {@code Task} that completes after the specified delay
168+
*/
169+
default Task<Void> createTimer(ZonedDateTime zonedDateTime) {
170+
throw new UnsupportedOperationException("This method is not implemented. Please use an orchestration context class that has an implementation for this method.");
171+
}
172+
158173
/**
159174
* Transitions the orchestration into the {@link OrchestrationRuntimeStatus#COMPLETED} state with the given output.
160175
*

client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import javax.annotation.Nullable;
1111
import java.time.Duration;
1212
import java.time.Instant;
13+
import java.time.ZonedDateTime;
1314
import java.util.*;
1415
import java.util.concurrent.CancellationException;
1516
import java.util.concurrent.CompletableFuture;
@@ -548,6 +549,20 @@ public Task<Void> createTimer(Duration duration) {
548549

549550
int id = this.sequenceNumber++;
550551
Instant fireAt = this.currentInstant.plus(duration);
552+
return createInstantTimer(id, fireAt);
553+
}
554+
555+
@Override
556+
public Task<Void> createTimer(ZonedDateTime zonedDateTime) {
557+
Helpers.throwIfOrchestratorComplete(this.isComplete);
558+
Helpers.throwIfArgumentNull(zonedDateTime, "zonedDateTime");
559+
560+
int id = this.sequenceNumber++;
561+
Instant fireAt = zonedDateTime.toInstant();
562+
return createInstantTimer(id, fireAt);
563+
}
564+
565+
private Task<Void> createInstantTimer(int id, Instant fireAt) {
551566
Timestamp ts = DataConverter.getTimestampFromInstant(fireAt);
552567
this.pendingActions.put(id, OrchestratorAction.newBuilder()
553568
.setId(id)

client/src/test/java/com/microsoft/durabletask/IntegrationTests.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@
33
package com.microsoft.durabletask;
44

55
import java.io.IOException;
6-
import java.time.Duration;
7-
import java.time.Instant;
6+
import java.time.*;
87
import java.util.*;
98
import java.util.concurrent.TimeUnit;
109
import java.util.concurrent.TimeoutException;
@@ -88,6 +87,30 @@ void singleTimer() throws IOException, TimeoutException {
8887
}
8988
}
9089

90+
@Test
91+
void singleTimeStampTimer() throws IOException, TimeoutException {
92+
final String orchestratorName = "SingleTimeStampTimer";
93+
final Duration delay = Duration.ofSeconds(3);
94+
final ZonedDateTime zonedDateTime = ZonedDateTime.of(LocalDateTime.now().plusSeconds(delay.getSeconds()), ZoneId.systemDefault());
95+
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
96+
.addOrchestrator(orchestratorName, ctx -> ctx.createTimer(zonedDateTime).await())
97+
.buildAndStart();
98+
99+
DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
100+
try (worker; client) {
101+
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName);
102+
Duration timeout = delay.plus(defaultTimeout);
103+
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, timeout, false);
104+
assertNotNull(instance);
105+
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
106+
107+
// Verify that the delay actually happened
108+
long expectedCompletionSecond = zonedDateTime.toInstant().getEpochSecond();
109+
long actualCompletionSecond = instance.getLastUpdatedAt().getEpochSecond();
110+
assertTrue(expectedCompletionSecond <= actualCompletionSecond);
111+
}
112+
}
113+
91114
@Test
92115
void isReplaying() throws IOException, InterruptedException, TimeoutException {
93116
final String orchestratorName = "SingleTimer";

0 commit comments

Comments
 (0)