Skip to content

Commit e1aafa2

Browse files
authored
[fix] Fix possibly deadlock (#680)
Signed-off-by: Pavel Okhlopkov <pavel.okhlopkov@flant.com>
1 parent b010d4e commit e1aafa2

File tree

9 files changed

+33
-25
lines changed

9 files changed

+33
-25
lines changed

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ require (
1010
github.com/dominikbraun/graph v0.23.0
1111
github.com/ettle/strcase v0.2.0
1212
github.com/flant/kube-client v1.3.1
13-
github.com/flant/shell-operator v1.11.0
13+
github.com/flant/shell-operator v1.11.3
1414
github.com/go-chi/chi/v5 v5.2.2
1515
github.com/go-openapi/loads v0.19.5
1616
github.com/go-openapi/spec v0.19.8
@@ -172,7 +172,7 @@ require (
172172
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
173173
github.com/pjbgf/sha1cd v0.3.0 // indirect
174174
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
175-
github.com/prometheus/client_golang v1.22.0 // indirect
175+
github.com/prometheus/client_golang v1.22.0
176176
github.com/prometheus/client_model v0.6.1 // indirect
177177
github.com/prometheus/common v0.62.0 // indirect
178178
github.com/prometheus/procfs v0.15.1 // indirect

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,8 @@ github.com/flant/go-openapi-validate v0.19.12-flant.1 h1:GuB9XEfiLHq3M7fafRLq1AW
164164
github.com/flant/go-openapi-validate v0.19.12-flant.1/go.mod h1:Rzou8hA/CBw8donlS6WNEUQupNvUZ0waH08tGe6kAQ4=
165165
github.com/flant/kube-client v1.3.1 h1:1SdD799sujXNg2F6Z27le/+qkcKQaKf9Z492YGEhVhc=
166166
github.com/flant/kube-client v1.3.1/go.mod h1:mql6hsZMgBLAhdj3Emb8TrP5MVdXduFQ2NLjzn6IF0Y=
167-
github.com/flant/shell-operator v1.11.0 h1:1fbhgVsO0OtZn5LZBsZcn6EU2NtQFI/yQZE7LvgMLjA=
168-
github.com/flant/shell-operator v1.11.0/go.mod h1:TFTCgXpp+yrvSUQSQKgotJbRK720fiqwaQdFVU6dAlU=
167+
github.com/flant/shell-operator v1.11.3 h1:Yp2N/cn/y1glCHKzfAKB8HPcIZ18TidGkgCaP38E6oc=
168+
github.com/flant/shell-operator v1.11.3/go.mod h1:TFTCgXpp+yrvSUQSQKgotJbRK720fiqwaQdFVU6dAlU=
169169
github.com/flopp/go-findfont v0.1.0 h1:lPn0BymDUtJo+ZkV01VS3661HL6F4qFlkhcJN55u6mU=
170170
github.com/flopp/go-findfont v0.1.0/go.mod h1:wKKxRDjD024Rh7VMwoU90i6ikQRCr+JTHB5n4Ejkqvw=
171171
github.com/fluxcd/flagger v1.36.1 h1:X2PumtNwZz9YSGaOtZLFm2zAKLgHhFkbNv8beg7ifyc=

pkg/addon-operator/metrics.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package addon_operator
22

33
import (
4+
"context"
45
"time"
56

67
metricsstorage "github.com/deckhouse/deckhouse/pkg/metrics-storage"
@@ -148,7 +149,7 @@ func StartTasksQueueLengthUpdater(metricStorage metricsstorage.Storage, tqs *que
148149
go func() {
149150
for {
150151
// Gather task queues lengths.
151-
tqs.Iterate(func(queue *queue.TaskQueue) {
152+
tqs.Iterate(context.TODO(), func(_ context.Context, queue *queue.TaskQueue) {
152153
queueLen := float64(queue.Length())
153154
metricStorage.GaugeSet("{PREFIX}tasks_queue_length", queueLen, map[string]string{"queue": queue.Name})
154155
})

pkg/addon-operator/operator_test.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/deckhouse/deckhouse/pkg/log"
1313
metricstorage "github.com/deckhouse/deckhouse/pkg/metrics-storage"
1414
. "github.com/onsi/gomega"
15+
"github.com/prometheus/client_golang/prometheus"
1516
v1 "k8s.io/api/core/v1"
1617
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1718
k8types "k8s.io/apimachinery/pkg/types"
@@ -104,6 +105,10 @@ func assembleTestAddonOperator(t *testing.T, configPath string) (*AddonOperator,
104105
_, err := kubeClient.CoreV1().ConfigMaps(result.cmNamespace).Create(context.TODO(), cmObj, metav1.CreateOptions{})
105106
g.Expect(err).ShouldNot(HaveOccurred(), "Should create ConfigMap/%s", result.cmName)
106107

108+
registry := prometheus.NewRegistry()
109+
prometheus.DefaultGatherer = registry
110+
prometheus.DefaultRegisterer = registry
111+
107112
// Assemble AddonOperator.
108113
op := NewAddonOperator(context.Background(), WithLogger(log.NewNop()))
109114
op.engine.KubeClient = kubeClient
@@ -136,10 +141,12 @@ func assembleTestAddonOperator(t *testing.T, configPath string) (*AddonOperator,
136141
MetricStorage: metricstorage.NewMetricStorage(
137142
metricstorage.WithPrefix("addon_operator_"),
138143
metricstorage.WithLogger(log.NewNop()),
144+
metricstorage.WithNewRegistry(),
139145
),
140146
HookMetricStorage: metricstorage.NewMetricStorage(
141147
metricstorage.WithPrefix("addon_operator_"),
142148
metricstorage.WithLogger(log.NewNop()),
149+
metricstorage.WithNewRegistry(),
143150
),
144151
}
145152
cfg := module_manager.ModuleManagerConfig{
@@ -215,7 +222,7 @@ func Test_Operator_startup_tasks(t *testing.T) {
215222
}
216223

217224
i := 0
218-
op.engine.TaskQueues.GetMain().Iterate(func(tsk sh_task.Task) {
225+
op.engine.TaskQueues.GetMain().IterateSnapshot(func(tsk sh_task.Task) {
219226
// Stop checking if no expects left.
220227
if i >= len(expectTasks) {
221228
return

pkg/addon-operator/queue.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func ModulesWithPendingModuleRun(q *queue.TaskQueue) map[string]struct{} {
3434

3535
skipFirstTask := true
3636

37-
q.Iterate(func(t sh_task.Task) {
37+
q.IterateSnapshot(func(t sh_task.Task) {
3838
// Skip the first task in the queue as it can be executed already, i.e. "not pending".
3939
if skipFirstTask {
4040
skipFirstTask = false
@@ -63,7 +63,7 @@ func ConvergeTasksInQueue(q *queue.TaskQueue) int {
6363
}
6464

6565
convergeTasks := 0
66-
q.Iterate(func(t sh_task.Task) {
66+
q.IterateSnapshot(func(t sh_task.Task) {
6767
if converge.IsConvergeTask(t) || converge.IsFirstConvergeTask(t) {
6868
convergeTasks++
6969
}
@@ -78,7 +78,7 @@ func ConvergeModulesInQueue(q *queue.TaskQueue) int {
7878
}
7979

8080
tasks := 0
81-
q.Iterate(func(t sh_task.Task) {
81+
q.IterateSnapshot(func(t sh_task.Task) {
8282
taskType := t.GetType()
8383
if converge.IsConvergeTask(t) && (taskType == task.ModuleRun || taskType == task.ModuleDelete) {
8484
tasks++
@@ -101,7 +101,7 @@ func RemoveCurrentConvergeTasks(convergeQueues []*queue.TaskQueue, logLabels map
101101

102102
stop := false
103103

104-
queue.Filter(func(t sh_task.Task) bool {
104+
queue.DeleteFunc(func(t sh_task.Task) bool {
105105
if stop {
106106
return true
107107
}
@@ -152,7 +152,7 @@ func RemoveCurrentConvergeTasksFromId(q *queue.TaskQueue, afterId string, logLab
152152
IDFound := false
153153
convergeDrained := false
154154
stop := false
155-
q.Filter(func(t sh_task.Task) bool {
155+
q.DeleteFunc(func(t sh_task.Task) bool {
156156
if stop {
157157
return true
158158
}
@@ -197,7 +197,7 @@ func RemoveAdjacentConvergeModules(q *queue.TaskQueue, afterId string, logLabels
197197

198198
IDFound := false
199199
stop := false
200-
q.Filter(func(t sh_task.Task) bool {
200+
q.DeleteFunc(func(t sh_task.Task) bool {
201201
if stop {
202202
return true
203203
}
@@ -229,7 +229,7 @@ func ModuleEnsureCRDsTasksInQueueAfterId(q *queue.TaskQueue, afterId string) boo
229229
IDFound := false
230230
taskFound := false
231231
stop := false
232-
q.Filter(func(t sh_task.Task) bool {
232+
q.DeleteFunc(func(t sh_task.Task) bool {
233233
if stop {
234234
return true
235235
}
@@ -257,7 +257,7 @@ func DrainNonMainQueue(q *queue.TaskQueue) {
257257
}
258258

259259
// Remove all tasks.
260-
q.Filter(func(_ sh_task.Task) bool {
260+
q.DeleteFunc(func(_ sh_task.Task) bool {
261261
return false
262262
})
263263
}

pkg/addon-operator/queue_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ func Test_RemoveAdjacentConvergeModules(t *testing.T) {
307307
// Check tasks after remove.
308308
require.Equal(t, len(tt.expect), q.Length(), "queue length should match length of expected tasks")
309309
i := 0
310-
q.Iterate(func(tsk sh_task.Task) {
310+
q.IterateSnapshot(func(tsk sh_task.Task) {
311311
require.Equal(t, tt.expect[i].Id, tsk.GetId(), "ID should match for task %d %+v", i, tsk)
312312
require.Equal(t, tt.expect[i].Type, tsk.GetType(), "Type should match for task %d %+v", i, tsk)
313313
i++
@@ -566,7 +566,7 @@ func Test_RemoveCurrentConvergeTasks(t *testing.T) {
566566
// Check tasks in queue after remove.
567567
require.Equal(t, len(tasks), queues[i].Length(), "length of queue %d should match length of expected tasks", i)
568568
j := 0
569-
queues[i].Iterate(func(tsk sh_task.Task) {
569+
queues[i].IterateSnapshot(func(tsk sh_task.Task) {
570570
require.Equal(t, tt.expectTasks[i][j].Id, tsk.GetId(), "ID should match for task %d %+v", j, tsk)
571571
require.Equal(t, tt.expectTasks[i][j].Type, tsk.GetType(), "Type should match for task %d %+v", j, tsk)
572572
j++
@@ -692,7 +692,7 @@ func Test_RemoveCurrentConvergeTasksFromId(t *testing.T) {
692692
// Check tasks in queue after remove.
693693
require.Equal(t, len(tt.expectTasks), q.Length(), "queue length should match length of expected tasks")
694694
i := 0
695-
q.Iterate(func(tsk sh_task.Task) {
695+
q.IterateSnapshot(func(tsk sh_task.Task) {
696696
require.Equal(t, tt.expectTasks[i].Id, tsk.GetId(), "ID should match for task %d %+v", i, tsk)
697697
require.Equal(t, tt.expectTasks[i].Type, tsk.GetType(), "Type should match for task %d %+v", i, tsk)
698698
i++

pkg/module_manager/module_manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1536,7 +1536,7 @@ func modulesWithPendingTasks(q *queue.TaskQueue, taskType sh_task.TaskType) map[
15361536

15371537
skipFirstTask := true
15381538

1539-
q.Iterate(func(t sh_task.Task) {
1539+
q.IterateSnapshot(func(t sh_task.Task) {
15401540
// Skip the first task in the queue as it can be executed already, i.e. "not pending".
15411541
if skipFirstTask {
15421542
skipFirstTask = false

pkg/task/queue/queue.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func convergeTasksInQueue(q *queue.TaskQueue) int {
119119
}
120120

121121
convergeTasks := 0
122-
q.Iterate(func(t sh_task.Task) {
122+
q.IterateSnapshot(func(t sh_task.Task) {
123123
if converge.IsConvergeTask(t) || converge.IsFirstConvergeTask(t) {
124124
convergeTasks++
125125
}
@@ -135,7 +135,7 @@ func (s *Service) DrainNonMainQueue(queueName string) {
135135
}
136136

137137
// Remove all tasks.
138-
q.Filter(func(_ sh_task.Task) bool {
138+
q.DeleteFunc(func(_ sh_task.Task) bool {
139139
return false
140140
})
141141
}
@@ -157,7 +157,7 @@ func (s *Service) RemoveAdjacentConvergeModules(queueName string, afterId string
157157
IDFound := false
158158
stop := false
159159

160-
q.Filter(func(t sh_task.Task) bool {
160+
q.DeleteFunc(func(t sh_task.Task) bool {
161161
if stop {
162162
return true
163163
}
@@ -212,7 +212,7 @@ func modulesWithPendingModuleRun(q *queue.TaskQueue) map[string]struct{} {
212212

213213
skipFirstTask := true
214214

215-
q.Iterate(func(t sh_task.Task) {
215+
q.IterateSnapshot(func(t sh_task.Task) {
216216
// Skip the first task in the queue as it can be executed already, i.e. "not pending".
217217
if skipFirstTask {
218218
skipFirstTask = false
@@ -244,7 +244,7 @@ func (s *Service) ModuleEnsureCRDsTasksInMainQueueAfterId(afterId string) bool {
244244
IDFound := false
245245
taskFound := false
246246
stop := false
247-
q.Filter(func(t sh_task.Task) bool {
247+
q.DeleteFunc(func(t sh_task.Task) bool {
248248
if stop {
249249
return true
250250
}

pkg/task/service/converge.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func ConvergeTasksInQueue(q *queue.TaskQueue) int {
118118
}
119119

120120
convergeTasks := 0
121-
q.Iterate(func(t sh_task.Task) {
121+
q.IterateSnapshot(func(t sh_task.Task) {
122122
if converge.IsConvergeTask(t) || converge.IsFirstConvergeTask(t) {
123123
convergeTasks++
124124
}
@@ -133,7 +133,7 @@ func ConvergeModulesInQueue(q *queue.TaskQueue) int {
133133
}
134134

135135
tasks := 0
136-
q.Iterate(func(t sh_task.Task) {
136+
q.IterateSnapshot(func(t sh_task.Task) {
137137
taskType := t.GetType()
138138
if converge.IsConvergeTask(t) && (taskType == task.ModuleRun || taskType == task.ModuleDelete) {
139139
tasks++

0 commit comments

Comments
 (0)