Skip to content

Commit 9519d38

Browse files
committed
feat: add compaction and refactor queue as doubly-linked list
Signed-off-by: Timur Tuktamyshev <timur.tuktamyshev@flant.com>
1 parent 7d5a582 commit 9519d38

File tree

18 files changed

+2198
-224
lines changed

18 files changed

+2198
-224
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,5 @@
1919

2020
# C Dependency
2121
libjq
22-
addon-operator
22+
addon-operator
23+
deckhouse

hooks/pods-hook.sh

Lines changed: 0 additions & 22 deletions
This file was deleted.

pkg/debug/server.go

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ import (
55
"fmt"
66
"io"
77
"log/slog"
8+
"net"
89
"net/http"
910
"os"
11+
"path"
1012
"path/filepath"
1113
"strings"
1214

@@ -15,6 +17,7 @@ import (
1517
"github.com/go-chi/chi/v5/middleware"
1618
"gopkg.in/yaml.v3"
1719

20+
utils "github.com/flant/shell-operator/pkg/utils/file"
1821
structuredLogger "github.com/flant/shell-operator/pkg/utils/structured-logger"
1922
)
2023

@@ -44,37 +47,37 @@ func NewServer(prefix, socketPath, httpAddr string, logger *log.Logger) *Server
4447
}
4548

