@@ -58,7 +58,7 @@ func (r *RuntimeUpdates) ClearStreams(ctx context.Context) (config, state redis.
5858}
5959
6060// Sync synchronizes runtime update streams from s.redis to s.db and deletes the original data on success.
61- // Note that Sync must be only be called configuration synchronization has been completed.
61+ // Note that Sync must only be called once configuration synchronization has been completed.
6262// allowParallel allows synchronizing out of order (not FIFO).
6363func (r * RuntimeUpdates ) Sync (
6464 ctx context.Context , factoryFuncs []database.EntityFactoryFunc , streams redis.Streams , allowParallel bool ,
@@ -73,7 +73,7 @@ func (r *RuntimeUpdates) Sync(
7373
7474 updateMessages := make (chan redis.XMessage , r .redis .Options .XReadCount )
7575 upsertEntities := make (chan database.Entity , r .redis .Options .XReadCount )
76- deleteIds := make (chan interface {} , r .redis .Options .XReadCount )
76+ deleteEntities := make (chan database. Entity , r .redis .Options .XReadCount )
7777
7878 var upsertedFifo chan database.Entity
7979 var deletedFifo chan interface {}
@@ -95,60 +95,176 @@ func (r *RuntimeUpdates) Sync(
9595 r .logger .Debugf ("Syncing runtime updates of %s" , s .Name ())
9696
9797 g .Go (structifyStream (
98- ctx , updateMessages , upsertEntities , upsertedFifo , deleteIds , deletedFifo ,
98+ ctx , updateMessages , upsertEntities , upsertedFifo , deleteEntities , deletedFifo ,
9999 structify .MakeMapStructifier (
100100 reflect .TypeOf (s .Entity ()).Elem (),
101101 "json" ,
102102 contracts .SafeInit ),
103103 ))
104104
105- g .Go (func () error {
106- var counter com.Counter
107- defer periodic .Start (ctx , r .logger .Interval (), func (_ periodic.Tick ) {
108- if count := counter .Reset (); count > 0 {
109- r .logger .Infof ("Upserted %d %s items" , count , s .Name ())
105+ // upsertEntityFunc returns a closure that is used to upsert the regular Icinga DB entities.
106+ // The returned func is used to directly start a separate goroutine that selects events
107+ // sequentially (!allowParallel) from the given chan.
108+ upsertEntityFunc := func (entities <- chan database.Entity ) func () error {
109+ return func () error {
110+ var counter com.Counter
111+ defer periodic .Start (ctx , r .logger .Interval (), func (_ periodic.Tick ) {
112+ if count := counter .Reset (); count > 0 {
113+ r .logger .Infof ("Upserted %d %s items" , count , s .Name ())
114+ }
115+ }).Stop ()
116+
117+ onSuccess := []database.OnSuccess [database.Entity ]{
118+ database.OnSuccessIncrement [database.Entity ](& counter ),
119+ database.OnSuccessIncrement [database.Entity ](stat ),
120+ }
121+ if ! allowParallel {
122+ onSuccess = append (onSuccess , database .OnSuccessSendTo (upsertedFifo ))
110123 }
111- }).Stop ()
112124
113- // Updates must be executed in order, ensure this by using a semaphore with maximum 1.
114- sem := semaphore .NewWeighted (1 )
125+ // Updates must be executed in order, ensure this by using a semaphore with maximum 1.
126+ sem := semaphore .NewWeighted (1 )
115127
116- onSuccess := []database.OnSuccess [database.Entity ]{
117- database.OnSuccessIncrement [database.Entity ](& counter ), database.OnSuccessIncrement [database.Entity ](stat ),
118- }
119- if ! allowParallel {
120- onSuccess = append (onSuccess , database .OnSuccessSendTo (upsertedFifo ))
128+ return r .db .NamedBulkExec (
129+ ctx , upsertStmt , upsertCount , sem , entities , database .SplitOnDupId [database .Entity ], onSuccess ... ,
130+ )
121131 }
132+ }
122133
123- return r .db .NamedBulkExec (
124- ctx , upsertStmt , upsertCount , sem , upsertEntities , database .SplitOnDupId [database .Entity ], onSuccess ... ,
125- )
126- })
134+ // deleteEntityFunc returns a closure that is used to delete the regular Icinga DB entities
135+ // based on their ids. The returned func is used to directly start a separate goroutine that
136+ // selects events sequentially (!allowParallel) from the given chan.
137+ deleteEntityFunc := func (deleteIds <- chan any ) func () error {
138+ return func () error {
139+ var counter com.Counter
140+ defer periodic .Start (ctx , r .logger .Interval (), func (_ periodic.Tick ) {
141+ if count := counter .Reset (); count > 0 {
142+ r .logger .Infof ("Deleted %d %s items" , count , s .Name ())
143+ }
144+ }).Stop ()
127145
128- g .Go (func () error {
129- var counter com.Counter
130- defer periodic .Start (ctx , r .logger .Interval (), func (_ periodic.Tick ) {
131- if count := counter .Reset (); count > 0 {
132- r .logger .Infof ("Deleted %d %s items" , count , s .Name ())
146+ onSuccess := []database.OnSuccess [any ]{database.OnSuccessIncrement [any ](& counter ), database.OnSuccessIncrement [any ](stat )}
147+ if ! allowParallel {
148+ onSuccess = append (onSuccess , database .OnSuccessSendTo (deletedFifo ))
133149 }
134- }).Stop ()
135150
136- sem := r .db .GetSemaphoreForTable (database .TableName (s .Entity ()))
151+ sem := r .db .GetSemaphoreForTable (database .TableName (s .Entity ()))
137152
138- onSuccess := []database.OnSuccess [any ]{database.OnSuccessIncrement [any ](& counter ), database.OnSuccessIncrement [any ](stat )}
139- if ! allowParallel {
140- onSuccess = append (onSuccess , database .OnSuccessSendTo (deletedFifo ))
153+ return r .db .BulkExec (ctx , r .db .BuildDeleteStmt (s .Entity ()), deleteCount , sem , deleteIds , onSuccess ... )
141154 }
155+ }
142156
143- return r .db .BulkExec (ctx , r .db .BuildDeleteStmt (s .Entity ()), deleteCount , sem , deleteIds , onSuccess ... )
144- })
157+ // In order to always get the sla entries written even in case of system errors, we need to process these
158+ // first. Otherwise, Icinga DB may be stopped after the regular queries have been processed, and deleted
159+ // from the Redis stream, thus we won't be able to generate sla lifecycle for these entities.
160+ //
161+ // The general event process flow looks as follows:
162+ // structifyStream() -> Reads `upsert` & `delete` events from redis and streams the entities to the
163+ // respective chans `upsertEntities`, `deleteEntities` and waits for `upserted`
164+ // and `deleted` chans (!allowParallel) before consuming the next one from redis.
165+ // - Starts a goroutine that consumes from `upsertEntities` (when the current sync subject is of type checkable,
166+ // this bulk inserts into the sla lifecycle table with semaphore 1 in a `INSERT ... IGNORE ON ERROR` fashion
167+ // and forwards the entities to the next one, which then inserts the checkables into the regular Icinga DB
168+ // tables). After successfully upserting the entities, (!allowParallel) they are passed sequentially to
169+ // the `upserted` stream.
170+ //
171+ // - Starts another goroutine that consumes from `deleteEntities` concurrently. When the current sync subject is
172+ // of type checkable, this performs sla lifecycle updates matching the checkables id and `delete_time` 0.
173+ // Since the upgrade script should've generated a sla entry for all exiting Checkables, it's unlikely to
174+ // happen, but when there is no tracked `created_at` event for a given checkable, this update is essentially
175+ // a no-op, but forwards the original entities IDs nonetheless to the next one. And finally, the original
176+ // checkables are deleted from the database and (!allowParallel) they are passed sequentially to the
177+ // `deleted` stream.
178+ switch s .Entity ().(type ) {
179+ case * v1.Host , * v1.Service :
180+ entities := make (chan database.Entity , 1 )
181+ g .Go (func () error {
182+ defer close (entities )
183+
184+ var counter com.Counter
185+ defer periodic .Start (ctx , r .logger .Interval (), func (_ periodic.Tick ) {
186+ if count := counter .Reset (); count > 0 {
187+ r .logger .Infof ("Upserted %d %s sla lifecycles" , count , s .Name ())
188+ }
189+ }).Stop ()
190+
191+ stmt , _ := r .db .BuildInsertIgnoreStmt (v1 .NewSlaLifecycle ())
192+
193+ // Not to mess up the already existing FIFO mechanism, we have to perform only a single query
194+ // (semaphore 1) at a time, even though, the sla queries could be executed concurrently.
195+ // After successfully upserting a lifecycle entity, the original checkable entity is streamed to "entities".
196+ fromSlaEntities := CreateSlaLifecyclesFromCheckables (ctx , g , upsertEntities , false )
197+ return r .db .NamedBulkExec (
198+ ctx , stmt , upsertCount , semaphore .NewWeighted (1 ), fromSlaEntities ,
199+ com .NeverSplit [database .Entity ], database.OnSuccessIncrement [database.Entity ](& counter ),
200+ OnSuccessApplyAndSendTo (entities , GetCheckableFromSlaLifecycle ))
201+ })
202+
203+ // Start the regular Icinga DB checkables upsert stream.
204+ g .Go (upsertEntityFunc (entities ))
205+
206+ deletedIds := make (chan any , r .redis .Options .XReadCount )
207+ g .Go (func () error {
208+ defer close (deletedIds )
209+
210+ var counter com.Counter
211+ defer periodic .Start (ctx , r .logger .Interval (), func (_ periodic.Tick ) {
212+ if count := counter .Reset (); count > 0 {
213+ r .logger .Infof ("Updating %d %s sla lifecycles" , count , s .Name ())
214+ }
215+ }).Stop ()
216+
217+ sl := v1 .NewSlaLifecycle ()
218+ stmt := fmt .Sprintf (`UPDATE %s SET delete_time = :delete_time WHERE "id" = :id AND "delete_time" = 0` , database .TableName (sl ))
219+ sem := r .db .GetSemaphoreForTable (database .TableName (sl ))
220+
221+ // extractEntityId is used as a callback for the on success mechanism to extract the checkables id.
222+ extractEntityId := func (e database.Entity ) any { return e .(* v1.SlaLifecycle ).SourceEntity .ID () }
223+
224+ return r .db .NamedBulkExec (
225+ ctx , stmt , upsertCount , sem , CreateSlaLifecyclesFromCheckables (ctx , g , deleteEntities , true ),
226+ com .NeverSplit [database .Entity ], database.OnSuccessIncrement [database.Entity ](& counter ),
227+ OnSuccessApplyAndSendTo [database.Entity , any ](deletedIds , extractEntityId ))
228+ })
229+
230+ // Start the regular Icinga DB checkables delete stream.
231+ g .Go (deleteEntityFunc (deletedIds ))
232+ default :
233+ // For non-checkables runtime updates of upsert event
234+ g .Go (upsertEntityFunc (upsertEntities ))
235+
236+ // For non-checkables runtime updates of delete event
237+ deleteIds := make (chan any , r .redis .Options .XReadCount )
238+ g .Go (func () error {
239+ defer close (deleteIds )
240+
241+ for {
242+ select {
243+ case <- ctx .Done ():
244+ return ctx .Err ()
245+ case entity , ok := <- deleteEntities :
246+ if ! ok {
247+ return nil
248+ }
249+
250+ select {
251+ case deleteIds <- entity .ID ():
252+ case <- ctx .Done ():
253+ return ctx .Err ()
254+ }
255+ }
256+ }
257+ })
258+
259+ g .Go (deleteEntityFunc (deleteIds ))
260+ }
145261 }
146262
147263 // customvar and customvar_flat sync.
148264 {
149265 updateMessages := make (chan redis.XMessage , r .redis .Options .XReadCount )
150266 upsertEntities := make (chan database.Entity , r .redis .Options .XReadCount )
151- deleteIds := make (chan interface {} , r .redis .Options .XReadCount )
267+ deleteEntities := make (chan database. Entity , r .redis .Options .XReadCount )
152268
153269 cv := common .NewSyncSubject (v1 .NewCustomvar )
154270 cvFlat := common .NewSyncSubject (v1 .NewCustomvarFlat )
@@ -158,7 +274,7 @@ func (r *RuntimeUpdates) Sync(
158274
159275 updateMessagesByKey ["icinga:" + strcase .Delimited (cv .Name (), ':' )] = updateMessages
160276 g .Go (structifyStream (
161- ctx , updateMessages , upsertEntities , nil , deleteIds , nil ,
277+ ctx , updateMessages , upsertEntities , nil , deleteEntities , nil ,
162278 structify .MakeMapStructifier (
163279 reflect .TypeOf (cv .Entity ()).Elem (),
164280 "json" ,
@@ -212,7 +328,7 @@ func (r *RuntimeUpdates) Sync(
212328 var once sync.Once
213329 for {
214330 select {
215- case _ , ok := <- deleteIds :
331+ case _ , ok := <- deleteEntities :
216332 if ! ok {
217333 return nil
218334 }
@@ -302,7 +418,7 @@ func (r *RuntimeUpdates) xRead(ctx context.Context, updateMessagesByKey map[stri
302418// Converted entities are inserted into the upsertEntities or deleteIds channel depending on the "runtime_type" message field.
303419func structifyStream (
304420 ctx context.Context , updateMessages <- chan redis.XMessage , upsertEntities , upserted chan database.Entity ,
305- deleteIds , deleted chan interface {}, structifier structify.MapStructifier ,
421+ deleteEntities chan database. Entity , deleted chan interface {}, structifier structify.MapStructifier ,
306422) func () error {
307423 if upserted == nil {
308424 upserted = make (chan database.Entity )
@@ -317,7 +433,7 @@ func structifyStream(
317433 return func () error {
318434 defer func () {
319435 close (upsertEntities )
320- close (deleteIds )
436+ close (deleteEntities )
321437 }()
322438
323439 for {
@@ -353,7 +469,7 @@ func structifyStream(
353469 }
354470 } else if runtimeType == "delete" {
355471 select {
356- case deleteIds <- entity . ID () :
472+ case deleteEntities <- entity :
357473 case <- ctx .Done ():
358474 return ctx .Err ()
359475 }
0 commit comments