Skip to content

Commit d402868

Browse files
authored
Merge branch 'kannan/activity-cancel/persist-worker-key' into kannan/activity-cancel/task-definition
2 parents 835d5bf + e66ae7e commit d402868

File tree

4 files changed

+87
-14
lines changed

4 files changed

+87
-14
lines changed

api/persistence/v1/executions.pb.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

proto/internal/temporal/server/api/persistence/v1/executions.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -641,7 +641,7 @@ message ActivityInfo {
641641

642642
int64 start_version = 50;
643643

644-
// Worker instance key of the worker executing this activity.
644+
// Unique identifier of the worker that is this activity.
645645
string worker_instance_key = 51;
646646
}
647647

service/history/api/respondactivitytaskcompleted/api.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -88,18 +88,18 @@ func Invoke(
8888
// we need to force complete an activity
8989
fabricateStartedEvent = ai.StartedEventId == common.EmptyEventID
9090
if fabricateStartedEvent {
91-
_, err := mutableState.AddActivityTaskStartedEvent(
92-
ai,
93-
scheduledEventID,
94-
"",
95-
req.GetCompleteRequest().GetIdentity(),
96-
nil,
97-
nil,
98-
// TODO (shahab): do we need to do anything with wf redirect in this case or any
99-
// other case where an activity starts?
100-
nil,
101-
"", // workerInstanceKey not available for force complete
102-
)
91+
_, err := mutableState.AddActivityTaskStartedEvent(
92+
ai,
93+
scheduledEventID,
94+
"",
95+
req.GetCompleteRequest().GetIdentity(),
96+
nil,
97+
nil,
98+
// TODO (shahab): do we need to do anything with wf redirect in this case or any
99+
// other case where an activity starts?
100+
nil,
101+
"", // workerInstanceKey not available for force complete
102+
)
103103
if err != nil {
104104
return nil, err
105105
}

service/history/workflow/mutable_state_impl_test.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6112,3 +6112,76 @@ func (s *mutableStateSuite) TestCHASMNodeSize() {
61126112
expectedTotalSize += len(newNodeKey) + newNode.Size()
61136113
s.Equal(expectedTotalSize, mutableState.GetApproximatePersistedSize())
61146114
}
6115+
6116+
func (s *mutableStateSuite) TestAddActivityTaskStartedEventStoresWorkerInstanceKey() {
6117+
s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes()
6118+
6119+
// Setup workflow execution
6120+
_, err := s.mutableState.AddWorkflowExecutionStartedEvent(
6121+
&commonpb.WorkflowExecution{WorkflowId: tests.WorkflowID, RunId: tests.RunID},
6122+
&historyservice.StartWorkflowExecutionRequest{
6123+
NamespaceId: tests.NamespaceID.String(),
6124+
StartRequest: &workflowservice.StartWorkflowExecutionRequest{
6125+
WorkflowType: &commonpb.WorkflowType{Name: "workflow-type"},
6126+
TaskQueue: &taskqueuepb.TaskQueue{Name: "task-queue"},
6127+
WorkflowRunTimeout: durationpb.New(200 * time.Second),
6128+
WorkflowTaskTimeout: durationpb.New(1 * time.Second),
6129+
},
6130+
},
6131+
)
6132+
s.NoError(err)
6133+
6134+
di, err := s.mutableState.AddWorkflowTaskScheduledEvent(false, enumsspb.WORKFLOW_TASK_TYPE_NORMAL)
6135+
s.NoError(err)
6136+
_, _, err = s.mutableState.AddWorkflowTaskStartedEvent(
6137+
di.ScheduledEventID,
6138+
di.RequestID,
6139+
di.TaskQueue,
6140+
"identity",
6141+
nil,
6142+
nil,
6143+
nil,
6144+
false,
6145+
nil,
6146+
)
6147+
s.NoError(err)
6148+
_, err = s.mutableState.AddWorkflowTaskCompletedEvent(
6149+
di,
6150+
&workflowservice.RespondWorkflowTaskCompletedRequest{Identity: "identity"},
6151+
workflowTaskCompletionLimits,
6152+
)
6153+
s.NoError(err)
6154+
6155+
// Schedule activity
6156+
workflowTaskCompletedEventID := int64(4)
6157+
_, activityInfo, err := s.mutableState.AddActivityTaskScheduledEvent(
6158+
workflowTaskCompletedEventID,
6159+
&commandpb.ScheduleActivityTaskCommandAttributes{
6160+
ActivityId: "test-activity-1",
6161+
ActivityType: &commonpb.ActivityType{Name: "test-activity-type"},
6162+
TaskQueue: &taskqueuepb.TaskQueue{Name: "test-task-queue"},
6163+
},
6164+
false,
6165+
)
6166+
s.NoError(err)
6167+
s.Empty(activityInfo.WorkerInstanceKey, "WorkerInstanceKey should be empty before activity starts")
6168+
6169+
// Start activity with workerInstanceKey
6170+
expectedWorkerInstanceKey := "test-worker-instance-key-12345"
6171+
_, err = s.mutableState.AddActivityTaskStartedEvent(
6172+
activityInfo,
6173+
activityInfo.ScheduledEventId,
6174+
uuid.NewString(),
6175+
"worker-identity",
6176+
nil,
6177+
nil,
6178+
nil,
6179+
expectedWorkerInstanceKey,
6180+
)
6181+
s.NoError(err)
6182+
6183+
// Verify workerInstanceKey is stored
6184+
updatedActivityInfo, ok := s.mutableState.GetActivityInfo(activityInfo.ScheduledEventId)
6185+
s.True(ok)
6186+
s.Equal(expectedWorkerInstanceKey, updatedActivityInfo.WorkerInstanceKey)
6187+
}

0 commit comments

Comments
 (0)