Skip to content

Commit 6156036

Browse files
authored
Fix premature triggering of eventLoop in case of activity cancellation (#1691)
Issue #1558
1 parent f37c999 commit 6156036

File tree

7 files changed

+234
-27
lines changed

7 files changed

+234
-27
lines changed

build.gradle

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,17 @@ ext {
3333
grpcVersion = '1.53.0' // [1.38.0,) Needed for io.grpc.protobuf.services.HealthStatusManager
3434
jacksonVersion = '2.14.2' // [2.9.0,)
3535
// we don't upgrade to 1.10.x because it requires kotlin 1.6. Users may use 1.10.x in their environments though.
36-
micrometerVersion = project.hasProperty("edgeDepsTest") ? '1.10.3' : '1.9.7' // [1.0.0,)
36+
micrometerVersion = project.hasProperty("edgeDepsTest") ? '1.10.5' : '1.9.9' // [1.0.0,)
3737

3838
// stay on 1.x for a while to don't use any APIs from 2.x which may break our users which still stay on 1.x
3939
// also slf4j 2.x is not compatible with spring boot 2.x
4040
slf4jVersion = project.hasProperty("edgeDepsTest") ? '2.0.6' : '1.7.36' // [1.4.0,)
41-
protoVersion = '3.22.0' // [3.12.0,) 3.12 is brought by min gRPC 1.38
41+
// [3.12.0,)
42+
// 3.12 is brought by min gRPC 1.38.
43+
// We can't move pass 3.22.0 because 3.22.2 deprecates some methods used by generated code produced by
44+
// the old protoc we keep for compatibility of our generated code with old protobuf-java versions.
45+
// Which leads to build failure because of -Werror.
46+
protoVersion = '3.22.0'
4247
annotationApiVersion = '1.3.2'
4348
guavaVersion = '31.1-jre' // [10.0,)
4449
tallyVersion = '0.13.0' // [0.4.0,)
@@ -51,12 +56,12 @@ ext {
5156

5257
// Spring Boot 3 requires Java 17, java-sdk builds against 2.x version because we support Java 8.
5358
// We do test compatibility with Spring Boot 3 in integration tests.
54-
springBootVersion = project.hasProperty("edgeDepsTest") ? '3.0.2' : '2.7.8'// [2.4.0,)
59+
springBootVersion = project.hasProperty("edgeDepsTest") ? '3.0.4' : '2.7.9'// [2.4.0,)
5560

5661
// test scoped
5762
// we don't upgrade to 1.3 and 1.4 because they require slf4j 2.x
5863
logbackVersion = project.hasProperty("edgeDepsTest") ? '1.3.5' : '1.2.11'
59-
mockitoVersion = '5.1.1'
64+
mockitoVersion = '5.2.0'
6065
junitVersion = '4.13.2'
6166
}
6267

temporal-sdk/src/main/java/io/temporal/internal/statemachines/ActivityStateMachine.java

Lines changed: 88 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import io.temporal.api.history.v1.ActivityTaskTimedOutEventAttributes;
3838
import io.temporal.workflow.Functions;
3939
import java.util.Optional;
40+
import javax.annotation.Nonnull;
4041

4142
final class ActivityStateMachine
4243
extends EntityStateMachineInitialCommand<
@@ -54,7 +55,7 @@ final class ActivityStateMachine
5455
private final ActivityType activityType;
5556
private final ActivityCancellationType cancellationType;
5657

57-
private final Functions.Proc2<Optional<Payloads>, Failure> completionCallback;
58+
private final Functions.Proc2<Optional<Payloads>, FailureResult> completionCallback;
5859

5960
private ExecuteActivityParameters parameters;
6061

@@ -107,7 +108,7 @@ enum State {
107108
State.SCHEDULE_COMMAND_CREATED,
108109
ExplicitEvent.CANCEL,
109110
State.CANCELED,
110-
ActivityStateMachine::cancelCommandNotifyCanceled)
111+
ActivityStateMachine::cancelCommandNotifyCanceledImmediately)
111112
.add(
112113
State.SCHEDULED_EVENT_RECORDED,
113114
EventType.EVENT_TYPE_ACTIVITY_TASK_STARTED,
@@ -147,7 +148,7 @@ enum State {
147148
State.SCHEDULED_ACTIVITY_CANCEL_COMMAND_CREATED,
148149
CommandType.COMMAND_TYPE_REQUEST_CANCEL_ACTIVITY_TASK,
149150
State.SCHEDULED_ACTIVITY_CANCEL_COMMAND_CREATED,
150-
ActivityStateMachine::notifyCanceledIfTryCancel)
151+
ActivityStateMachine::notifyCanceledIfTryCancelImmediately)
151152
.add(
152153
State.SCHEDULED_ACTIVITY_CANCEL_COMMAND_CREATED,
153154
EventType.EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED,
@@ -175,7 +176,7 @@ applied to the state machine (as it is done for all command events before
175176
State.SCHEDULED_ACTIVITY_CANCEL_EVENT_RECORDED,
176177
EventType.EVENT_TYPE_ACTIVITY_TASK_CANCELED,
177178
State.CANCELED,
178-
ActivityStateMachine::notifyCanceled)
179+
ActivityStateMachine::notifyCanceledFromEvent)
179180
.add(
180181
State.SCHEDULED_ACTIVITY_CANCEL_EVENT_RECORDED,
181182
EventType.EVENT_TYPE_ACTIVITY_TASK_STARTED,
@@ -193,7 +194,7 @@ applied to the state machine (as it is done for all command events before
193194
State.STARTED_ACTIVITY_CANCEL_COMMAND_CREATED,
194195
EventType.EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED,
195196
State.STARTED_ACTIVITY_CANCEL_EVENT_RECORDED,
196-
ActivityStateMachine::notifyCanceledIfTryCancel)
197+
ActivityStateMachine::notifyCanceledIfTryCancelFromEvent)
197198
/*
198199
These state transitions are not possible.
199200
It looks like it is valid when an event, handling of which requests activity
@@ -247,15 +248,15 @@ applied to the state machine (as it is done for all command events before
247248
*/
248249
public static ActivityStateMachine newInstance(
249250
ExecuteActivityParameters parameters,
250-
Functions.Proc2<Optional<Payloads>, Failure> completionCallback,
251+
Functions.Proc2<Optional<Payloads>, FailureResult> completionCallback,
251252
Functions.Proc1<CancellableCommand> commandSink,
252253
Functions.Proc1<StateMachine> stateMachineSink) {
253254
return new ActivityStateMachine(parameters, completionCallback, commandSink, stateMachineSink);
254255
}
255256

256257
private ActivityStateMachine(
257258
ExecuteActivityParameters parameters,
258-
Functions.Proc2<Optional<Payloads>, Failure> completionCallback,
259+
Functions.Proc2<Optional<Payloads>, FailureResult> completionCallback,
259260
Functions.Proc1<CancellableCommand> commandSink,
260261
Functions.Proc1<StateMachine> stateMachineSink) {
261262
super(STATE_MACHINE_DEFINITION, commandSink, stateMachineSink);
@@ -276,32 +277,76 @@ public void createScheduleActivityTaskCommand() {
276277
.build());
277278
}
278279

280+
private void setStartedCommandEventId() {
281+
startedCommandEventId = currentEvent.getEventId();
282+
}
283+
279284
public void cancel() {
280285
if (cancellationType == ActivityCancellationType.ABANDON) {
281-
notifyCanceled();
286+
notifyCanceled(false);
282287
} else if (!isFinalState()) {
283288
explicitEvent(ExplicitEvent.CANCEL);
284289
}
285290
}
286291

287-
private void setStartedCommandEventId() {
288-
startedCommandEventId = currentEvent.getEventId();
289-
}
292+
// *Immediately versions don't wait for a matching command, underlying callback will trigger an
293+
// event loop so the workflow can make progress, because promise gets filled.
290294

291-
private void cancelCommandNotifyCanceled() {
295+
/**
296+
* {@link CommandType#COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK} command is not yet left to the server.
297+
* Cancel it in place and immediately notify the workflow code.
298+
*/
299+
private void cancelCommandNotifyCanceledImmediately() {
292300
cancelCommand();
301+
// TODO With {@link ActivityCancellationType#ABANDON} we shouldn't even get here as it gets
302+
// handled in #cancel.
303+
// It's a code path for TRY_CANCEL and WAIT_CANCELLATION_COMPLETED only.
304+
// Was the original design to cancel a not-yet-sent
305+
// {@link CommandType#COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK} in case of ABANDON too?
293306
if (cancellationType != ActivityCancellationType.ABANDON) {
294-
notifyCanceled();
307+
notifyCanceled(false);
308+
}
309+
}
310+
311+
/**
312+
* Workflow code doesn't need to wait for the cancellation event if {@link
313+
* ActivityCancellationType#TRY_CANCEL}, immediately notify the workflow code.
314+
*/
315+
private void notifyCanceledIfTryCancelImmediately() {
316+
if (cancellationType == ActivityCancellationType.TRY_CANCEL) {
317+
notifyCanceled(false);
295318
}
296319
}
297320

298-
private void notifyCanceledIfTryCancel() {
321+
// *FromEvent versions will not trigger event loop as they need to wait for all events to be
322+
// applied before and there will be WorkflowTaskStarted to trigger the event loop.
323+
324+
/**
325+
* if {@link EventType#EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED} is observed, notify the workflow
326+
* code if {@link ActivityCancellationType#TRY_CANCEL}, this mode doesn't need a confirmation of
327+
* cancellation.
328+
*/
329+
private void notifyCanceledIfTryCancelFromEvent() {
299330
if (cancellationType == ActivityCancellationType.TRY_CANCEL) {
300-
notifyCanceled();
331+
notifyCanceled(true);
301332
}
302333
}
303334

304-
private void notifyCanceled() {
335+
/**
336+
* Notify workflow code of the cancellation from the {@link
337+
* EventType#EVENT_TYPE_ACTIVITY_TASK_CANCELED} event.
338+
*
339+
* <p>There is no harm in notifying {@link ActivityCancellationType#TRY_CANCEL} again, but it
340+
* should not be needed as it should be already done by {@link
341+
* #notifyCanceledIfTryCancelFromEvent} as there should be no {@link
342+
* EventType#EVENT_TYPE_ACTIVITY_TASK_CANCELED} without {@link
343+
* EventType#EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED}.
344+
*/
345+
private void notifyCanceledFromEvent() {
346+
notifyCanceled(true);
347+
}
348+
349+
private void notifyCanceled(boolean fromEvent) {
305350
Failure canceledFailure =
306351
Failure.newBuilder()
307352
.setSource(JAVA_SDK)
@@ -321,7 +366,7 @@ private void notifyCanceled() {
321366
.setCause(canceledFailure)
322367
.setMessage(ACTIVITY_CANCELED_MESSAGE)
323368
.build();
324-
completionCallback.apply(Optional.empty(), failure);
369+
completionCallback.apply(Optional.empty(), new FailureResult(failure, fromEvent));
325370
}
326371

327372
private void notifyCompleted() {
@@ -349,7 +394,7 @@ private void notifyFailed() {
349394
.setCause(failed.getFailure())
350395
.setMessage(ACTIVITY_FAILED_MESSAGE)
351396
.build();
352-
completionCallback.apply(Optional.empty(), failure);
397+
completionCallback.apply(Optional.empty(), new FailureResult(failure, true));
353398
}
354399

355400
private void notifyTimedOut() {
@@ -370,7 +415,7 @@ private void notifyTimedOut() {
370415
.setCause(timedOut.getFailure())
371416
.setMessage(ACTIVITY_TIMED_OUT_MESSAGE)
372417
.build();
373-
completionCallback.apply(Optional.empty(), failure);
418+
completionCallback.apply(Optional.empty(), new FailureResult(failure, true));
374419
}
375420

376421
private void notifyCancellationFromEvent() {
@@ -398,7 +443,7 @@ private void notifyCancellationFromEvent() {
398443
.setMessage(ACTIVITY_CANCELED_MESSAGE)
399444
.build();
400445

401-
completionCallback.apply(Optional.empty(), failure);
446+
completionCallback.apply(Optional.empty(), new FailureResult(failure, true));
402447
}
403448
}
404449

@@ -412,4 +457,27 @@ private void createRequestCancelActivityTaskCommand() {
412457
.build());
413458
parameters = null; // avoid retaining large input for the duration of the activity
414459
}
460+
461+
public static class FailureResult {
462+
private final @Nonnull Failure failure;
463+
private final boolean fromEvent;
464+
465+
public FailureResult(@Nonnull Failure failure, boolean fromEvent) {
466+
this.failure = failure;
467+
this.fromEvent = fromEvent;
468+
}
469+
470+
@Nonnull
471+
public Failure getFailure() {
472+
return failure;
473+
}
474+
475+
/**
476+
* @return true if this failure is created from the event during event <-> command matching
477+
* phase.
478+
*/
479+
public boolean isFromEvent() {
480+
return fromEvent;
481+
}
482+
}
415483
}

temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -545,8 +545,16 @@ public Functions.Proc scheduleActivityTask(
545545
ActivityStateMachine.newInstance(
546546
attributes,
547547
(p, f) -> {
548-
callback.apply(p, f);
549-
if (f != null && f.hasCause() && f.getCause().hasCanceledFailureInfo()) {
548+
Failure failure = f != null ? f.getFailure() : null;
549+
callback.apply(p, failure);
550+
551+
if (f != null
552+
&& !f.isFromEvent()
553+
&& failure.hasCause()
554+
&& failure.getCause().hasCanceledFailureInfo()) {
555+
// If !f.isFromEvent(), we want to unblock the event loop as the promise got filled
556+
// and the workflow may make progress. If f.isFromEvent(), we need to delay event
557+
// loop triggering until WorkflowTaskStarted.
550558
eventLoop();
551559
}
552560
},

temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -663,7 +663,7 @@ public static Random newRandom() {
663663
*
664664
* <p>This method always returns false if called from a non workflow thread.
665665
*
666-
* @deprecated use {{@link WorkflowUnsafe#isReplaying()}}
666+
* @deprecated use {@link WorkflowUnsafe#isReplaying()}
667667
*/
668668
@Deprecated
669669
public static boolean isReplaying() {

0 commit comments

Comments
 (0)