Skip to content

Commit 06e35b1

Browse files
committed
Extract some duplicated code snippets
1 parent fbc641f commit 06e35b1

File tree

3 files changed

+51
-44
lines changed

3 files changed

+51
-44
lines changed

pkg/icingadb/runtime_updates.go

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -203,32 +203,8 @@ func (r *RuntimeUpdates) Sync(
203203
// Start the regular Icinga DB checkables upsert stream.
204204
g.Go(upsertEntityFunc(entities))
205205

206-
deletedIds := make(chan any, r.redis.Options.XReadCount)
207-
g.Go(func() error {
208-
defer close(deletedIds)
209-
210-
var counter com.Counter
211-
defer periodic.Start(ctx, r.logger.Interval(), func(_ periodic.Tick) {
212-
if count := counter.Reset(); count > 0 {
213-
r.logger.Infof("Updating %d %s sla lifecycles", count, s.Name())
214-
}
215-
}).Stop()
216-
217-
sl := v1.NewSlaLifecycle()
218-
stmt := fmt.Sprintf(`UPDATE %s SET delete_time = :delete_time WHERE "id" = :id AND "delete_time" = 0`, database.TableName(sl))
219-
sem := r.db.GetSemaphoreForTable(database.TableName(sl))
220-
221-
// extractEntityId is used as a callback for the on success mechanism to extract the checkables id.
222-
extractEntityId := func(e database.Entity) any { return e.(*v1.SlaLifecycle).SourceEntity.ID() }
223-
224-
return r.db.NamedBulkExec(
225-
ctx, stmt, upsertCount, sem, CreateSlaLifecyclesFromCheckables(ctx, g, deleteEntities, true),
226-
com.NeverSplit[database.Entity], database.OnSuccessIncrement[database.Entity](&counter),
227-
OnSuccessApplyAndSendTo[database.Entity, any](deletedIds, extractEntityId))
228-
})
229-
230206
// Start the regular Icinga DB checkables delete stream.
231-
g.Go(deleteEntityFunc(deletedIds))
207+
g.Go(deleteEntityFunc(StreamIDsFromUpdatedSlaLifecycles(ctx, r.db, g, r.logger, deleteEntities, upsertCount)))
232208
default:
233209
// For non-checkables runtime updates of upsert event
234210
g.Go(upsertEntityFunc(upsertEntities))

pkg/icingadb/sla_lifecycle.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,21 @@ package icingadb
22

33
import (
44
"context"
5+
"fmt"
6+
"github.com/icinga/icinga-go-library/com"
57
"github.com/icinga/icinga-go-library/database"
8+
"github.com/icinga/icinga-go-library/logging"
9+
"github.com/icinga/icinga-go-library/periodic"
610
"github.com/icinga/icinga-go-library/types"
711
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
812
"github.com/pkg/errors"
913
"golang.org/x/sync/errgroup"
1014
"time"
1115
)
1216

17+
// tableName defines the table name of v1.SlaLifecycle type.
18+
var tableName = database.TableName(v1.NewSlaLifecycle())
19+
1320
// GetCheckableFromSlaLifecycle returns the original checkable from which the specified sla lifecycle were transformed.
1421
// When the passed entity is not of type *SlaLifecycle, it is returned as is.
1522
func GetCheckableFromSlaLifecycle(e database.Entity) database.Entity {
@@ -86,3 +93,42 @@ func CreateSlaLifecyclesFromCheckables(
8693

8794
return slaLifecycles
8895
}
96+
97+
// StreamIDsFromUpdatedSlaLifecycles updates the `delete_time` of the sla lifecycle table for each of the Checkables
98+
// consumed from the provided "entities" chan and upon successful execution of the query streams the original IDs
99+
// of the entities into the returned channel.
100+
//
101+
// It's unlikely, but when a given Checkable doesn't already have a `create_time` entry in the database, the update
102+
// query won't update anything. Either way the entities IDs are streamed into the returned chan.
103+
func StreamIDsFromUpdatedSlaLifecycles(
104+
ctx context.Context, db *database.DB, g *errgroup.Group, logger *logging.Logger, entities <-chan database.Entity, bulkSize int,
105+
) <-chan any {
106+
deleteEntityIDs := make(chan any, 1)
107+
108+
g.Go(func() error {
109+
defer close(deleteEntityIDs)
110+
111+
var counter com.Counter
112+
defer periodic.Start(ctx, logger.Interval(), func(_ periodic.Tick) {
113+
if count := counter.Reset(); count > 0 {
114+
logger.Infof("Updated %d sla lifecycles", count)
115+
}
116+
}).Stop()
117+
118+
sem := db.GetSemaphoreForTable(tableName)
119+
stmt := fmt.Sprintf(`UPDATE %s SET delete_time = :delete_time WHERE "id" = :id AND "delete_time" = 0`, tableName)
120+
121+
if bulkSize <= 0 {
122+
bulkSize = db.Options.MaxPlaceholdersPerStatement
123+
}
124+
125+
// extractEntityId is used as a callback for the on success mechanism to extract the checkables id.
126+
extractEntityId := func(e database.Entity) any { return e.(*v1.SlaLifecycle).SourceEntity.ID() }
127+
128+
return db.NamedBulkExec(
129+
ctx, stmt, bulkSize, sem, CreateSlaLifecyclesFromCheckables(ctx, g, entities, true),
130+
com.NeverSplit[database.Entity], OnSuccessApplyAndSendTo[database.Entity, any](deleteEntityIDs, extractEntityId))
131+
})
132+
133+
return deleteEntityIDs
134+
}

pkg/icingadb/sync.go

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -184,27 +184,12 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
184184
entity := delta.Subject.Entity()
185185
switch entity.(type) {
186186
case *v1.Host, *v1.Service:
187-
deleteIds := make(chan any, len(delta.Delete))
188-
g.Go(func() error {
189-
defer close(deleteIds)
190-
191-
s.logger.Infof("Updating %d %s sla lifecycles", len(delta.Delete), delta.Subject.Name())
192-
193-
slTableName := database.TableName(v1.NewSlaLifecycle())
194-
sem := s.db.GetSemaphoreForTable(slTableName)
195-
stmt := fmt.Sprintf(`UPDATE %s SET delete_time = :delete_time WHERE "id" = :id AND "delete_time" = 0`, slTableName)
196-
197-
// extractEntityId is used as a callback for the on success mechanism to extract the checkables id.
198-
extractEntityId := func(e database.Entity) any { return e.(*v1.SlaLifecycle).SourceEntity.ID() }
199-
200-
return s.db.NamedBulkExec(
201-
ctx, stmt, s.db.Options.MaxPlaceholdersPerStatement, sem,
202-
CreateSlaLifecyclesFromCheckables(ctx, g, delta.Delete.Entities(ctx), true),
203-
com.NeverSplit[database.Entity], OnSuccessApplyAndSendTo[database.Entity, any](deleteIds, extractEntityId))
204-
})
187+
s.logger.Infof("Updating %d %s sla lifecycles", len(delta.Delete), delta.Subject.Name())
205188

206189
g.Go(func() error {
207-
return s.db.DeleteStreamed(ctx, delta.Subject.Entity(), deleteIds, database.OnSuccessIncrement[any](stat))
190+
return s.db.DeleteStreamed(
191+
ctx, entity, StreamIDsFromUpdatedSlaLifecycles(ctx, s.db, g, s.logger, delta.Delete.Entities(ctx), 0),
192+
database.OnSuccessIncrement[any](stat))
208193
})
209194
default:
210195
g.Go(func() error {

0 commit comments

Comments
 (0)