Skip to content

Commit a5e43dd

Browse files
committed
Store worker_control_task_queue in ActivityInfo
Add workerControlTaskQueue parameter to AddActivityTaskStartedEvent and persist it in ActivityInfo when an activity starts. This enables routing activity cancellation requests to the correct worker's control queue via Nexus. Changes: - Add worker_control_task_queue field to ActivityInfo proto - Update MutableState interface and implementation - Pass workerControlTaskQueue from poll request for regular activities - Pass from RespondWorkflowTaskCompleted request for eager activities - Update all test call sites
1 parent f74ef2e commit a5e43dd

File tree

13 files changed

+48
-12
lines changed

13 files changed

+48
-12
lines changed

api/persistence/v1/executions.pb.go

Lines changed: 14 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

proto/internal/temporal/server/api/persistence/v1/executions.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -633,6 +633,9 @@ message ActivityInfo {
633633

634634
// Unique identifier of the worker that is this activity.
635635
string worker_instance_key = 51;
636+
637+
// The task queue on which the server will send control tasks to the worker running this activity.
638+
string worker_control_task_queue = 52;
636639
}
637640

638641
// timer_map column

service/history/api/recordactivitytaskstarted/api.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ func recordActivityTaskStarted(
243243
ai, scheduledEventID, requestID, request.PollRequest.GetIdentity(),
244244
versioningStamp, pollerDeployment, request.GetBuildIdRedirectInfo(),
245245
request.PollRequest.GetWorkerInstanceKey(),
246+
request.PollRequest.GetWorkerControlTaskQueue(),
246247
); err != nil {
247248
return nil, rejectCodeUndefined, err
248249
}

service/history/api/respondactivitytaskcompleted/api.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ func Invoke(
9999
// other case where an activity starts?
100100
nil,
101101
"", // workerInstanceKey not available for force complete
102+
"", // workerControlTaskQueue not available for force complete
102103
)
103104
if err != nil {
104105
return nil, err

service/history/api/respondworkflowtaskcompleted/api.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,8 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
389389

390390
workflowTaskHandler := newWorkflowTaskCompletedHandler(
391391
request.GetIdentity(),
392+
request.GetWorkerInstanceKey(),
393+
request.GetWorkerControlTaskQueue(),
392394
completedEvent.GetEventId(), // If completedEvent is nil, then GetEventId() returns 0 and this value shouldn't be used in workflowTaskHandler.
393395
ms,
394396
updateRegistry,

service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ type (
5353

5454
workflowTaskCompletedHandler struct {
5555
identity string
56+
workerInstanceKey string
57+
workerControlTaskQueue string
5658
workflowTaskCompletedID int64
5759

5860
// internal state
@@ -104,6 +106,8 @@ type (
104106

105107
func newWorkflowTaskCompletedHandler(
106108
identity string,
109+
workerInstanceKey string,
110+
workerControlTaskQueue string,
107111
workflowTaskCompletedID int64,
108112
mutableState historyi.MutableState,
109113
updateRegistry update.Registry,
@@ -122,7 +126,9 @@ func newWorkflowTaskCompletedHandler(
122126
versionMembershipCache worker_versioning.VersionMembershipCache,
123127
) *workflowTaskCompletedHandler {
124128
return &workflowTaskCompletedHandler{
125-
identity: identity,
129+
identity: identity,
130+
workerInstanceKey: workerInstanceKey,
131+
workerControlTaskQueue: workerControlTaskQueue,
126132
workflowTaskCompletedID: workflowTaskCompletedID,
127133

128134
// internal state
@@ -559,7 +565,8 @@ func (handler *workflowTaskCompletedHandler) handlePostCommandEagerExecuteActivi
559565
stamp,
560566
nil,
561567
nil,
562-
"", // workerInstanceKey not available for eager dispatch
568+
handler.workerInstanceKey,
569+
handler.workerControlTaskQueue,
563570
); err != nil {
564571
return nil, err
565572
}

service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ func TestCommandProtocolMessage(t *testing.T) {
8484
)
8585
out.handler = newWorkflowTaskCompletedHandler( // 😲
8686
t.Name(), // identity
87+
"", // workerInstanceKey
88+
"", // workerControlTaskQueue
8789
123, // workflowTaskCompletedID
8890
out.ms,
8991
out.updates,

service/history/history_engine_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6679,6 +6679,7 @@ func addActivityTaskStartedEvent(ms historyi.MutableState, scheduledEventID int6
66796679
nil,
66806680
nil,
66816681
"",
6682+
"",
66826683
)
66836684
return event
66846685
}

service/history/interfaces/mutable_state.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ type (
5858
*deploymentpb.Deployment,
5959
*taskqueuespb.BuildIdRedirectInfo,
6060
string, // workerInstanceKey
61+
string, // workerControlTaskQueue
6162
) (*historypb.HistoryEvent, error)
6263
AddActivityTaskTimedOutEvent(int64, int64, *failurepb.Failure, enumspb.RetryState) (*historypb.HistoryEvent, error)
6364
AddChildWorkflowExecutionCanceledEvent(int64, *commonpb.WorkflowExecution, *historypb.WorkflowExecutionCanceledEventAttributes) (*historypb.HistoryEvent, error)

service/history/interfaces/mutable_state_mock.go

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)