@@ -10,6 +10,7 @@ import (
1010 "github.com/icinga/icinga-go-library/redis"
1111 "github.com/icinga/icinga-go-library/strcase"
1212 "github.com/icinga/icinga-go-library/structify"
13+ "github.com/icinga/icinga-go-library/types"
1314 "github.com/icinga/icingadb/pkg/common"
1415 "github.com/icinga/icingadb/pkg/contracts"
1516 v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
@@ -58,7 +59,7 @@ func (r *RuntimeUpdates) ClearStreams(ctx context.Context) (config, state redis.
5859}
5960
6061// Sync synchronizes runtime update streams from s.redis to s.db and deletes the original data on success.
61- // Note that Sync must be only be called configuration synchronization has been completed.
62+ // Note that Sync must only be called once configuration synchronization has been completed.
6263// allowParallel allows synchronizing out of order (not FIFO).
6364func (r * RuntimeUpdates ) Sync (
6465 ctx context.Context , factoryFuncs []database.EntityFactoryFunc , streams redis.Streams , allowParallel bool ,
@@ -71,10 +72,22 @@ func (r *RuntimeUpdates) Sync(
7172 s := common .NewSyncSubject (factoryFunc )
7273 stat := getCounterForEntity (s .Entity ())
7374
75+ // Multiplexer channels used to distribute the Redis entities to several consumers.
76+ upsertEntitiesMultiplexer := make (chan database.Entity , 1 )
77+ deleteIdsMultiplexer := make (chan any , 1 )
78+
7479 updateMessages := make (chan redis.XMessage , r .redis .Options .XReadCount )
7580 upsertEntities := make (chan database.Entity , r .redis .Options .XReadCount )
7681 deleteIds := make (chan interface {}, r .redis .Options .XReadCount )
7782
83+ var insertSlaEntities chan database.Entity
84+ var updateSlaEntities chan database.Entity
85+ switch s .Entity ().(type ) {
86+ case * v1.Host , * v1.Service :
87+ insertSlaEntities = make (chan database.Entity , r .redis .Options .XReadCount )
88+ updateSlaEntities = make (chan database.Entity , r .redis .Options .XReadCount )
89+ }
90+
7891 var upsertedFifo chan database.Entity
7992 var deletedFifo chan interface {}
8093 var upsertCount int
@@ -95,13 +108,47 @@ func (r *RuntimeUpdates) Sync(
95108 r .logger .Debugf ("Syncing runtime updates of %s" , s .Name ())
96109
97110 g .Go (structifyStream (
98- ctx , updateMessages , upsertEntities , upsertedFifo , deleteIds , deletedFifo ,
111+ ctx , updateMessages , upsertEntitiesMultiplexer , upsertedFifo , deleteIdsMultiplexer , deletedFifo ,
99112 structify .MakeMapStructifier (
100113 reflect .TypeOf (s .Entity ()).Elem (),
101114 "json" ,
102115 contracts .SafeInit ),
103116 ))
104117
118+ // This worker consumes the "upsert" event from Redis and redistributes the entities to the "upsertEntities"
119+ // channel and for Host/Service entities also to the "insertSlaEntities" channel.
120+ g .Go (func () error {
121+ defer close (upsertEntities )
122+ if insertSlaEntities != nil {
123+ defer close (insertSlaEntities )
124+ }
125+
126+ for {
127+ select {
128+ case <- ctx .Done ():
129+ return ctx .Err ()
130+ case entity , ok := <- upsertEntitiesMultiplexer :
131+ if ! ok {
132+ return nil
133+ }
134+
135+ select {
136+ case upsertEntities <- entity :
137+ case <- ctx .Done ():
138+ return ctx .Err ()
139+ }
140+
141+ if insertSlaEntities != nil {
142+ select {
143+ case insertSlaEntities <- entity :
144+ case <- ctx .Done ():
145+ return ctx .Err ()
146+ }
147+ }
148+ }
149+ }
150+ })
151+
105152 g .Go (func () error {
106153 var counter com.Counter
107154 defer periodic .Start (ctx , r .logger .Interval (), func (_ periodic.Tick ) {
@@ -125,6 +172,59 @@ func (r *RuntimeUpdates) Sync(
125172 )
126173 })
127174
175+ // Consumes from the "insertSlaEntities" channel and bulk inserts into the "sla_lifecycle" table.
176+ g .Go (func () error {
177+ var counter com.Counter
178+ defer periodic .Start (ctx , r .logger .Interval (), func (_ periodic.Tick ) {
179+ if count := counter .Reset (); count > 0 {
180+ r .logger .Infof ("Inserted %d %s sla lifecycles" , count , s .Name ())
181+ }
182+ }).Stop ()
183+
184+ stmt , _ := r .db .BuildInsertIgnoreStmt (v1 .NewSlaLifecycle ())
185+ return r .db .NamedBulkExec (
186+ ctx , stmt , upsertCount , r .db .GetSemaphoreForTable (slaLifecycleTable ),
187+ CreateSlaLifecyclesFromCheckables (ctx , s .Entity (), g , insertSlaEntities , false ),
188+ com .NeverSplit [database .Entity ], database.OnSuccessIncrement [database.Entity ](& counter ))
189+ })
190+
191+ // This worker consumes the "delete" event from Redis and redistributes the IDs to the "deleteIds"
192+ // channel and for Host/Service entities also to the "updateSlaEntities" channel.
193+ g .Go (func () error {
194+ defer close (deleteIds )
195+ if updateSlaEntities != nil {
196+ defer close (updateSlaEntities )
197+ }
198+
199+ for {
200+ select {
201+ case <- ctx .Done ():
202+ return ctx .Err ()
203+ case deleteId , ok := <- deleteIdsMultiplexer :
204+ if ! ok {
205+ return nil
206+ }
207+
208+ select {
209+ case deleteIds <- deleteId :
210+ case <- ctx .Done ():
211+ return ctx .Err ()
212+ }
213+
214+ if updateSlaEntities != nil {
215+ entity := factoryFunc ()
216+ entity .SetID (deleteId .(types.Binary ))
217+
218+ select {
219+ case updateSlaEntities <- entity :
220+ case <- ctx .Done ():
221+ return ctx .Err ()
222+ }
223+ }
224+ }
225+ }
226+ })
227+
128228 g .Go (func () error {
129229 var counter com.Counter
130230 defer periodic .Start (ctx , r .logger .Interval (), func (_ periodic.Tick ) {
@@ -142,6 +242,23 @@ func (r *RuntimeUpdates) Sync(
142242
143243 return r .db .BulkExec (ctx , r .db .BuildDeleteStmt (s .Entity ()), deleteCount , sem , deleteIds , onSuccess ... )
144244 })
245+
246+ // Consumes from the "updateSlaEntities" channel and updates the "delete_time" of each
247+ // SLA lifecycle entry with "delete_time = 0" to now.
248+ g .Go (func () error {
249+ var counter com.Counter
250+ defer periodic .Start (ctx , r .logger .Interval (), func (_ periodic.Tick ) {
251+ if count := counter .Reset (); count > 0 {
252+ r .logger .Infof ("Updated %d %s sla lifecycles" , count , s .Name ())
253+ }
254+ }).Stop ()
255+
256+ stmt := fmt .Sprintf (`UPDATE %s SET delete_time = :delete_time WHERE "id" = :id AND "delete_time" = 0` , slaLifecycleTable )
257+ return r .db .NamedBulkExec (
258+ ctx , stmt , deleteCount , r .db .GetSemaphoreForTable (slaLifecycleTable ),
259+ CreateSlaLifecyclesFromCheckables (ctx , s .Entity (), g , updateSlaEntities , true ),
260+ com .NeverSplit [database .Entity ], database.OnSuccessIncrement [database.Entity ](& counter ))
261+ })
145262 }
146263
147264 // customvar and customvar_flat sync.
0 commit comments