4649
func (s *Server) Init() error {
47-
// address := s.SocketPath
48-
49-
// if err := os.MkdirAll(path.Dir(address), 0o700); err != nil {
50-
// return fmt.Errorf("Debug HTTP server fail to create socket '%s': %w", address, err)
51-
// }
52-
53-
// exists, err := utils.FileExists(address)
54-
// if err != nil {
55-
// return fmt.Errorf("Debug HTTP server fail to check socket '%s': %w", address, err)
56-
// }
57-
58-
// if exists {
59-
// if err := os.Remove(address); err != nil {
60-
// return fmt.Errorf("Debug HTTP server fail to check socket '%s': %w", address, err)
61-
// }
62-
// }
63-
64-
// // Check if socket is available
65-
// listener, err := net.Listen("unix", address)
66-
// if err != nil {
67-
// return fmt.Errorf("Debug HTTP server fail to listen on '%s': %w", address, err)
68-
// }
69-
70-
// s.logger.Info("Debug endpoint listen on address", slog.String("address", address))
71-
72-
// go func() {
73-
// if err := http.Serve(listener, s.Router); err != nil {
74-
// s.logger.Error("Error starting Debug socket server", log.Err(err))
75-
// os.Exit(1)
76-
// }
77-
// }()
50+
address := s.SocketPath
51+
52+
if err := os.MkdirAll(path.Dir(address), 0o700); err != nil {
53+
return fmt.Errorf("Debug HTTP server fail to create socket '%s': %w", address, err)
54+
}
55+
56+
exists, err := utils.FileExists(address)
57+
if err != nil {
58+
return fmt.Errorf("Debug HTTP server fail to check socket '%s': %w", address, err)
59+
}
60+
61+
if exists {
62+
if err := os.Remove(address); err != nil {
63+
return fmt.Errorf("Debug HTTP server fail to check socket '%s': %w", address, err)
64+
}
65+
}
66+
67+
// Check if socket is available
68+
listener, err := net.Listen("unix", address)
69+
if err != nil {
70+
return fmt.Errorf("Debug HTTP server fail to listen on '%s': %w", address, err)
71+
}
72+
73+
s.logger.Info("Debug endpoint listen on address", slog.String("address", address))
74+
75+
go func() {
76+
if err := http.Serve(listener, s.Router); err != nil {
77+
s.logger.Error("Error starting Debug socket server", log.Err(err))
78+
os.Exit(1)
79+
}
80+
}()
7881

7982
if s.HttpAddr != "" {
8083
go func() {

pkg/hook/task_metadata/task_metadata.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,14 @@ type BindingContextAccessor interface {
2626
GetBindingContext() []bindingcontext.BindingContext
2727
}
2828

29+
type BindingContextSetter interface {
30+
SetBindingContext([]bindingcontext.BindingContext) interface{}
31+
}
32+
33+
type MonitorIDSetter interface {
34+
SetMonitorIDs([]string) interface{}
35+
}
36+
2937
type MonitorIDAccessor interface {
3038
GetMonitorIDs() []string
3139
}
@@ -74,6 +82,12 @@ func (m HookMetadata) GetBindingContext() []bindingcontext.BindingContext {
7482
return m.BindingContext
7583
}
7684

85+
func (m HookMetadata) SetBindingContext(context []bindingcontext.BindingContext) interface{} {
86+
m.BindingContext = context
87+
88+
return m
89+
}
90+
7791
func (m HookMetadata) GetAllowFailure() bool {
7892
return m.AllowFailure
7993
}
@@ -82,6 +96,11 @@ func (m HookMetadata) GetMonitorIDs() []string {
8296
return m.MonitorIDs
8397
}
8498

99+
func (m HookMetadata) SetMonitorIDs(monitorIDs []string) interface{} {
100+
m.MonitorIDs = monitorIDs
101+
return m
102+
}
103+
85104
func (m *HookMetadata) WithHookName(name string) *HookMetadata {
86105
m.HookName = name
87106
return m
Lines changed: 57 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,13 @@
11
package task_metadata
22

33
import (
4-
"fmt"
5-
"strings"
64
"testing"
75

86
. "github.com/onsi/gomega"
9-
"github.com/stretchr/testify/assert"
107

118
bctx "github.com/flant/shell-operator/pkg/hook/binding_context"
129
htypes "github.com/flant/shell-operator/pkg/hook/types"
13-
"github.com/flant/shell-operator/pkg/metric"
1410
"github.com/flant/shell-operator/pkg/task"
15-
"github.com/flant/shell-operator/pkg/task/queue"
1611
)
1712

1813
func Test_HookMetadata_Access(t *testing.T) {
@@ -39,73 +34,72 @@ func Test_HookMetadata_Access(t *testing.T) {
3934
g.Expect(hm.BindingContext[1].Binding).Should(Equal("each_5_min"))
4035
}
4136

42-
func Test_HookMetadata_QueueDump_Task_Description(t *testing.T) {
37+
func Test_HookMetadata_GetDescription(t *testing.T) {
4338
g := NewWithT(t)
4439

45-
logLabels := map[string]string{
46-
"hook": "hook1.sh",
40+
hm1 := HookMetadata{
41+
HookName: "hook1.sh",
42+
BindingType: htypes.OnKubernetesEvent,
43+
Binding: "monitor_pods",
4744
}
45+
g.Expect(hm1.GetDescription()).Should(Equal(":kubernetes:hook1.sh:monitor_pods"))
4846

49-
metricStorage := metric.NewStorageMock(t)
50-
metricStorage.HistogramObserveMock.Set(func(metric string, value float64, labels map[string]string, buckets []float64) {
51-
assert.Equal(t, metric, "{PREFIX}tasks_queue_action_duration_seconds")
52-
assert.NotZero(t, value)
53-
assert.Equal(t, map[string]string{
54-
"queue_action": "AddLast",
55-
"queue_name": "",
56-
}, labels)
57-
assert.Nil(t, buckets)
58-
})
47+
hm2 := HookMetadata{
48+
HookName: "hook1.sh",
49+
BindingType: htypes.Schedule,
50+
Binding: "every 1 sec",
51+
Group: "monitor_pods",
52+
}
53+
g.Expect(hm2.GetDescription()).Should(Equal(":schedule:hook1.sh:group=monitor_pods:every 1 sec"))
5954

60-
q := queue.NewTasksQueue().WithMetricStorage(metricStorage)
55+
hm3 := HookMetadata{
56+
HookName: "hook1.sh",
57+
BindingType: htypes.OnStartup,
58+
}
59+
g.Expect(hm3.GetDescription()).Should(Equal(":onstartup:hook1.sh"))
6160

62-
q.AddLast(task.NewTask(EnableKubernetesBindings).
63-
WithMetadata(HookMetadata{
64-
HookName: "hook1.sh",
65-
Binding: string(EnableKubernetesBindings),
66-
}))
61+
hm4 := HookMetadata{
62+
HookName: "hook1.sh",
63+
BindingType: htypes.Schedule,
64+
Group: "monitor_pods",
65+
}
66+
g.Expect(hm4.GetDescription()).Should(Equal(":schedule:hook1.sh:group=monitor_pods"))
6767

68-
q.AddLast(task.NewTask(HookRun).
69-
WithMetadata(HookMetadata{
70-
HookName: "hook1.sh",
71-
BindingType: htypes.OnKubernetesEvent,
72-
Binding: "monitor_pods",
73-
}).
74-
WithLogLabels(logLabels).
75-
WithQueueName("main"))
76-
77-
q.AddLast(task.NewTask(HookRun).
78-
WithMetadata(HookMetadata{
79-
HookName: "hook1.sh",
80-
BindingType: htypes.Schedule,
81-
AllowFailure: true,
82-
Binding: "every 1 sec",
83-
Group: "monitor_pods",
84-
}).
85-
WithLogLabels(logLabels).
86-
WithQueueName("main"))
87-
88-
queueDump := taskQueueToText(q)
89-
90-
g.Expect(queueDump).Should(ContainSubstring("hook1.sh"), "Queue dump should reveal a hook name.")
91-
g.Expect(queueDump).Should(ContainSubstring("EnableKubernetesBindings"), "Queue dump should reveal EnableKubernetesBindings.")
92-
g.Expect(queueDump).Should(ContainSubstring(":kubernetes:"), "Queue dump should show kubernetes binding.")
93-
g.Expect(queueDump).Should(ContainSubstring(":schedule:"), "Queue dump should show schedule binding.")
94-
g.Expect(queueDump).Should(ContainSubstring("group=monitor_pods"), "Queue dump should show group name.")
68+
hm5 := HookMetadata{
69+
HookName: "hook1.sh",
70+
BindingType: htypes.Schedule,
71+
Binding: "every 1 sec",
72+
}
73+
g.Expect(hm5.GetDescription()).Should(Equal(":schedule:hook1.sh:every 1 sec"))
9574
}
9675

97-
func taskQueueToText(q *queue.TaskQueue) string {
98-
var buf strings.Builder
99-
buf.WriteString(fmt.Sprintf("Queue '%s': length %d, status: '%s'\n", q.Name, q.Length(), q.Status))
100-
buf.WriteString("\n")
76+
func Test_HookMetadata_IsSynchronization(t *testing.T) {
77+
g := NewWithT(t)
10178

102-
index := 1
103-
q.Iterate(func(task task.Task) {
104-
buf.WriteString(fmt.Sprintf("%2d. ", index))
105-
buf.WriteString(task.GetDescription())
106-
buf.WriteString("\n")
107-
index++
108-
})
79+
hm1 := HookMetadata{
80+
HookName: "hook1.sh",
81+
BindingContext: []bctx.BindingContext{
82+
{
83+
Binding: "monitor_pods",
84+
Type: "Synchronization",
85+
},
86+
},
87+
}
88+
g.Expect(hm1.IsSynchronization()).Should(BeTrue())
89+
90+
hm2 := HookMetadata{
91+
HookName: "hook1.sh",
92+
BindingContext: []bctx.BindingContext{
93+
{
94+
Binding: "monitor_pods",
95+
Type: "Event",
96+
},
97+
},
98+
}
99+
g.Expect(hm2.IsSynchronization()).Should(BeFalse())
109100

110-
return buf.String()
101+
hm3 := HookMetadata{
102+
HookName: "hook1.sh",
103+
}
104+
g.Expect(hm3.IsSynchronization()).Should(BeFalse())
111105
}

pkg/kube_events_manager/resource_informer.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -389,8 +389,6 @@ func (ei *resourceInformer) handleWatchEvent(object interface{}, eventType kemty
389389
Objects: []kemtypes.ObjectAndFilterResult{*objFilterRes},
390390
}
391391

392-
fmt.Printf("[TRACE] KubeEvent created: monitorId=%s, eventType=%s\n", kubeEvent.MonitorId, string(eventType))
393-
394392
// fix race with enableKubeEventCb.
395393
eventCbEnabled := false
396394
ei.eventBufLock.Lock()

pkg/shell-operator/combine_binding_context.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@ func (op *ShellOperator) combineBindingContextForHook(tqs *queue.TaskQueueSet, q
3434
}
3535
hookName := taskMeta.(HookNameAccessor).GetHookName()
3636

37-
fmt.Printf("[TRACE] combineBindingContextForHook called: hook=%s, task=%s\n", hookName, t.GetId())
38-
3937
res := new(CombineResult)
4038

4139
otherTasks := make([]task.Task, 0)
@@ -45,8 +43,6 @@ func (op *ShellOperator) combineBindingContextForHook(tqs *queue.TaskQueueSet, q
4543
return
4644
}
4745

48-
fmt.Printf("[TRACE] Iterating task in queue: taskId=%s, hookName=%s\n", tsk.GetId(), hookName)
49-
5046
// ignore current task
5147
if tsk.GetId() == t.GetId() {
5248
return
@@ -78,8 +74,6 @@ func (op *ShellOperator) combineBindingContextForHook(tqs *queue.TaskQueueSet, q
7874
return nil
7975
}
8076

81-
fmt.Printf("[TRACE] Found tasks to combine: count=%d\n", len(otherTasks))
82-
8377
// Combine binding context and make a map to delete excess tasks
8478
combinedContext := make([]bctx.BindingContext, 0)
8579
monitorIDs := taskMeta.(MonitorIDAccessor).GetMonitorIDs()
@@ -96,8 +90,6 @@ func (op *ShellOperator) combineBindingContextForHook(tqs *queue.TaskQueueSet, q
9690
tasksFilter[tsk.GetId()] = false
9791
}
9892

99-
fmt.Printf("[TRACE] Combined binding contexts before compaction: contexts=%+v\n", combinedContext)
100-
10193
// Delete tasks with false in tasksFilter map
10294
tqs.GetByName(t.GetQueueName()).Filter(func(tsk task.Task) bool {
10395
if v, ok := tasksFilter[tsk.GetId()]; ok {
@@ -116,7 +108,6 @@ func (op *ShellOperator) combineBindingContextForHook(tqs *queue.TaskQueueSet, q
116108
if groupName != "" && (i+1 <= len(combinedContext)-1) && combinedContext[i+1].Metadata.Group == groupName {
117109
keep = false
118110
}
119-
fmt.Printf("[TRACE] Compacting binding context: group=%s, keep=%t\n", groupName, keep)
120111

121112
if keep {
122113
compactedContext = append(compactedContext, combinedContext[i])

0 commit comments

Comments
 (0)