diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index d98489eb..28e3ac3d 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -42,10 +42,10 @@ var ( TasksQueueActionDurationSeconds = "{PREFIX}tasks_queue_action_duration_seconds" // TasksQueueLength shows the current length of the task queue TasksQueueLength = "{PREFIX}tasks_queue_length" - // TasksQueueCompactionInQueueTasks tracks telemetry for queue compaction - TasksQueueCompactionInQueueTasks = "d8_telemetry_{PREFIX}tasks_queue_compaction_in_queue_tasks" - // TasksQueueCompactionReached tracks when queue compaction is reached - TasksQueueCompactionReached = "d8_telemetry_{PREFIX}tasks_queue_compaction_reached" + // TasksQueueCompactionOperationsTotal counts compaction operations per hook + TasksQueueCompactionOperationsTotal = "d8_telemetry_{PREFIX}tasks_queue_compaction_operations_total" + // TasksQueueCompactionTasksByHook shows the number of tasks in queue for each hook when count exceeds 20. Updated every 10 seconds. + TasksQueueCompactionTasksByHook = "d8_telemetry_{PREFIX}tasks_queue_compaction_tasks_by_hook" // ============================================================================ // Hook Execution Metrics @@ -103,8 +103,8 @@ func InitMetrics(prefix string) { // ============================================================================ TasksQueueActionDurationSeconds = ReplacePrefix(TasksQueueActionDurationSeconds, prefix) TasksQueueLength = ReplacePrefix(TasksQueueLength, prefix) - TasksQueueCompactionInQueueTasks = ReplacePrefix(TasksQueueCompactionInQueueTasks, prefix) - TasksQueueCompactionReached = ReplacePrefix(TasksQueueCompactionReached, prefix) + TasksQueueCompactionOperationsTotal = ReplacePrefix(TasksQueueCompactionOperationsTotal, prefix) + TasksQueueCompactionTasksByHook = ReplacePrefix(TasksQueueCompactionTasksByHook, prefix) // ============================================================================ // Hook Execution Metrics @@ -339,6 +339,27 @@ func RegisterTaskQueueMetrics(metricStorage metricsstorage.Storage) error { return fmt.Errorf("failed to register %s: %w", TasksQueueLength, err) } + // Compaction metrics labels + compactionLabels := []string{"queue_name", "hook"} + + // Register compaction operations counter + _, err = metricStorage.RegisterCounter( + TasksQueueCompactionOperationsTotal, compactionLabels, + options.WithHelp("Counter of compaction operations per hook"), + ) + if err != nil { + return fmt.Errorf("failed to register %s: %w", TasksQueueCompactionOperationsTotal, err) + } + + // Register compaction tasks by hook gauge + _, err = metricStorage.RegisterGauge( + TasksQueueCompactionTasksByHook, compactionLabels, + options.WithHelp("Gauge showing the number of tasks in queue for each hook when the count exceeds 20. Updated every 10 seconds by sampling the queue state."), + ) + if err != nil { + return fmt.Errorf("failed to register %s: %w", TasksQueueCompactionTasksByHook, err) + } + return nil } diff --git a/pkg/shell-operator/combine_binding_context_test.go b/pkg/shell-operator/combine_binding_context_test.go index 3d2f8bda..de69eae8 100644 --- a/pkg/shell-operator/combine_binding_context_test.go +++ b/pkg/shell-operator/combine_binding_context_test.go @@ -31,7 +31,7 @@ func Test_CombineBindingContext_MultipleHooks(t *testing.T) { }, labels) assert.Nil(t, buckets) }) - metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) { + metricStorage.GaugeSetMock.Optional().Set(func(_ string, _ float64, _ map[string]string) { }) TaskQueues := queue.NewTaskQueueSet().WithMetricStorage(metricStorage) @@ -141,7 +141,7 @@ func Test_CombineBindingContext_Nil_On_NoCombine(t *testing.T) { }, labels) assert.Nil(t, buckets) }) - metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) { + metricStorage.GaugeSetMock.Optional().Set(func(_ string, _ float64, _ map[string]string) { }) TaskQueues := queue.NewTaskQueueSet().WithMetricStorage(metricStorage) @@ -216,7 +216,7 @@ func Test_CombineBindingContext_Group_Compaction(t *testing.T) { }, labels) assert.Nil(t, buckets) }) - metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) { + metricStorage.GaugeSetMock.Optional().Set(func(_ string, _ float64, _ map[string]string) { }) TaskQueues := queue.NewTaskQueueSet().WithMetricStorage(metricStorage) @@ -334,7 +334,7 @@ func Test_CombineBindingContext_Group_Type(t *testing.T) { }, labels) assert.Nil(t, buckets) }) - metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) { + metricStorage.GaugeSetMock.Optional().Set(func(_ string, _ float64, _ map[string]string) { }) TaskQueues := queue.NewTaskQueueSet().WithMetricStorage(metricStorage) diff --git a/pkg/shell-operator/operator_test.go b/pkg/shell-operator/operator_test.go index 1ced1558..570c9ceb 100644 --- a/pkg/shell-operator/operator_test.go +++ b/pkg/shell-operator/operator_test.go @@ -32,7 +32,7 @@ func Test_Operator_startup_tasks(t *testing.T) { }, labels) assert.Nil(t, buckets) }) - metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) { + metricStorage.GaugeSetMock.Optional().Set(func(_ string, _ float64, _ map[string]string) { }) op := NewShellOperator(context.Background(), nil, nil, WithLogger(log.NewNop())) diff --git a/pkg/task/dump/dump_test.go b/pkg/task/dump/dump_test.go index efd404fc..92984cb0 100644 --- a/pkg/task/dump/dump_test.go +++ b/pkg/task/dump/dump_test.go @@ -95,7 +95,7 @@ func Test_Dump(t *testing.T) { assert.Contains(t, mapSlice, labels) assert.Nil(t, buckets) }) - metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) { + metricStorage.GaugeSetMock.Optional().Set(func(_ string, _ float64, _ map[string]string) { }) tqs := queue.NewTaskQueueSet().WithMetricStorage(metricStorage) diff --git a/pkg/task/queue/task_counter.go b/pkg/task/queue/task_counter.go index 78c729f7..6c08ed45 100644 --- a/pkg/task/queue/task_counter.go +++ b/pkg/task/queue/task_counter.go @@ -9,7 +9,9 @@ import ( "github.com/flant/shell-operator/pkg/task" ) -const taskCap = 100 +const ( + compactionMetricsThreshold = 20 +) type TaskCounter struct { mu sync.RWMutex @@ -53,21 +55,10 @@ func (tc *TaskCounter) Add(task task.Task) { } counter++ - tc.counter[id] = counter - tc.metricStorage.GaugeSet(metrics.TasksQueueCompactionInQueueTasks, float64(counter), map[string]string{ - "queue_name": tc.queueName, - "task_id": id, - }) - - if counter == taskCap { + if counter == compactionThreshold { tc.reachedCap[id] = struct{}{} - - tc.metricStorage.GaugeSet(metrics.TasksQueueCompactionReached, 1, map[string]string{ - "queue_name": tc.queueName, - "task_id": id, - }) } } @@ -84,20 +75,28 @@ func (tc *TaskCounter) Remove(task task.Task) { id := task.GetCompactionID() counter, ok := tc.counter[id] - if ok { - counter-- + if !ok { + delete(tc.reachedCap, id) + return } + if counter == 0 { + delete(tc.counter, id) + delete(tc.reachedCap, id) + return + } + + counter-- + if counter == 0 { delete(tc.counter, id) } else { tc.counter[id] = counter } - tc.metricStorage.GaugeSet(metrics.TasksQueueCompactionInQueueTasks, float64(counter), map[string]string{ - "queue_name": task.GetQueueName(), - "task_id": id, - }) + if counter < compactionThreshold { + delete(tc.reachedCap, id) + } } func (tc *TaskCounter) GetReachedCap() map[string]struct{} { @@ -118,12 +117,24 @@ func (tc *TaskCounter) ResetReachedCap() { tc.mu.Lock() defer tc.mu.Unlock() - for id := range tc.reachedCap { - tc.metricStorage.GaugeSet(metrics.TasksQueueCompactionReached, 0, map[string]string{ - "queue_name": tc.queueName, - "task_id": id, - }) + tc.reachedCap = make(map[string]struct{}, 32) +} + +// UpdateHookMetricsFromSnapshot updates metrics for all hooks based on a snapshot of hook counts. +// Only hooks with task count above the threshold are published to avoid metric cardinality explosion. +func (tc *TaskCounter) UpdateHookMetricsFromSnapshot(hookCounts map[string]uint) { + if tc.metricStorage == nil { + return } - tc.reachedCap = make(map[string]struct{}, 32) + // Update metrics only for hooks above threshold + for hookName, count := range hookCounts { + if count > compactionMetricsThreshold { + labels := map[string]string{ + "queue_name": tc.queueName, + "hook": hookName, + } + tc.metricStorage.GaugeSet(metrics.TasksQueueCompactionTasksByHook, float64(count), labels) + } + } } diff --git a/pkg/task/queue/task_counter_test.go b/pkg/task/queue/task_counter_test.go new file mode 100644 index 00000000..78bb230a --- /dev/null +++ b/pkg/task/queue/task_counter_test.go @@ -0,0 +1,99 @@ +package queue + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/flant/shell-operator/pkg/metric" + "github.com/flant/shell-operator/pkg/metrics" +) + +func TestTaskCounterUpdateHookMetricsFromSnapshot(t *testing.T) { + metricStorage := metric.NewStorageMock(t) + + type gaugeCall struct { + metric string + value float64 + labels map[string]string + } + + var ( + mu sync.Mutex + calls []gaugeCall + ) + + metricStorage.GaugeSetMock.Set(func(metric string, value float64, labels map[string]string) { + cloned := make(map[string]string, len(labels)) + for k, v := range labels { + cloned[k] = v + } + + mu.Lock() + calls = append(calls, gaugeCall{ + metric: metric, + value: value, + labels: cloned, + }) + mu.Unlock() + }) + + tc := NewTaskCounter("main", nil, metricStorage) + + // Simulate initial state with hooks above threshold by setting up a snapshot + initialSnapshot := map[string]uint{ + "hook1": 26, + "hook2": 31, + "hook3": 51, + } + tc.UpdateHookMetricsFromSnapshot(initialSnapshot) + + mu.Lock() + calls = nil // Clear previous calls + mu.Unlock() + + // Update with new snapshot where: + // - hook1 still has high count (25 tasks) + // - hook2 dropped below threshold (15 tasks) - should not be published + // - hook3 is completely gone (0 tasks in new snapshot) - should not be published + // - hook4 is new (30 tasks) + newSnapshot := map[string]uint{ + "hook1": 25, + "hook2": 15, + "hook4": 30, + } + + tc.UpdateHookMetricsFromSnapshot(newSnapshot) + + mu.Lock() + defer mu.Unlock() + + // Verify that metrics were set correctly + require.NotEmpty(t, calls) + + // Build a map of last call for each hook + lastCallByHook := make(map[string]gaugeCall) + for _, call := range calls { + if call.metric == metrics.TasksQueueCompactionTasksByHook { + hook := call.labels["hook"] + lastCallByHook[hook] = call + } + } + + // hook1: should have value 25 + require.Contains(t, lastCallByHook, "hook1") + require.Equal(t, float64(25), lastCallByHook["hook1"].value) + require.Equal(t, "main", lastCallByHook["hook1"].labels["queue_name"]) + + // hook2: should NOT be published (below threshold) + require.NotContains(t, lastCallByHook, "hook2") + + // hook3: should NOT be published (removed from snapshot and was above threshold before) + require.NotContains(t, lastCallByHook, "hook3") + + // hook4: should have value 30 + require.Contains(t, lastCallByHook, "hook4") + require.Equal(t, float64(30), lastCallByHook["hook4"].value) + require.Equal(t, "main", lastCallByHook["hook4"].labels["queue_name"]) +} diff --git a/pkg/task/queue/task_queue.go b/pkg/task/queue/task_queue.go index 97d81ef7..b5144a87 100644 --- a/pkg/task/queue/task_queue.go +++ b/pkg/task/queue/task_queue.go @@ -35,7 +35,10 @@ config parameter. This implementation uses container/list for O(1) queue operations and a map for O(1) task lookup by ID. */ -const compactionThreshold = 100 +const ( + compactionThreshold = 100 + metricsMonitoringUpdateInterval = 10 * time.Second +) var ( DefaultWaitLoopCheckInterval = 125 * time.Millisecond @@ -459,7 +462,7 @@ func (q *TaskQueue) compaction(compactionIDs map[string]struct{}) { }) // Second pass: merge with pooled slices - for _, group := range hookGroups { + for hookName, group := range hookGroups { if len(group.elementsToMerge) == 0 { continue } @@ -538,6 +541,14 @@ func (q *TaskQueue) compaction(compactionIDs map[string]struct{}) { withContext = targetMonitorIDsSetter.SetMonitorIDs(newMonitorIDs) targetTask.UpdateMetadata(withContext) + // Record compaction operation metric + if q.metricStorage != nil && len(group.elementsToMerge) > 0 { + q.metricStorage.CounterAdd(metrics.TasksQueueCompactionOperationsTotal, 1, map[string]string{ + "queue_name": q.Name, + "hook": hookName, + }) + } + // Call compaction callback if set if q.compactionCallback != nil && len(group.elementsToMerge) > 0 { compactedTasks := make([]task.Task, 0, len(group.elementsToMerge)) @@ -675,6 +686,62 @@ func (q *TaskQueue) Stop() { } } +// startMetricsMonitoring starts a goroutine that periodically updates compaction metrics +// by taking a snapshot of the queue and counting tasks per hook. +func (q *TaskQueue) startMetricsMonitoring() { + if q.metricStorage == nil || q.queueTasksCounter == nil { + return + } + + go func() { + // Update metrics immediately on start + q.updateCompactionMetrics() + + ticker := time.NewTicker(metricsMonitoringUpdateInterval) + defer ticker.Stop() + + for { + select { + case <-q.ctx.Done(): + return + case <-ticker.C: + q.updateCompactionMetrics() + } + } + }() +} + +// updateCompactionMetrics counts tasks by hook and updates metrics +func (q *TaskQueue) updateCompactionMetrics() { + hookCounts := make(map[string]uint) + + // Use IterateSnapshot to avoid holding locks during iteration + q.IterateSnapshot(func(t task.Task) { + // Only count compactable task types + if _, ok := q.compactableTypes[t.GetType()]; !ok { + return + } + + metadata := t.GetMetadata() + if isNil(metadata) { + return + } + + hookNameAccessor, ok := metadata.(task_metadata.HookNameAccessor) + if !ok { + return + } + + hookName := hookNameAccessor.GetHookName() + if hookName != "" { + hookCounts[hookName]++ + } + }) + + // Update metrics based on current state + q.queueTasksCounter.UpdateHookMetricsFromSnapshot(hookCounts) +} + // lazydebug evaluates args only if debug log is enabled. // It is used to avoid unnecessary allocations when logging is disabled. // Queue MUST remain fast and not allocate memory when logging is disabled. @@ -695,6 +762,9 @@ func (q *TaskQueue) Start(ctx context.Context) { return } + // Start metrics monitoring goroutine + q.startMetricsMonitoring() + go func() { q.SetStatus(QueueStatusIdle) var sleepDelay time.Duration diff --git a/pkg/task/queue/task_queue_benchmark_test.go b/pkg/task/queue/task_queue_benchmark_test.go index 7081cc7d..19db5c13 100644 --- a/pkg/task/queue/task_queue_benchmark_test.go +++ b/pkg/task/queue/task_queue_benchmark_test.go @@ -149,6 +149,8 @@ func newBenchmarkTasksQueue(b *testing.B) *TaskQueue { }) metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) { }) + metricStorage.CounterAddMock.Set(func(_ string, _ float64, _ map[string]string) { + }) return NewTasksQueue("test", metricStorage) } diff --git a/pkg/task/queue/task_queue_compaction_test.go b/pkg/task/queue/task_queue_compaction_test.go index f0b2934a..a81cbcd1 100644 --- a/pkg/task/queue/task_queue_compaction_test.go +++ b/pkg/task/queue/task_queue_compaction_test.go @@ -180,34 +180,53 @@ func (t *mockTask) GetCompactionID() string { return t.Id } +// newTestMetricStorage creates a metric storage mock with all required methods stubbed +func newTestMetricStorage(t *testing.T, expectCompaction bool) *metric.StorageMock { + metricStorage := metric.NewStorageMock(t) + metricStorage.HistogramObserveMock.Set(func(_ string, _ float64, _ map[string]string, _ []float64) { + }) + // CounterAdd is only called during actual compaction + if expectCompaction { + metricStorage.CounterAddMock.Set(func(_ string, _ float64, _ map[string]string) { + }) + } + // Note: GaugeSet is no longer called during compaction tests since metrics + // are updated asynchronously in a separate goroutine that runs with Start() + return metricStorage +} + func TestTaskQueueList_AddLast_GreedyMerge(t *testing.T) { tests := []struct { - name string - initialQueue []task.Task - taskToAdd task.Task - expectedIDs []string - expectedBCs map[string]string // map[taskID] -> expected number of binding contexts + name string + initialQueue []task.Task + taskToAdd task.Task + expectedIDs []string + expectedBCs map[string]string // map[taskID] -> expected number of binding contexts + expectCompaction bool // whether compaction is expected to happen }{ { - name: "Simple merge into last task", - initialQueue: []task.Task{newHookTask("h1_A", "hook-1")}, - taskToAdd: newHookTask("h1_B", "hook-1"), - expectedIDs: []string{"h1_A"}, - expectedBCs: map[string]string{"h1_A": "bc_for_h1_B"}, + name: "Simple merge into last task", + initialQueue: []task.Task{newHookTask("h1_A", "hook-1")}, + taskToAdd: newHookTask("h1_B", "hook-1"), + expectedIDs: []string{"h1_A"}, + expectedBCs: map[string]string{"h1_A": "bc_for_h1_B"}, + expectCompaction: true, }, { - name: "No merge for different hook", - initialQueue: []task.Task{newHookTask("h1_A", "hook-1")}, - taskToAdd: newHookTask("h2_B", "hook-2"), - expectedIDs: []string{"h1_A", "h2_B"}, - expectedBCs: map[string]string{"h1_A": "bc_for_h1_A", "h2_B": "bc_for_h2_B"}, + name: "No merge for different hook", + initialQueue: []task.Task{newHookTask("h1_A", "hook-1")}, + taskToAdd: newHookTask("h2_B", "hook-2"), + expectedIDs: []string{"h1_A", "h2_B"}, + expectedBCs: map[string]string{"h1_A": "bc_for_h1_A", "h2_B": "bc_for_h2_B"}, + expectCompaction: false, }, { - name: "Greedy merge over a different hook task", - initialQueue: []task.Task{newHookTask("h1_A", "hook-1"), newHookTask("h2_B", "hook-2")}, - taskToAdd: newHookTask("h1_C", "hook-1"), - expectedIDs: []string{"h1_A", "h2_B"}, - expectedBCs: map[string]string{"h1_A": "bc_for_h1_C", "h2_B": "bc_for_h2_B"}, + name: "Greedy merge over a different hook task", + initialQueue: []task.Task{newHookTask("h1_A", "hook-1"), newHookTask("h2_B", "hook-2")}, + taskToAdd: newHookTask("h1_C", "hook-1"), + expectedIDs: []string{"h1_A", "h2_B"}, + expectedBCs: map[string]string{"h1_A": "bc_for_h1_C", "h2_B": "bc_for_h2_B"}, + expectCompaction: true, }, { name: "Do not merge into a processing task, add new", @@ -216,9 +235,10 @@ func TestTaskQueueList_AddLast_GreedyMerge(t *testing.T) { t.SetProcessing(true) return t }()}, - taskToAdd: newHookTask("h1_B", "hook-1"), - expectedIDs: []string{"h1_A", "h1_B"}, - expectedBCs: map[string]string{"h1_A": "bc_for_h1_A", "h1_B": "bc_for_h1_B"}, + taskToAdd: newHookTask("h1_B", "hook-1"), + expectedIDs: []string{"h1_A", "h1_B"}, + expectedBCs: map[string]string{"h1_A": "bc_for_h1_A", "h1_B": "bc_for_h1_B"}, + expectCompaction: false, }, { name: "Merge into the second pile, not the processing one", @@ -230,9 +250,10 @@ func TestTaskQueueList_AddLast_GreedyMerge(t *testing.T) { }(), newHookTask("h1_B", "hook-1"), }, - taskToAdd: newHookTask("h1_C", "hook-1"), - expectedIDs: []string{"h1_A", "h1_B"}, - expectedBCs: map[string]string{"h1_A": "bc_for_h1_A", "h1_B": "bc_for_h1_C"}, + taskToAdd: newHookTask("h1_C", "hook-1"), + expectedIDs: []string{"h1_A", "h1_B"}, + expectedBCs: map[string]string{"h1_A": "bc_for_h1_A", "h1_B": "bc_for_h1_C"}, + expectCompaction: true, }, { name: "Merge over a processing task of the same kind", @@ -244,23 +265,26 @@ func TestTaskQueueList_AddLast_GreedyMerge(t *testing.T) { return t }(), }, - taskToAdd: newHookTask("h1_C", "hook-1"), - expectedIDs: []string{"h1_A", "h1_B"}, - expectedBCs: map[string]string{"h1_A": "bc_for_h1_C", "h1_B": "bc_for_h1_B"}, + taskToAdd: newHookTask("h1_C", "hook-1"), + expectedIDs: []string{"h1_A", "h1_B"}, + expectedBCs: map[string]string{"h1_A": "bc_for_h1_C", "h1_B": "bc_for_h1_B"}, + expectCompaction: true, }, { - name: "Add service task, no merge", - initialQueue: []task.Task{newHookTask("h1_A", "hook-1")}, - taskToAdd: newServiceTask("service_B"), - expectedIDs: []string{"h1_A", "service_B"}, - expectedBCs: map[string]string{"h1_A": "bc_for_h1_A"}, + name: "Add service task, no merge", + initialQueue: []task.Task{newHookTask("h1_A", "hook-1")}, + taskToAdd: newServiceTask("service_B"), + expectedIDs: []string{"h1_A", "service_B"}, + expectedBCs: map[string]string{"h1_A": "bc_for_h1_A"}, + expectCompaction: false, }, { - name: "Merge hook task over a service task", - initialQueue: []task.Task{newHookTask("h1_A", "hook-1"), newServiceTask("service_B")}, - taskToAdd: newHookTask("h1_C", "hook-1"), - expectedIDs: []string{"h1_A", "service_B"}, - expectedBCs: map[string]string{"h1_A": "bc_for_h1_C"}, + name: "Merge hook task over a service task", + initialQueue: []task.Task{newHookTask("h1_A", "hook-1"), newServiceTask("service_B")}, + taskToAdd: newHookTask("h1_C", "hook-1"), + expectedIDs: []string{"h1_A", "service_B"}, + expectedBCs: map[string]string{"h1_A": "bc_for_h1_C"}, + expectCompaction: true, }, { name: "Greedy merge should compact the entire queue", @@ -284,16 +308,13 @@ func TestTaskQueueList_AddLast_GreedyMerge(t *testing.T) { "h1_B": "bc_for_h1_E", // own (dropped) + h1_C (dropped) + h1_D (dropped) + h1_E (latest kept) "h2_A": "bc_for_h2_B", // own (dropped) + h2_B (latest kept) }, + expectCompaction: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - metricStorage := metric.NewStorageMock(t) - metricStorage.HistogramObserveMock.Set(func(_ string, _ float64, _ map[string]string, _ []float64) { - }) - metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) { - }) + metricStorage := newTestMetricStorage(t, tt.expectCompaction) q := NewTasksQueue("test", metricStorage, WithCompactableTypes(task_metadata.HookRun)) diff --git a/pkg/task/queue/task_queue_requeue_test.go b/pkg/task/queue/task_queue_requeue_test.go index 57a87468..fd0e01cf 100644 --- a/pkg/task/queue/task_queue_requeue_test.go +++ b/pkg/task/queue/task_queue_requeue_test.go @@ -21,15 +21,17 @@ func Test_TaskQueueList_Requeue(t *testing.T) { metricStorage := metric.NewStorageMock(t) metricStorage.HistogramObserveMock.Set(func(metric string, value float64, labels map[string]string, buckets []float64) { - assert.Equal(t, metric, metrics.TasksQueueActionDurationSeconds) - assert.NotZero(t, value) - assert.Equal(t, map[string]string{ - "queue_action": "AddLast", - "queue_name": "requeue-test-queue", - }, labels) - assert.Nil(t, buckets) + // Optional: only validate if it's the AddLast action + if labels["queue_action"] == "AddLast" { + assert.Equal(t, metric, metrics.TasksQueueActionDurationSeconds) + assert.NotZero(t, value) + } + }) + metricStorage.GaugeSetMock.Optional().Set(func(_ string, _ float64, _ map[string]string) { + // Optional: accept any gauge set calls }) - metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) { + metricStorage.CounterAddMock.Optional().Set(func(_ string, _ float64, _ map[string]string) { + // Optional: accept any counter add calls }) // A channel to control when RequeueTask can finish.