Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ var (
TasksQueueActionDurationSeconds = "{PREFIX}tasks_queue_action_duration_seconds"
// TasksQueueLength shows the current length of the task queue
TasksQueueLength = "{PREFIX}tasks_queue_length"
// TasksQueueCompactionInQueueTasks tracks telemetry for queue compaction
TasksQueueCompactionInQueueTasks = "d8_telemetry_{PREFIX}tasks_queue_compaction_in_queue_tasks"
// TasksQueueCompactionReached tracks when queue compaction is reached
TasksQueueCompactionReached = "d8_telemetry_{PREFIX}tasks_queue_compaction_reached"
// TasksQueueCompactionOperationsTotal counts compaction operations per hook
TasksQueueCompactionOperationsTotal = "d8_telemetry_{PREFIX}tasks_queue_compaction_operations_total"
// TasksQueueCompactionTasksByHook shows the number of tasks in queue for each hook when count exceeds 20. Updated every 10 seconds.
TasksQueueCompactionTasksByHook = "d8_telemetry_{PREFIX}tasks_queue_compaction_tasks_by_hook"

// ============================================================================
// Hook Execution Metrics
Expand Down Expand Up @@ -103,8 +103,8 @@ func InitMetrics(prefix string) {
// ============================================================================
TasksQueueActionDurationSeconds = ReplacePrefix(TasksQueueActionDurationSeconds, prefix)
TasksQueueLength = ReplacePrefix(TasksQueueLength, prefix)
TasksQueueCompactionInQueueTasks = ReplacePrefix(TasksQueueCompactionInQueueTasks, prefix)
TasksQueueCompactionReached = ReplacePrefix(TasksQueueCompactionReached, prefix)
TasksQueueCompactionOperationsTotal = ReplacePrefix(TasksQueueCompactionOperationsTotal, prefix)
TasksQueueCompactionTasksByHook = ReplacePrefix(TasksQueueCompactionTasksByHook, prefix)

// ============================================================================
// Hook Execution Metrics
Expand Down Expand Up @@ -339,6 +339,27 @@ func RegisterTaskQueueMetrics(metricStorage metricsstorage.Storage) error {
return fmt.Errorf("failed to register %s: %w", TasksQueueLength, err)
}

// Compaction metrics labels
compactionLabels := []string{"queue_name", "hook"}

// Register compaction operations counter
_, err = metricStorage.RegisterCounter(
TasksQueueCompactionOperationsTotal, compactionLabels,
options.WithHelp("Counter of compaction operations per hook"),
)
if err != nil {
return fmt.Errorf("failed to register %s: %w", TasksQueueCompactionOperationsTotal, err)
}

// Register compaction tasks by hook gauge
_, err = metricStorage.RegisterGauge(
TasksQueueCompactionTasksByHook, compactionLabels,
options.WithHelp("Gauge showing the number of tasks in queue for each hook when the count exceeds 20. Updated every 10 seconds by sampling the queue state."),
)
if err != nil {
return fmt.Errorf("failed to register %s: %w", TasksQueueCompactionTasksByHook, err)
}

return nil
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/shell-operator/combine_binding_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func Test_CombineBindingContext_MultipleHooks(t *testing.T) {
}, labels)
assert.Nil(t, buckets)
})
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {
metricStorage.GaugeSetMock.Optional().Set(func(_ string, _ float64, _ map[string]string) {
})

TaskQueues := queue.NewTaskQueueSet().WithMetricStorage(metricStorage)
Expand Down Expand Up @@ -141,7 +141,7 @@ func Test_CombineBindingContext_Nil_On_NoCombine(t *testing.T) {
}, labels)
assert.Nil(t, buckets)
})
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {
metricStorage.GaugeSetMock.Optional().Set(func(_ string, _ float64, _ map[string]string) {
})

TaskQueues := queue.NewTaskQueueSet().WithMetricStorage(metricStorage)
Expand Down Expand Up @@ -216,7 +216,7 @@ func Test_CombineBindingContext_Group_Compaction(t *testing.T) {
}, labels)
assert.Nil(t, buckets)
})
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {
metricStorage.GaugeSetMock.Optional().Set(func(_ string, _ float64, _ map[string]string) {
})

TaskQueues := queue.NewTaskQueueSet().WithMetricStorage(metricStorage)
Expand Down Expand Up @@ -334,7 +334,7 @@ func Test_CombineBindingContext_Group_Type(t *testing.T) {
}, labels)
assert.Nil(t, buckets)
})
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {
metricStorage.GaugeSetMock.Optional().Set(func(_ string, _ float64, _ map[string]string) {
})

TaskQueues := queue.NewTaskQueueSet().WithMetricStorage(metricStorage)
Expand Down
2 changes: 1 addition & 1 deletion pkg/shell-operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func Test_Operator_startup_tasks(t *testing.T) {
}, labels)
assert.Nil(t, buckets)
})
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {
metricStorage.GaugeSetMock.Optional().Set(func(_ string, _ float64, _ map[string]string) {
})

op := NewShellOperator(context.Background(), nil, nil, WithLogger(log.NewNop()))
Expand Down
2 changes: 1 addition & 1 deletion pkg/task/dump/dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func Test_Dump(t *testing.T) {
assert.Contains(t, mapSlice, labels)
assert.Nil(t, buckets)
})
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {
metricStorage.GaugeSetMock.Optional().Set(func(_ string, _ float64, _ map[string]string) {
})

tqs := queue.NewTaskQueueSet().WithMetricStorage(metricStorage)
Expand Down
61 changes: 36 additions & 25 deletions pkg/task/queue/task_counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"github.com/flant/shell-operator/pkg/task"
)

