Skip to content

Commit d713971

Browse files
authored
Added WorkflowTestEnvironment.sleep (#130)
1 parent 4cd383a commit d713971

12 files changed

+204
-7
lines changed

src/main/java/com/uber/cadence/client/WorkflowClientInterceptor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ UntypedWorkflowStub newUntypedWorkflowStub(
2626
String workflowType, WorkflowOptions options, UntypedWorkflowStub next);
2727

2828
UntypedWorkflowStub newUntypedWorkflowStub(
29-
WorkflowExecution execution, Optional<String> workflowType, UntypedWorkflowStub result);
29+
WorkflowExecution execution, Optional<String> workflowType, UntypedWorkflowStub next);
3030

3131
ActivityCompletionClient newActivityCompletionClient(ActivityCompletionClient next);
3232
}

src/main/java/com/uber/cadence/internal/replay/ActivityDecisionContext.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ private ActivityCancellationHandler(
4949

5050
@Override
5151
public void accept(Exception cause) {
52+
if (!scheduledActivities.containsKey(activityId)) {
53+
// Cancellation handlers are not deregistered. So they fire after an activity completion.
54+
return;
55+
}
5256
decisions.requestCancelActivityTask(
5357
activityId,
5458
() -> {

src/main/java/com/uber/cadence/internal/replay/WorkflowDecisionContext.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ private ChildWorkflowCancellationHandler(
6262

6363
@Override
6464
public void accept(Exception cause) {
65+
if (!scheduledExternalWorkflows.containsKey(workflowId)) {
66+
// Cancellation handlers are not deregistered. So they fire after a child completion.
67+
return;
68+
}
6569
RequestCancelExternalWorkflowExecutionDecisionAttributes cancelAttributes =
6670
new RequestCancelExternalWorkflowExecutionDecisionAttributes();
6771
cancelAttributes.setWorkflowId(workflowId);
@@ -175,6 +179,10 @@ Consumer<Exception> signalWorkflowExecution(
175179
final String finalSignalId = new String(attributes.getControl(), StandardCharsets.UTF_8);
176180
scheduledSignals.put(finalSignalId, context);
177181
return (e) -> {
182+
if (!scheduledSignals.containsKey(finalSignalId)) {
183+
// Cancellation handlers are not deregistered. So they fire after a signal completion.
184+
return;
185+
}
178186
decisions.cancelSignalExternalWorkflowExecution(finalSignalId, null);
179187
OpenRequestInfo<Void, Void> scheduled = scheduledSignals.remove(finalSignalId);
180188
if (scheduled == null) {

src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ private Promise<byte[]> executeActivityOnce(String name, ActivityOptions options
121121
() -> result.completeExceptionally(mapActivityException(failure)));
122122
} else {
123123
runner.executeInWorkflowThread(
124-
"activity failure callback", () -> result.complete(output));
124+
"activity completion callback", () -> result.complete(output));
125125
}
126126
});
127127
CancellationScope.current()

src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java

Lines changed: 138 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,14 @@
6262
import com.uber.cadence.TerminateWorkflowExecutionRequest;
6363
import com.uber.cadence.UpdateDomainRequest;
6464
import com.uber.cadence.UpdateDomainResponse;
65+
import com.uber.cadence.WorkflowExecution;
6566
import com.uber.cadence.WorkflowExecutionAlreadyStartedError;
67+
import com.uber.cadence.client.ActivityCompletionClient;
68+
import com.uber.cadence.client.UntypedWorkflowStub;
6669
import com.uber.cadence.client.WorkflowClient;
70+
import com.uber.cadence.client.WorkflowClientInterceptor;
6771
import com.uber.cadence.client.WorkflowClientOptions;
68-
import com.uber.cadence.client.WorkflowClientOptions.Builder;
72+
import com.uber.cadence.client.WorkflowOptions;
6973
import com.uber.cadence.internal.testservice.TestWorkflowService;
7074
import com.uber.cadence.serviceclient.IWorkflowService;
7175
import com.uber.cadence.testing.TestEnvironmentOptions;
@@ -76,6 +80,10 @@
7680
import java.util.ArrayList;
7781
import java.util.Collections;
7882
import java.util.List;
83+
import java.util.Optional;
84+
import java.util.concurrent.CompletableFuture;
85+
import java.util.concurrent.TimeUnit;
86+
import java.util.concurrent.TimeoutException;
7987
import org.apache.thrift.TException;
8088
import org.apache.thrift.async.AsyncMethodCallback;
8189
import org.slf4j.Logger;
@@ -96,6 +104,7 @@ class TestWorkflowEnvironmentInternal implements TestWorkflowEnvironment {
96104
this.testEnvironmentOptions = options;
97105
}
98106
service = new WorkflowServiceWrapper();
107+
service.lockTimeSkipping();
99108
}
100109

101110
@Override
@@ -113,7 +122,10 @@ public Worker newWorker(String taskList) {
113122
@Override
114123
public WorkflowClient newWorkflowClient() {
115124
WorkflowClientOptions options =
116-
new Builder().setDataConverter(testEnvironmentOptions.getDataConverter()).build();
125+
new WorkflowClientOptions.Builder()
126+
.setDataConverter(testEnvironmentOptions.getDataConverter())
127+
.setInterceptors(new TimeLockingInterceptor(service))
128+
.build();
117129
return WorkflowClientInternal.newInstance(service, testEnvironmentOptions.getDomain(), options);
118130
}
119131

@@ -127,6 +139,11 @@ public long currentTimeMillis() {
127139
return service.currentTimeMillis();
128140
}
129141

142+
@Override
143+
public void sleep(Duration duration) {
144+
service.sleep(duration);
145+
}
146+
130147
@Override
131148
public void registerDelayedCallback(Duration delay, Runnable r) {
132149
service.registerDelayedCallback(delay, r);
@@ -523,5 +540,124 @@ public void close() {
523540
public void registerDelayedCallback(Duration delay, Runnable r) {
524541
impl.registerDelayedCallback(delay, r);
525542
}
543+
544+
public void lockTimeSkipping() {
545+
impl.lockTimeSkipping();
546+
}
547+
548+
public void unlockTimeSkipping() {
549+
impl.unlockTimeSkipping();
550+
}
551+
552+
public void sleep(Duration duration) {
553+
impl.sleep(duration);
554+
}
555+
}
556+
557+
private static class TimeLockingInterceptor implements WorkflowClientInterceptor {
558+
559+
private final WorkflowServiceWrapper service;
560+
561+
TimeLockingInterceptor(WorkflowServiceWrapper service) {
562+
this.service = service;
563+
}
564+
565+
@Override
566+
public UntypedWorkflowStub newUntypedWorkflowStub(
567+
String workflowType, WorkflowOptions options, UntypedWorkflowStub next) {
568+
return new TimeLockingWorkflowStub(service, next);
569+
}
570+
571+
@Override
572+
public UntypedWorkflowStub newUntypedWorkflowStub(
573+
WorkflowExecution execution, Optional<String> workflowType, UntypedWorkflowStub next) {
574+
return new TimeLockingWorkflowStub(service, next);
575+
}
576+
577+
@Override
578+
public ActivityCompletionClient newActivityCompletionClient(ActivityCompletionClient next) {
579+
return next;
580+
}
581+
582+
private class TimeLockingWorkflowStub implements UntypedWorkflowStub {
583+
584+
private final WorkflowServiceWrapper service;
585+
private final UntypedWorkflowStub next;
586+
587+
TimeLockingWorkflowStub(WorkflowServiceWrapper service, UntypedWorkflowStub next) {
588+
this.service = service;
589+
this.next = next;
590+
}
591+
592+
@Override
593+
public void signal(String signalName, Object... args) {
594+
next.signal(signalName, args);
595+
}
596+
597+
@Override
598+
public WorkflowExecution start(Object... args) {
599+
return next.start(args);
600+
}
601+
602+
@Override
603+
public Optional<String> getWorkflowType() {
604+
return next.getWorkflowType();
605+
}
606+
607+
@Override
608+
public WorkflowExecution getExecution() {
609+
return next.getExecution();
610+
}
611+
612+
@Override
613+
public <R> R getResult(Class<R> returnType) {
614+
service.unlockTimeSkipping();
615+
try {
616+
return next.getResult(returnType);
617+
} finally {
618+
service.lockTimeSkipping();
619+
}
620+
}
621+
622+
@Override
623+
public <R> CompletableFuture<R> getResultAsync(Class<R> returnType) {
624+
service.unlockTimeSkipping();
625+
return next.getResultAsync(returnType).whenComplete((r, e) -> service.lockTimeSkipping());
626+
}
627+
628+
@Override
629+
public <R> R getResult(long timeout, TimeUnit unit, Class<R> returnType)
630+
throws TimeoutException {
631+
service.unlockTimeSkipping();
632+
try {
633+
return next.getResult(timeout, unit, returnType);
634+
} finally {
635+
service.lockTimeSkipping();
636+
}
637+
}
638+
639+
@Override
640+
public <R> CompletableFuture<R> getResultAsync(
641+
long timeout, TimeUnit unit, Class<R> returnType) {
642+
service.unlockTimeSkipping();
643+
return next.getResultAsync(timeout, unit, returnType)
644+
.whenComplete((r, e) -> service.lockTimeSkipping());
645+
}
646+
647+
@Override
648+
public <R> R query(String queryType, Class<R> returnType, Object... args) {
649+
return next.query(queryType, returnType, args);
650+
}
651+
652+
@Override
653+
public void cancel() {
654+
next.cancel();
655+
}
656+
657+
@Override
658+
public Optional<WorkflowOptions> getOptions() {
659+
return next.getOptions();
660+
}
661+
}
526662
}
527663
}

src/main/java/com/uber/cadence/internal/testservice/RequestContext.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import java.util.List;
2929
import java.util.Objects;
3030
import java.util.function.LongSupplier;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
3133

3234
final class RequestContext {
3335

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -974,6 +974,7 @@ public RecordActivityTaskHeartbeatResponse heartbeatActivityTask(
974974
}
975975

976976
private void timeoutActivity(String activityId, TimeoutType timeoutType) {
977+
boolean unlockTimer = true;
977978
try {
978979
update(
979980
ctx -> {
@@ -996,9 +997,14 @@ private void timeoutActivity(String activityId, TimeoutType timeoutType) {
996997
});
997998
} catch (EntityNotExistsError e) {
998999
// Expected as timers are not removed
1000+
unlockTimer = false;
9991001
} catch (Exception e) {
10001002
// Cannot fail to timer threads
10011003
log.error("Failure trying to timeout an activity", e);
1004+
} finally {
1005+
if (unlockTimer) {
1006+
selfAdvancingTimer.unlockTimeSkipping();
1007+
}
10021008
}
10031009
}
10041010

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@
7373
import java.util.HashMap;
7474
import java.util.Map;
7575
import java.util.Optional;
76+
import java.util.concurrent.CompletableFuture;
77+
import java.util.concurrent.ExecutionException;
7678
import java.util.concurrent.ForkJoinPool;
7779
import java.util.concurrent.locks.Lock;
7880
import java.util.concurrent.locks.ReentrantLock;
@@ -681,4 +683,33 @@ public long currentTimeMillis() {
681683
public void registerDelayedCallback(Duration delay, Runnable r) {
682684
store.registerDelayedCallback(delay, r);
683685
}
686+
687+
public void lockTimeSkipping() {
688+
store.getTimer().lockTimeSkipping();
689+
}
690+
691+
public void unlockTimeSkipping() {
692+
store.getTimer().unlockTimeSkipping();
693+
}
694+
695+
public void sleep(Duration duration) {
696+
long start = store.currentTimeMillis();
697+
CompletableFuture<Void> result = new CompletableFuture<>();
698+
store
699+
.getTimer()
700+
.schedule(
701+
duration,
702+
() -> {
703+
store.getTimer().lockTimeSkipping();
704+
result.complete(null);
705+
});
706+
store.getTimer().unlockTimeSkipping();
707+
try {
708+
result.get();
709+
} catch (InterruptedException e) {
710+
throw new RuntimeException(e);
711+
} catch (ExecutionException e) {
712+
throw new RuntimeException(e);
713+
}
714+
}
684715
}

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowStore.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ public PollForActivityTaskResponse getTask() {
125125

126126
SelfAdvancingTimer getTimer();
127127

128+
long currentTimeMillis();
129+
128130
long save(RequestContext requestContext) throws InternalServiceError, EntityNotExistsError;
129131

130132
void registerDelayedCallback(Duration delay, Runnable r);

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowStoreImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,11 @@ public SelfAdvancingTimer getTimer() {
150150
return timerService;
151151
}
152152

153+
@Override
154+
public long currentTimeMillis() {
155+
return timerService.getClock().getAsLong();
156+
}
157+
153158
@Override
154159
public long save(RequestContext ctx) throws InternalServiceError, EntityNotExistsError {
155160
long result;

0 commit comments

Comments
 (0)