@@ -10,6 +10,7 @@ import (
1010 "github.com/icinga/icinga-go-library/redis"
1111 "github.com/icinga/icinga-go-library/strcase"
1212 "github.com/icinga/icinga-go-library/structify"
13+ "github.com/icinga/icinga-go-library/types"
1314 "github.com/icinga/icingadb/pkg/common"
1415 "github.com/icinga/icingadb/pkg/contracts"
1516 v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
@@ -71,9 +72,21 @@ func (r *RuntimeUpdates) Sync(
7172 s := common .NewSyncSubject (factoryFunc )
7273 stat := getCounterForEntity (s .Entity ())
7374
75+ // Multiplexer channels used to distribute the Redis entities to several consumers.
76+ upsertEntitiesMultiplexer := make (chan database.Entity , 1 )
77+ deleteIdsMultiplexer := make (chan any , 1 )
78+
7479 updateMessages := make (chan redis.XMessage , r .redis .Options .XReadCount )
7580 upsertEntities := make (chan database.Entity , r .redis .Options .XReadCount )
76- deleteEntities := make (chan database.Entity , r .redis .Options .XReadCount )
81+ deleteIds := make (chan interface {}, r .redis .Options .XReadCount )
82+
83+ var insertSlaEntities chan database.Entity
84+ var updateSlaEntities chan database.Entity
85+ switch s .Entity ().(type ) {
86+ case * v1.Host , * v1.Service :
87+ insertSlaEntities = make (chan database.Entity , r .redis .Options .XReadCount )
88+ updateSlaEntities = make (chan database.Entity , r .redis .Options .XReadCount )
89+ }
7790
7891 var upsertedFifo chan database.Entity
7992 var deletedFifo chan interface {}
@@ -95,152 +108,164 @@ func (r *RuntimeUpdates) Sync(
95108 r .logger .Debugf ("Syncing runtime updates of %s" , s .Name ())
96109
97110 g .Go (structifyStream (
98- ctx , updateMessages , upsertEntities , upsertedFifo , deleteEntities , deletedFifo ,
111+ ctx , updateMessages , upsertEntitiesMultiplexer , upsertedFifo , deleteIdsMultiplexer , deletedFifo ,
99112 structify .MakeMapStructifier (
100113 reflect .TypeOf (s .Entity ()).Elem (),
101114 "json" ,
102115 contracts .SafeInit ),
103116 ))
104117
105- // upsertEntityFunc returns a closure that is used to upsert the regular Icinga DB entities.
106- // The returned func is used to directly start a separate goroutine that selects events
107- // sequentially (!allowParallel) from the given chan.
108- upsertEntityFunc := func (entities <- chan database.Entity ) func () error {
109- return func () error {
110- var counter com.Counter
111- defer periodic .Start (ctx , r .logger .Interval (), func (_ periodic.Tick ) {
112- if count := counter .Reset (); count > 0 {
113- r .logger .Infof ("Upserted %d %s items" , count , s .Name ())
118+ // This worker consumes the "upsert" event from Redis and redistributes the entities to the "upsertEntities"
119+ // channel and for Host/Service entities also to the "insertSlaEntities" channel.
120+ g .Go (func () error {
121+ defer close (upsertEntities )
122+ if insertSlaEntities != nil {
123+ defer close (insertSlaEntities )
124+ }
125+
126+ for {
127+ select {
128+ case <- ctx .Done ():
129+ return ctx .Err ()
130+ case entity , ok := <- upsertEntitiesMultiplexer :
131+ if ! ok {
132+ return nil
114133 }
115- }).Stop ()
116134
117- onSuccess := []database.OnSuccess [database.Entity ]{
118- database.OnSuccessIncrement [database.Entity ](& counter ),
119- database.OnSuccessIncrement [database.Entity ](stat ),
135+ select {
136+ case upsertEntities <- entity :
137+ case <- ctx .Done ():
138+ return ctx .Err ()
139+ }
140+
141+ if insertSlaEntities != nil {
142+ select {
143+ case insertSlaEntities <- entity :
144+ case <- ctx .Done ():
145+ return ctx .Err ()
146+ }
147+ }
120148 }
121- if ! allowParallel {
122- onSuccess = append (onSuccess , database .OnSuccessSendTo (upsertedFifo ))
149+ }
150+ })
151+
152+ g .Go (func () error {
153+ var counter com.Counter
154+ defer periodic .Start (ctx , r .logger .Interval (), func (_ periodic.Tick ) {
155+ if count := counter .Reset (); count > 0 {
156+ r .logger .Infof ("Upserted %d %s items" , count , s .Name ())
123157 }
158+ }).Stop ()
124159
125- // Updates must be executed in order, ensure this by using a semaphore with maximum 1.
126- sem := semaphore .NewWeighted (1 )
160+ // Updates must be executed in order, ensure this by using a semaphore with maximum 1.
161+ sem := semaphore .NewWeighted (1 )
127162
128- return r .db .NamedBulkExec (
129- ctx , upsertStmt , upsertCount , sem , entities , database .SplitOnDupId [database .Entity ], onSuccess ... ,
130- )
163+ onSuccess := []database.OnSuccess [database.Entity ]{
164+ database.OnSuccessIncrement [database.Entity ](& counter ), database.OnSuccessIncrement [database.Entity ](stat ),
165+ }
166+ if ! allowParallel {
167+ onSuccess = append (onSuccess , database .OnSuccessSendTo (upsertedFifo ))
131168 }
132- }
133169
134- // deleteEntityFunc returns a closure that is used to delete the regular Icinga DB entities
135- // based on their ids. The returned func is used to directly start a separate goroutine that
136- // selects events sequentially (!allowParallel) from the given chan.
137- deleteEntityFunc := func (deleteIds <- chan any ) func () error {
138- return func () error {
139- var counter com.Counter
140- defer periodic .Start (ctx , r .logger .Interval (), func (_ periodic.Tick ) {
141- if count := counter .Reset (); count > 0 {
142- r .logger .Infof ("Deleted %d %s items" , count , s .Name ())
143- }
144- }).Stop ()
170+ return r .db .NamedBulkExec (
171+ ctx , upsertStmt , upsertCount , sem , upsertEntities , database .SplitOnDupId [database .Entity ], onSuccess ... ,
172+ )
173+ })
145174
146- onSuccess := []database.OnSuccess [any ]{database.OnSuccessIncrement [any ](& counter ), database.OnSuccessIncrement [any ](stat )}
147- if ! allowParallel {
148- onSuccess = append (onSuccess , database .OnSuccessSendTo (deletedFifo ))
175+ // Consumes from the "insertSlaEntities" channel and bulk inserts into the "sla_lifecycle" table.
176+ g .Go (func () error {
177+ var counter com.Counter
178+ defer periodic .Start (ctx , r .logger .Interval (), func (_ periodic.Tick ) {
179+ if count := counter .Reset (); count > 0 {
180+ r .logger .Infof ("Inserted %d %s sla lifecycles" , count , s .Name ())
149181 }
182+ }).Stop ()
150183
151- sem := r .db .GetSemaphoreForTable (database .TableName (s .Entity ()))
184+ stmt , _ := r .db .BuildInsertIgnoreStmt (v1 .NewSlaLifecycle ())
185+ return r .db .NamedBulkExec (
186+ ctx , stmt , upsertCount , r .db .GetSemaphoreForTable (slaLifecycleTable ),
187+ CreateSlaLifecyclesFromCheckables (ctx , s .Entity (), g , insertSlaEntities , false ),
188+ com .NeverSplit [database .Entity ], database.OnSuccessIncrement [database.Entity ](& counter ))
189+ })
152190
153- return r .db .BulkExec (ctx , r .db .BuildDeleteStmt (s .Entity ()), deleteCount , sem , deleteIds , onSuccess ... )
191+ // This worker consumes the "delete" event from Redis and redistributes the IDs to the "deleteIds"
192+ // channel and for Host/Service entities also to the "updateSlaEntities" channel.
193+ g .Go (func () error {
194+ defer close (deleteIds )
195+ if updateSlaEntities != nil {
196+ defer close (updateSlaEntities )
154197 }
155- }
156198
157- // In order to always get the sla entries written even in case of system errors, we need to process these
158- // first. Otherwise, Icinga DB may be stopped after the regular queries have been processed, and deleted
159- // from the Redis stream, thus we won't be able to generate sla lifecycle for these entities.
160- //
161- // The general event process flow looks as follows:
162- // structifyStream() -> Reads `upsert` & `delete` events from redis and streams the entities to the
163- // respective chans `upsertEntities`, `deleteEntities` and waits for `upserted`
164- // and `deleted` chans (!allowParallel) before consuming the next one from redis.
165- // - Starts a goroutine that consumes from `upsertEntities` (when the current sync subject is of type checkable,
166- // this bulk inserts into the sla lifecycle table with semaphore 1 in a `INSERT ... IGNORE ON ERROR` fashion
167- // and forwards the entities to the next one, which then inserts the checkables into the regular Icinga DB
168- // tables). After successfully upserting the entities, (!allowParallel) they are passed sequentially to
169- // the `upserted` stream.
170- //
171- // - Starts another goroutine that consumes from `deleteEntities` concurrently. When the current sync subject is
172- // of type checkable, this performs sla lifecycle updates matching the checkables id and `delete_time` 0.
173- // Since the upgrade script should've generated a sla entry for all exiting Checkables, it's unlikely to
174- // happen, but when there is no tracked `created_at` event for a given checkable, this update is essentially
175- // a no-op, but forwards the original entities IDs nonetheless to the next one. And finally, the original
176- // checkables are deleted from the database and (!allowParallel) they are passed sequentially to the
177- // `deleted` stream.
178- switch s .Entity ().(type ) {
179- case * v1.Host , * v1.Service :
180- entities := make (chan database.Entity , 1 )
181- g .Go (func () error {
182- defer close (entities )
183-
184- var counter com.Counter
185- defer periodic .Start (ctx , r .logger .Interval (), func (_ periodic.Tick ) {
186- if count := counter .Reset (); count > 0 {
187- r .logger .Infof ("Upserted %d %s sla lifecycles" , count , s .Name ())
199+ for {
200+ select {
201+ case <- ctx .Done ():
202+ return ctx .Err ()
203+ case deleteId , ok := <- deleteIdsMultiplexer :
204+ if ! ok {
205+ return nil
188206 }
189- }).Stop ()
190-
191- stmt , _ := r .db .BuildInsertIgnoreStmt (v1 .NewSlaLifecycle ())
192-
193- // Not to mess up the already existing FIFO mechanism, we have to perform only a single query
194- // (semaphore 1) at a time, even though, the sla queries could be executed concurrently.
195- // After successfully upserting a lifecycle entity, the original checkable entity is streamed to "entities".
196- fromSlaEntities := CreateSlaLifecyclesFromCheckables (ctx , s .Entity (), g , upsertEntities , false )
197- return r .db .NamedBulkExec (
198- ctx , stmt , upsertCount , semaphore .NewWeighted (1 ), fromSlaEntities ,
199- com .NeverSplit [database .Entity ], database.OnSuccessIncrement [database.Entity ](& counter ),
200- OnSuccessApplyAndSendTo (entities , GetCheckableFromSlaLifecycle ))
201- })
202-
203- // Start the regular Icinga DB checkables upsert stream.
204- g .Go (upsertEntityFunc (entities ))
205207
206- // Start the regular Icinga DB checkables delete stream.
207- g .Go (deleteEntityFunc (StreamIDsFromUpdatedSlaLifecycles (ctx , r .db , s .Entity (), g , r .logger , deleteEntities )))
208- default :
209- // For non-checkables runtime updates of upsert event
210- g .Go (upsertEntityFunc (upsertEntities ))
211-
212- // For non-checkables runtime updates of delete event
213- deleteIds := make (chan any , r .redis .Options .XReadCount )
214- g .Go (func () error {
215- defer close (deleteIds )
216-
217- for {
218208 select {
209+ case deleteIds <- deleteId :
219210 case <- ctx .Done ():
220211 return ctx .Err ()
221- case entity , ok := <- deleteEntities :
222- if ! ok {
223- return nil
224- }
212+ }
213+
214+ if updateSlaEntities != nil {
215+ entity := factoryFunc ()
216+ entity .SetID (deleteId .(types.Binary ))
225217
226218 select {
227- case deleteIds <- entity . ID () :
219+ case updateSlaEntities <- entity :
228220 case <- ctx .Done ():
229221 return ctx .Err ()
230222 }
231223 }
232224 }
233- })
225+ }
226+ })
234227
235- g .Go (deleteEntityFunc (deleteIds ))
236- }
228+ g .Go (func () error {
229+ var counter com.Counter
230+ defer periodic .Start (ctx , r .logger .Interval (), func (_ periodic.Tick ) {
231+ if count := counter .Reset (); count > 0 {
232+ r .logger .Infof ("Deleted %d %s items" , count , s .Name ())
233+ }
234+ }).Stop ()
235+
236+ sem := r .db .GetSemaphoreForTable (database .TableName (s .Entity ()))
237+
238+ onSuccess := []database.OnSuccess [any ]{database.OnSuccessIncrement [any ](& counter ), database.OnSuccessIncrement [any ](stat )}
239+ if ! allowParallel {
240+ onSuccess = append (onSuccess , database .OnSuccessSendTo (deletedFifo ))
241+ }
242+
243+ return r .db .BulkExec (ctx , r .db .BuildDeleteStmt (s .Entity ()), deleteCount , sem , deleteIds , onSuccess ... )
244+ })
245+
246+ // Consumes from the "updateSlaEntities" channel and updates the "delete_time" of each
247+ // SLA lifecycle entry with "delete_time = 0" to now.
248+ g .Go (func () error {
249+ var counter com.Counter
250+ defer periodic .Start (ctx , r .logger .Interval (), func (_ periodic.Tick ) {
251+ if count := counter .Reset (); count > 0 {
252+ r .logger .Infof ("Updated %d %s sla lifecycles" , count , s .Name ())
253+ }
254+ }).Stop ()
255+
256+ stmt := fmt .Sprintf (`UPDATE %s SET delete_time = :delete_time WHERE "id" = :id AND "delete_time" = 0` , slaLifecycleTable )
257+ return r .db .NamedBulkExec (
258+ ctx , stmt , deleteCount , r .db .GetSemaphoreForTable (slaLifecycleTable ),
259+ CreateSlaLifecyclesFromCheckables (ctx , s .Entity (), g , updateSlaEntities , true ),
260+ com .NeverSplit [database .Entity ], database.OnSuccessIncrement [database.Entity ](& counter ))
261+ })
237262 }
238263
239264 // customvar and customvar_flat sync.
240265 {
241266 updateMessages := make (chan redis.XMessage , r .redis .Options .XReadCount )
242267 upsertEntities := make (chan database.Entity , r .redis .Options .XReadCount )
243- deleteEntities := make (chan database. Entity , r .redis .Options .XReadCount )
268+ deleteIds := make (chan interface {} , r .redis .Options .XReadCount )
244269
245270 cv := common .NewSyncSubject (v1 .NewCustomvar )
246271 cvFlat := common .NewSyncSubject (v1 .NewCustomvarFlat )
@@ -250,7 +275,7 @@ func (r *RuntimeUpdates) Sync(
250275
251276 updateMessagesByKey ["icinga:" + strcase .Delimited (cv .Name (), ':' )] = updateMessages
252277 g .Go (structifyStream (
253- ctx , updateMessages , upsertEntities , nil , deleteEntities , nil ,
278+ ctx , updateMessages , upsertEntities , nil , deleteIds , nil ,
254279 structify .MakeMapStructifier (
255280 reflect .TypeOf (cv .Entity ()).Elem (),
256281 "json" ,
@@ -304,7 +329,7 @@ func (r *RuntimeUpdates) Sync(
304329 var once sync.Once
305330 for {
306331 select {
307- case _ , ok := <- deleteEntities :
332+ case _ , ok := <- deleteIds :
308333 if ! ok {
309334 return nil
310335 }
@@ -394,7 +419,7 @@ func (r *RuntimeUpdates) xRead(ctx context.Context, updateMessagesByKey map[stri
394419// Converted entities are inserted into the upsertEntities or deleteIds channel depending on the "runtime_type" message field.
395420func structifyStream (
396421 ctx context.Context , updateMessages <- chan redis.XMessage , upsertEntities , upserted chan database.Entity ,
397- deleteEntities chan database. Entity , deleted chan interface {}, structifier structify.MapStructifier ,
422+ deleteIds , deleted chan interface {}, structifier structify.MapStructifier ,
398423) func () error {
399424 if upserted == nil {
400425 upserted = make (chan database.Entity )
@@ -409,7 +434,7 @@ func structifyStream(
409434 return func () error {
410435 defer func () {
411436 close (upsertEntities )
412- close (deleteEntities )
437+ close (deleteIds )
413438 }()
414439
415440 for {
@@ -445,7 +470,7 @@ func structifyStream(
445470 }
446471 } else if runtimeType == "delete" {
447472 select {
448- case deleteEntities <- entity :
473+ case deleteIds <- entity . ID () :
449474 case <- ctx .Done ():
450475 return ctx .Err ()
451476 }
0 commit comments