const taskCap = 100
const (
compactionMetricsThreshold = 20
)

type TaskCounter struct {
mu sync.RWMutex
Expand Down Expand Up @@ -53,21 +55,10 @@ func (tc *TaskCounter) Add(task task.Task) {
}

counter++

tc.counter[id] = counter

tc.metricStorage.GaugeSet(metrics.TasksQueueCompactionInQueueTasks, float64(counter), map[string]string{
"queue_name": tc.queueName,
"task_id": id,
})

if counter == taskCap {
if counter == compactionThreshold {
tc.reachedCap[id] = struct{}{}

tc.metricStorage.GaugeSet(metrics.TasksQueueCompactionReached, 1, map[string]string{
"queue_name": tc.queueName,
"task_id": id,
})
}
}

Expand All @@ -84,20 +75,28 @@ func (tc *TaskCounter) Remove(task task.Task) {
id := task.GetCompactionID()

counter, ok := tc.counter[id]
if ok {
counter--
if !ok {
delete(tc.reachedCap, id)
return
}

if counter == 0 {
delete(tc.counter, id)
delete(tc.reachedCap, id)
return
}

counter--

if counter == 0 {
delete(tc.counter, id)
} else {
tc.counter[id] = counter
}

tc.metricStorage.GaugeSet(metrics.TasksQueueCompactionInQueueTasks, float64(counter), map[string]string{
"queue_name": task.GetQueueName(),
"task_id": id,
})
if counter < compactionThreshold {
delete(tc.reachedCap, id)
}
}

func (tc *TaskCounter) GetReachedCap() map[string]struct{} {
Expand All @@ -118,12 +117,24 @@ func (tc *TaskCounter) ResetReachedCap() {
tc.mu.Lock()
defer tc.mu.Unlock()

for id := range tc.reachedCap {
tc.metricStorage.GaugeSet(metrics.TasksQueueCompactionReached, 0, map[string]string{
"queue_name": tc.queueName,
"task_id": id,
})
tc.reachedCap = make(map[string]struct{}, 32)
}

// UpdateHookMetricsFromSnapshot updates metrics for all hooks based on a snapshot of hook counts.
// Only hooks with task count above the threshold are published to avoid metric cardinality explosion.
func (tc *TaskCounter) UpdateHookMetricsFromSnapshot(hookCounts map[string]uint) {
if tc.metricStorage == nil {
return
}

tc.reachedCap = make(map[string]struct{}, 32)
// Update metrics only for hooks above threshold
for hookName, count := range hookCounts {
if count > compactionMetricsThreshold {
labels := map[string]string{
"queue_name": tc.queueName,
"hook": hookName,
}
tc.metricStorage.GaugeSet(metrics.TasksQueueCompactionTasksByHook, float64(count), labels)
}
}
}
99 changes: 99 additions & 0 deletions pkg/task/queue/task_counter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package queue

import (
"sync"
"testing"

"github.com/stretchr/testify/require"

"github.com/flant/shell-operator/pkg/metric"
"github.com/flant/shell-operator/pkg/metrics"
)

func TestTaskCounterUpdateHookMetricsFromSnapshot(t *testing.T) {
metricStorage := metric.NewStorageMock(t)

type gaugeCall struct {
metric string
value float64
labels map[string]string
}

var (
mu sync.Mutex
calls []gaugeCall
)

metricStorage.GaugeSetMock.Set(func(metric string, value float64, labels map[string]string) {
cloned := make(map[string]string, len(labels))
for k, v := range labels {
cloned[k] = v
}

mu.Lock()
calls = append(calls, gaugeCall{
metric: metric,
value: value,
labels: cloned,
})
mu.Unlock()
})

tc := NewTaskCounter("main", nil, metricStorage)

// Simulate initial state with hooks above threshold by setting up a snapshot
initialSnapshot := map[string]uint{
"hook1": 26,
"hook2": 31,
"hook3": 51,
}
tc.UpdateHookMetricsFromSnapshot(initialSnapshot)

mu.Lock()
calls = nil // Clear previous calls
mu.Unlock()

// Update with new snapshot where:
// - hook1 still has high count (25 tasks)
// - hook2 dropped below threshold (15 tasks) - should not be published
// - hook3 is completely gone (0 tasks in new snapshot) - should not be published
// - hook4 is new (30 tasks)
newSnapshot := map[string]uint{
"hook1": 25,
"hook2": 15,
"hook4": 30,
}

tc.UpdateHookMetricsFromSnapshot(newSnapshot)

mu.Lock()
defer mu.Unlock()

// Verify that metrics were set correctly
require.NotEmpty(t, calls)

// Build a map of last call for each hook
lastCallByHook := make(map[string]gaugeCall)
for _, call := range calls {
if call.metric == metrics.TasksQueueCompactionTasksByHook {
hook := call.labels["hook"]
lastCallByHook[hook] = call
}
}

// hook1: should have value 25
require.Contains(t, lastCallByHook, "hook1")
require.Equal(t, float64(25), lastCallByHook["hook1"].value)
require.Equal(t, "main", lastCallByHook["hook1"].labels["queue_name"])

// hook2: should NOT be published (below threshold)
require.NotContains(t, lastCallByHook, "hook2")

// hook3: should NOT be published (removed from snapshot and was above threshold before)
require.NotContains(t, lastCallByHook, "hook3")

// hook4: should have value 30
require.Contains(t, lastCallByHook, "hook4")
require.Equal(t, float64(30), lastCallByHook["hook4"].value)
require.Equal(t, "main", lastCallByHook["hook4"].labels["queue_name"])
}
Loading
Loading