Skip to content

Commit 4eaeb9e

Browse files
authored
Fix shutdown behavior of unstarted LA slot queue (#2196)
1 parent 0c135d6 commit 4eaeb9e

File tree

4 files changed

+213
-17
lines changed

4 files changed

+213
-17
lines changed

temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivitySlotSupplierQueue.java

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,12 @@
2323
import io.temporal.worker.tuning.LocalActivitySlotInfo;
2424
import io.temporal.worker.tuning.SlotPermit;
2525
import io.temporal.workflow.Functions;
26-
import java.util.concurrent.PriorityBlockingQueue;
27-
import java.util.concurrent.Semaphore;
28-
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.*;
2927
import javax.annotation.Nullable;
3028
import org.slf4j.Logger;
3129
import org.slf4j.LoggerFactory;
3230

33-
class LocalActivitySlotSupplierQueue {
31+
class LocalActivitySlotSupplierQueue implements Shutdownable {
3432
static final class QueuedLARequest {
3533
final boolean isRetry;
3634
final SlotReservationData data;
@@ -47,10 +45,11 @@ static final class QueuedLARequest {
4745
private final Semaphore newExecutionsBackpressureSemaphore;
4846
private final TrackingSlotSupplier<LocalActivitySlotInfo> slotSupplier;
4947
private final Functions.Proc1<LocalActivityAttemptTask> afterReservedCallback;
50-
private final Thread queueThread;
48+
private final ExecutorService queueThreadService;
5149
private static final Logger log =
5250
LoggerFactory.getLogger(LocalActivitySlotSupplierQueue.class.getName());
5351
private volatile boolean running = true;
52+
private volatile boolean wasEverStarted = false;
5453

5554
LocalActivitySlotSupplierQueue(
5655
TrackingSlotSupplier<LocalActivitySlotInfo> slotSupplier,
@@ -73,13 +72,13 @@ static final class QueuedLARequest {
7372
return 0;
7473
});
7574
this.slotSupplier = slotSupplier;
76-
this.queueThread = new Thread(this::processQueue, "LocalActivitySlotSupplierQueue");
77-
this.queueThread.start();
75+
this.queueThreadService =
76+
Executors.newSingleThreadExecutor(r -> new Thread(r, "LocalActivitySlotSupplierQueue"));
7877
}
7978

8079
private void processQueue() {
8180
try {
82-
while (running) {
81+
while (running || !requestQueue.isEmpty()) {
8382
QueuedLARequest request = requestQueue.take();
8483
SlotPermit slotPermit;
8584
try {
@@ -102,9 +101,9 @@ private void processQueue() {
102101
}
103102
}
104103

105-
void shutdown() {
106-
running = false;
107-
queueThread.interrupt();
104+
void start() {
105+
wasEverStarted = true;
106+
this.queueThreadService.submit(this::processQueue);
108107
}
109108

110109
boolean waitOnBackpressure(@Nullable Long acceptanceTimeoutMs) throws InterruptedException {
@@ -134,4 +133,40 @@ void submitAttempt(SlotReservationData data, boolean isRetry, LocalActivityAttem
134133
newExecutionsBackpressureSemaphore.release();
135134
}
136135
}
136+
137+
@Override
138+
public boolean isShutdown() {
139+
return queueThreadService.isShutdown();
140+
}
141+
142+
@Override
143+
public boolean isTerminated() {
144+
return queueThreadService.isTerminated();
145+
}
146+
147+
@Override
148+
public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
149+
running = false;
150+
if (requestQueue.isEmpty()) {
151+
// Just interrupt the thread, so that if we're waiting on blocking take the thread will
152+
// be interrupted and exit. Otherwise the loop will exit once the queue is empty.
153+
queueThreadService.shutdownNow();
154+
}
155+
156+
return interruptTasks
157+
? shutdownManager.shutdownExecutorNowUntimed(
158+
queueThreadService, "LocalActivitySlotSupplierQueue")
159+
: shutdownManager.shutdownExecutorUntimed(
160+
queueThreadService, "LocalActivitySlotSupplierQueue");
161+
}
162+
163+
@Override
164+
public void awaitTermination(long timeout, TimeUnit unit) {
165+
if (!wasEverStarted) {
166+
// Not entirely clear why this is necessary, but await termination will hang the whole
167+
// timeout duration if no task was ever submitted.
168+
return;
169+
}
170+
ShutdownManager.awaitTermination(queueThreadService, unit.toMillis(timeout));
171+
}
137172
}

temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -689,6 +689,7 @@ public boolean start() {
689689
false);
690690

691691
this.workerMetricsScope.counter(MetricsType.WORKER_START_COUNTER).inc(1);
692+
this.slotQueue.start();
692693
return true;
693694
} else {
694695
return false;
@@ -698,9 +699,9 @@ public boolean start() {
698699
@Override
699700
public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
700701
if (activityAttemptTaskExecutor != null && !activityAttemptTaskExecutor.isShutdown()) {
701-
slotQueue.shutdown();
702-
return activityAttemptTaskExecutor
702+
return slotQueue
703703
.shutdown(shutdownManager, interruptTasks)
704+
.thenCompose(r -> activityAttemptTaskExecutor.shutdown(shutdownManager, interruptTasks))
704705
.thenCompose(
705706
r ->
706707
shutdownManager.shutdownExecutor(
@@ -717,21 +718,24 @@ public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean
717718

718719
@Override
719720
public void awaitTermination(long timeout, TimeUnit unit) {
720-
slotQueue.shutdown();
721721
long timeoutMillis = unit.toMillis(timeout);
722-
ShutdownManager.awaitTermination(scheduledExecutor, timeoutMillis);
722+
long remainingTimeout = ShutdownManager.awaitTermination(scheduledExecutor, timeoutMillis);
723+
ShutdownManager.awaitTermination(slotQueue, remainingTimeout);
723724
}
724725

725726
@Override
726727
public boolean isShutdown() {
727-
return activityAttemptTaskExecutor != null && activityAttemptTaskExecutor.isShutdown();
728+
return activityAttemptTaskExecutor != null
729+
&& activityAttemptTaskExecutor.isShutdown()
730+
&& slotQueue.isShutdown();
728731
}
729732

730733
@Override
731734
public boolean isTerminated() {
732735
return activityAttemptTaskExecutor != null
733736
&& activityAttemptTaskExecutor.isTerminated()
734-
&& scheduledExecutor.isTerminated();
737+
&& scheduledExecutor.isTerminated()
738+
&& slotQueue.isTerminated();
735739
}
736740

737741
@Override
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.worker;
22+
23+
import static org.junit.Assert.assertTrue;
24+
25+
import io.temporal.testing.internal.SDKTestWorkflowRule;
26+
import io.temporal.workflow.Workflow;
27+
import io.temporal.workflow.WorkflowInterface;
28+
import io.temporal.workflow.WorkflowMethod;
29+
import java.time.Duration;
30+
import java.time.Instant;
31+
import java.util.Set;
32+
import org.junit.Rule;
33+
import org.junit.Test;
34+
35+
public class LocalActivityWorkerNoneRegisteredNotStartedTest {
36+
37+
@Rule
38+
public SDKTestWorkflowRule testWorkflowRule =
39+
SDKTestWorkflowRule.newBuilder()
40+
.setWorkflowTypes(NothingWorkflowImpl.class)
41+
.setWorkerOptions(WorkerOptions.newBuilder().setLocalActivityWorkerOnly(true).build())
42+
// Don't start the worker
43+
.setDoNotStart(true)
44+
.build();
45+
46+
@Test
47+
public void canShutDownProperlyWhenNotStarted() {
48+
// Shut down the (never started) worker
49+
Instant shutdownTime = Instant.now();
50+
testWorkflowRule.getTestEnvironment().getWorkerFactory().shutdown();
51+
testWorkflowRule.getWorker().awaitTermination(2, java.util.concurrent.TimeUnit.SECONDS);
52+
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
53+
for (Thread thread : threadSet) {
54+
if (thread.getName().contains("LocalActivitySlotSupplierQueue")) {
55+
throw new RuntimeException("Thread should be terminated");
56+
}
57+
}
58+
Duration elapsed = Duration.between(shutdownTime, Instant.now());
59+
// Shutdown should not have taken long
60+
assertTrue(elapsed.getSeconds() < 2);
61+
}
62+
63+
@WorkflowInterface
64+
public interface NothingWorkflow {
65+
@WorkflowMethod
66+
void execute();
67+
}
68+
69+
public static class NothingWorkflowImpl implements NothingWorkflow {
70+
@Override
71+
public void execute() {
72+
Workflow.sleep(500);
73+
}
74+
}
75+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.worker;
22+
23+
import static org.junit.Assert.assertTrue;
24+
25+
import io.temporal.testing.internal.SDKTestWorkflowRule;
26+
import io.temporal.workflow.Workflow;
27+
import io.temporal.workflow.WorkflowInterface;
28+
import io.temporal.workflow.WorkflowMethod;
29+
import io.temporal.workflow.shared.TestActivities;
30+
import java.time.Duration;
31+
import java.time.Instant;
32+
import java.util.Set;
33+
import org.junit.Rule;
34+
import org.junit.Test;
35+
36+
public class LocalActivityWorkerNotStartedTest {
37+
38+
@Rule
39+
public SDKTestWorkflowRule testWorkflowRule =
40+
SDKTestWorkflowRule.newBuilder()
41+
.setWorkflowTypes(NothingWorkflowImpl.class)
42+
.setActivityImplementations(new NothingActivityImpl())
43+
.setWorkerOptions(WorkerOptions.newBuilder().setLocalActivityWorkerOnly(true).build())
44+
// Don't start the worker
45+
.setDoNotStart(true)
46+
.build();
47+
48+
@Test
49+
public void canShutDownProperlyWhenNotStarted() {
50+
// Shut down the (never started) worker
51+
Instant shutdownTime = Instant.now();
52+
testWorkflowRule.getTestEnvironment().getWorkerFactory().shutdown();
53+
testWorkflowRule.getWorker().awaitTermination(2, java.util.concurrent.TimeUnit.SECONDS);
54+
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
55+
for (Thread thread : threadSet) {
56+
if (thread.getName().contains("LocalActivitySlotSupplierQueue")) {
57+
throw new RuntimeException("Thread should be terminated");
58+
}
59+
}
60+
Duration elapsed = Duration.between(shutdownTime, Instant.now());
61+
// Shutdown should not have taken long
62+
assertTrue(elapsed.getSeconds() < 2);
63+
}
64+
65+
@WorkflowInterface
66+
public interface NothingWorkflow {
67+
@WorkflowMethod
68+
void execute();
69+
}
70+
71+
public static class NothingWorkflowImpl implements NothingWorkflow {
72+
@Override
73+
public void execute() {
74+
Workflow.sleep(500);
75+
}
76+
}
77+
78+
public static class NothingActivityImpl implements TestActivities.NoArgsActivity {
79+
@Override
80+
public void execute() {}
81+
}
82+
}

0 commit comments

Comments
 (0)