Skip to content

Commit c5c83e3

Browse files
committed
Don't bulk execute sla lifecycle updates
1 parent ccced2d commit c5c83e3

File tree

3 files changed

+4
-8
lines changed

3 files changed

+4
-8
lines changed

pkg/icingadb/runtime_updates.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ func (r *RuntimeUpdates) Sync(
204204
g.Go(upsertEntityFunc(entities))
205205

206206
// Start the regular Icinga DB checkables delete stream.
207-
g.Go(deleteEntityFunc(StreamIDsFromUpdatedSlaLifecycles(ctx, r.db, s.Entity(), g, r.logger, deleteEntities, upsertCount)))
207+
g.Go(deleteEntityFunc(StreamIDsFromUpdatedSlaLifecycles(ctx, r.db, s.Entity(), g, r.logger, deleteEntities)))
208208
default:
209209
// For non-checkables runtime updates of upsert event
210210
g.Go(upsertEntityFunc(upsertEntities))

pkg/icingadb/sla_lifecycle.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func CreateSlaLifecyclesFromCheckables(
101101
// It's unlikely, but when a given Checkable doesn't already have a `create_time` entry in the database, the update
102102
// query won't update anything. Either way the entities IDs are streamed into the returned chan.
103103
func StreamIDsFromUpdatedSlaLifecycles(
104-
ctx context.Context, db *database.DB, subject database.Entity, g *errgroup.Group, logger *logging.Logger, entities <-chan database.Entity, bulkSize int,
104+
ctx context.Context, db *database.DB, subject database.Entity, g *errgroup.Group, logger *logging.Logger, entities <-chan database.Entity,
105105
) <-chan any {
106106
deleteEntityIDs := make(chan any, 1)
107107

@@ -118,15 +118,11 @@ func StreamIDsFromUpdatedSlaLifecycles(
118118
sem := db.GetSemaphoreForTable(tableName)
119119
stmt := fmt.Sprintf(`UPDATE %s SET delete_time = :delete_time WHERE "id" = :id AND "delete_time" = 0`, tableName)
120120

121-
if bulkSize <= 0 {
122-
bulkSize = db.Options.MaxPlaceholdersPerStatement
123-
}
124-
125121
// extractEntityId is used as a callback for the on success mechanism to extract the checkables id.
126122
extractEntityId := func(e database.Entity) any { return e.(*v1.SlaLifecycle).SourceEntity.ID() }
127123

128124
return db.NamedBulkExec(
129-
ctx, stmt, bulkSize, sem, CreateSlaLifecyclesFromCheckables(ctx, subject, g, entities, true),
125+
ctx, stmt, 1, sem, CreateSlaLifecyclesFromCheckables(ctx, subject, g, entities, true),
130126
com.NeverSplit[database.Entity], OnSuccessApplyAndSendTo[database.Entity, any](deleteEntityIDs, extractEntityId))
131127
})
132128

pkg/icingadb/sync.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
186186
case *v1.Host, *v1.Service:
187187
g.Go(func() error {
188188
return s.db.DeleteStreamed(
189-
ctx, entity, StreamIDsFromUpdatedSlaLifecycles(ctx, s.db, entity, g, s.logger, delta.Delete.Entities(ctx), 0),
189+
ctx, entity, StreamIDsFromUpdatedSlaLifecycles(ctx, s.db, entity, g, s.logger, delta.Delete.Entities(ctx)),
190190
database.OnSuccessIncrement[any](stat))
191191
})
192192
default:

0 commit comments

Comments
 (0)