Skip to content

Commit abfd3ab

Browse files
committed
updating timer comparison
Signed-off-by: salaboy <[email protected]>
1 parent d5c12a3 commit abfd3ab

File tree

4 files changed

+90
-38
lines changed

4 files changed

+90
-38
lines changed

client/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ plugins {
1111
}
1212

1313
group 'io.dapr'
14-
version = '1.5.10-SNAPSHOT'
14+
version = '1.5.9'
1515
archivesBaseName = 'durabletask-client'
1616

1717
def grpcVersion = '1.69.0'

client/src/main/java/io/dapr/durabletask/DurableTaskClient.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
import javax.annotation.Nullable;
66
import java.time.Duration;
7-
import java.time.Instant;
87
import java.util.concurrent.TimeoutException;
98

109
/**
@@ -63,7 +62,6 @@ public String scheduleNewOrchestrationInstance(String orchestratorName, Object i
6362
*/
6463
public String scheduleNewOrchestrationInstance(String orchestratorName, Object input, String instanceId) {
6564
NewOrchestrationInstanceOptions options = new NewOrchestrationInstanceOptions()
66-
.setStartTime(Instant.now())
6765
.setInput(input)
6866
.setInstanceId(instanceId);
6967
return this.scheduleNewOrchestrationInstance(orchestratorName, options);

client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.time.Duration;
1515
import java.time.Instant;
1616
import java.time.ZonedDateTime;
17+
import java.time.temporal.ChronoUnit;
1718
import java.time.temporal.TemporalField;
1819
import java.util.*;
1920
import java.util.concurrent.CancellationException;
@@ -91,9 +92,9 @@ private class ContextImplTask implements TaskOrchestrationContext {
9192

9293
// LinkedHashMap to maintain insertion order when returning the list of pending actions
9394
private final Map<Integer, OrchestratorAction> pendingActions = new LinkedHashMap<>();
94-
private final Map<Integer, TaskRecord<?>> openTasks = new ConcurrentHashMap<>();
95+
private final Map<Integer, TaskRecord<?>> openTasks = new HashMap<>();
9596
private final Map<String, Queue<TaskRecord<?>>> outstandingEvents = new LinkedHashMap<>();
96-
private final List<HistoryEvent> unprocessedEvents = Collections.synchronizedList(new LinkedList<>());
97+
private final List<HistoryEvent> unprocessedEvents = new LinkedList<>();
9798
private final Queue<HistoryEvent> eventsWhileSuspended = new ArrayDeque<>();
9899
private final DataConverter dataConverter = TaskOrchestrationExecutor.this.dataConverter;
99100
private final Duration maximumTimerInterval = TaskOrchestrationExecutor.this.maximumTimerInterval;
@@ -305,12 +306,10 @@ public <V> Task<V> callActivity(
305306
}
306307
TaskFactory<V> taskFactory = () -> {
307308
int id = this.sequenceNumber++;
308-
309309
ScheduleTaskAction scheduleTaskAction = scheduleTaskBuilder.build();
310310
OrchestratorAction.Builder actionBuilder = OrchestratorAction.newBuilder()
311311
.setId(id)
312312
.setScheduleTask(scheduleTaskBuilder);
313-
314313
if (options != null && options.hasAppID()) {
315314
String targetAppId = options.getAppID();
316315
TaskRouter actionRouter = TaskRouter.newBuilder()
@@ -319,7 +318,6 @@ public <V> Task<V> callActivity(
319318
.build();
320319
actionBuilder.setRouter(actionRouter);
321320
}
322-
323321
this.pendingActions.put(id, actionBuilder.build());
324322

325323
if (!this.isReplaying) {
@@ -650,7 +648,6 @@ public Task<Void> createTimer(ZonedDateTime zonedDateTime) {
650648
}
651649

652650
private Task<Void> createTimer(Instant finalFireAt) {
653-
logger.info(">>>> Creating a Timer Task to fire at: "+ finalFireAt);
654651
return new TimerTask(finalFireAt);
655652
}
656653

@@ -662,11 +659,9 @@ private CompletableTask<Void> createInstantTimer(int id, Instant fireAt) {
662659
.build());
663660

664661
if (!this.isReplaying) {
665-
logger.info("Creating Instant Timer with id: " + id + " fireAt: " + fireAt);
662+
logger.finer("Creating Instant Timer with id: " + id + " fireAt: " + fireAt);
666663
}
667664

668-
logger.info("REPLAY: Creating Instant Timer with id: " + id + " fireAt: " + fireAt);
669-
670665
CompletableTask<Void> timerTask = new CompletableTask<>();
671666
TaskRecord<Void> record = new TaskRecord<>(timerTask, "(timer)", Void.class);
672667
this.openTasks.put(id, record);
@@ -706,9 +701,8 @@ public void handleTimerFired(HistoryEvent e) {
706701

707702
if (!this.isReplaying) {
708703
// TODO: Log timer fired, including the scheduled fire-time
709-
this.logger.info("Firing timer by completing task: "+timerEventId+" expected fire at time: "+ Instant.ofEpochSecond(timerFiredEvent.getFireAt().getSeconds(), timerFiredEvent.getFireAt().getNanos()));
704+
this.logger.finer("Firing timer by completing task: "+timerEventId+" expected fire at time: "+ Instant.ofEpochSecond(timerFiredEvent.getFireAt().getSeconds(), timerFiredEvent.getFireAt().getNanos()));
710705
}
711-
this.logger.info("REPLAY: Firing timer by completing task: "+timerEventId+" expected fire at time: "+ Instant.ofEpochSecond(timerFiredEvent.getFireAt().getSeconds(), timerFiredEvent.getFireAt().getNanos()));
712706

713707
CompletableTask<?> task = record.getTask();
714708
task.complete(null);
@@ -1031,7 +1025,6 @@ private class TimerTask extends CompletableTask<Void> {
10311025

10321026
public TimerTask(Instant finalFireAt) {
10331027
super();
1034-
logger.info("Creating a Timer task at: " + finalFireAt + " -> with current time " + currentInstant );
10351028
CompletableTask<Void> firstTimer = createTimerTask(finalFireAt);
10361029
CompletableFuture<Void> timerChain = createTimerChain(finalFireAt, firstTimer.future);
10371030
this.task = new CompletableTask<>(timerChain);
@@ -1045,11 +1038,12 @@ public TimerTask(Instant finalFireAt) {
10451038
// if necessary. Otherwise, we return and no more sub-timers are created.
10461039
private CompletableFuture<Void> createTimerChain(Instant finalFireAt, CompletableFuture<Void> currentFuture) {
10471040
return currentFuture.thenRun(() -> {
1048-
if (currentInstant.compareTo(finalFireAt) >= 0) {
1041+
Instant currentInstsanceMinusNanos = currentInstant.minusNanos(currentInstant.getNano());
1042+
Instant finalFireAtMinusNanos = finalFireAt.minusNanos(finalFireAt.getNano());
1043+
if (currentInstsanceMinusNanos.compareTo(finalFireAtMinusNanos) >= 0) {
10491044
return;
10501045
}
10511046
Task<Void> nextTimer = createTimerTask(finalFireAt);
1052-
10531047
createTimerChain(finalFireAt, nextTimer.future);
10541048
});
10551049
}
@@ -1069,10 +1063,9 @@ private CompletableTask<Void> createTimerTask(Instant finalFireAt) {
10691063

10701064
private void handleSubTimerSuccess() {
10711065
// check if it is the last timer
1072-
logger.info(">>>> Comparing current instant "+ currentInstant+" with finalFireAt: " + finalFireAt);
1073-
logger.info(">>>>Comparison: "+currentInstant.compareTo(finalFireAt));
1074-
if (currentInstant.compareTo(finalFireAt) >= 0) {
1075-
logger.info(">>>> Comparison -> Complete");
1066+
Instant currentInstantMinusNanos = currentInstant.minusNanos(currentInstant.getNano());
1067+
Instant finalFireAtMinusNanos = finalFireAt.minusNanos(finalFireAt.getNano());
1068+
if (currentInstantMinusNanos.compareTo(finalFireAtMinusNanos) >= 0) {
10761069
this.complete(null);
10771070
}
10781071
}

client/src/test/java/io/dapr/durabletask/IntegrationTests.java

Lines changed: 78 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -119,22 +119,25 @@ void singleTimer() throws IOException, TimeoutException {
119119
for (int i = 0; i < timestamps.length() - 1; i++) {
120120
secondsElapsed[i] = timestamps.get(i + 1).getSecond() - timestamps.get(i).getSecond();
121121
}
122-
assertEquals(secondsElapsed[0], 3);
122+
assertEquals(3, secondsElapsed[0]);
123123

124124
}
125125
}
126126

127+
127128
@Test
128129
void loopWithTimer() throws IOException, TimeoutException {
129130
final String orchestratorName = "LoopWithTimer";
130131
final Duration delay = Duration.ofSeconds(2);
131-
AtomicReferenceArray<LocalDateTime> timestamps = new AtomicReferenceArray<>(100);
132+
AtomicReferenceArray<LocalDateTime> timestamps = new AtomicReferenceArray<>(4);
132133
AtomicInteger counter = new AtomicInteger();
133134
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
134135
.addOrchestrator(orchestratorName, ctx -> {
135-
for(int i = 0 ; i < 10; i++) {
136-
timestamps.set(counter.get(), LocalDateTime.now());
137-
counter.incrementAndGet();
136+
for(int i = 0 ; i < 3; i++) {
137+
if(!ctx.getIsReplaying()) {
138+
timestamps.set(counter.get(), LocalDateTime.now());
139+
counter.incrementAndGet();
140+
}
138141
ctx.createTimer(delay).await();
139142
}
140143
})
@@ -154,8 +157,7 @@ void loopWithTimer() throws IOException, TimeoutException {
154157
assertTrue(expectedCompletionSecond <= actualCompletionSecond);
155158

156159
// Verify that the correct number of timers were created
157-
// ??? Not sure why 65, this seems consistent with what we see in Catalyst
158-
assertEquals(65, counter.get());
160+
assertEquals(3, counter.get());
159161

160162
// Verify that each timer is the expected length
161163
int[] secondsElapsed = new int[timestamps.length()];
@@ -166,7 +168,66 @@ void loopWithTimer() throws IOException, TimeoutException {
166168
secondsElapsed[i] = -1;
167169
}
168170
}
169-
assertEquals(secondsElapsed[0], 2);
171+
assertEquals(2, secondsElapsed[0]);
172+
assertEquals(2, secondsElapsed[1]);
173+
assertEquals(-1, secondsElapsed[2]);
174+
175+
176+
}
177+
}
178+
179+
@Test
180+
void loopWithWaitForEvent() throws IOException, TimeoutException {
181+
final String orchestratorName = "LoopWithTimer";
182+
final Duration delay = Duration.ofSeconds(2);
183+
AtomicReferenceArray<LocalDateTime> timestamps = new AtomicReferenceArray<>(4);
184+
AtomicInteger counter = new AtomicInteger();
185+
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
186+
.addOrchestrator(orchestratorName, ctx -> {
187+
for(int i = 0 ; i < 4; i++) {
188+
try{
189+
ctx.waitForExternalEvent("HELLO", delay).await();
190+
}catch(TaskCanceledException tce ){
191+
if(!ctx.getIsReplaying()){
192+
timestamps.set(counter.get(), LocalDateTime.now());
193+
counter.incrementAndGet();
194+
}
195+
196+
}
197+
}
198+
})
199+
.buildAndStart();
200+
201+
DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
202+
try (worker; client) {
203+
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName);
204+
Duration timeout = delay.plus(defaultTimeout);
205+
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, timeout, false);
206+
assertNotNull(instance);
207+
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
208+
209+
// Verify that the delay actually happened
210+
long expectedCompletionSecond = instance.getCreatedAt().plus(delay).getEpochSecond();
211+
long actualCompletionSecond = instance.getLastUpdatedAt().getEpochSecond();
212+
assertTrue(expectedCompletionSecond <= actualCompletionSecond);
213+
214+
// Verify that the correct number of timers were created
215+
assertEquals(4, counter.get());
216+
217+
// Verify that each timer is the expected length
218+
int[] secondsElapsed = new int[timestamps.length()];
219+
for (int i = 0; i < timestamps.length() - 1; i++) {
220+
if(timestamps.get(i + 1) != null && timestamps.get(i) != null ) {
221+
secondsElapsed[i] = timestamps.get(i + 1).getSecond() - timestamps.get(i).getSecond();
222+
}else{
223+
secondsElapsed[i] = -1;
224+
}
225+
}
226+
assertEquals(2, secondsElapsed[0]);
227+
assertEquals(2, secondsElapsed[1]);
228+
assertEquals(2, secondsElapsed[2]);
229+
assertEquals(0, secondsElapsed[3]);
230+
170231

171232
}
172233
}
@@ -192,7 +253,7 @@ void longTimer() throws TimeoutException {
192253
Duration timeout = delay.plus(defaultTimeout);
193254
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, timeout, false);
194255
assertNotNull(instance);
195-
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus(),
256+
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus(),
196257
String.format("Orchestration failed with error: %s", instance.getFailureDetails().getErrorMessage()));
197258

198259
// Verify that the delay actually happened
@@ -973,13 +1034,13 @@ void multiInstanceQuery() throws TimeoutException{
9731034
// Test CreatedTimeTo filter
9741035
query.setCreatedTimeTo(startTime.minus(Duration.ofSeconds(1)));
9751036
result = client.queryInstances(query);
976-
assertTrue(result.getOrchestrationState().isEmpty(),
977-
"Result should be empty but found " + result.getOrchestrationState().size() + " instances: " +
1037+
assertTrue(result.getOrchestrationState().isEmpty(),
1038+
"Result should be empty but found " + result.getOrchestrationState().size() + " instances: " +
9781039
"Start time: " + startTime + ", " +
9791040
result.getOrchestrationState().stream()
980-
.map(state -> String.format("\nID: %s, Status: %s, Created: %s",
981-
state.getInstanceId(),
982-
state.getRuntimeStatus(),
1041+
.map(state -> String.format("\nID: %s, Status: %s, Created: %s",
1042+
state.getInstanceId(),
1043+
state.getRuntimeStatus(),
9831044
state.getCreatedAt()))
9841045
.collect(Collectors.joining(", ")));
9851046

@@ -1292,7 +1353,7 @@ void waitForInstanceStartThrowsException() {
12921353
client.scheduleNewOrchestrationInstance(orchestratorName, null, instanceId);
12931354
});
12941355
thread.start();
1295-
1356+
12961357
assertThrows(TimeoutException.class, () -> client.waitForInstanceStart(instanceId, Duration.ofSeconds(2)) );
12971358
}
12981359
}
@@ -1680,8 +1741,8 @@ public void taskExecutionIdTest() {
16801741

16811742
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
16821743
.addOrchestrator(orchestratorName, ctx -> {
1683-
ctx.callActivity(retryActivityName,null,taskOptions).await();
1684-
ctx.callActivity(retryActivityName,null,taskOptions).await();
1744+
ctx.callActivity(retryActivityName,null,taskOptions).await();
1745+
ctx.callActivity(retryActivityName,null,taskOptions).await();
16851746
ctx.complete(true);
16861747
})
16871748
.addActivity(retryActivityName, ctx -> {

0 commit comments

Comments
 (0)