Skip to content

Commit 1d6385c

Browse files
authored
Added List[Open|Closed]WorkflowExecutions to the TestWorkflowService (#145)
* Eliminate timer failure message on closed workflows * Added TestWorkflowEnvironment.getDomain * Implemented TestWorkflowService.listWorkflows * Workflow.retry for procedure * Fixed TestWorkflowEnvironemnt.close
1 parent 5669b01 commit 1d6385c

File tree

11 files changed

+199
-33
lines changed

11 files changed

+199
-33
lines changed

src/main/java/com/uber/cadence/internal/common/WorkflowExecutionUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,7 @@ public static WorkflowExecutionCloseStatus waitForWorkflowInstanceCompletion(
452452
return getCloseStatus(closeEvent);
453453
}
454454

455-
private static WorkflowExecutionCloseStatus getCloseStatus(HistoryEvent event) {
455+
public static WorkflowExecutionCloseStatus getCloseStatus(HistoryEvent event) {
456456
switch (event.getEventType()) {
457457
case WorkflowExecutionCanceled:
458458
return WorkflowExecutionCloseStatus.CANCELED;
@@ -485,7 +485,7 @@ public static WorkflowExecutionCloseStatus waitForWorkflowInstanceCompletionAcro
485485
WorkflowExecution workflowExecution,
486486
long timeout,
487487
TimeUnit unit)
488-
throws InterruptedException, TimeoutException, EntityNotExistsError {
488+
throws TimeoutException, EntityNotExistsError {
489489

490490
WorkflowExecution lastExecutionToRun = workflowExecution;
491491
long millisecondsAtFirstWait = System.currentTimeMillis();

src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,11 @@ public IWorkflowService getWorkflowService() {
157157
return service;
158158
}
159159

160+
@Override
161+
public String getDomain() {
162+
return testEnvironmentOptions.getDomain();
163+
}
164+
160165
@Override
161166
public String getDiagnostics() {
162167
StringBuilder result = new StringBuilder();
@@ -167,7 +172,11 @@ public String getDiagnostics() {
167172
@Override
168173
public void close() {
169174
for (Worker w : workers) {
170-
w.shutdown(Duration.ofMillis(10));
175+
if (w.isStarted()) {
176+
w.shutdown(Duration.ofMillis(10));
177+
} else {
178+
log.warn("Worker was created, but never started for taskList: " + w.getTaskList());
179+
}
171180
}
172181
service.close();
173182
}

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -265,17 +265,6 @@ public void completeDecisionTask(int historySize, RespondDecisionTaskCompletedRe
265265
this.concurrentToDecision.clear();
266266
ctx.unlockTimer();
267267
});
268-
lock.lock();
269-
try {
270-
{
271-
if (decision != null && decision.getState() != StateMachines.State.INITIATED) {
272-
throw new InternalServiceError(
273-
"non null decision after the completion: " + decision.getState());
274-
}
275-
}
276-
} finally {
277-
lock.unlock();
278-
}
279268
}
280269

281270
private boolean hasCompleteDecision(List<Decision> decisions) {
@@ -650,9 +639,17 @@ private void processStartTimer(
650639
}
651640

652641
private void fireTimer(String timerId) {
653-
StateMachine<TimerData> timer = timers.get(timerId);
654-
if (timer == null) {
655-
return; // cancelled already
642+
StateMachine<TimerData> timer;
643+
lock.lock();
644+
try {
645+
{
646+
timer = timers.get(timerId);
647+
if (timer == null || workflow.getState() != State.STARTED) {
648+
return; // cancelled already
649+
}
650+
}
651+
} finally {
652+
lock.unlock();
656653
}
657654
try {
658655
update(

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,14 @@
6767
import com.uber.cadence.WorkflowExecution;
6868
import com.uber.cadence.WorkflowExecutionAlreadyStartedError;
6969
import com.uber.cadence.WorkflowExecutionContinuedAsNewEventAttributes;
70+
import com.uber.cadence.WorkflowExecutionFilter;
71+
import com.uber.cadence.WorkflowExecutionInfo;
7072
import com.uber.cadence.internal.testservice.TestWorkflowMutableStateImpl.QueryId;
73+
import com.uber.cadence.internal.testservice.TestWorkflowStore.WorkflowState;
7174
import com.uber.cadence.serviceclient.IWorkflowService;
7275
import java.time.Duration;
7376
import java.util.HashMap;
77+
import java.util.List;
7478
import java.util.Map;
7579
import java.util.Optional;
7680
import java.util.concurrent.CompletableFuture;
@@ -441,15 +445,36 @@ public ListOpenWorkflowExecutionsResponse ListOpenWorkflowExecutions(
441445
ListOpenWorkflowExecutionsRequest listRequest)
442446
throws BadRequestError, InternalServiceError, EntityNotExistsError, ServiceBusyError,
443447
TException {
444-
throw new UnsupportedOperationException("not implemented");
448+
Optional<String> workflowIdFilter;
449+
WorkflowExecutionFilter executionFilter = listRequest.getExecutionFilter();
450+
if (executionFilter != null
451+
&& executionFilter.isSetWorkflowId()
452+
&& !executionFilter.getWorkflowId().isEmpty()) {
453+
workflowIdFilter = Optional.of(executionFilter.getWorkflowId());
454+
} else {
455+
workflowIdFilter = Optional.empty();
456+
}
457+
List<WorkflowExecutionInfo> result = store.listWorkflows(WorkflowState.OPEN, workflowIdFilter);
458+
return new ListOpenWorkflowExecutionsResponse().setExecutions(result);
445459
}
446460

447461
@Override
448462
public ListClosedWorkflowExecutionsResponse ListClosedWorkflowExecutions(
449463
ListClosedWorkflowExecutionsRequest listRequest)
450464
throws BadRequestError, InternalServiceError, EntityNotExistsError, ServiceBusyError,
451465
TException {
452-
throw new UnsupportedOperationException("not implemented");
466+
Optional<String> workflowIdFilter;
467+
WorkflowExecutionFilter executionFilter = listRequest.getExecutionFilter();
468+
if (executionFilter != null
469+
&& executionFilter.isSetWorkflowId()
470+
&& !executionFilter.getWorkflowId().isEmpty()) {
471+
workflowIdFilter = Optional.of(executionFilter.getWorkflowId());
472+
} else {
473+
workflowIdFilter = Optional.empty();
474+
}
475+
List<WorkflowExecutionInfo> result =
476+
store.listWorkflows(WorkflowState.CLOSED, workflowIdFilter);
477+
return new ListClosedWorkflowExecutionsResponse().setExecutions(result);
453478
}
454479

455480
@Override

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowStore.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,19 @@
2525
import com.uber.cadence.PollForActivityTaskResponse;
2626
import com.uber.cadence.PollForDecisionTaskRequest;
2727
import com.uber.cadence.PollForDecisionTaskResponse;
28+
import com.uber.cadence.WorkflowExecutionInfo;
2829
import java.time.Duration;
30+
import java.util.List;
2931
import java.util.Objects;
32+
import java.util.Optional;
3033

3134
interface TestWorkflowStore {
3235

36+
enum WorkflowState {
37+
OPEN,
38+
CLOSED
39+
}
40+
3341
class TaskListId {
3442

3543
private final String domain;
@@ -147,5 +155,7 @@ GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory(
147155

148156
void getDiagnostics(StringBuilder result);
149157

158+
List<WorkflowExecutionInfo> listWorkflows(WorkflowState state, Optional<String> workflowId);
159+
150160
void close();
151161
}

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowStoreImpl.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.uber.cadence.PollForDecisionTaskRequest;
3131
import com.uber.cadence.PollForDecisionTaskResponse;
3232
import com.uber.cadence.WorkflowExecution;
33+
import com.uber.cadence.WorkflowExecutionInfo;
3334
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
3435
import com.uber.cadence.internal.testservice.RequestContext.Timer;
3536
import java.time.Duration;
@@ -38,6 +39,7 @@
3839
import java.util.List;
3940
import java.util.Map;
4041
import java.util.Map.Entry;
42+
import java.util.Optional;
4143
import java.util.concurrent.BlockingQueue;
4244
import java.util.concurrent.LinkedBlockingQueue;
4345
import java.util.concurrent.locks.Condition;
@@ -64,6 +66,14 @@ private HistoryStore(ExecutionId id, Lock lock) {
6466
this.newEventsCondition = lock.newCondition();
6567
}
6668

69+
public boolean isCompleted() {
70+
return completed;
71+
}
72+
73+
public List<HistoryEvent> getHistory() {
74+
return history;
75+
}
76+
6777
private void checkNextEventId(long nextEventId) {
6878
if (nextEventId != history.size()) {
6979
throw new IllegalStateException(
@@ -346,6 +356,54 @@ public void getDiagnostics(StringBuilder result) {
346356
}
347357
}
348358

359+
@Override
360+
public List<WorkflowExecutionInfo> listWorkflows(
361+
WorkflowState state, Optional<String> filterWorkflowId) {
362+
List<WorkflowExecutionInfo> result = new ArrayList<>();
363+
for (Entry<ExecutionId, HistoryStore> entry : this.histories.entrySet()) {
364+
if (state == WorkflowState.OPEN) {
365+
if (entry.getValue().isCompleted()) {
366+
continue;
367+
}
368+
ExecutionId executionId = entry.getKey();
369+
String workflowId = executionId.getWorkflowId().getWorkflowId();
370+
if (filterWorkflowId.isPresent() && !workflowId.equals(filterWorkflowId.get())) {
371+
continue;
372+
}
373+
List<HistoryEvent> history = entry.getValue().getHistory();
374+
WorkflowExecutionInfo info =
375+
new WorkflowExecutionInfo()
376+
.setExecution(executionId.getExecution())
377+
.setHistoryLength(history.size())
378+
.setStartTime(history.get(0).getTimestamp())
379+
.setType(
380+
history.get(0).getWorkflowExecutionStartedEventAttributes().getWorkflowType());
381+
result.add(info);
382+
} else {
383+
if (!entry.getValue().isCompleted()) {
384+
continue;
385+
}
386+
ExecutionId executionId = entry.getKey();
387+
String workflowId = executionId.getWorkflowId().getWorkflowId();
388+
if (filterWorkflowId.isPresent() && !workflowId.equals(filterWorkflowId.get())) {
389+
continue;
390+
}
391+
List<HistoryEvent> history = entry.getValue().getHistory();
392+
WorkflowExecutionInfo info =
393+
new WorkflowExecutionInfo()
394+
.setExecution(executionId.getExecution())
395+
.setHistoryLength(history.size())
396+
.setStartTime(history.get(0).getTimestamp())
397+
.setType(
398+
history.get(0).getWorkflowExecutionStartedEventAttributes().getWorkflowType())
399+
.setCloseStatus(
400+
WorkflowExecutionUtils.getCloseStatus(history.get(history.size() - 1)));
401+
result.add(info);
402+
}
403+
}
404+
return result;
405+
}
406+
349407
@Override
350408
public void close() {
351409
timerService.shutdown();

src/main/java/com/uber/cadence/testing/TestWorkflowEnvironment.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,8 @@ static TestWorkflowEnvironment newInstance(TestEnvironmentOptions options) {
134134
/** Returns the in-memory test Cadence service that is owned by this. */
135135
IWorkflowService getWorkflowService();
136136

137+
String getDomain();
138+
137139
/**
138140
* Returns diagnostic data about the internal service state to the provided {@link StringBuilder}.
139141
* Currently prints histories of all workflow instances stored in the service. This is useful to

src/main/java/com/uber/cadence/worker/Worker.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,15 @@
2020
import com.google.common.annotations.VisibleForTesting;
2121
import com.uber.cadence.WorkflowExecution;
2222
import com.uber.cadence.client.WorkflowClient;
23+
import com.uber.cadence.converter.DataConverter;
2324
import com.uber.cadence.internal.sync.SyncActivityWorker;
2425
import com.uber.cadence.internal.sync.SyncWorkflowWorker;
2526
import com.uber.cadence.internal.worker.SingleWorkerOptions;
2627
import com.uber.cadence.serviceclient.IWorkflowService;
2728
import com.uber.cadence.serviceclient.WorkflowServiceTChannel;
29+
import com.uber.cadence.worker.WorkerOptions.Builder;
2830
import com.uber.cadence.workflow.Functions.Func;
31+
import com.uber.cadence.workflow.WorkflowMethod;
2932
import java.time.Duration;
3033
import java.util.Objects;
3134
import java.util.concurrent.TimeUnit;
@@ -37,10 +40,11 @@
3740
*/
3841
public final class Worker {
3942

40-
private final AtomicBoolean started = new AtomicBoolean();
4143
private final WorkerOptions options;
44+
private final String taskList;
4245
private final SyncWorkflowWorker workflowWorker;
4346
private final SyncActivityWorker activityWorker;
47+
private final AtomicBoolean started = new AtomicBoolean();
4448

4549
/**
4650
* Creates worker that connects to the local instance of the Cadence Service that listens on a
@@ -61,8 +65,7 @@ public Worker(String domain, String taskList) {
6165
* @param domain domain that worker uses to poll.
6266
* @param taskList task list name worker uses to poll. It uses this name for both decision and
6367
* activity task list polls.
64-
* @param options Options (like {@link com.uber.cadence.converter.DataConverter} override) for
65-
* configuring worker.
68+
* @param options Options (like {@link DataConverter} override) for configuring worker.
6669
*/
6770
public Worker(String domain, String taskList, WorkerOptions options) {
6871
this(new WorkflowServiceTChannel(), domain, taskList, options);
@@ -89,8 +92,7 @@ public Worker(String host, int port, String domain, String taskList) {
8992
* @param domain domain that worker uses to poll.
9093
* @param taskList task list name worker uses to poll. It uses this name for both decision and
9194
* activity task list polls.
92-
* @param options Options (like {@link com.uber.cadence.converter.DataConverter} override) for
93-
* configuring worker.
95+
* @param options Options (like {@link DataConverter} override) for configuring worker.
9496
*/
9597
public Worker(String host, int port, String domain, String taskList, WorkerOptions options) {
9698
this(new WorkflowServiceTChannel(host, port), domain, taskList, options);
@@ -103,15 +105,14 @@ public Worker(String host, int port, String domain, String taskList, WorkerOptio
103105
* @param domain domain that worker uses to poll.
104106
* @param taskList task list name worker uses to poll. It uses this name for both decision and
105107
* activity task list polls.
106-
* @param options Options (like {@link com.uber.cadence.converter.DataConverter} override) for
107-
* configuring worker.
108+
* @param options Options (like {@link DataConverter} override) for configuring worker.
108109
*/
109110
public Worker(IWorkflowService service, String domain, String taskList, WorkerOptions options) {
110111
Objects.requireNonNull(service, "service");
111112
Objects.requireNonNull(domain, "domain");
112-
Objects.requireNonNull(taskList, "taskList");
113+
this.taskList = Objects.requireNonNull(taskList, "taskList");
113114
if (options == null) {
114-
options = new WorkerOptions.Builder().build();
115+
options = new Builder().build();
115116
}
116117
this.options = options;
117118
SingleWorkerOptions activityOptions = toActivityOptions(options);
@@ -160,8 +161,7 @@ private SingleWorkerOptions toWorkflowOptions(WorkerOptions options) {
160161
/**
161162
* Register workflow implementation classes with a worker. Overwrites previously registered types.
162163
* A workflow implementation class must implement at least one interface with a method annotated
163-
* with {@link com.uber.cadence.workflow.WorkflowMethod}. That method becomes a workflow type that
164-
* this worker supports.
164+
* with {@link WorkflowMethod}. That method becomes a workflow type that this worker supports.
165165
*
166166
* <p>Implementations that share a worker must implement different interfaces as a workflow type
167167
* is identified by the workflow interface, not by the implementation.
@@ -239,6 +239,10 @@ public void start() {
239239
}
240240
}
241241

242+
public boolean isStarted() {
243+
return started.get();
244+
}
245+
242246
/**
243247
* Shutdown a worker, waiting for activities to complete execution up to the specified timeout.
244248
*/
@@ -287,4 +291,8 @@ public <R> R queryWorkflowExecution(
287291
}
288292
return workflowWorker.queryWorkflowExecution(execution, queryType, returnType, args);
289293
}
294+
295+
public String getTaskList() {
296+
return taskList;
297+
}
290298
}

src/main/java/com/uber/cadence/workflow/Workflow.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,22 @@ public static <R> R retry(RetryOptions options, Functions.Func<R> fn) {
307307
return WorkflowInternal.retry(options, fn);
308308
}
309309

310+
/**
311+
* Invokes function retrying in case of failures according to retry options. Synchronous variant.
312+
* Use {@link Async#retry(RetryOptions, Functions.Func)} for asynchronous functions.
313+
*
314+
* @param options retry options that specify retry policy
315+
* @param proc procedure to invoke and retry
316+
*/
317+
public static void retry(RetryOptions options, Functions.Proc proc) {
318+
WorkflowInternal.retry(
319+
options,
320+
() -> {
321+
proc.apply();
322+
return null;
323+
});
324+
}
325+
310326
/**
311327
* If there is a need to return a checked exception from a workflow implementation do not add the
312328
* exception to a method signature but wrap it using this method before rethrowing. The library

0 commit comments

Comments
 (0)