Skip to content

Commit 3e46cbd

Browse files
authored
Fix activity cancelation check (#2069)
1 parent 0968a50 commit 3e46cbd

File tree

3 files changed

+54
-2
lines changed

3 files changed

+54
-2
lines changed

internal/internal_task_handlers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2308,7 +2308,7 @@ func (ath *activityTaskHandlerImpl) Execute(taskQueue string, t *workflowservice
23082308

23092309
// Cancels that don't originate from the server will have separate cancel reasons, like
23102310
// ErrWorkerShutdown or ErrActivityPaused
2311-
isActivityCanceled := ctx.Err() == context.Canceled && errors.Is(context.Cause(ctx), &CanceledError{})
2311+
isActivityCanceled := ctx.Err() == context.Canceled && IsCanceledError(context.Cause(ctx))
23122312

23132313
dlCancelFunc()
23142314
if <-ctx.Done(); ctx.Err() == context.DeadlineExceeded {

internal/internal_task_handlers_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2167,6 +2167,58 @@ func (t *TaskHandlersTestSuite) TestActivityExecutionWorkerStop() {
21672167
t.NotNil(r)
21682168
}
21692169

2170+
func (t *TaskHandlersTestSuite) TestActivityCancellationUsesIsCanceledError() {
2171+
activityName := "activityCancellationIsCanceledError"
2172+
cancelContextActivity := func(ctx context.Context) error {
2173+
env := getActivityEnv(ctx)
2174+
invoker, ok := env.serviceInvoker.(*temporalInvoker)
2175+
t.Require().True(ok, "expected temporalInvoker")
2176+
invoker.cancelHandler(NewCanceledError())
2177+
<-ctx.Done()
2178+
return ctx.Err()
2179+
}
2180+
2181+
t.registry.RegisterActivityWithOptions(
2182+
cancelContextActivity,
2183+
RegisterActivityOptions{Name: activityName, DisableAlreadyRegisteredCheck: true},
2184+
)
2185+
2186+
mockCtrl := gomock.NewController(t.T())
2187+
mockService := workflowservicemock.NewMockWorkflowServiceClient(mockCtrl)
2188+
client := WorkflowClient{workflowService: mockService}
2189+
wep := t.getTestWorkerExecutionParams()
2190+
activityHandler := newActivityTaskHandler(&client, wep, t.registry)
2191+
now := time.Now()
2192+
pats := &workflowservice.PollActivityTaskQueueResponse{
2193+
Attempt: 1,
2194+
TaskToken: []byte("token"),
2195+
WorkflowExecution: &commonpb.WorkflowExecution{
2196+
WorkflowId: "wID",
2197+
RunId: "rID",
2198+
},
2199+
ActivityType: &commonpb.ActivityType{Name: activityName},
2200+
ActivityId: uuid.NewString(),
2201+
ScheduledTime: timestamppb.New(now),
2202+
ScheduleToCloseTimeout: durationpb.New(time.Second),
2203+
StartedTime: timestamppb.New(now),
2204+
StartToCloseTimeout: durationpb.New(time.Second),
2205+
HeartbeatTimeout: durationpb.New(time.Second),
2206+
WorkflowType: &commonpb.WorkflowType{
2207+
Name: "wType",
2208+
},
2209+
WorkflowNamespace: wep.Namespace,
2210+
}
2211+
2212+
result, err := activityHandler.Execute(taskqueue, pats)
2213+
t.Require().NoError(err)
2214+
2215+
canceledReq, ok := result.(*workflowservice.RespondActivityTaskCanceledRequest)
2216+
t.Require().True(ok, "expected cancel response")
2217+
t.Equal(pats.TaskToken, canceledReq.TaskToken)
2218+
t.Equal(wep.Identity, canceledReq.Identity)
2219+
t.Equal(wep.Namespace, canceledReq.Namespace)
2220+
}
2221+
21702222
func Test_NonDeterministicCheck(t *testing.T) {
21712223
unimplementedCommands := []int32{
21722224
int32(enumspb.COMMAND_TYPE_UNSPECIFIED),

internal/session.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ func sessionCreationActivity(ctx context.Context, sessionID string) error {
419419
sessionEnv.CompleteSession(sessionID)
420420
// Because of how session creation configures retryPolicy, we need to wrap context cancels that don't
421421
// originate from the server as non-retryable errors. See retrypolicy in createSession() above.
422-
if !(ctx.Err() == context.Canceled && errors.Is(context.Cause(ctx), &CanceledError{})) {
422+
if !(ctx.Err() == context.Canceled && IsCanceledError(context.Cause(ctx))) {
423423
return NewApplicationErrorWithOptions(
424424
"session failed due to worker shutdown", "SessionWorkerShutdown",
425425
ApplicationErrorOptions{NonRetryable: true, Cause: ctx.Err()})

0 commit comments

Comments
 (0)