Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/dominikbraun/graph v0.23.0
github.com/ettle/strcase v0.2.0
github.com/flant/kube-client v1.3.1
github.com/flant/shell-operator v1.11.0
github.com/flant/shell-operator v1.11.2-0.20251007073542-ec1fe1f2a11b
github.com/go-chi/chi/v5 v5.2.2
github.com/go-openapi/loads v0.19.5
github.com/go-openapi/spec v0.19.8
Expand Down Expand Up @@ -172,7 +172,7 @@ require (
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
github.com/pjbgf/sha1cd v0.3.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.22.0 // indirect
github.com/prometheus/client_golang v1.22.0
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.62.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ github.com/flant/go-openapi-validate v0.19.12-flant.1 h1:GuB9XEfiLHq3M7fafRLq1AW
github.com/flant/go-openapi-validate v0.19.12-flant.1/go.mod h1:Rzou8hA/CBw8donlS6WNEUQupNvUZ0waH08tGe6kAQ4=
github.com/flant/kube-client v1.3.1 h1:1SdD799sujXNg2F6Z27le/+qkcKQaKf9Z492YGEhVhc=
github.com/flant/kube-client v1.3.1/go.mod h1:mql6hsZMgBLAhdj3Emb8TrP5MVdXduFQ2NLjzn6IF0Y=
github.com/flant/shell-operator v1.11.0 h1:1fbhgVsO0OtZn5LZBsZcn6EU2NtQFI/yQZE7LvgMLjA=
github.com/flant/shell-operator v1.11.0/go.mod h1:TFTCgXpp+yrvSUQSQKgotJbRK720fiqwaQdFVU6dAlU=
github.com/flant/shell-operator v1.11.2-0.20251007073542-ec1fe1f2a11b h1:cqAelRoZ4+J8TJKXwxWU+UTQWjkD2Dgob6qDBAEp0fk=
github.com/flant/shell-operator v1.11.2-0.20251007073542-ec1fe1f2a11b/go.mod h1:TFTCgXpp+yrvSUQSQKgotJbRK720fiqwaQdFVU6dAlU=
github.com/flopp/go-findfont v0.1.0 h1:lPn0BymDUtJo+ZkV01VS3661HL6F4qFlkhcJN55u6mU=
github.com/flopp/go-findfont v0.1.0/go.mod h1:wKKxRDjD024Rh7VMwoU90i6ikQRCr+JTHB5n4Ejkqvw=
github.com/fluxcd/flagger v1.36.1 h1:X2PumtNwZz9YSGaOtZLFm2zAKLgHhFkbNv8beg7ifyc=
Expand Down
3 changes: 2 additions & 1 deletion pkg/addon-operator/metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package addon_operator

import (
"context"
"time"

metricsstorage "github.com/deckhouse/deckhouse/pkg/metrics-storage"
Expand Down Expand Up @@ -148,7 +149,7 @@ func StartTasksQueueLengthUpdater(metricStorage metricsstorage.Storage, tqs *que
go func() {
for {
// Gather task queues lengths.
tqs.Iterate(func(queue *queue.TaskQueue) {
tqs.Iterate(context.TODO(), func(_ context.Context, queue *queue.TaskQueue) {
queueLen := float64(queue.Length())
metricStorage.GaugeSet("{PREFIX}tasks_queue_length", queueLen, map[string]string{"queue": queue.Name})
})
Expand Down
9 changes: 8 additions & 1 deletion pkg/addon-operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/deckhouse/deckhouse/pkg/log"
metricstorage "github.com/deckhouse/deckhouse/pkg/metrics-storage"
. "github.com/onsi/gomega"
"github.com/prometheus/client_golang/prometheus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8types "k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -104,6 +105,10 @@ func assembleTestAddonOperator(t *testing.T, configPath string) (*AddonOperator,
_, err := kubeClient.CoreV1().ConfigMaps(result.cmNamespace).Create(context.TODO(), cmObj, metav1.CreateOptions{})
g.Expect(err).ShouldNot(HaveOccurred(), "Should create ConfigMap/%s", result.cmName)

registry := prometheus.NewRegistry()
prometheus.DefaultGatherer = registry
prometheus.DefaultRegisterer = registry

// Assemble AddonOperator.
op := NewAddonOperator(context.Background(), WithLogger(log.NewNop()))
op.engine.KubeClient = kubeClient
Expand Down Expand Up @@ -136,10 +141,12 @@ func assembleTestAddonOperator(t *testing.T, configPath string) (*AddonOperator,
MetricStorage: metricstorage.NewMetricStorage(
metricstorage.WithPrefix("addon_operator_"),
metricstorage.WithLogger(log.NewNop()),
metricstorage.WithNewRegistry(),
),
HookMetricStorage: metricstorage.NewMetricStorage(
metricstorage.WithPrefix("addon_operator_"),
metricstorage.WithLogger(log.NewNop()),
metricstorage.WithNewRegistry(),
),
}
cfg := module_manager.ModuleManagerConfig{
Expand Down Expand Up @@ -215,7 +222,7 @@ func Test_Operator_startup_tasks(t *testing.T) {
}

i := 0
op.engine.TaskQueues.GetMain().Iterate(func(tsk sh_task.Task) {
op.engine.TaskQueues.GetMain().IterateSnapshot(func(tsk sh_task.Task) {
// Stop checking if no expects left.
if i >= len(expectTasks) {
return
Expand Down
16 changes: 8 additions & 8 deletions pkg/addon-operator/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func ModulesWithPendingModuleRun(q *queue.TaskQueue) map[string]struct{} {

skipFirstTask := true

q.Iterate(func(t sh_task.Task) {
q.IterateSnapshot(func(t sh_task.Task) {
// Skip the first task in the queue as it can be executed already, i.e. "not pending".
if skipFirstTask {
skipFirstTask = false
Expand Down Expand Up @@ -63,7 +63,7 @@ func ConvergeTasksInQueue(q *queue.TaskQueue) int {
}

convergeTasks := 0
q.Iterate(func(t sh_task.Task) {
q.IterateSnapshot(func(t sh_task.Task) {
if converge.IsConvergeTask(t) || converge.IsFirstConvergeTask(t) {
convergeTasks++
}
Expand All @@ -78,7 +78,7 @@ func ConvergeModulesInQueue(q *queue.TaskQueue) int {
}

tasks := 0
q.Iterate(func(t sh_task.Task) {
q.IterateSnapshot(func(t sh_task.Task) {
taskType := t.GetType()
if converge.IsConvergeTask(t) && (taskType == task.ModuleRun || taskType == task.ModuleDelete) {
tasks++
Expand All @@ -101,7 +101,7 @@ func RemoveCurrentConvergeTasks(convergeQueues []*queue.TaskQueue, logLabels map

stop := false

queue.Filter(func(t sh_task.Task) bool {
queue.DeleteFunc(func(t sh_task.Task) bool {
if stop {
return true
}
Expand Down Expand Up @@ -152,7 +152,7 @@ func RemoveCurrentConvergeTasksFromId(q *queue.TaskQueue, afterId string, logLab
IDFound := false
convergeDrained := false
stop := false
q.Filter(func(t sh_task.Task) bool {
q.DeleteFunc(func(t sh_task.Task) bool {
if stop {
return true
}
Expand Down Expand Up @@ -197,7 +197,7 @@ func RemoveAdjacentConvergeModules(q *queue.TaskQueue, afterId string, logLabels

IDFound := false
stop := false
q.Filter(func(t sh_task.Task) bool {
q.DeleteFunc(func(t sh_task.Task) bool {
if stop {
return true
}
Expand Down Expand Up @@ -229,7 +229,7 @@ func ModuleEnsureCRDsTasksInQueueAfterId(q *queue.TaskQueue, afterId string) boo
IDFound := false
taskFound := false
stop := false
q.Filter(func(t sh_task.Task) bool {
q.DeleteFunc(func(t sh_task.Task) bool {
if stop {
return true
}
Expand Down Expand Up @@ -257,7 +257,7 @@ func DrainNonMainQueue(q *queue.TaskQueue) {
}

// Remove all tasks.
q.Filter(func(_ sh_task.Task) bool {
q.DeleteFunc(func(_ sh_task.Task) bool {
return false
})
}
6 changes: 3 additions & 3 deletions pkg/addon-operator/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func Test_RemoveAdjacentConvergeModules(t *testing.T) {
// Check tasks after remove.
require.Equal(t, len(tt.expect), q.Length(), "queue length should match length of expected tasks")
i := 0
q.Iterate(func(tsk sh_task.Task) {
q.IterateSnapshot(func(tsk sh_task.Task) {
require.Equal(t, tt.expect[i].Id, tsk.GetId(), "ID should match for task %d %+v", i, tsk)
require.Equal(t, tt.expect[i].Type, tsk.GetType(), "Type should match for task %d %+v", i, tsk)
i++
Expand Down Expand Up @@ -566,7 +566,7 @@ func Test_RemoveCurrentConvergeTasks(t *testing.T) {
// Check tasks in queue after remove.
require.Equal(t, len(tasks), queues[i].Length(), "length of queue %d should match length of expected tasks", i)
j := 0
queues[i].Iterate(func(tsk sh_task.Task) {
queues[i].IterateSnapshot(func(tsk sh_task.Task) {
require.Equal(t, tt.expectTasks[i][j].Id, tsk.GetId(), "ID should match for task %d %+v", j, tsk)
require.Equal(t, tt.expectTasks[i][j].Type, tsk.GetType(), "Type should match for task %d %+v", j, tsk)
j++
Expand Down Expand Up @@ -692,7 +692,7 @@ func Test_RemoveCurrentConvergeTasksFromId(t *testing.T) {
// Check tasks in queue after remove.
require.Equal(t, len(tt.expectTasks), q.Length(), "queue length should match length of expected tasks")
i := 0
q.Iterate(func(tsk sh_task.Task) {
q.IterateSnapshot(func(tsk sh_task.Task) {
require.Equal(t, tt.expectTasks[i].Id, tsk.GetId(), "ID should match for task %d %+v", i, tsk)
require.Equal(t, tt.expectTasks[i].Type, tsk.GetType(), "Type should match for task %d %+v", i, tsk)
i++
Expand Down
2 changes: 1 addition & 1 deletion pkg/module_manager/module_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1536,7 +1536,7 @@ func modulesWithPendingTasks(q *queue.TaskQueue, taskType sh_task.TaskType) map[

skipFirstTask := true

q.Iterate(func(t sh_task.Task) {
q.IterateSnapshot(func(t sh_task.Task) {
// Skip the first task in the queue as it can be executed already, i.e. "not pending".
if skipFirstTask {
skipFirstTask = false
Expand Down
10 changes: 5 additions & 5 deletions pkg/task/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func convergeTasksInQueue(q *queue.TaskQueue) int {
}

convergeTasks := 0
q.Iterate(func(t sh_task.Task) {
q.IterateSnapshot(func(t sh_task.Task) {
if converge.IsConvergeTask(t) || converge.IsFirstConvergeTask(t) {
convergeTasks++
}
Expand All @@ -135,7 +135,7 @@ func (s *Service) DrainNonMainQueue(queueName string) {
}

// Remove all tasks.
q.Filter(func(_ sh_task.Task) bool {
q.DeleteFunc(func(_ sh_task.Task) bool {
return false
})
}
Expand All @@ -157,7 +157,7 @@ func (s *Service) RemoveAdjacentConvergeModules(queueName string, afterId string
IDFound := false
stop := false

q.Filter(func(t sh_task.Task) bool {
q.DeleteFunc(func(t sh_task.Task) bool {
if stop {
return true
}
Expand Down Expand Up @@ -212,7 +212,7 @@ func modulesWithPendingModuleRun(q *queue.TaskQueue) map[string]struct{} {

skipFirstTask := true

q.Iterate(func(t sh_task.Task) {
q.IterateSnapshot(func(t sh_task.Task) {
// Skip the first task in the queue as it can be executed already, i.e. "not pending".
if skipFirstTask {
skipFirstTask = false
Expand Down Expand Up @@ -244,7 +244,7 @@ func (s *Service) ModuleEnsureCRDsTasksInMainQueueAfterId(afterId string) bool {
IDFound := false
taskFound := false
stop := false
q.Filter(func(t sh_task.Task) bool {
q.DeleteFunc(func(t sh_task.Task) bool {
if stop {
return true
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/task/service/converge.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func ConvergeTasksInQueue(q *queue.TaskQueue) int {
}

convergeTasks := 0
q.Iterate(func(t sh_task.Task) {
q.IterateSnapshot(func(t sh_task.Task) {
if converge.IsConvergeTask(t) || converge.IsFirstConvergeTask(t) {
convergeTasks++
}
Expand All @@ -133,7 +133,7 @@ func ConvergeModulesInQueue(q *queue.TaskQueue) int {
}

tasks := 0
q.Iterate(func(t sh_task.Task) {
q.IterateSnapshot(func(t sh_task.Task) {
taskType := t.GetType()
if converge.IsConvergeTask(t) && (taskType == task.ModuleRun || taskType == task.ModuleDelete) {
tasks++
Expand Down
Loading