Skip to content

Commit 8d1c881

Browse files
authored
refactor timer locks to provide more diagnostic information. (#165)
Also fix couple of bugs in the related logic
1 parent 1defe40 commit 8d1c881

File tree

9 files changed

+229
-76
lines changed

9 files changed

+229
-76
lines changed

src/main/java/com/uber/cadence/client/WorkflowClientOptions.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,17 @@ public final class WorkflowClientOptions {
2929
public static final class Builder {
3030

3131
private DataConverter dataConverter = JsonDataConverter.getInstance();
32-
3332
private WorkflowClientInterceptor[] interceptors = EMPTY_INTERCEPTOR_ARRAY;
34-
3533
private Scope metricsScope;
3634

35+
public Builder() {}
36+
37+
public Builder(WorkflowClientOptions options) {
38+
dataConverter = options.getDataConverter();
39+
interceptors = options.getInterceptors();
40+
metricsScope = options.getMetricsScope();
41+
}
42+
3743
/**
3844
* Used to override default (JSON) data converter implementation.
3945
*

src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public TestWorkflowEnvironmentInternal(TestEnvironmentOptions options) {
105105
this.testEnvironmentOptions = options;
106106
}
107107
service = new WorkflowServiceWrapper();
108-
service.lockTimeSkipping();
108+
service.lockTimeSkipping("TestWorkflowEnvironmentInternal constructor");
109109
}
110110

111111
@Override
@@ -137,7 +137,15 @@ public WorkflowClient newWorkflowClient() {
137137

138138
@Override
139139
public WorkflowClient newWorkflowClient(WorkflowClientOptions options) {
140-
return WorkflowClientInternal.newInstance(service, testEnvironmentOptions.getDomain(), options);
140+
WorkflowClientInterceptor[] existingInterceptors = options.getInterceptors();
141+
WorkflowClientInterceptor[] interceptors =
142+
new WorkflowClientInterceptor[existingInterceptors.length + 1];
143+
System.arraycopy(existingInterceptors, 0, interceptors, 0, existingInterceptors.length);
144+
interceptors[interceptors.length - 1] = new TimeLockingInterceptor(service);
145+
WorkflowClientOptions newOptions =
146+
new WorkflowClientOptions.Builder(options).setInterceptors(interceptors).build();
147+
return WorkflowClientInternal.newInstance(
148+
service, testEnvironmentOptions.getDomain(), newOptions);
141149
}
142150

143151
@Override
@@ -559,12 +567,12 @@ public void registerDelayedCallback(Duration delay, Runnable r) {
559567
impl.registerDelayedCallback(delay, r);
560568
}
561569

562-
public void lockTimeSkipping() {
563-
impl.lockTimeSkipping();
570+
public void lockTimeSkipping(String caller) {
571+
impl.lockTimeSkipping(caller);
564572
}
565573

566-
public void unlockTimeSkipping() {
567-
impl.unlockTimeSkipping();
574+
public void unlockTimeSkipping(String caller) {
575+
impl.unlockTimeSkipping(caller);
568576
}
569577

570578
public void sleep(Duration duration) {
@@ -629,11 +637,11 @@ public WorkflowExecution getExecution() {
629637

630638
@Override
631639
public <R> R getResult(Class<R> returnType) {
632-
service.unlockTimeSkipping();
640+
service.unlockTimeSkipping("TimeLockingWorkflowStub getResult");
633641
try {
634642
return next.getResult(returnType);
635643
} finally {
636-
service.lockTimeSkipping();
644+
service.lockTimeSkipping("TimeLockingWorkflowStub getResult");
637645
}
638646
}
639647

@@ -645,11 +653,11 @@ public <R> CompletableFuture<R> getResultAsync(Class<R> returnType) {
645653
@Override
646654
public <R> R getResult(long timeout, TimeUnit unit, Class<R> returnType)
647655
throws TimeoutException {
648-
service.unlockTimeSkipping();
656+
service.unlockTimeSkipping("TimeLockingWorkflowStub getResult");
649657
try {
650658
return next.getResult(timeout, unit, returnType);
651659
} finally {
652-
service.lockTimeSkipping();
660+
service.lockTimeSkipping("TimeLockingWorkflowStub getResult");
653661
}
654662
}
655663

@@ -681,7 +689,8 @@ public TimeLockingFuture(CompletableFuture<R> resultAsync) {
681689
CompletableFuture<R> ignored =
682690
resultAsync.whenComplete(
683691
(r, e) -> {
684-
service.lockTimeSkipping();
692+
service.lockTimeSkipping(
693+
"TimeLockingWorkflowStub TimeLockingFuture constructor");
685694
if (e == null) {
686695
this.complete(r);
687696
} else {
@@ -692,25 +701,28 @@ public TimeLockingFuture(CompletableFuture<R> resultAsync) {
692701

693702
@Override
694703
public R get() throws InterruptedException, ExecutionException {
695-
service.unlockTimeSkipping();
696-
return super.get();
704+
service.unlockTimeSkipping("TimeLockingWorkflowStub TimeLockingFuture get");
705+
try {
706+
return super.get();
707+
} finally {
708+
service.lockTimeSkipping("TimeLockingWorkflowStub TimeLockingFuture get");
709+
}
697710
}
698711

699712
@Override
700713
public R get(long timeout, TimeUnit unit)
701714
throws InterruptedException, ExecutionException, TimeoutException {
702-
service.unlockTimeSkipping();
715+
service.unlockTimeSkipping("TimeLockingWorkflowStub TimeLockingFuture get");
703716
try {
704717
return super.get(timeout, unit);
705-
} catch (TimeoutException | InterruptedException e) {
706-
service.lockTimeSkipping();
707-
throw e;
718+
} finally {
719+
service.lockTimeSkipping("TimeLockingWorkflowStub TimeLockingFuture get");
708720
}
709721
}
710722

711723
@Override
712724
public R join() {
713-
service.unlockTimeSkipping();
725+
service.unlockTimeSkipping("TimeLockingWorkflowStub TimeLockingFuture join");
714726
return super.join();
715727
}
716728
}

src/main/java/com/uber/cadence/internal/testservice/RequestContext.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,12 @@ static final class Timer {
4141

4242
private final long delaySeconds;
4343
private final Runnable callback;
44+
private final String taskInfo;
4445

45-
Timer(long delaySeconds, Runnable callback) {
46+
Timer(long delaySeconds, Runnable callback, String taskInfo) {
4647
this.delaySeconds = delaySeconds;
4748
this.callback = callback;
49+
this.taskInfo = taskInfo;
4850
}
4951

5052
long getDelaySeconds() {
@@ -54,6 +56,10 @@ long getDelaySeconds() {
5456
Runnable getCallback() {
5557
return callback;
5658
}
59+
60+
String getTaskInfo() {
61+
return taskInfo;
62+
}
5763
}
5864

5965
private static final long NANOS_PER_MILLIS = 1_000_000;
@@ -173,8 +179,8 @@ void addActivityTask(ActivityTask activityTask) {
173179
this.activityTasks.add(activityTask);
174180
}
175181

176-
void addTimer(long delaySeconds, Runnable callback) {
177-
Timer timer = new Timer(delaySeconds, callback);
182+
void addTimer(long delaySeconds, Runnable callback, String name) {
183+
Timer timer = new Timer(delaySeconds, callback, name);
178184
this.timers.add(timer);
179185
}
180186

src/main/java/com/uber/cadence/internal/testservice/SelfAdvancingTimer.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
/**
2424
* Timer service that automatically forwards current time to the next task time when is not locked
25-
* through {@link #lockTimeSkipping()}.
25+
* through {@link #lockTimeSkipping(String)}.
2626
*/
2727
interface SelfAdvancingTimer {
2828

@@ -32,24 +32,32 @@ interface SelfAdvancingTimer {
3232
*/
3333
void schedule(Duration delay, Runnable task);
3434

35+
void schedule(Duration delay, Runnable task, String taskInfo);
36+
3537
/** Supplier that returns current time of the timer when called. */
3638
LongSupplier getClock();
3739

3840
/**
39-
* Prohibit automatic time skipping until {@link #unlockTimeSkipping()} is called. Locks and
41+
* Prohibit automatic time skipping until {@link #unlockTimeSkipping(String)} is called. Locks and
4042
* unlocks are counted.
4143
*/
42-
void lockTimeSkipping();
44+
LockHandle lockTimeSkipping(String caller);
4345

44-
void unlockTimeSkipping();
46+
void unlockTimeSkipping(String caller);
4547

4648
/**
4749
* Update lock count. The same as calling lockTimeSkipping count number of times for positive
4850
* count and unlockTimeSkipping for negative count.
4951
*
5052
* @param count
5153
*/
52-
void updateLocks(int count);
54+
void updateLocks(int count, String caller);
55+
56+
void getDiagnostics(StringBuilder result);
5357

5458
void shutdown();
5559
}
60+
61+
interface LockHandle {
62+
void unlock();
63+
}

0 commit comments

Comments
 (0)