Skip to content

Commit 54b6d1a

Browse files
committed
Define CancelActivityNexusTask transfer task type
1 parent 07a4d60 commit 54b6d1a

File tree

14 files changed

+470
-223
lines changed

14 files changed

+470
-223
lines changed

api/enums/v1/task.go-helpers.pb.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/enums/v1/task.pb.go

Lines changed: 9 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/persistence/v1/executions.pb.go

Lines changed: 288 additions & 211 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/dynamicconfig/constants.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,11 @@ config as the other services.`,
189189
false,
190190
`EnableActivityEagerExecution indicates if activity eager execution is enabled per namespace`,
191191
)
192+
EnableActivityCancellationNexusTask = NewGlobalBoolSetting(
193+
"system.enableActivityCancellationNexusTask",
194+
false,
195+
`EnableActivityCancellationNexusTask enables pushing activity cancellation to workers via Nexus task`,
196+
)
192197
NamespaceMinRetentionGlobal = NewGlobalDurationSetting(
193198
"system.namespaceMinRetentionGlobal",
194199
24*time.Hour,

common/persistence/serialization/task_serializers.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ func serializeTransferTask(
3838
transferTask = transferDeleteExecutionTaskToProto(task)
3939
case *tasks.ChasmTask:
4040
transferTask = transferChasmTaskToProto(task)
41+
case *tasks.CancelActivityNexusTask:
42+
transferTask = transferCancelActivityNexusTaskToProto(task)
4143
default:
4244
return nil, serviceerror.NewInternalf("Unknown transfer task type: %v", task)
4345
}
@@ -86,6 +88,8 @@ func deserializeTransferTask(
8688
task = transferDeleteExecutionTaskFromProto(transferTask)
8789
case enumsspb.TASK_TYPE_CHASM:
8890
task = transferChasmTaskFromProto(transferTask)
91+
case enumsspb.TASK_TYPE_TRANSFER_CANCEL_ACTIVITY_NEXUS:
92+
task = transferCancelActivityNexusTaskFromProto(transferTask)
8993
default:
9094
return nil, serviceerror.NewInternalf("Unknown transfer task type: %v", transferTask.TaskType)
9195
}
@@ -106,6 +110,40 @@ func transferChasmTaskFromProto(task *persistencespb.TransferTaskInfo) tasks.Tas
106110
}
107111
}
108112

113+
func transferCancelActivityNexusTaskToProto(task *tasks.CancelActivityNexusTask) *persistencespb.TransferTaskInfo {
114+
return &persistencespb.TransferTaskInfo{
115+
NamespaceId: task.WorkflowKey.NamespaceID,
116+
WorkflowId: task.WorkflowKey.WorkflowID,
117+
RunId: task.WorkflowKey.RunID,
118+
TaskId: task.TaskID,
119+
TaskType: task.GetType(),
120+
Version: task.Version,
121+
VisibilityTime: timestamppb.New(task.VisibilityTimestamp),
122+
TaskDetails: &persistencespb.TransferTaskInfo_CancelActivityNexusTaskDetails_{
123+
CancelActivityNexusTaskDetails: &persistencespb.TransferTaskInfo_CancelActivityNexusTaskDetails{
124+
ScheduledEventIds: task.ScheduledEventIDs,
125+
WorkerInstanceKey: task.WorkerInstanceKey,
126+
},
127+
},
128+
}
129+
}
130+
131+
func transferCancelActivityNexusTaskFromProto(task *persistencespb.TransferTaskInfo) tasks.Task {
132+
details := task.GetCancelActivityNexusTaskDetails()
133+
return &tasks.CancelActivityNexusTask{
134+
WorkflowKey: definition.NewWorkflowKey(
135+
task.NamespaceId,
136+
task.WorkflowId,
137+
task.RunId,
138+
),
139+
VisibilityTimestamp: task.VisibilityTime.AsTime(),
140+
TaskID: task.TaskId,
141+
Version: task.Version,
142+
ScheduledEventIDs: details.GetScheduledEventIds(),
143+
WorkerInstanceKey: details.GetWorkerInstanceKey(),
144+
}
145+
}
146+
109147
func serializeTimerTask(
110148
encoder Encoder,
111149
task tasks.Task,

common/persistence/serialization/task_serializers_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,19 @@ func (s *taskSerializerSuite) TestTransferResetTask() {
169169
s.assertEqualTasks(resetTask)
170170
}
171171

172+
func (s *taskSerializerSuite) TestTransferCancelActivityNexusTask() {
173+
cancelActivityNexusTask := &tasks.CancelActivityNexusTask{
174+
WorkflowKey: s.workflowKey,
175+
VisibilityTimestamp: time.Unix(0, rand.Int63()).UTC(),
176+
TaskID: rand.Int63(),
177+
Version: rand.Int63(),
178+
ScheduledEventIDs: []int64{rand.Int63(), rand.Int63(), rand.Int63()},
179+
WorkerInstanceKey: "test-worker-instance-key",
180+
}
181+
182+
s.assertEqualTasks(cancelActivityNexusTask)
183+
}
184+
172185
func (s *taskSerializerSuite) TestTimerWorkflowTask() {
173186
workflowTaskTimer := &tasks.WorkflowTaskTimeoutTask{
174187
WorkflowKey: s.workflowKey,

proto/internal/temporal/server/api/enums/v1/task.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ enum TaskType {
5959

6060
// A task with side effects generated by a CHASM component.
6161
TASK_TYPE_CHASM = 33;
62+
63+
// A task to cancel a running activity via Nexus control queue.
64+
TASK_TYPE_TRANSFER_CANCEL_ACTIVITY_NEXUS = 34;
6265
}
6366

6467
// TaskPriority is only used for replication task as of May 2024

proto/internal/temporal/server/api/persistence/v1/executions.proto

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,11 +349,21 @@ message TransferTaskInfo {
349349
// by some other task, so this task doesn't need to worry about it.
350350
bool can_skip_visibility_archival = 1;
351351
}
352+
353+
// Details for a Nexus task that cancels activities belonging to a specific worker.
354+
message CancelActivityNexusTaskDetails {
355+
// Scheduled event IDs of activities to cancel.
356+
repeated int64 scheduled_event_ids = 1;
357+
string worker_instance_key = 2;
358+
}
359+
352360
oneof task_details {
353361
CloseExecutionTaskDetails close_execution_task_details = 16;
354362

355363
// If the task addresses a CHASM component, this field will be set.
356364
ChasmTaskInfo chasm_task_info = 18;
365+
366+
CancelActivityNexusTaskDetails cancel_activity_nexus_task_details = 19;
357367
}
358368
// Stamp represents the "version" of the entity's internal state for which the transfer task was created.
359369
// It increases monotonically when the entity's options are modified.

service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"go.temporal.io/server/service/history/api"
4040
"go.temporal.io/server/service/history/configs"
4141
historyi "go.temporal.io/server/service/history/interfaces"
42+
"go.temporal.io/server/service/history/tasks"
4243
"go.temporal.io/server/service/history/workflow"
4344
"go.temporal.io/server/service/history/workflow/update"
4445
"google.golang.org/protobuf/proto"
@@ -658,6 +659,13 @@ func (handler *workflowTaskCompletedHandler) handleCommandRequestCancelActivity(
658659
return nil, err
659660
}
660661
handler.activityNotStartedCancelled = true
662+
} else if ai.WorkerInstanceKey != "" && handler.config.EnableActivityCancellationNexusTask() {
663+
// Activity has started and worker supports Nexus tasks - create cancel task.
664+
handler.mutableState.AddTasks(&tasks.CancelActivityNexusTask{
665+
WorkflowKey: handler.mutableState.GetWorkflowKey(),
666+
ScheduledEventIDs: []int64{ai.ScheduledEventId},
667+
WorkerInstanceKey: ai.WorkerInstanceKey,
668+
})
661669
}
662670
}
663671
return actCancelReqEvent, nil

service/history/configs/config.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -354,11 +354,12 @@ type Config struct {
354354
ESProcessorFlushInterval dynamicconfig.DurationPropertyFn
355355
ESProcessorAckTimeout dynamicconfig.DurationPropertyFn
356356

357-
EnableCrossNamespaceCommands dynamicconfig.BoolPropertyFn
358-
EnableActivityEagerExecution dynamicconfig.BoolPropertyFnWithNamespaceFilter
359-
EnableActivityRetryStampIncrement dynamicconfig.BoolPropertyFn
360-
EnableEagerWorkflowStart dynamicconfig.BoolPropertyFnWithNamespaceFilter
361-
NamespaceCacheRefreshInterval dynamicconfig.DurationPropertyFn
357+
EnableCrossNamespaceCommands dynamicconfig.BoolPropertyFn
358+
EnableActivityEagerExecution dynamicconfig.BoolPropertyFnWithNamespaceFilter
359+
EnableActivityRetryStampIncrement dynamicconfig.BoolPropertyFn
360+
EnableActivityCancellationNexusTask dynamicconfig.BoolPropertyFn
361+
EnableEagerWorkflowStart dynamicconfig.BoolPropertyFnWithNamespaceFilter
362+
NamespaceCacheRefreshInterval dynamicconfig.DurationPropertyFn
362363

363364
// ArchivalQueueProcessor settings
364365
ArchivalProcessorSchedulerWorkerCount dynamicconfig.TypedSubscribable[int]
@@ -721,11 +722,12 @@ func NewConfig(
721722
ESProcessorFlushInterval: dynamicconfig.WorkerESProcessorFlushInterval.Get(dc),
722723
ESProcessorAckTimeout: dynamicconfig.WorkerESProcessorAckTimeout.Get(dc),
723724

724-
EnableCrossNamespaceCommands: dynamicconfig.EnableCrossNamespaceCommands.Get(dc),
725-
EnableActivityEagerExecution: dynamicconfig.EnableActivityEagerExecution.Get(dc),
726-
EnableActivityRetryStampIncrement: dynamicconfig.EnableActivityRetryStampIncrement.Get(dc),
727-
EnableEagerWorkflowStart: dynamicconfig.EnableEagerWorkflowStart.Get(dc),
728-
NamespaceCacheRefreshInterval: dynamicconfig.NamespaceCacheRefreshInterval.Get(dc),
725+
EnableCrossNamespaceCommands: dynamicconfig.EnableCrossNamespaceCommands.Get(dc),
726+
EnableActivityEagerExecution: dynamicconfig.EnableActivityEagerExecution.Get(dc),
727+
EnableActivityRetryStampIncrement: dynamicconfig.EnableActivityRetryStampIncrement.Get(dc),
728+
EnableActivityCancellationNexusTask: dynamicconfig.EnableActivityCancellationNexusTask.Get(dc),
729+
EnableEagerWorkflowStart: dynamicconfig.EnableEagerWorkflowStart.Get(dc),
730+
NamespaceCacheRefreshInterval: dynamicconfig.NamespaceCacheRefreshInterval.Get(dc),
729731

730732
// Archival related
731733
ArchivalTaskBatchSize: dynamicconfig.ArchivalTaskBatchSize.Get(dc),

0 commit comments

Comments
 (0)