Skip to content

Commit e2d2608

Browse files
Warn on dangling handlers and add method to help await on all handlers. (#2144)
Warn on dangling handlers and add method to help await on all handlers
1 parent 59c485e commit e2d2608

23 files changed

+852
-41
lines changed

temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -371,16 +371,32 @@ public Header getHeader() {
371371

372372
final class SignalRegistrationRequest {
373373
private final String signalType;
374+
private final HandlerUnfinishedPolicy unfinishedPolicy;
374375
private final Class<?>[] argTypes;
375376
private final Type[] genericArgTypes;
376377
private final Functions.Proc1<Object[]> callback;
377378

379+
// Kept for backward compatibility
378380
public SignalRegistrationRequest(
379381
String signalType,
380382
Class<?>[] argTypes,
381383
Type[] genericArgTypes,
382384
Functions.Proc1<Object[]> callback) {
383385
this.signalType = signalType;
386+
this.unfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON;
387+
this.argTypes = argTypes;
388+
this.genericArgTypes = genericArgTypes;
389+
this.callback = callback;
390+
}
391+
392+
public SignalRegistrationRequest(
393+
String signalType,
394+
HandlerUnfinishedPolicy unfinishedPolicy,
395+
Class<?>[] argTypes,
396+
Type[] genericArgTypes,
397+
Functions.Proc1<Object[]> callback) {
398+
this.signalType = signalType;
399+
this.unfinishedPolicy = unfinishedPolicy;
384400
this.argTypes = argTypes;
385401
this.genericArgTypes = genericArgTypes;
386402
this.callback = callback;
@@ -390,6 +406,10 @@ public String getSignalType() {
390406
return signalType;
391407
}
392408

409+
public HandlerUnfinishedPolicy getUnfinishedPolicy() {
410+
return unfinishedPolicy;
411+
}
412+
393413
public Class<?>[] getArgTypes() {
394414
return argTypes;
395415
}
@@ -417,19 +437,22 @@ public List<SignalRegistrationRequest> getRequests() {
417437

418438
@Experimental
419439
final class UpdateRegistrationRequest {
420-
private final Functions.Func1<Object[], Object> executeCallback;
421-
private final Functions.Proc1<Object[]> validateCallback;
422440
private final String updateName;
441+
private final HandlerUnfinishedPolicy unfinishedPolicy;
423442
private final Class<?>[] argTypes;
424443
private final Type[] genericArgTypes;
444+
private final Functions.Func1<Object[], Object> executeCallback;
445+
private final Functions.Proc1<Object[]> validateCallback;
425446

426447
public UpdateRegistrationRequest(
427448
String updateName,
449+
HandlerUnfinishedPolicy unfinishedPolicy,
428450
Class<?>[] argTypes,
429451
Type[] genericArgTypes,
430452
Functions.Proc1<Object[]> validateCallback,
431453
Functions.Func1<Object[], Object> executeCallback) {
432454
this.updateName = updateName;
455+
this.unfinishedPolicy = unfinishedPolicy;
433456
this.argTypes = argTypes;
434457
this.genericArgTypes = genericArgTypes;
435458
this.validateCallback = validateCallback;
@@ -440,12 +463,8 @@ public String getUpdateName() {
440463
return updateName;
441464
}
442465

443-
public Functions.Proc1<Object[]> getValidateCallback() {
444-
return validateCallback;
445-
}
446-
447-
public Functions.Func1<Object[], Object> getExecuteCallback() {
448-
return executeCallback;
466+
public HandlerUnfinishedPolicy getUnfinishedPolicy() {
467+
return unfinishedPolicy;
449468
}
450469

451470
public Class<?>[] getArgTypes() {
@@ -455,6 +474,14 @@ public Class<?>[] getArgTypes() {
455474
public Type[] getGenericArgTypes() {
456475
return genericArgTypes;
457476
}
477+
478+
public Functions.Proc1<Object[]> getValidateCallback() {
479+
return validateCallback;
480+
}
481+
482+
public Functions.Func1<Object[], Object> getExecuteCallback() {
483+
return executeCallback;
484+
}
458485
}
459486

460487
@Experimental

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowExecutor.java

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
package io.temporal.internal.replay;
2222

23+
import com.google.common.annotations.VisibleForTesting;
2324
import com.google.protobuf.InvalidProtocolBufferException;
2425
import com.google.protobuf.util.Timestamps;
2526
import com.uber.m3.tally.Scope;
@@ -36,13 +37,44 @@
3637
import io.temporal.internal.common.ProtobufTimeUtils;
3738
import io.temporal.internal.common.UpdateMessage;
3839
import io.temporal.internal.statemachines.WorkflowStateMachines;
40+
import io.temporal.internal.sync.SignalHandlerInfo;
41+
import io.temporal.internal.sync.UpdateHandlerInfo;
3942
import io.temporal.internal.worker.WorkflowExecutionException;
4043
import io.temporal.worker.MetricsType;
4144
import io.temporal.worker.NonDeterministicException;
45+
import io.temporal.workflow.HandlerUnfinishedPolicy;
46+
import java.util.List;
47+
import java.util.Map;
4248
import java.util.Optional;
49+
import java.util.stream.Collectors;
4350
import javax.annotation.Nullable;
51+
import org.slf4j.Logger;
52+
import org.slf4j.LoggerFactory;
53+
import org.slf4j.MDC;
4454

4555
final class ReplayWorkflowExecutor {
56+
@VisibleForTesting
57+
public static final String unfinishedUpdateHandlesWarnMessage =
58+
"[TMPRL1102] Workflow finished while update handlers are still running. This may "
59+
+ "have interrupted work that the update handler was doing, and the client "
60+
+ "that sent the update will receive a 'workflow execution already completed' "
61+
+ "Exception instead of the update result. You can wait for all update and "
62+
+ "signal handlers to complete by using `await workflow.Await(() -> workflow.isEveryHandlerFinished())`. "
63+
+ "Alternatively, if both you and the clients sending the update are okay with "
64+
+ "interrupting running handlers when the workflow finishes, and causing "
65+
+ "clients to receive errors, then you can disable this warning via the update "
66+
+ "handler annotations: `@UpdateMethod(unfinishedPolicy = HandlerUnfinishedPolicy.ABANDON)`.";
67+
68+
@VisibleForTesting
69+
public static final String unfinishedSignalHandlesWarnMessage =
70+
"[TMPRL1102] Workflow finished while signal handlers are still running. This may "
71+
+ "have interrupted work that the signal handler was doing. You can wait for all update and "
72+
+ "signal handlers to complete by using `await workflow.Await(() -> workflow.isEveryHandlerFinished())`. "
73+
+ "Alternatively, if both you and the clients sending the signal are okay with "
74+
+ "interrupting running handlers when the workflow finishes you can disable this warning via the signal "
75+
+ "handler annotations: `@SignalMethod(unfinishedPolicy = HandlerUnfinishedPolicy.ABANDON)`.";
76+
77+
private static final Logger log = LoggerFactory.getLogger(ReplayWorkflowExecutor.class);
4678

4779
private final ReplayWorkflow workflow;
4880

@@ -89,6 +121,33 @@ public void eventLoop() {
89121
}
90122

91123
private void completeWorkflow(@Nullable WorkflowExecutionException failure) {
124+
// If the workflow is failed we do not log any warnings about unfinished handlers.
125+
if (log.isWarnEnabled() && (failure == null || context.isCancelRequested())) {
126+
Map<Long, SignalHandlerInfo> runningSignalHandlers =
127+
workflow.getWorkflowContext().getRunningSignalHandlers();
128+
List<SignalHandlerInfo> unfinishedSignalHandlers =
129+
runningSignalHandlers.values().stream()
130+
.filter(a -> a.getPolicy() == HandlerUnfinishedPolicy.WARN_AND_ABANDON)
131+
.collect(Collectors.toList());
132+
if (!unfinishedSignalHandlers.isEmpty()) {
133+
MDC.put("Signals", unfinishedSignalHandlers.toString());
134+
log.warn(unfinishedSignalHandlesWarnMessage);
135+
MDC.remove("Signals");
136+
}
137+
138+
Map<String, UpdateHandlerInfo> runningUpdateHandlers =
139+
workflow.getWorkflowContext().getRunningUpdateHandlers();
140+
List<UpdateHandlerInfo> unfinishedUpdateHandlers =
141+
runningUpdateHandlers.values().stream()
142+
.filter(a -> a.getPolicy() == HandlerUnfinishedPolicy.WARN_AND_ABANDON)
143+
.collect(Collectors.toList());
144+
if (!unfinishedUpdateHandlers.isEmpty()) {
145+
MDC.put("Updates", unfinishedUpdateHandlers.toString());
146+
log.warn(unfinishedUpdateHandlesWarnMessage);
147+
MDC.remove("Updates");
148+
}
149+
}
150+
92151
if (context.isCancelRequested()) {
93152
workflowStateMachines.cancelWorkflow();
94153
metricsScope.counter(MetricsType.WORKFLOW_CANCELED_COUNTER).inc(1);
@@ -161,7 +220,7 @@ public void handleWorkflowExecutionUpdated(UpdateMessage updateMessage) {
161220
Optional<Payloads> args = Optional.ofNullable(input.getArgs());
162221
this.workflow.handleUpdate(
163222
input.getName(),
164-
protocolMessage.getProtocolInstanceId(),
223+
update.getMeta().getUpdateId(),
165224
args,
166225
protocolMessage.getEventId(),
167226
input.getHeader(),

temporal-sdk/src/main/java/io/temporal/internal/replay/WorkflowContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222

2323
import io.temporal.api.failure.v1.Failure;
2424
import io.temporal.common.context.ContextPropagator;
25+
import io.temporal.internal.sync.SignalHandlerInfo;
26+
import io.temporal.internal.sync.UpdateHandlerInfo;
2527
import io.temporal.worker.WorkflowImplementationOptions;
2628
import java.lang.reflect.Type;
2729
import java.util.List;
@@ -66,4 +68,8 @@ public interface WorkflowContext {
6668
* ContextPropagator#getCurrentContext()}
6769
*/
6870
Map<String, Object> getPropagatedContexts();
71+
72+
Map<Long, SignalHandlerInfo> getRunningSignalHandlers();
73+
74+
Map<String, UpdateHandlerInfo> getRunningUpdateHandlers();
6975
}

temporal-sdk/src/main/java/io/temporal/internal/sync/SignalDispatcher.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,9 @@
2929
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
3030
import io.temporal.worker.MetricsType;
3131
import io.temporal.workflow.DynamicSignalHandler;
32+
import io.temporal.workflow.HandlerUnfinishedPolicy;
3233
import io.temporal.workflow.Workflow;
33-
import java.util.ArrayDeque;
34-
import java.util.HashMap;
35-
import java.util.Map;
36-
import java.util.Objects;
37-
import java.util.Optional;
38-
import java.util.Queue;
34+
import java.util.*;
3935
import org.slf4j.Logger;
4036
import org.slf4j.LoggerFactory;
4137

@@ -52,6 +48,8 @@ class SignalDispatcher {
5248
/** Buffers signals which don't have a registered listener. */
5349
private final Queue<SignalData> signalBuffer = new ArrayDeque<>();
5450

51+
private Map<Long, SignalHandlerInfo> runningSignalHandlers = new LinkedHashMap<>();
52+
5553
public SignalDispatcher(DataConverter dataConverterWithWorkflowContext) {
5654
this.dataConverterWithWorkflowContext = dataConverterWithWorkflowContext;
5755
}
@@ -77,17 +75,23 @@ public void handleInterceptedSignal(WorkflowInboundCallsInterceptor.SignalInput
7775
}
7876
}
7977

78+
public Map<Long, SignalHandlerInfo> getRunningSignalHandlers() {
79+
return runningSignalHandlers;
80+
}
81+
8082
public void handleSignal(
8183
String signalName, Optional<Payloads> input, long eventId, Header header) {
8284
WorkflowOutboundCallsInterceptor.SignalRegistrationRequest handler =
8385
signalCallbacks.get(signalName);
8486
Object[] args;
87+
HandlerUnfinishedPolicy policy;
8588
if (handler == null) {
8689
if (dynamicSignalHandler == null) {
8790
signalBuffer.add(new SignalData(signalName, input, eventId, header));
8891
return;
8992
}
9093
args = new Object[] {new EncodedValues(input, dataConverterWithWorkflowContext)};
94+
policy = dynamicSignalHandler.getUnfinishedPolicy(signalName);
9195
} else {
9296
try {
9397
args =
@@ -97,9 +101,23 @@ public void handleSignal(
97101
logSerializationException(signalName, eventId, e);
98102
return;
99103
}
104+
policy = handler.getUnfinishedPolicy();
105+
}
106+
// Track the signal handler
107+
boolean threadDestroyed = false;
108+
runningSignalHandlers.put(eventId, new SignalHandlerInfo(eventId, signalName, policy));
109+
try {
110+
inboundCallsInterceptor.handleSignal(
111+
new WorkflowInboundCallsInterceptor.SignalInput(signalName, args, eventId, header));
112+
} catch (DestroyWorkflowThreadError e) {
113+
threadDestroyed = true;
114+
throw e;
115+
} finally {
116+
// If the thread was destroyed the user did not finish the handler
117+
if (!threadDestroyed) {
118+
runningSignalHandlers.remove(eventId);
119+
}
100120
}
101-
inboundCallsInterceptor.handleSignal(
102-
new WorkflowInboundCallsInterceptor.SignalInput(signalName, args, eventId, header));
103121
}
104122

105123
public void registerSignalHandlers(
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.internal.sync;
22+
23+
import io.temporal.workflow.HandlerUnfinishedPolicy;
24+
25+
public class SignalHandlerInfo {
26+
private final long eventId;
27+
private final String name;
28+
private final HandlerUnfinishedPolicy policy;
29+
30+
public SignalHandlerInfo(long eventId, String name, HandlerUnfinishedPolicy policy) {
31+
this.eventId = eventId;
32+
this.name = name;
33+
this.policy = policy;
34+
}
35+
36+
public String getName() {
37+
return name;
38+
}
39+
40+
public HandlerUnfinishedPolicy getPolicy() {
41+
return policy;
42+
}
43+
44+
@Override
45+
public String toString() {
46+
return "SignalHandlerInfo{"
47+
+ "eventId="
48+
+ eventId
49+
+ ", name='"
50+
+ name
51+
+ '\''
52+
+ ", policy="
53+
+ policy
54+
+ '}';
55+
}
56+
}

temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,9 +144,12 @@ public void start(HistoryEvent event, ReplayWorkflowContext context) {
144144
@Override
145145
public void handleSignal(
146146
String signalName, Optional<Payloads> input, long eventId, Header header) {
147+
// Signals can trigger completion
147148
runner.executeInWorkflowThread(
148149
"signal " + signalName,
149-
() -> workflowProc.handleSignal(signalName, input, eventId, header));
150+
() -> {
151+
workflowProc.handleSignal(signalName, input, eventId, header);
152+
});
150153
}
151154

152155
@Override
@@ -167,7 +170,7 @@ public void handleUpdate(
167170
if (!callbacks.isReplaying()) {
168171
try {
169172
workflowContext.setReadOnly(true);
170-
workflowProc.handleValidateUpdate(updateName, input, eventId, header);
173+
workflowProc.handleValidateUpdate(updateName, updateId, input, eventId, header);
171174
} catch (ReadOnlyException r) {
172175
// Rethrow instead on rejecting the update to fail the WFT
173176
throw r;
@@ -184,7 +187,7 @@ public void handleUpdate(
184187
callbacks.accept();
185188
try {
186189
Optional<Payloads> result =
187-
workflowProc.handleExecuteUpdate(updateName, input, eventId, header);
190+
workflowProc.handleExecuteUpdate(updateName, updateId, input, eventId, header);
188191
callbacks.complete(result, null);
189192
} catch (WorkflowExecutionException e) {
190193
callbacks.complete(Optional.empty(), e.getFailure());

0 commit comments

Comments
 (0)