Skip to content

Commit 418ea38

Browse files
authored
TestWorkflowEnvironment implemented (#132)
* Fixed TestWorkflowEnvironment time skipping * Added Javadoc to TestWorkflowEnvironment
1 parent 6dce3aa commit 418ea38

18 files changed

+470
-321
lines changed

src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,14 +87,14 @@
8787
import org.apache.thrift.TException;
8888
import org.apache.thrift.async.AsyncMethodCallback;
8989

90-
class TestActivityEnvironmentInternal implements TestActivityEnvironment {
90+
public final class TestActivityEnvironmentInternal implements TestActivityEnvironment {
9191

9292
private final POJOActivityTaskHandler activityTaskHandler;
9393
private final TestEnvironmentOptions testEnvironmentOptions;
9494
private final AtomicInteger idSequencer = new AtomicInteger();
9595
private ClassConsumerPair<Object> activityHeartbetListener;
9696

97-
TestActivityEnvironmentInternal(TestEnvironmentOptions options) {
97+
public TestActivityEnvironmentInternal(TestEnvironmentOptions options) {
9898
if (options == null) {
9999
this.testEnvironmentOptions = new TestEnvironmentOptions.Builder().build();
100100
} else {

src/main/java/com/uber/cadence/internal/sync/TestEnvironmentInternal.java

Lines changed: 0 additions & 66 deletions
This file was deleted.

src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -82,22 +82,23 @@
8282
import java.util.List;
8383
import java.util.Optional;
8484
import java.util.concurrent.CompletableFuture;
85+
import java.util.concurrent.ExecutionException;
8586
import java.util.concurrent.TimeUnit;
8687
import java.util.concurrent.TimeoutException;
8788
import org.apache.thrift.TException;
8889
import org.apache.thrift.async.AsyncMethodCallback;
8990
import org.slf4j.Logger;
9091
import org.slf4j.LoggerFactory;
9192

92-
class TestWorkflowEnvironmentInternal implements TestWorkflowEnvironment {
93+
public final class TestWorkflowEnvironmentInternal implements TestWorkflowEnvironment {
9394

9495
private static final Logger log = LoggerFactory.getLogger(TestWorkflowEnvironmentInternal.class);
9596

9697
private final TestEnvironmentOptions testEnvironmentOptions;
9798
private final WorkflowServiceWrapper service;
9899
private final List<Worker> workers = Collections.synchronizedList(new ArrayList<>());
99100

100-
TestWorkflowEnvironmentInternal(TestEnvironmentOptions options) {
101+
public TestWorkflowEnvironmentInternal(TestEnvironmentOptions options) {
101102
if (options == null) {
102103
this.testEnvironmentOptions = new TestEnvironmentOptions.Builder().build();
103104
} else {
@@ -149,19 +150,22 @@ public void registerDelayedCallback(Duration delay, Runnable r) {
149150
service.registerDelayedCallback(delay, r);
150151
}
151152

153+
@Override
152154
public IWorkflowService getWorkflowService() {
153155
return service;
154156
}
155157

158+
@Override
156159
public String getDiagnostics() {
157160
StringBuilder result = new StringBuilder();
158161
service.getDiagnostics(result);
159162
return result.toString();
160163
}
161164

162-
public void shutdown() {
165+
@Override
166+
public void close() {
163167
for (Worker w : workers) {
164-
w.shutdown(Duration.ofMillis(1));
168+
w.shutdown(Duration.ofMillis(10));
165169
}
166170
service.close();
167171
}
@@ -621,8 +625,7 @@ public <R> R getResult(Class<R> returnType) {
621625

622626
@Override
623627
public <R> CompletableFuture<R> getResultAsync(Class<R> returnType) {
624-
service.unlockTimeSkipping();
625-
return next.getResultAsync(returnType).whenComplete((r, e) -> service.lockTimeSkipping());
628+
return new TimeLockingFuture<>(next.getResultAsync(returnType));
626629
}
627630

628631
@Override
@@ -639,9 +642,7 @@ public <R> R getResult(long timeout, TimeUnit unit, Class<R> returnType)
639642
@Override
640643
public <R> CompletableFuture<R> getResultAsync(
641644
long timeout, TimeUnit unit, Class<R> returnType) {
642-
service.unlockTimeSkipping();
643-
return next.getResultAsync(timeout, unit, returnType)
644-
.whenComplete((r, e) -> service.lockTimeSkipping());
645+
return new TimeLockingFuture<>(next.getResultAsync(timeout, unit, returnType));
645646
}
646647

647648
@Override
@@ -658,6 +659,47 @@ public void cancel() {
658659
public Optional<WorkflowOptions> getOptions() {
659660
return next.getOptions();
660661
}
662+
663+
/** Unlocks time skipping before blocking calls and locks back after completion. */
664+
private class TimeLockingFuture<R> extends CompletableFuture<R> {
665+
666+
public TimeLockingFuture(CompletableFuture<R> resultAsync) {
667+
CompletableFuture<R> ignored =
668+
resultAsync.whenComplete(
669+
(r, e) -> {
670+
service.lockTimeSkipping();
671+
if (e == null) {
672+
this.complete(r);
673+
} else {
674+
this.completeExceptionally(e);
675+
}
676+
});
677+
}
678+
679+
@Override
680+
public R get() throws InterruptedException, ExecutionException {
681+
service.unlockTimeSkipping();
682+
return super.get();
683+
}
684+
685+
@Override
686+
public R get(long timeout, TimeUnit unit)
687+
throws InterruptedException, ExecutionException, TimeoutException {
688+
service.unlockTimeSkipping();
689+
try {
690+
return super.get(timeout, unit);
691+
} catch (TimeoutException | InterruptedException e) {
692+
service.lockTimeSkipping();
693+
throw e;
694+
}
695+
}
696+
697+
@Override
698+
public R join() {
699+
service.unlockTimeSkipping();
700+
return super.join();
701+
}
702+
}
661703
}
662704
}
663705
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.internal.testservice;
19+
20+
import com.google.common.base.Throwables;
21+
import com.uber.cadence.InternalServiceError;
22+
import java.io.ByteArrayInputStream;
23+
import java.io.ByteArrayOutputStream;
24+
import java.io.DataInputStream;
25+
import java.io.DataOutputStream;
26+
import java.io.IOException;
27+
import java.util.Objects;
28+
29+
final class DecisionTaskToken {
30+
31+
private final ExecutionId executionId;
32+
private final int historySize;
33+
34+
DecisionTaskToken(ExecutionId executionId, int historySize) {
35+
this.executionId = Objects.requireNonNull(executionId);
36+
this.historySize = Objects.requireNonNull(historySize);
37+
}
38+
39+
ExecutionId getExecutionId() {
40+
return executionId;
41+
}
42+
43+
int getHistorySize() {
44+
return historySize;
45+
}
46+
47+
/** Used for task tokens. */
48+
byte[] toBytes() throws InternalServiceError {
49+
ByteArrayOutputStream bout = new ByteArrayOutputStream();
50+
DataOutputStream out = new DataOutputStream(bout);
51+
try {
52+
addBytes(out);
53+
} catch (IOException e) {
54+
throw new InternalServiceError(Throwables.getStackTraceAsString(e));
55+
}
56+
return bout.toByteArray();
57+
}
58+
59+
private void addBytes(DataOutputStream out) throws IOException {
60+
executionId.addBytes(out);
61+
out.writeInt(historySize);
62+
}
63+
64+
static DecisionTaskToken fromBytes(byte[] serialized) throws InternalServiceError {
65+
ByteArrayInputStream bin = new ByteArrayInputStream(serialized);
66+
DataInputStream in = new DataInputStream(bin);
67+
try {
68+
ExecutionId executionId = ExecutionId.readFromBytes(in);
69+
int historySize = in.readInt();
70+
return new DecisionTaskToken(executionId, historySize);
71+
} catch (IOException e) {
72+
throw new InternalServiceError(Throwables.getStackTraceAsString(e));
73+
}
74+
}
75+
}

src/main/java/com/uber/cadence/internal/testservice/RequestContext.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ final class RequestContext {
3434
@FunctionalInterface
3535
interface CommitCallback {
3636

37-
void apply() throws InternalServiceError;
37+
void apply(int historySize) throws InternalServiceError;
3838
}
3939

4040
static final class Timer {
@@ -199,9 +199,9 @@ long commitChanges(TestWorkflowStore store) throws InternalServiceError, EntityN
199199
}
200200

201201
/** Called by {@link TestWorkflowStore#save(RequestContext)} */
202-
void fireCallbacks() throws InternalServiceError {
202+
void fireCallbacks(int historySize) throws InternalServiceError {
203203
for (CommitCallback callback : commitCallbacks) {
204-
callback.apply();
204+
callback.apply(historySize);
205205
}
206206
}
207207

src/main/java/com/uber/cadence/internal/testservice/SelfAdvancingTimer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
* Timer service that automatically forwards current time to the next task time when is not locked
2525
* through {@link #lockTimeSkipping()}.
2626
*/
27-
public interface SelfAdvancingTimer {
27+
interface SelfAdvancingTimer {
2828

2929
/**
3030
* Schedule a task with a specified delay. The actual wait time is defined by the internal clock

0 commit comments

Comments
 (0)