Skip to content

Commit 6f81ae9

Browse files
authored
Clean worker shutdown (#237)
Refactoring of the worker shutdown API and logic. After the refactoring the worker shutdown is similar to the shutdown of a ThreadPoolExecutor. It is done through shutdown/shutdownNow and then awaitTermination. An activity heartbeat after shutdown/shutdownNow starts failing with ActivityWorkerShutdownException. shutdownNow also sends an interrupt to all currently executing activities.. Bumped the library version to 2.2.0
1 parent dd22cb1 commit 6f81ae9

39 files changed

+1088
-343
lines changed

CHANGELOG.md

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,23 @@
11
# Changelog
22

3-
## v1.0.0 (06-04-2018)
3+
## v2.2.0
4+
- Added support for workflow and activity server side retries.
5+
- Clean worker shutdown. Replaced Worker shutdown(Duration) with Worker shutdown, shutdownNow and awaitTermination.
6+
- Fixed thread exhaustion with a large number of parallel async activities.
7+
8+
## v2.1.3
9+
- Added RPC headers needed to enable sticky queries. Before this change
10+
queries did not used cached workflows.
11+
12+
## v2.1.2
13+
- Requires minimum server release v0.4.0
14+
- Introduced WorkerFactory and FactoryOptions
15+
- Added sticky workflow execution, which is caching of a workflow object between decisions. It is enabled by default,
16+
to disable use FactoryOptions.disableStickyExecution property.
17+
- Updated Thrift to expose new types of service exceptions: ServiceBusyError, DomainNotActiveError, LimitExceededError
18+
- Added metric for corrupted signal as well as metrics related to caching and evictions.
419

5-
### Features
20+
## v1.0.0 (06-04-2018)
621
- POJO workflow, child workflow, activity execution.
722
- Sync and Async workflow execution.
823
- Query and Signal workflow execution.
@@ -12,14 +27,4 @@
1227
- Activity heartbeat throttling.
1328
- Deterministic retry of failed operation.
1429

15-
## v2.1.2
16-
- Requires minimum server release v0.4.0
17-
- Introduced WorkerFactory and FactoryOptions
18-
- Added sticky workflow execution, which is caching of a workflow object between decisions. It is enabled by default,
19-
to disable use FactoryOptions.disableStickyExecution property.
20-
- Updated Thrift to expose new types of service exceptions: ServiceBusyError, DomainNotActiveError, LimitExceededError
21-
- Added metric for corrupted signal as well as metrics related to caching and evictions.
2230

23-
## v2.1.3
24-
- Added RPC headers needed to enable sticky queries. Before this change
25-
queries did not used cached workflows.

build.gradle

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,20 +37,20 @@ googleJavaFormat {
3737
}
3838

3939
group = 'com.uber.cadence'
40-
version = '2.1.3'
40+
version = '2.2.0'
4141

4242
description = """Uber Cadence Java Client"""
4343

4444
sourceCompatibility = 1.8
4545
targetCompatibility = 1.8
4646

4747
dependencies {
48-
compile group: 'com.uber.tchannel', name: 'tchannel-core', version: '0.7.6'
48+
compile group: 'com.uber.tchannel', name: 'tchannel-core', version: '0.8.5'
4949
compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.25'
5050
compile group: 'org.apache.thrift', name: 'libthrift', version: '0.9.3'
51-
compile group: 'com.google.code.gson', name: 'gson', version: '2.8.2'
51+
compile group: 'com.google.code.gson', name: 'gson', version: '2.8.5'
5252
compile group: 'com.uber.m3', name: 'tally-core', version: '0.2.3'
53-
compile group: 'com.google.guava', name: 'guava', version: '25.1-jre'
53+
compile group: 'com.google.guava', name: 'guava', version: '27.0.1-jre'
5454
testCompile group: 'junit', name: 'junit', version: '4.12'
5555
testCompile group: 'com.googlecode.junit-toolbox', name: 'junit-toolbox', version: '2.4'
5656
testCompile group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3'

src/main/java/com/uber/cadence/activity/Activity.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@
1717

1818
package com.uber.cadence.activity;
1919

20+
import com.uber.cadence.client.ActivityCompletionException;
2021
import com.uber.cadence.internal.sync.ActivityInternal;
2122
import com.uber.cadence.internal.sync.WorkflowInternal;
2223
import com.uber.cadence.serviceclient.IWorkflowService;
2324
import com.uber.cadence.workflow.ActivityException;
2425
import com.uber.cadence.workflow.ActivityTimeoutException;
2526
import java.lang.reflect.Type;
2627
import java.util.Optional;
27-
import java.util.concurrent.CancellationException;
2828

2929
/**
3030
* An activity is the implementation of a particular task in the business logic.
@@ -232,11 +232,11 @@ public static ActivityTask getTask() {
232232
*
233233
* @param details In case of activity timeout can be accessed through {@link
234234
* ActivityTimeoutException#getDetails(Class)} method.
235-
* @throws CancellationException Indicates that activity cancellation was requested by the
236-
* workflow.Should be rethrown from activity implementation to indicate successful
237-
* cancellation.
235+
* @throws ActivityCompletionException Indicates that activity execution is expected to be
236+
* interrupted. The reason for interruption is indicated by a type of subclass of the
237+
* exception.
238238
*/
239-
public static void heartbeat(Object details) throws CancellationException {
239+
public static <V> void heartbeat(V details) throws ActivityCompletionException {
240240
ActivityInternal.recordActivityHeartbeat(details);
241241
}
242242

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.client;
19+
20+
import com.uber.cadence.activity.ActivityTask;
21+
import com.uber.cadence.worker.Worker;
22+
import java.util.concurrent.TimeUnit;
23+
24+
/**
25+
* Indicates that {@link Worker.Factory#shutdown()} or {@link Worker.Factory#shutdownNow()} was
26+
* called. It is OK to ignore the exception to let activity to complete. It assumes that {@link
27+
* com.uber.cadence.worker.Worker.Factory#awaitTermination(long, TimeUnit)} is called with a timeout
28+
* larger than the activity execution time.
29+
*/
30+
public final class ActivityWorkerShutdownException extends ActivityCompletionException {
31+
32+
public ActivityWorkerShutdownException(ActivityTask task) {
33+
super(task);
34+
}
35+
36+
public ActivityWorkerShutdownException() {
37+
super();
38+
}
39+
}

src/main/java/com/uber/cadence/internal/common/InternalUtils.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@
1919

2020
import com.uber.cadence.TaskList;
2121
import com.uber.cadence.TaskListKind;
22+
import com.uber.cadence.internal.worker.Shutdownable;
2223
import com.uber.cadence.workflow.WorkflowMethod;
2324
import java.lang.reflect.Method;
25+
import java.util.concurrent.ExecutorService;
26+
import java.util.concurrent.TimeUnit;
2427

2528
/** Utility functions shared by the implementation code. */
2629
public final class InternalUtils {
@@ -80,6 +83,41 @@ public static TaskList createNormalTaskList(String taskListName) {
8083
return tl;
8184
}
8285

86+
public static long awaitTermination(Shutdownable s, long timeoutMillis) {
87+
if (s == null) {
88+
return timeoutMillis;
89+
}
90+
return awaitTermination(
91+
timeoutMillis,
92+
() -> {
93+
s.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS);
94+
});
95+
}
96+
97+
public static long awaitTermination(ExecutorService s, long timeoutMillis) {
98+
if (s == null) {
99+
return timeoutMillis;
100+
}
101+
return awaitTermination(
102+
timeoutMillis,
103+
() -> {
104+
try {
105+
s.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS);
106+
} catch (InterruptedException e) {
107+
}
108+
});
109+
}
110+
111+
public static long awaitTermination(long timeoutMillis, Runnable toTerminate) {
112+
long started = System.currentTimeMillis();
113+
toTerminate.run();
114+
long remainingTimeout = timeoutMillis - (System.currentTimeMillis() - started);
115+
if (remainingTimeout < 0) {
116+
remainingTimeout = 0;
117+
}
118+
return remainingTimeout;
119+
}
120+
83121
/** Prohibit instantiation */
84122
private InternalUtils() {}
85123
}

src/main/java/com/uber/cadence/internal/sync/ActivityExecutionContextImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.uber.cadence.client.ActivityCompletionException;
2828
import com.uber.cadence.client.ActivityCompletionFailureException;
2929
import com.uber.cadence.client.ActivityNotExistsException;
30+
import com.uber.cadence.client.ActivityWorkerShutdownException;
3031
import com.uber.cadence.converter.DataConverter;
3132
import com.uber.cadence.serviceclient.IWorkflowService;
3233
import java.lang.reflect.Type;
@@ -85,6 +86,9 @@ class ActivityExecutionContextImpl implements ActivityExecutionContext {
8586
/** @see ActivityExecutionContext#recordActivityHeartbeat(Object) */
8687
@Override
8788
public <V> void recordActivityHeartbeat(V details) throws ActivityCompletionException {
89+
if (heartbeatExecutor.isShutdown()) {
90+
throw new ActivityWorkerShutdownException(task);
91+
}
8892
lock.lock();
8993
try {
9094
// always set lastDetail. Successful heartbeat will clear it.

src/main/java/com/uber/cadence/internal/sync/ActivityInternal.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ static ActivityExecutionContext getContext() {
3030
return CurrentActivityExecutionContext.get();
3131
}
3232

33-
public static void recordActivityHeartbeat(Object details) {
33+
public static <V> void recordActivityHeartbeat(V details) {
3434
getContext().recordActivityHeartbeat(details);
3535
}
3636

src/main/java/com/uber/cadence/internal/sync/SyncActivityWorker.java

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,17 @@
1717

1818
package com.uber.cadence.internal.sync;
1919

20+
import com.uber.cadence.internal.common.InternalUtils;
2021
import com.uber.cadence.internal.worker.ActivityWorker;
22+
import com.uber.cadence.internal.worker.Lifecycle;
2123
import com.uber.cadence.internal.worker.SingleWorkerOptions;
2224
import com.uber.cadence.serviceclient.IWorkflowService;
2325
import java.util.concurrent.Executors;
2426
import java.util.concurrent.ScheduledExecutorService;
2527
import java.util.concurrent.TimeUnit;
2628

2729
/** Activity worker that supports POJO activity implementations. */
28-
public class SyncActivityWorker {
30+
public class SyncActivityWorker implements Lifecycle {
2931

3032
private final ActivityWorker worker;
3133
private final POJOActivityTaskHandler taskHandler;
@@ -41,34 +43,43 @@ public void setActivitiesImplementation(Object... activitiesImplementation) {
4143
taskHandler.setActivitiesImplementation(activitiesImplementation);
4244
}
4345

46+
@Override
4447
public void start() {
4548
worker.start();
4649
}
4750

51+
@Override
52+
public boolean isStarted() {
53+
return worker.isStarted();
54+
}
55+
56+
@Override
57+
public boolean isShutdown() {
58+
return worker.isShutdown();
59+
}
60+
61+
@Override
62+
public boolean isTerminated() {
63+
return worker.isTerminated() && heartbeatExecutor.isTerminated();
64+
}
65+
66+
@Override
4867
public void shutdown() {
4968
worker.shutdown();
5069
heartbeatExecutor.shutdown();
5170
}
5271

72+
@Override
5373
public void shutdownNow() {
5474
worker.shutdownNow();
5575
heartbeatExecutor.shutdownNow();
5676
}
5777

58-
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
59-
return worker.awaitTermination(timeout, unit)
60-
&& heartbeatExecutor.awaitTermination(timeout, unit);
61-
}
62-
63-
public boolean shutdownAndAwaitTermination(long timeout, TimeUnit unit)
64-
throws InterruptedException {
65-
heartbeatExecutor.shutdownNow();
66-
return worker.shutdownAndAwaitTermination(timeout, unit)
67-
&& heartbeatExecutor.awaitTermination(timeout, unit);
68-
}
69-
70-
public boolean isRunning() {
71-
return worker.isRunning();
78+
@Override
79+
public void awaitTermination(long timeout, TimeUnit unit) {
80+
long timeoutMillis = unit.toMillis(timeout);
81+
timeoutMillis = InternalUtils.awaitTermination(worker, timeoutMillis);
82+
InternalUtils.awaitTermination(heartbeatExecutor, timeoutMillis);
7283
}
7384

7485
public void suspendPolling() {

src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.uber.cadence.internal.replay.ReplayDecisionTaskHandler;
2626
import com.uber.cadence.internal.worker.DecisionTaskHandler;
2727
import com.uber.cadence.internal.worker.SingleWorkerOptions;
28+
import com.uber.cadence.internal.worker.SuspendableWorker;
2829
import com.uber.cadence.internal.worker.WorkflowWorker;
2930
import com.uber.cadence.serviceclient.IWorkflowService;
3031
import com.uber.cadence.worker.WorkflowImplementationOptions;
@@ -39,7 +40,8 @@
3940
import java.util.function.Function;
4041

4142
/** Workflow worker that supports POJO workflow implementations. */
42-
public class SyncWorkflowWorker implements Consumer<PollForDecisionTaskResponse> {
43+
public class SyncWorkflowWorker
44+
implements SuspendableWorker, Consumer<PollForDecisionTaskResponse> {
4345

4446
private final WorkflowWorker worker;
4547
private final POJOWorkflowImplementationFactory factory;
@@ -84,39 +86,56 @@ public <R> void addWorkflowImplementationFactory(Class<R> clazz, Func<R> factory
8486
this.factory.addWorkflowImplementationFactory(clazz, factory);
8587
}
8688

89+
@Override
8790
public void start() {
8891
worker.start();
8992
}
9093

91-
public void shutdown() {
92-
worker.shutdown();
94+
@Override
95+
public boolean isStarted() {
96+
return worker.isStarted();
9397
}
9498

95-
public void shutdownNow() {
96-
worker.shutdownNow();
99+
@Override
100+
public boolean isShutdown() {
101+
return worker.isShutdown();
97102
}
98103

99-
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
100-
return worker.awaitTermination(timeout, unit);
104+
@Override
105+
public boolean isTerminated() {
106+
return worker.isTerminated();
107+
}
108+
109+
@Override
110+
public void shutdown() {
111+
worker.shutdown();
101112
}
102113

103-
public boolean shutdownAndAwaitTermination(long timeout, TimeUnit unit)
104-
throws InterruptedException {
105-
return worker.shutdownAndAwaitTermination(timeout, unit);
114+
@Override
115+
public void shutdownNow() {
116+
worker.shutdownNow();
106117
}
107118

108-
public boolean isRunning() {
109-
return worker.isRunning();
119+
@Override
120+
public void awaitTermination(long timeout, TimeUnit unit) {
121+
worker.awaitTermination(timeout, unit);
110122
}
111123

124+
@Override
112125
public void suspendPolling() {
113126
worker.suspendPolling();
114127
}
115128

129+
@Override
116130
public void resumePolling() {
117131
worker.resumePolling();
118132
}
119133

134+
@Override
135+
public boolean isSuspended() {
136+
return worker.isSuspended();
137+
}
138+
120139
public <R> R queryWorkflowExecution(
121140
WorkflowExecution execution,
122141
String queryType,

src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -791,5 +791,10 @@ public PollForActivityTaskResponse PollForActivityTask(PollForActivityTaskReques
791791
throws BadRequestError, InternalServiceError, ServiceBusyError, TException {
792792
return impl.PollForActivityTask(pollRequest);
793793
}
794+
795+
@Override
796+
public void close() {
797+
impl.close();
798+
}
794799
}
795800
}

0 commit comments

Comments
 (0)