Skip to content

Commit 9eea93d

Browse files
committed
Define CancelActivityNexusTask transfer task type
1 parent 07a4d60 commit 9eea93d

File tree

17 files changed

+950
-727
lines changed

17 files changed

+950
-727
lines changed

api/enums/v1/task.go-helpers.pb.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/enums/v1/task.pb.go

Lines changed: 9 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/persistence/v1/executions.pb.go

Lines changed: 288 additions & 211 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/dynamicconfig/constants.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,11 @@ config as the other services.`,
189189
false,
190190
`EnableActivityEagerExecution indicates if activity eager execution is enabled per namespace`,
191191
)
192+
EnableActivityCancellationNexusTask = NewGlobalBoolSetting(
193+
"system.enableActivityCancellationNexusTask",
194+
false,
195+
`EnableActivityCancellationNexusTask enables pushing activity cancellation to workers via Nexus task`,
196+
)
192197
NamespaceMinRetentionGlobal = NewGlobalDurationSetting(
193198
"system.namespaceMinRetentionGlobal",
194199
24*time.Hour,

common/persistence/serialization/task_serializers.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ func serializeTransferTask(
3838
transferTask = transferDeleteExecutionTaskToProto(task)
3939
case *tasks.ChasmTask:
4040
transferTask = transferChasmTaskToProto(task)
41+
case *tasks.CancelActivityNexusTask:
42+
transferTask = transferCancelActivityNexusTaskToProto(task)
4143
default:
4244
return nil, serviceerror.NewInternalf("Unknown transfer task type: %v", task)
4345
}
@@ -86,6 +88,8 @@ func deserializeTransferTask(
8688
task = transferDeleteExecutionTaskFromProto(transferTask)
8789
case enumsspb.TASK_TYPE_CHASM:
8890
task = transferChasmTaskFromProto(transferTask)
91+
case enumsspb.TASK_TYPE_TRANSFER_CANCEL_ACTIVITY_NEXUS:
92+
task = transferCancelActivityNexusTaskFromProto(transferTask)
8993
default:
9094
return nil, serviceerror.NewInternalf("Unknown transfer task type: %v", transferTask.TaskType)
9195
}
@@ -106,6 +110,40 @@ func transferChasmTaskFromProto(task *persistencespb.TransferTaskInfo) tasks.Tas
106110
}
107111
}
108112

113+
func transferCancelActivityNexusTaskToProto(task *tasks.CancelActivityNexusTask) *persistencespb.TransferTaskInfo {
114+
return &persistencespb.TransferTaskInfo{
115+
NamespaceId: task.WorkflowKey.NamespaceID,
116+
WorkflowId: task.WorkflowKey.WorkflowID,
117+
RunId: task.WorkflowKey.RunID,
118+
TaskId: task.TaskID,
119+
TaskType: task.GetType(),
120+
Version: task.Version,
121+
VisibilityTime: timestamppb.New(task.VisibilityTimestamp),
122+
TaskDetails: &persistencespb.TransferTaskInfo_CancelActivityNexusTaskDetails_{
123+
CancelActivityNexusTaskDetails: &persistencespb.TransferTaskInfo_CancelActivityNexusTaskDetails{
124+
ScheduledEventIds: task.ScheduledEventIDs,
125+
WorkerInstanceKey: task.WorkerInstanceKey,
126+
},
127+
},
128+
}
129+
}
130+
131+
func transferCancelActivityNexusTaskFromProto(task *persistencespb.TransferTaskInfo) tasks.Task {
132+
details := task.GetCancelActivityNexusTaskDetails()
133+
return &tasks.CancelActivityNexusTask{
134+
WorkflowKey: definition.NewWorkflowKey(
135+
task.NamespaceId,
136+
task.WorkflowId,
137+
task.RunId,
138+
),
139+
VisibilityTimestamp: task.VisibilityTime.AsTime(),
140+
TaskID: task.TaskId,
141+
Version: task.Version,
142+
ScheduledEventIDs: details.GetScheduledEventIds(),
143+
WorkerInstanceKey: details.GetWorkerInstanceKey(),
144+
}
145+
}
146+
109147
func serializeTimerTask(
110148
encoder Encoder,
111149
task tasks.Task,

go.mod

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ require (
111111
github.com/go-logr/logr v1.4.2 // indirect
112112
github.com/go-logr/stdr v1.2.2 // indirect
113113
github.com/gogo/protobuf v1.3.2 // indirect
114-
github.com/golang/mock v1.6.0 // indirect
114+
github.com/golang/mock v1.6.0
115115
github.com/golang/snappy v0.0.4 // indirect
116116
github.com/google/s2a-go v0.1.9 // indirect
117117
github.com/googleapis/enterprise-certificate-proxy v0.3.5 // indirect
@@ -171,4 +171,3 @@ require (
171171
modernc.org/mathutil v1.7.1 // indirect
172172
modernc.org/memory v1.11.0 // indirect
173173
)
174-

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,8 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC
373373
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
374374
go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4=
375375
go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4=
376+
go.temporal.io/api v1.61.1-0.20260128230845-c246540cf2ed h1:g3CgsK5BXL2rQy0ZIJVRpNUDdtPM1y4bGv5ZoKsqR74=
377+
go.temporal.io/api v1.61.1-0.20260128230845-c246540cf2ed/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
376378
go.temporal.io/sdk v1.38.0 h1:4Bok5LEdED7YKpsSjIa3dDqram5VOq+ydBf4pyx0Wo4=
377379
go.temporal.io/sdk v1.38.0/go.mod h1:a+R2Ej28ObvHoILbHaxMyind7M6D+W0L7edt5UJF4SE=
378380
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=

proto/internal/temporal/server/api/enums/v1/task.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ enum TaskType {
5959

6060
// A task with side effects generated by a CHASM component.
6161
TASK_TYPE_CHASM = 33;
62+
63+
// A task to cancel a running activity via Nexus control queue.
64+
TASK_TYPE_TRANSFER_CANCEL_ACTIVITY_NEXUS = 34;
6265
}
6366

6467
// TaskPriority is only used for replication task as of May 2024

proto/internal/temporal/server/api/persistence/v1/executions.proto

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,11 +349,21 @@ message TransferTaskInfo {
349349
// by some other task, so this task doesn't need to worry about it.
350350
bool can_skip_visibility_archival = 1;
351351
}
352+
353+
// Details for a Nexus task that cancels activities belonging to a specific worker.
354+
message CancelActivityNexusTaskDetails {
355+
// Scheduled event IDs of activities to cancel.
356+
repeated int64 scheduled_event_ids = 1;
357+
string worker_instance_key = 2;
358+
}
359+
352360
oneof task_details {
353361
CloseExecutionTaskDetails close_execution_task_details = 16;
354362

355363
// If the task addresses a CHASM component, this field will be set.
356364
ChasmTaskInfo chasm_task_info = 18;
365+
366+
CancelActivityNexusTaskDetails cancel_activity_nexus_task_details = 19;
357367
}
358368
// Stamp represents the "version" of the entity's internal state for which the transfer task was created.
359369
// It increases monotonically when the entity's options are modified.

service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"go.temporal.io/server/service/history/api"
4040
"go.temporal.io/server/service/history/configs"
4141
historyi "go.temporal.io/server/service/history/interfaces"
42+
"go.temporal.io/server/service/history/tasks"
4243
"go.temporal.io/server/service/history/workflow"
4344
"go.temporal.io/server/service/history/workflow/update"
4445
"google.golang.org/protobuf/proto"
@@ -658,6 +659,13 @@ func (handler *workflowTaskCompletedHandler) handleCommandRequestCancelActivity(
658659
return nil, err
659660
}
660661
handler.activityNotStartedCancelled = true
662+
} else if ai.WorkerInstanceKey != "" && handler.config.EnableActivityCancellationNexusTask() {
663+
// Activity has started and worker supports Nexus tasks - create cancel task.
664+
handler.mutableState.AddTasks(&tasks.CancelActivityNexusTask{
665+
WorkflowKey: handler.mutableState.GetWorkflowKey(),
666+
ScheduledEventIDs: []int64{ai.ScheduledEventId},
667+
WorkerInstanceKey: ai.WorkerInstanceKey,
668+
})
661669
}
662670
}
663671
return actCancelReqEvent, nil

0 commit comments

Comments
 (0)