Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions service/frontend/workflow_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3872,6 +3872,92 @@ func (s *WorkflowHandlerSuite) TestShutdownWorkerWithEagerPollCancellation() {
}
}

func (s *WorkflowHandlerSuite) TestShutdownWorkerWithCancellationError() {
// Verifies graceful degradation: ShutdownWorker succeeds even when poll cancellation fails.
// This ensures backward compatibility during rolling upgrades.
config := s.newConfig()
config.EnableCancelWorkerPollsOnShutdown = dc.GetBoolPropertyFnFilteredByNamespace(true)
config.NumTaskQueueReadPartitions = dc.GetIntPropertyFnFilteredByTaskQueue(1)
wh := s.getWorkflowHandler(config)
ctx := context.Background()

stickyTaskQueue := "sticky-task-queue"
taskQueue := "my-task-queue"
workerInstanceKey := "worker-instance-123"

// CancelOutstandingWorkerPolls returns an error (simulates old matching node)
s.mockMatchingClient.EXPECT().CancelOutstandingWorkerPolls(gomock.Any(), gomock.Any()).
Return(nil, serviceerror.NewUnimplemented("method not implemented")).
Times(2) // 1 partition x 2 task types

s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Eq(s.testNamespace)).Return(s.testNamespaceID, nil).AnyTimes()

expectedForceUnloadRequest := &matchingservice.ForceUnloadTaskQueuePartitionRequest{
NamespaceId: s.testNamespaceID.String(),
TaskQueuePartition: &taskqueuespb.TaskQueuePartition{
TaskQueue: stickyTaskQueue,
TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW,
},
}

s.mockMatchingClient.EXPECT().ForceUnloadTaskQueuePartition(gomock.Any(), gomock.Eq(expectedForceUnloadRequest)).Return(&matchingservice.ForceUnloadTaskQueuePartitionResponse{}, nil)

// ShutdownWorker should succeed despite cancellation errors
_, err := wh.ShutdownWorker(ctx, &workflowservice.ShutdownWorkerRequest{
Namespace: s.testNamespace.String(),
StickyTaskQueue: stickyTaskQueue,
Identity: "worker",
Reason: "graceful shutdown",
WorkerInstanceKey: workerInstanceKey,
TaskQueue: taskQueue,
})
s.NoError(err, "ShutdownWorker should succeed even when poll cancellation fails")
}

func (s *WorkflowHandlerSuite) TestShutdownWorkerWithPartialCancellationFailure() {
// Verifies ShutdownWorker succeeds when some cancellation calls succeed and others fail.
config := s.newConfig()
config.EnableCancelWorkerPollsOnShutdown = dc.GetBoolPropertyFnFilteredByNamespace(true)
config.NumTaskQueueReadPartitions = dc.GetIntPropertyFnFilteredByTaskQueue(2) // 2 partitions
wh := s.getWorkflowHandler(config)
ctx := context.Background()

stickyTaskQueue := "sticky-task-queue"
taskQueue := "my-task-queue"
workerInstanceKey := "worker-instance-123"

// Mixed results: some succeed, some fail (2 partitions x 2 task types = 4 calls)
s.mockMatchingClient.EXPECT().CancelOutstandingWorkerPolls(gomock.Any(), gomock.Any()).
Return(&matchingservice.CancelOutstandingWorkerPollsResponse{CancelledCount: 1}, nil).
Times(2)
s.mockMatchingClient.EXPECT().CancelOutstandingWorkerPolls(gomock.Any(), gomock.Any()).
Return(nil, serviceerror.NewUnavailable("temporary error")).
Times(2)

s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Eq(s.testNamespace)).Return(s.testNamespaceID, nil).AnyTimes()

expectedForceUnloadRequest := &matchingservice.ForceUnloadTaskQueuePartitionRequest{
NamespaceId: s.testNamespaceID.String(),
TaskQueuePartition: &taskqueuespb.TaskQueuePartition{
TaskQueue: stickyTaskQueue,
TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW,
},
}

s.mockMatchingClient.EXPECT().ForceUnloadTaskQueuePartition(gomock.Any(), gomock.Eq(expectedForceUnloadRequest)).Return(&matchingservice.ForceUnloadTaskQueuePartitionResponse{}, nil)

// ShutdownWorker should succeed despite partial failures
_, err := wh.ShutdownWorker(ctx, &workflowservice.ShutdownWorkerRequest{
Namespace: s.testNamespace.String(),
StickyTaskQueue: stickyTaskQueue,
Identity: "worker",
Reason: "graceful shutdown",
WorkerInstanceKey: workerInstanceKey,
TaskQueue: taskQueue,
})
s.NoError(err, "ShutdownWorker should succeed even with partial cancellation failures")
}

func (s *WorkflowHandlerSuite) TestPatchSchedule_TriggerImmediatelyScheduledTime() {
config := s.newConfig()
config.EnableSchedules = dc.GetBoolPropertyFnFilteredByNamespace(true)
Expand Down
Loading