Skip to content

Commit a444d95

Browse files
authored
Break long timers into smaller timers (#122)
* Break long timers into smaller timers * Update CHANGELOG.md * Break up long timer for zonedDateTime and verify timer count in tests * Move down new createTimer function
1 parent b635d06 commit a444d95

File tree

7 files changed

+114
-8
lines changed

7 files changed

+114
-8
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
* Support Suspend and Resume Client APIs ([#104](https://github.com/microsoft/durabletask-java/issues/104))
55
* Fix the potential NPE issue of `DurableTaskClient terminate` method ([#104](https://github.com/microsoft/durabletask-java/issues/104))
66
* Add waitForCompletionOrCreateCheckStatusResponse client API ([#115](https://github.com/microsoft/durabletask-java/pull/115))
7-
7+
* Support long timers by breaking up into smaller timers ([#114](https://github.com/microsoft/durabletask-java/issues/114))
88

99
## v1.0.0
1010

client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import io.grpc.*;
1313

14+
import java.time.Duration;
1415
import java.util.*;
1516
import java.util.concurrent.TimeUnit;
1617
import java.util.logging.Level;
@@ -22,12 +23,14 @@
2223
public final class DurableTaskGrpcWorker implements AutoCloseable {
2324
private static final int DEFAULT_PORT = 4001;
2425
private static final Logger logger = Logger.getLogger(DurableTaskGrpcWorker.class.getPackage().getName());
26+
private static final Duration DEFAULT_MAXIMUM_TIMER_INTERVAL = Duration.ofDays(3);
2527

2628
private final HashMap<String, TaskOrchestrationFactory> orchestrationFactories = new HashMap<>();
2729
private final HashMap<String, TaskActivityFactory> activityFactories = new HashMap<>();
2830

2931
private final ManagedChannel managedSidecarChannel;
3032
private final DataConverter dataConverter;
33+
private final Duration maximumTimerInterval;
3134

3235
private final TaskHubSidecarServiceBlockingStub sidecarClient;
3336

@@ -57,6 +60,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
5760

5861
this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel);
5962
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
63+
this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL;
6064
}
6165

6266
/**
@@ -108,6 +112,7 @@ public void startAndBlock() {
108112
TaskOrchestrationExecutor taskOrchestrationExecutor = new TaskOrchestrationExecutor(
109113
this.orchestrationFactories,
110114
this.dataConverter,
115+
this.maximumTimerInterval,
111116
logger);
112117
TaskActivityExecutor taskActivityExecutor = new TaskActivityExecutor(
113118
this.activityFactories,

client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import io.grpc.Channel;
66

7+
import java.time.Duration;
78
import java.util.HashMap;
89

910
/**
@@ -15,6 +16,7 @@ public final class DurableTaskGrpcWorkerBuilder {
1516
int port;
1617
Channel channel;
1718
DataConverter dataConverter;
19+
Duration maximumTimerInterval;
1820

1921
/**
2022
* Adds an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker}.
@@ -99,6 +101,18 @@ public DurableTaskGrpcWorkerBuilder dataConverter(DataConverter dataConverter) {
99101
return this;
100102
}
101103

104+
/**
105+
* Sets the maximum timer interval. If not specified, the default maximum timer interval duration will be used.
106+
* The default maximum timer interval duration is 3 days.
107+
*
108+
* @param maximumTimerInterval the maximum timer interval
109+
* @return this builder object
110+
*/
111+
public DurableTaskGrpcWorkerBuilder maximumTimerInterval(Duration maximumTimerInterval) {
112+
this.maximumTimerInterval = maximumTimerInterval;
113+
return this;
114+
}
115+
102116
/**
103117
* Initializes a new {@link DurableTaskGrpcWorker} object with the settings specified in the current builder object.
104118
* @return a new {@link DurableTaskGrpcWorker} object

client/src/main/java/com/microsoft/durabletask/OrchestrationRunner.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.google.protobuf.StringValue;
77
import com.microsoft.durabletask.implementation.protobuf.OrchestratorService;
88

9+
import java.time.Duration;
910
import java.util.Base64;
1011
import java.util.HashMap;
1112
import java.util.logging.Logger;
@@ -18,6 +19,7 @@
1819
*/
1920
public final class OrchestrationRunner {
2021
private static final Logger logger = Logger.getLogger(OrchestrationRunner.class.getPackage().getName());
22+
private static final Duration DEFAULT_MAXIMUM_TIMER_INTERVAL = Duration.ofDays(3);
2123

2224
private OrchestrationRunner() {
2325
}
@@ -126,6 +128,7 @@ public TaskOrchestration create() {
126128
TaskOrchestrationExecutor taskOrchestrationExecutor = new TaskOrchestrationExecutor(
127129
orchestrationFactories,
128130
new JacksonDataConverter(),
131+
DEFAULT_MAXIMUM_TIMER_INTERVAL,
129132
logger);
130133

131134
// TODO: Error handling

client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,16 @@ final class TaskOrchestrationExecutor {
2424
private final HashMap<String, TaskOrchestrationFactory> orchestrationFactories;
2525
private final DataConverter dataConverter;
2626
private final Logger logger;
27+
private final Duration maximumTimerInterval;
2728

2829
public TaskOrchestrationExecutor(
2930
HashMap<String, TaskOrchestrationFactory> orchestrationFactories,
3031
DataConverter dataConverter,
32+
Duration maximumTimerInterval,
3133
Logger logger) {
3234
this.orchestrationFactories = orchestrationFactories;
3335
this.dataConverter = dataConverter;
36+
this.maximumTimerInterval = maximumTimerInterval;
3437
this.logger = logger;
3538
}
3639

@@ -77,6 +80,7 @@ private class ContextImplTask implements TaskOrchestrationContext {
7780
private final LinkedList<HistoryEvent> unprocessedEvents = new LinkedList<>();
7881
private final Queue<HistoryEvent> eventsWhileSuspended = new ArrayDeque<>();
7982
private final DataConverter dataConverter = TaskOrchestrationExecutor.this.dataConverter;
83+
private final Duration maximumTimerInterval = TaskOrchestrationExecutor.this.maximumTimerInterval;
8084
private final Logger logger = TaskOrchestrationExecutor.this.logger;
8185
private final OrchestrationHistoryIterator historyEventPlayer;
8286
private int sequenceNumber;
@@ -547,19 +551,27 @@ public Task<Void> createTimer(Duration duration) {
547551
Helpers.throwIfOrchestratorComplete(this.isComplete);
548552
Helpers.throwIfArgumentNull(duration, "duration");
549553

550-
int id = this.sequenceNumber++;
551-
Instant fireAt = this.currentInstant.plus(duration);
552-
return createInstantTimer(id, fireAt);
554+
Instant finalFireAt = this.currentInstant.plus(duration);
555+
return createTimer(finalFireAt);
553556
}
554557

555558
@Override
556559
public Task<Void> createTimer(ZonedDateTime zonedDateTime) {
557560
Helpers.throwIfOrchestratorComplete(this.isComplete);
558561
Helpers.throwIfArgumentNull(zonedDateTime, "zonedDateTime");
559562

560-
int id = this.sequenceNumber++;
561-
Instant fireAt = zonedDateTime.toInstant();
562-
return createInstantTimer(id, fireAt);
563+
Instant finalFireAt = zonedDateTime.toInstant();
564+
return createTimer(finalFireAt);
565+
}
566+
567+
private Task<Void> createTimer(Instant finalFireAt) {
568+
Duration remainingTime = Duration.between(this.currentInstant, finalFireAt);
569+
while (remainingTime.compareTo(this.maximumTimerInterval) > 0) {
570+
Instant nextFireAt = this.currentInstant.plus(this.maximumTimerInterval);
571+
createInstantTimer(this.sequenceNumber++, nextFireAt).await();
572+
remainingTime = Duration.between(this.currentInstant, finalFireAt);
573+
}
574+
return createInstantTimer(this.sequenceNumber++, finalFireAt);
563575
}
564576

565577
private Task<Void> createInstantTimer(int id, Instant fireAt) {

client/src/test/java/com/microsoft/durabletask/IntegrationTestBase.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ public DurableTaskGrpcWorker buildAndStart() {
3838
return server;
3939
}
4040

41+
public TestDurableTaskWorkerBuilder setMaximumTimerInterval(Duration maximumTimerInterval) {
42+
this.innerBuilder.maximumTimerInterval(maximumTimerInterval);
43+
return this;
44+
}
45+
4146
public TestDurableTaskWorkerBuilder addOrchestrator(
4247
String name,
4348
TaskOrchestration implementation) {

client/src/test/java/com/microsoft/durabletask/IntegrationTests.java

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.util.*;
88
import java.util.concurrent.TimeUnit;
99
import java.util.concurrent.TimeoutException;
10+
import java.util.concurrent.atomic.AtomicInteger;
1011
import java.util.stream.Collectors;
1112
import java.util.stream.IntStream;
1213
import java.util.stream.Stream;
@@ -55,7 +56,7 @@ void emptyOrchestration() throws TimeoutException {
5556
instanceId,
5657
defaultTimeout,
5758
true);
58-
59+
5960
assertNotNull(instance);
6061
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
6162
assertEquals(input, instance.readInputAs(String.class));
@@ -86,6 +87,72 @@ void singleTimer() throws IOException, TimeoutException {
8687
}
8788
}
8889

90+
@Test
91+
void longTimer() throws TimeoutException {
92+
final String orchestratorName = "LongTimer";
93+
final Duration delay = Duration.ofSeconds(7);
94+
AtomicInteger counter = new AtomicInteger();
95+
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
96+
.addOrchestrator(orchestratorName, ctx -> {
97+
counter.incrementAndGet();
98+
ctx.createTimer(delay).await();
99+
})
100+
.setMaximumTimerInterval(Duration.ofSeconds(3))
101+
.buildAndStart();
102+
103+
DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
104+
try (worker; client) {
105+
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName);
106+
Duration timeout = delay.plus(defaultTimeout);
107+
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, timeout, false);
108+
assertNotNull(instance);
109+
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
110+
111+
// Verify that the delay actually happened
112+
long expectedCompletionSecond = instance.getCreatedAt().plus(delay).getEpochSecond();
113+
long actualCompletionSecond = instance.getLastUpdatedAt().getEpochSecond();
114+
assertTrue(expectedCompletionSecond <= actualCompletionSecond);
115+
116+
// Verify that the correct number of timers were created
117+
// This should yield 4 (first invocation + replay invocations for internal timers 3s + 3s + 1s)
118+
assertEquals(4, counter.get());
119+
}
120+
}
121+
122+
@Test
123+
void longTimeStampTimer() throws TimeoutException {
124+
final String orchestratorName = "LongTimeStampTimer";
125+
final Duration delay = Duration.ofSeconds(7);
126+
final ZonedDateTime zonedDateTime = ZonedDateTime.of(LocalDateTime.now().plusSeconds(delay.getSeconds()), ZoneId.systemDefault());
127+
128+
AtomicInteger counter = new AtomicInteger();
129+
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
130+
.addOrchestrator(orchestratorName, ctx -> {
131+
counter.incrementAndGet();
132+
ctx.createTimer(zonedDateTime).await();
133+
})
134+
.setMaximumTimerInterval(Duration.ofSeconds(3))
135+
.buildAndStart();
136+
137+
DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
138+
try (worker; client) {
139+
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName);
140+
Duration timeout = delay.plus(defaultTimeout);
141+
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, timeout, false);
142+
assertNotNull(instance);
143+
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
144+
145+
// Verify that the delay actually happened
146+
long expectedCompletionSecond = zonedDateTime.toInstant().getEpochSecond();
147+
long actualCompletionSecond = instance.getLastUpdatedAt().getEpochSecond();
148+
assertTrue(expectedCompletionSecond <= actualCompletionSecond);
149+
150+
// Verify that the correct number of timers were created
151+
// This should yield 4 (first invocation + replay invocations for internal timers 3s + 3s + 1s)
152+
assertEquals(4, counter.get());
153+
}
154+
}
155+
89156
@Test
90157
void singleTimeStampTimer() throws IOException, TimeoutException {
91158
final String orchestratorName = "SingleTimeStampTimer";

0 commit comments

Comments
 (0)