[DO_NOT_MERGE] Add new Nexus outbound task for cancelling activities#9232
[DO_NOT_MERGE] Add new Nexus outbound task for cancelling activities#9232rkannan82 wants to merge 11 commits intokannan/activity-cancel/persist-worker-keyfrom
Conversation
9eea93d to
835d5bf
Compare
d402868 to
54b6d1a
Compare
5bae03a to
a5e43dd
Compare
417606c to
6246c4e
Compare
f1e1ee5 to
e795849
Compare
fec3c41 to
1dd975d
Compare
proto/internal/temporal/server/api/persistence/v1/executions.proto
Outdated
Show resolved
Hide resolved
| handler.mutableState.AddTasks(&tasks.CancelActivityNexusTask{ | ||
| WorkflowKey: handler.mutableState.GetWorkflowKey(), | ||
| ScheduledEventIDs: []int64{ai.ScheduledEventId}, | ||
| WorkerInstanceKey: ai.WorkerInstanceKey, | ||
| }) |
There was a problem hiding this comment.
I'd move the logic into ApplyActivityTaskCanceledEvent in mutable state impl, and do AddTasks in task_generator.go, to be consistent with rest of the codebase for task generation.
There was a problem hiding this comment.
I kept the top level check in handler because we need to generate the nexus tasks only if the activity is started. Moved the AddTasks code inside the generator as you suggested. Please see if this makes sense.
Just fyi: I have a separate PR to batch activities by the queue. Right now we end up creating 1 task per activity.
service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
what about RespondActivityTaskCanceled api?
There was a problem hiding this comment.
RespondActivityTaskCanceled is what the SDK sends after it has cancelled. Here, we are initiating that cancel (after Step-5). From my understanding, this is how it works e2e.
- User calls RequestCancelWorkflowExecution API
- Server creates a new workflow task for the workflow
- SDK picks up the workflow task
- Workflow code sees cancellation, decides to cancel activities
- SDK sends RespondWorkflowTaskCompleted with RequestCancelActivityTask commands
I think standalone activities needs to be handled separately.
There was a problem hiding this comment.
RespondActivityTaskCanceled is what the SDK sends after it has cancelled.
ah ok make sense. I was thinking about the ByID version of RespondActivityTaskCanceled api, but yeah you are right, they only get called when the activity is actually cancelled.
What about other cases like workflow termination/timeout?
11c74b6 to
2cb3108
Compare
| err = t.processDeleteExecutionTask(ctx, task, false) | ||
| case *tasks.ChasmTask: | ||
| err = t.executeChasmSideEffectTransferTask(ctx, task) | ||
| case *tasks.CancelActivityNexusTask: |
There was a problem hiding this comment.
@yycptt Need help figuring out if it is OK to skip in standby.
There was a problem hiding this comment.
yeah I think it's best effort anyway so ok to skip and won't cause workflow to stuck. Worker also won't poll standby matching I think.
d0f60d7 to
fa4be0a
Compare
89d2cf5 to
79e4e3e
Compare
bf3c061 to
1218cae
Compare
9231795 to
bc58c4f
Compare
04db6f1 to
b72cf9f
Compare
bc58c4f to
3eb028c
Compare
| } | ||
|
|
||
| // ActivityNotificationType specifies the type of notification to send to activities. | ||
| enum ActivityNotificationType { |
There was a problem hiding this comment.
nit: probably should have its own file
There was a problem hiding this comment.
RespondActivityTaskCanceled is what the SDK sends after it has cancelled.
ah ok make sense. I was thinking about the ByID version of RespondActivityTaskCanceled api, but yeah you are right, they only get called when the activity is actually cancelled.
What about other cases like workflow termination/timeout?
| // NotificationType specifies the type of notification. | ||
| NotificationType enumsspb.ActivityNotificationType | ||
| // ScheduledEventIDs of activities to notify (batched by worker). | ||
| ScheduledEventIDs []int64 |
There was a problem hiding this comment.
I think we still need failover version here, scheduledEventID is not unique for global namespace in the case of conflict resolution.
3eb028c to
92b1a81
Compare
b72cf9f to
e319a6c
Compare
92b1a81 to
b775ca6
Compare
- Define ActivityCommandTask in tasks package - Add serialization/deserialization for ActivityCommandTask - Add task generation in TaskGenerator - Add batching logic in workflow task completed handler - Forward WorkerControlTaskQueue through matching service - Add unit tests for task generation and batching - Add metrics support with command type
b775ca6 to
d6af322
Compare
e319a6c to
5390759
Compare
Add new transfer task type for activity cancellation control:
What changed?
Added a new transfer task type called CancelActivityNexusTask transfer task type. This will be used to dispatch activity cancellation requests to workers via their Nexus control queue.
Main changes:
Task definition
Proto
Struct
Task creation
Task dispatch using Nexus
Why?
To support activity cancellation without activity heartbeat.
Overall flow:
How did you test it?
Potential risks
No. The new task creation is disabled by default.