Skip to content

Commit fa92bcc

Browse files
committed
Add unit tests for NotifyActivityTask batching and generation
1 parent 092d6b7 commit fa92bcc

File tree

3 files changed

+146
-0
lines changed

3 files changed

+146
-0
lines changed

common/testing/testvars/test_vars.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,15 @@ func (tv *TestVars) WorkerIdentity() string {
396396
return getOrCreate(tv, "worker_identity", tv.uniqueString, tv.stringNSetter)
397397
}
398398

399+
func (tv *TestVars) WorkerInstanceKey() string {
400+
return getOrCreate(tv, "worker_instance_key", tv.uniqueString, tv.stringNSetter)
401+
}
402+
403+
// ControlQueueName returns the Nexus task queue name used to deliver control tasks to this worker.
404+
func (tv *TestVars) ControlQueueName(ns string) string {
405+
return fmt.Sprintf("/temporal-sys/worker-commands/%s/%s", ns, tv.WorkerInstanceKey())
406+
}
407+
399408
func (tv *TestVars) TimerID() string {
400409
return getOrCreate(tv, "timer_id", tv.uniqueString, tv.stringNSetter)
401410
}

service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,3 +384,63 @@ func mustMarshalAny(t *testing.T, pb proto.Message) *anypb.Any {
384384
require.NoError(t, a.MarshalFrom(pb))
385385
return &a
386386
}
387+
388+
func TestFlushBatchedNotifyActivityTasks(t *testing.T) {
389+
t.Parallel()
390+
391+
t.Run("batches activities by control queue", func(t *testing.T) {
392+
ctrl := gomock.NewController(t)
393+
ms := historyi.NewMockMutableState(ctrl)
394+
395+
ms.EXPECT().AddNotifyActivityTasks(
396+
[]int64{5, 6, 7},
397+
"control-queue-1",
398+
gomock.Any(),
399+
).Return(nil).Times(1)
400+
401+
handler := &workflowTaskCompletedHandler{
402+
mutableState: ms,
403+
pendingActivityCancelsByControlQueue: map[string][]int64{
404+
"control-queue-1": {5, 6, 7},
405+
},
406+
}
407+
408+
err := handler.flushBatchedNotifyActivityTasks()
409+
require.NoError(t, err)
410+
})
411+
412+
t.Run("creates separate tasks for different control queues", func(t *testing.T) {
413+
ctrl := gomock.NewController(t)
414+
ms := historyi.NewMockMutableState(ctrl)
415+
416+
ms.EXPECT().AddNotifyActivityTasks(
417+
gomock.Any(),
418+
gomock.Any(),
419+
gomock.Any(),
420+
).Return(nil).Times(2)
421+
422+
handler := &workflowTaskCompletedHandler{
423+
mutableState: ms,
424+
pendingActivityCancelsByControlQueue: map[string][]int64{
425+
"control-queue-1": {5, 6},
426+
"control-queue-2": {7, 8},
427+
},
428+
}
429+
430+
err := handler.flushBatchedNotifyActivityTasks()
431+
require.NoError(t, err)
432+
})
433+
434+
t.Run("does nothing when no pending cancels", func(t *testing.T) {
435+
ctrl := gomock.NewController(t)
436+
ms := historyi.NewMockMutableState(ctrl)
437+
438+
handler := &workflowTaskCompletedHandler{
439+
mutableState: ms,
440+
pendingActivityCancelsByControlQueue: nil,
441+
}
442+
443+
err := handler.flushBatchedNotifyActivityTasks()
444+
require.NoError(t, err)
445+
})
446+
}

service/history/workflow/task_generator_test.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1066,3 +1066,80 @@ func TestTaskGeneratorImpl_GenerateDeleteHistoryEventTask_ActivityRetention(t *t
10661066
})
10671067
}
10681068
}
1069+
1070+
func TestGenerateNotifyActivityTasks(t *testing.T) {
1071+
t.Parallel()
1072+
1073+
testCases := []struct {
1074+
name string
1075+
featureEnabled bool
1076+
scheduledEventIDs []int64
1077+
controlQueue string
1078+
expectTask bool
1079+
}{
1080+
{
1081+
name: "creates task when enabled with valid inputs",
1082+
featureEnabled: true,
1083+
scheduledEventIDs: []int64{5, 6, 7},
1084+
controlQueue: "test-control-queue",
1085+
expectTask: true,
1086+
},
1087+
{
1088+
name: "no task when feature disabled",
1089+
featureEnabled: false,
1090+
scheduledEventIDs: []int64{5, 6, 7},
1091+
controlQueue: "test-control-queue",
1092+
expectTask: false,
1093+
},
1094+
{
1095+
name: "no task when scheduledEventIDs empty",
1096+
featureEnabled: true,
1097+
scheduledEventIDs: []int64{},
1098+
controlQueue: "test-control-queue",
1099+
expectTask: false,
1100+
},
1101+
{
1102+
name: "no task when controlQueue empty",
1103+
featureEnabled: true,
1104+
scheduledEventIDs: []int64{5, 6, 7},
1105+
controlQueue: "",
1106+
expectTask: false,
1107+
},
1108+
}
1109+
1110+
for _, tc := range testCases {
1111+
t.Run(tc.name, func(t *testing.T) {
1112+
ctrl := gomock.NewController(t)
1113+
1114+
mutableState := historyi.NewMockMutableState(ctrl)
1115+
mutableState.EXPECT().GetWorkflowKey().Return(definition.NewWorkflowKey(
1116+
tests.NamespaceID.String(), tests.WorkflowID, tests.RunID,
1117+
)).AnyTimes()
1118+
1119+
var capturedTasks []tasks.Task
1120+
if tc.expectTask {
1121+
mutableState.EXPECT().AddTasks(gomock.Any()).Do(func(ts ...tasks.Task) {
1122+
capturedTasks = append(capturedTasks, ts...)
1123+
}).Times(1)
1124+
}
1125+
1126+
cfg := &configs.Config{
1127+
EnableActivityCancellationNexusTask: func() bool { return tc.featureEnabled },
1128+
}
1129+
1130+
taskGenerator := NewTaskGenerator(nil, mutableState, cfg, nil, log.NewTestLogger())
1131+
err := taskGenerator.GenerateNotifyActivityTasks(tc.scheduledEventIDs, tc.controlQueue, enumsspb.ACTIVITY_NOTIFICATION_TYPE_CANCEL)
1132+
require.NoError(t, err)
1133+
1134+
if tc.expectTask {
1135+
require.Len(t, capturedTasks, 1)
1136+
notifyTask, ok := capturedTasks[0].(*tasks.NotifyActivityTask)
1137+
require.True(t, ok)
1138+
assert.Equal(t, tc.scheduledEventIDs, notifyTask.ScheduledEventIDs)
1139+
assert.Equal(t, tc.controlQueue, notifyTask.Destination)
1140+
assert.Equal(t, tests.NamespaceID.String(), notifyTask.NamespaceID)
1141+
assert.Equal(t, enumsspb.ACTIVITY_NOTIFICATION_TYPE_CANCEL, notifyTask.NotificationType)
1142+
}
1143+
})
1144+
}
1145+
}

0 commit comments

Comments
 (0)