Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/dominikbraun/graph v0.23.0
github.com/ettle/strcase v0.2.0
github.com/flant/kube-client v1.3.1
github.com/flant/shell-operator v1.11.0
github.com/flant/shell-operator v1.11.3
github.com/go-chi/chi/v5 v5.2.2
github.com/go-openapi/loads v0.19.5
github.com/go-openapi/spec v0.19.8
Expand Down Expand Up @@ -172,7 +172,7 @@ require (
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
github.com/pjbgf/sha1cd v0.3.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.22.0 // indirect
github.com/prometheus/client_golang v1.22.0
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.62.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ github.com/flant/go-openapi-validate v0.19.12-flant.1 h1:GuB9XEfiLHq3M7fafRLq1AW
github.com/flant/go-openapi-validate v0.19.12-flant.1/go.mod h1:Rzou8hA/CBw8donlS6WNEUQupNvUZ0waH08tGe6kAQ4=
github.com/flant/kube-client v1.3.1 h1:1SdD799sujXNg2F6Z27le/+qkcKQaKf9Z492YGEhVhc=
github.com/flant/kube-client v1.3.1/go.mod h1:mql6hsZMgBLAhdj3Emb8TrP5MVdXduFQ2NLjzn6IF0Y=
github.com/flant/shell-operator v1.11.0 h1:1fbhgVsO0OtZn5LZBsZcn6EU2NtQFI/yQZE7LvgMLjA=
github.com/flant/shell-operator v1.11.0/go.mod h1:TFTCgXpp+yrvSUQSQKgotJbRK720fiqwaQdFVU6dAlU=
github.com/flant/shell-operator v1.11.3 h1:Yp2N/cn/y1glCHKzfAKB8HPcIZ18TidGkgCaP38E6oc=
github.com/flant/shell-operator v1.11.3/go.mod h1:TFTCgXpp+yrvSUQSQKgotJbRK720fiqwaQdFVU6dAlU=
github.com/flopp/go-findfont v0.1.0 h1:lPn0BymDUtJo+ZkV01VS3661HL6F4qFlkhcJN55u6mU=
github.com/flopp/go-findfont v0.1.0/go.mod h1:wKKxRDjD024Rh7VMwoU90i6ikQRCr+JTHB5n4Ejkqvw=
github.com/fluxcd/flagger v1.36.1 h1:X2PumtNwZz9YSGaOtZLFm2zAKLgHhFkbNv8beg7ifyc=
Expand Down
3 changes: 2 additions & 1 deletion pkg/addon-operator/metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package addon_operator

import (
"context"
"time"

metricsstorage "github.com/deckhouse/deckhouse/pkg/metrics-storage"
Expand Down Expand Up @@ -148,7 +149,7 @@ func StartTasksQueueLengthUpdater(metricStorage metricsstorage.Storage, tqs *que
go func() {
for {
// Gather task queues lengths.
tqs.Iterate(func(queue *queue.TaskQueue) {
tqs.Iterate(context.TODO(), func(_ context.Context, queue *queue.TaskQueue) {
queueLen := float64(queue.Length())
metricStorage.GaugeSet("{PREFIX}tasks_queue_length", queueLen, map[string]string{"queue": queue.Name})
})
Expand Down
9 changes: 8 additions & 1 deletion pkg/addon-operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/deckhouse/deckhouse/pkg/log"
metricstorage "github.com/deckhouse/deckhouse/pkg/metrics-storage"
. "github.com/onsi/gomega"
"github.com/prometheus/client_golang/prometheus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8types "k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -104,6 +105,10 @@ func assembleTestAddonOperator(t *testing.T, configPath string) (*AddonOperator,
_, err := kubeClient.CoreV1().ConfigMaps(result.cmNamespace).Create(context.TODO(), cmObj, metav1.CreateOptions{})
g.Expect(err).ShouldNot(HaveOccurred(), "Should create ConfigMap/%s", result.cmName)

registry := prometheus.NewRegistry()
prometheus.DefaultGatherer = registry
prometheus.DefaultRegisterer = registry

// Assemble AddonOperator.
op := NewAddonOperator(context.Background(), WithLogger(log.NewNop()))
op.engine.KubeClient = kubeClient
Expand Down Expand Up @@ -136,10 +141,12 @@ func assembleTestAddonOperator(t *testing.T, configPath string) (*AddonOperator,
MetricStorage: metricstorage.NewMetricStorage(
metricstorage.WithPrefix("addon_operator_"),
metricstorage.WithLogger(log.NewNop()),
metricstorage.WithNewRegistry(),
),
HookMetricStorage: metricstorage.NewMetricStorage(
metricstorage.WithPrefix("addon_operator_"),
metricstorage.WithLogger(log.NewNop()),
metricstorage.WithNewRegistry(),
),
}
cfg := module_manager.ModuleManagerConfig{
Expand Down Expand Up @@ -215,7 +222,7 @@ func Test_Operator_startup_tasks(t *testing.T) {
}

i := 0
op.engine.TaskQueues.GetMain().Iterate(func(tsk sh_task.Task) {
op.engine.TaskQueues.GetMain().IterateSnapshot(func(tsk sh_task.Task) {
// Stop checking if no expects left.
if i >= len(expectTasks) {
return
Expand Down
16 changes: 8 additions & 8 deletions pkg/addon-operator/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func ModulesWithPendingModuleRun(q *queue.TaskQueue) map[string]struct{} {

skipFirstTask := true

q.Iterate(func(t sh_task.Task) {
q.IterateSnapshot(func(t sh_task.Task) {
// Skip the first task in the queue as it can be executed already, i.e. "not pending".
if skipFirstTask {
skipFirstTask = false
Expand Down Expand Up @@ -63,7 +63,7 @@ func ConvergeTasksInQueue(q *queue.TaskQueue) int {
}

convergeTasks := 0
q.Iterate(func(t sh_task.Task) {
q.IterateSnapshot(func(t sh_task.Task) {
if converge.IsConvergeTask(t) || converge.IsFirstConvergeTask(t) {
convergeTasks++
}
Expand All @@ -78,7 +78,7 @@ func ConvergeModulesInQueue(q *queue.TaskQueue) int {
}

tasks := 0
q.Iterate(func(t sh_task.Task) {
q.IterateSnapshot(func(t sh_task.Task) {
taskType := t.GetType()
if converge.IsConvergeTask(t) && (taskType == task.ModuleRun || taskType == task.ModuleDelete) {
tasks++
Expand All @@ -101,7 +101,7 @@ func RemoveCurrentConvergeTasks(convergeQueues []*queue.TaskQueue, logLabels map

stop := false

queue.Filter(func(t sh_task.Task) bool {
queue.DeleteFunc(func(t sh_task.Task) bool {
if stop {
return true
}
Expand Down Expand Up @@ -152,7 +152,7 @@ func RemoveCurrentConvergeTasksFromId(q *queue.TaskQueue, afterId string, logLab
IDFound := false
convergeDrained := false
stop := false
q.Filter(func(t sh_task.Task) bool {
q.DeleteFunc(func(t sh_task.Task) bool {
if stop {
return true
}
Expand Down Expand Up @@ -197,7 +197,7 @@ func RemoveAdjacentConvergeModules(q *queue.TaskQueue, afterId string, logLabels

IDFound := false
stop := false
q.Filter(func(t sh_task.Task) bool {
q.DeleteFunc(func(t sh_task.Task) bool {
if stop {
return true
}
Expand Down Expand Up @@ -229,7 +229,7 @@ func ModuleEnsureCRDsTasksInQueueAfterId(q *queue.TaskQueue, afterId string) boo
IDFound := false
taskFound := false
stop := false
q.Filter(func(t sh_task.Task) bool {
q.DeleteFunc(func(t sh_task.Task) bool {
if stop {
return true
}
Expand Down Expand Up @@ -257,7 +257,7 @@ func DrainNonMainQueue(q *queue.TaskQueue) {
}

// Remove all tasks.
q.Filter(func(_ sh_task.Task) bool {
q.DeleteFunc(func(_ sh_task.Task) bool {
return false
})
}
6 changes: 3 additions & 3 deletions pkg/addon-operator/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func Test_RemoveAdjacentConvergeModules(t *testing.T) {
// Check tasks after remove.
require.Equal(t, len(tt.expect), q.Length(), "queue length should match length of expected tasks")
i := 0
q.Iterate(func(tsk sh_task.Task) {
q.IterateSnapshot(func(tsk sh_task.Task) {
require.Equal(t, tt.expect[i].Id, tsk.GetId(), "ID should match for task %d %+v", i, tsk)
require.Equal(t, tt.expect[i].Type, tsk.GetType(), "Type should match for task %d %+v", i, tsk)
i++
Expand Down Expand Up @@ -566,7 +566,7 @@ func Test_RemoveCurrentConvergeTasks(t *testing.T) {
// Check tasks in queue after remove.
require.Equal(t, len(tasks), queues[i].Length(), "length of queue %d should match length of expected tasks", i)
j := 0
queues[i].Iterate(func(tsk sh_task.Task) {
queues[i].IterateSnapshot(func(tsk sh_task.Task) {
require.Equal(t, tt.expectTasks[i][j].Id, tsk.GetId(), "ID should match for task %d %+v", j, tsk)
require.Equal(t, tt.expectTasks[i][j].Type, tsk.GetType(), "Type should match for task %d %+v", j, tsk)
j++
Expand Down Expand Up @@ -692,7 +692,7 @@ func Test_RemoveCurrentConvergeTasksFromId(t *testing.T) {
// Check tasks in queue after remove.
require.Equal(t, len(tt.expectTasks), q.Length(), "queue length should match length of expected tasks")
i := 0
q.Iterate(func(tsk sh_task.Task) {
q.IterateSnapshot(func(tsk sh_task.Task) {
require.Equal(t, tt.expectTasks[i].Id, tsk.GetId(), "ID should match for task %d %+v", i, tsk)
require.Equal(t, tt.expectTasks[i].Type, tsk.GetType(), "Type should match for task %d %+v", i, tsk)
i++
Expand Down
2 changes: 1 addition & 1 deletion pkg/module_manager/module_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1536,7 +1536,7 @@ func modulesWithPendingTasks(q *queue.TaskQueue, taskType sh_task.TaskType) map[

skipFirstTask := true

q.Iterate(func(t sh_task.Task) {
q.IterateSnapshot(func(t sh_task.Task) {
// Skip the first task in the queue as it can be executed already, i.e. "not pending".
if skipFirstTask {
skipFirstTask = false
Expand Down
10 changes: 5 additions & 5 deletions pkg/task/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func convergeTasksInQueue(q *queue.TaskQueue) int {
}

convergeTasks := 0
q.Iterate(func(t sh_task.Task) {
q.IterateSnapshot(func(t sh_task.Task) {
if converge.IsConvergeTask(t) || converge.IsFirstConvergeTask(t) {
convergeTasks++
}
Expand All @@ -135,7 +135,7 @@ func (s *Service) DrainNonMainQueue(queueName string) {
}

// Remove all tasks.
q.Filter(func(_ sh_task.Task) bool {
q.DeleteFunc(func(_ sh_task.Task) bool {
return false
})
}
Expand All @@ -157,7 +157,7 @@ func (s *Service) RemoveAdjacentConvergeModules(queueName string, afterId string
IDFound := false
stop := false

q.Filter(func(t sh_task.Task) bool {
q.DeleteFunc(func(t sh_task.Task) bool {
if stop {
return true
}
Expand Down Expand Up @@ -212,7 +212,7 @@ func modulesWithPendingModuleRun(q *queue.TaskQueue) map[string]struct{} {

skipFirstTask := true

q.Iterate(func(t sh_task.Task) {
q.IterateSnapshot(func(t sh_task.Task) {
// Skip the first task in the queue as it can be executed already, i.e. "not pending".
if skipFirstTask {
skipFirstTask = false
Expand Down Expand Up @@ -244,7 +244,7 @@ func (s *Service) ModuleEnsureCRDsTasksInMainQueueAfterId(afterId string) bool {
IDFound := false
taskFound := false
stop := false
q.Filter(func(t sh_task.Task) bool {
q.DeleteFunc(func(t sh_task.Task) bool {
if stop {
return true
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/task/service/converge.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func ConvergeTasksInQueue(q *queue.TaskQueue) int {
}

convergeTasks := 0
q.Iterate(func(t sh_task.Task) {
q.IterateSnapshot(func(t sh_task.Task) {
if converge.IsConvergeTask(t) || converge.IsFirstConvergeTask(t) {
convergeTasks++
}
Expand All @@ -133,7 +133,7 @@ func ConvergeModulesInQueue(q *queue.TaskQueue) int {
}

tasks := 0
q.Iterate(func(t sh_task.Task) {
q.IterateSnapshot(func(t sh_task.Task) {
taskType := t.GetType()
if converge.IsConvergeTask(t) && (taskType == task.ModuleRun || taskType == task.ModuleDelete) {
tasks++
Expand Down
Loading