Skip to content

Commit 3d44b45

Browse files
Supporting graceful shutdown based on SIGTERM handler. (#749)
* Supporting graceful shutdown based on SIGTERM handler.
1 parent 5109b0c commit 3d44b45

File tree

4 files changed

+139
-2
lines changed

4 files changed

+139
-2
lines changed

src/main/java/com/uber/cadence/internal/sync/WorkflowClientInternal.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,7 @@ public static <A1, A2, A3, A4, A5, A6, R> CompletableFuture<R> execute(
408408
return execute(() -> workflow.apply(arg1, arg2, arg3, arg4, arg5, arg6));
409409
}
410410

411-
private synchronized static void emitClientVersion(WorkflowClientOptions options) {
411+
private static synchronized void emitClientVersion(WorkflowClientOptions options) {
412412
if (emittingClientVersion) {
413413
return;
414414
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Modifications Copyright (c) 2020-2022 Uber Technologies Inc.
3+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
6+
* use this file except in compliance with the License. A copy of the License is
7+
* located at
8+
*
9+
* http://aws.amazon.com/apache2.0
10+
*
11+
* or in the "license" file accompanying this file. This file is distributed on
12+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
13+
* express or implied. See the License for the specific language governing
14+
* permissions and limitations under the License.
15+
*/
16+
17+
package com.uber.cadence.internal.worker;
18+
19+
import com.uber.cadence.internal.common.InternalUtils;
20+
import com.uber.cadence.worker.WorkerFactory;
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
import java.util.concurrent.TimeUnit;
24+
25+
public class WorkerShutDownHandler {
26+
27+
private static final List<WorkerFactory> workerFactories = new ArrayList<>();
28+
private static Thread registeredHandler;
29+
30+
public static void registerHandler() {
31+
if (registeredHandler != null) {
32+
return;
33+
}
34+
35+
registeredHandler = new Thread("SHUTDOWN_WORKERS") {
36+
@Override
37+
public void run() {
38+
for (WorkerFactory workerFactory : workerFactories) {
39+
workerFactory.suspendPolling();
40+
}
41+
42+
for (WorkerFactory workerFactory : workerFactories) {
43+
workerFactory.shutdownNow();
44+
}
45+
46+
long remainingTimeout = 10000;
47+
for (WorkerFactory workerFactory : workerFactories) {
48+
final long timeoutMillis = remainingTimeout;
49+
remainingTimeout = InternalUtils.awaitTermination(timeoutMillis,
50+
() -> workerFactory.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS));
51+
}
52+
}
53+
};
54+
55+
Runtime.getRuntime()
56+
.addShutdownHook(registeredHandler);
57+
}
58+
59+
public static synchronized void registerWorkerFactory(WorkerFactory workerFactory) {
60+
if (workerFactory != null) {
61+
workerFactories.add(workerFactory);
62+
}
63+
}
64+
65+
// Only for tests
66+
protected static void execute() {
67+
registeredHandler.run();
68+
}
69+
}

src/main/java/com/uber/cadence/worker/WorkerFactory.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.uber.cadence.internal.worker.PollDecisionTaskDispatcher;
3131
import com.uber.cadence.internal.worker.Poller;
3232
import com.uber.cadence.internal.worker.PollerOptions;
33+
import com.uber.cadence.internal.worker.WorkerShutDownHandler;
3334
import com.uber.cadence.internal.worker.WorkflowPollTaskFactory;
3435
import com.uber.m3.tally.Scope;
3536
import com.uber.m3.util.ImmutableMap;
@@ -57,7 +58,10 @@ public static WorkerFactory newInstance(WorkflowClient workflowClient) {
5758

5859
public static WorkerFactory newInstance(
5960
WorkflowClient workflowClient, WorkerFactoryOptions options) {
60-
return new WorkerFactory(workflowClient, options);
61+
WorkerShutDownHandler.registerHandler();
62+
WorkerFactory workerFactory = new WorkerFactory(workflowClient, options);
63+
WorkerShutDownHandler.registerWorkerFactory(workerFactory);
64+
return workerFactory;
6165
}
6266

6367
private final List<Worker> workers = new ArrayList<>();
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.internal.worker;
19+
20+
import static org.junit.Assert.assertTrue;
21+
import static org.mockito.Mockito.when;
22+
23+
import com.uber.cadence.client.WorkflowClient;
24+
import com.uber.cadence.client.WorkflowClientOptions;
25+
import com.uber.cadence.serviceclient.IWorkflowService;
26+
import com.uber.cadence.worker.WorkerFactory;
27+
import com.uber.m3.tally.NoopScope;
28+
import org.junit.Before;
29+
import org.junit.Test;
30+
import org.junit.runner.RunWith;
31+
import org.mockito.Mock;
32+
import org.mockito.runners.MockitoJUnitRunner;
33+
34+
@RunWith(MockitoJUnitRunner.class)
35+
public class WorkerShutDownHandlerTest {
36+
37+
@Mock private WorkflowClient mockClient;
38+
39+
@Mock private IWorkflowService mockService;
40+
41+
@Before
42+
public void setup() {
43+
WorkflowClientOptions clientOptions =
44+
WorkflowClientOptions.newBuilder().setMetricsScope(new NoopScope()).build();
45+
when(mockClient.getOptions()).thenReturn(clientOptions);
46+
when(mockClient.getService()).thenReturn(mockService);
47+
}
48+
49+
@Test
50+
public void shutDownHookShutsDownFactories() {
51+
52+
WorkerFactory workerFactory = WorkerFactory.newInstance(mockClient);
53+
workerFactory.newWorker("TL1");
54+
workerFactory.newWorker("TL2");
55+
56+
WorkerFactory workerFactory2 = WorkerFactory.newInstance(mockClient);
57+
workerFactory2.newWorker("TL3");
58+
59+
WorkerShutDownHandler.execute();
60+
61+
assertTrue(workerFactory.isShutdown());
62+
assertTrue(workerFactory2.isShutdown());
63+
}
64+
}

0 commit comments

Comments
 (0)