Skip to content

Commit 93d6f3f

Browse files
committed
wip
1 parent efec262 commit 93d6f3f

File tree

7 files changed

+241
-6
lines changed

7 files changed

+241
-6
lines changed

pkg/icingadb/db.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -506,7 +506,9 @@ func (db *DB) BatchSizeByPlaceholders(n int) int {
506506
// YieldAll executes the query with the supplied scope,
507507
// scans each resulting row into an entity returned by the factory function,
508508
// and streams them into a returned channel.
509-
func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, query string, scope interface{}) (<-chan contracts.Entity, <-chan error) {
509+
func (db *DB) YieldAll(
510+
ctx context.Context, factoryFunc contracts.EntityFactoryFunc, query string, namedQueryParams bool, scope ...interface{},
511+
) (<-chan contracts.Entity, <-chan error) {
510512
entities := make(chan contracts.Entity, 1)
511513
g, ctx := errgroup.WithContext(ctx)
512514

@@ -515,7 +517,14 @@ func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryF
515517
defer db.log(ctx, query, &counter).Stop()
516518
defer close(entities)
517519

518-
rows, err := db.NamedQueryContext(ctx, query, scope)
520+
var rows *sqlx.Rows
521+
var err error
522+
if namedQueryParams {
523+
rows, err = db.NamedQueryContext(ctx, query, scope)
524+
} else {
525+
rows, err = db.QueryxContext(ctx, query, scope...)
526+
}
527+
519528
if err != nil {
520529
return internal.CantPerformQuery(err, query)
521530
}

pkg/icingadb/sla.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package icingadb
2+
3+
import (
4+
"context"
5+
"github.com/icinga/icingadb/pkg/contracts"
6+
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
7+
"github.com/icinga/icingadb/pkg/types"
8+
"github.com/pkg/errors"
9+
"golang.org/x/sync/errgroup"
10+
"time"
11+
)
12+
13+
type SlaHistoryTrail struct {
14+
Id types.Int `json:"id" db:"-"`
15+
v1.EnvironmentMeta `json:",inline"`
16+
HostId types.Binary `json:"host_id"`
17+
ServiceId types.Binary `json:"service_id"`
18+
EventType string `json:"event_type"`
19+
EventTime types.UnixMilli `json:"event_time"`
20+
}
21+
22+
// Fingerprint implements the contracts.Fingerprinter interface.
23+
func (sht SlaHistoryTrail) Fingerprint() contracts.Fingerprinter {
24+
return sht
25+
}
26+
27+
// ID implements part of the contracts.IDer interface.
28+
func (sht SlaHistoryTrail) ID() contracts.ID {
29+
return sht.Id
30+
}
31+
32+
// SetID implements part of the contracts.IDer interface.
33+
func (sht *SlaHistoryTrail) SetID(id contracts.ID) {
34+
sht.Id = id.(types.Int)
35+
}
36+
37+
type SlaServiceHistoryTrailColumns struct {
38+
v1.EntityWithoutChecksum `json:",inline"`
39+
v1.EnvironmentMeta `json:",inline"`
40+
HostId types.Binary `json:"host_id"`
41+
}
42+
43+
func CheckableToSlaTrailEntities(ctx context.Context, g *errgroup.Group, checkables <-chan contracts.Entity, eventType string) <-chan contracts.Entity {
44+
entities := make(chan contracts.Entity, 1)
45+
46+
g.Go(func() error {
47+
defer close(entities)
48+
49+
env, ok := v1.EnvironmentFromContext(ctx)
50+
if !ok {
51+
return errors.New("can't get environment from context")
52+
}
53+
54+
// Use the same event time for all chackables
55+
now := time.Now()
56+
57+
for {
58+
select {
59+
case checkable, ok := <-checkables:
60+
if !ok {
61+
return nil
62+
}
63+
64+
entity := &SlaHistoryTrail{
65+
EnvironmentMeta: v1.EnvironmentMeta{EnvironmentId: env.Id},
66+
EventType: eventType,
67+
EventTime: types.UnixMilli(now),
68+
}
69+
70+
switch ptr := checkable.(type) {
71+
case *v1.Host:
72+
entity.HostId = ptr.Id
73+
case *v1.Service:
74+
entity.HostId = ptr.HostId
75+
entity.ServiceId = ptr.Id
76+
}
77+
78+
entities <- entity
79+
case <-ctx.Done():
80+
return ctx.Err()
81+
}
82+
}
83+
})
84+
85+
return entities
86+
}
87+
88+
var (
89+
_ contracts.Entity = (*SlaHistoryTrail)(nil)
90+
)

pkg/icingadb/sync.go

Lines changed: 85 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@ import (
1010
"github.com/icinga/icingadb/pkg/icingaredis"
1111
"github.com/icinga/icingadb/pkg/icingaredis/telemetry"
1212
"github.com/icinga/icingadb/pkg/logging"
13+
"github.com/icinga/icingadb/pkg/types"
1314
"github.com/icinga/icingadb/pkg/utils"
1415
"github.com/pkg/errors"
1516
"go.uber.org/zap"
1617
"golang.org/x/sync/errgroup"
1718
"runtime"
19+
"strings"
1820
"time"
1921
)
2022

@@ -85,7 +87,8 @@ func (s Sync) Sync(ctx context.Context, subject *common.SyncSubject) error {
8587

8688
actual, dbErrs := s.db.YieldAll(
8789
ctx, subject.FactoryForDelta(),
88-
s.db.BuildSelectStmt(NewScopedEntity(subject.Entity(), e.Meta()), subject.Entity().Fingerprint()), e.Meta(),
90+
s.db.BuildSelectStmt(NewScopedEntity(subject.Entity(), e.Meta()), subject.Entity().Fingerprint()),
91+
true, e.Meta(),
8992
)
9093
// Let errors from DB cancel our group.
9194
com.ErrgroupReceive(g, dbErrs)
@@ -128,9 +131,31 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
128131
entities = delta.Create.Entities(ctx)
129132
}
130133

134+
var slaTrailEntities chan contracts.Entity
135+
onSuccessHandlers := []OnSuccess[contracts.Entity]{
136+
OnSuccessIncrement[contracts.Entity](stat),
137+
}
138+
139+
switch delta.Subject.Entity().(type) {
140+
case *v1.Host, *v1.Service:
141+
slaTrailEntities = make(chan contracts.Entity)
142+
onSuccessHandlers = append(onSuccessHandlers, OnSuccessSendTo[contracts.Entity](slaTrailEntities))
143+
}
144+
131145
g.Go(func() error {
132-
return s.db.CreateStreamed(ctx, entities, OnSuccessIncrement[contracts.Entity](stat))
146+
if slaTrailEntities != nil {
147+
defer close(slaTrailEntities)
148+
}
149+
150+
return s.db.CreateStreamed(ctx, entities, onSuccessHandlers...)
133151
})
152+
153+
if slaTrailEntities != nil {
154+
s.logger.Infof("Inserting %d items of type %s sla history trails of type create", len(delta.Create), utils.Key(utils.Name(delta.Subject.Entity()), ' '))
155+
g.Go(func() error {
156+
return s.db.CreateStreamed(ctx, CheckableToSlaTrailEntities(ctx, g, slaTrailEntities, "create"))
157+
})
158+
}
134159
}
135160

136161
// Update
@@ -160,6 +185,60 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
160185
// Delete
161186
if len(delta.Delete) > 0 {
162187
s.logger.Infof("Deleting %d items of type %s", len(delta.Delete), utils.Key(utils.Name(delta.Subject.Entity()), ' '))
188+
entity := delta.Subject.Entity()
189+
switch entity.(type) {
190+
case *v1.Host, *v1.Service:
191+
s.logger.Infof("Inserting %d items of type %s sla history trails of type delete", len(delta.Delete), utils.Key(utils.Name(entity), ' '))
192+
if _, ok := entity.(*v1.Host); ok {
193+
entities := make(chan contracts.Entity, 1)
194+
g.Go(func() error {
195+
defer close(entities)
196+
197+
env, ok := v1.EnvironmentFromContext(ctx)
198+
if !ok {
199+
return errors.New("can't get environment from context")
200+
}
201+
202+
now := time.Now()
203+
for _, id := range delta.Delete.IDs() {
204+
sht := &SlaHistoryTrail{
205+
EnvironmentMeta: v1.EnvironmentMeta{EnvironmentId: env.Id},
206+
HostId: id.(types.Binary),
207+
EventType: "delete",
208+
EventTime: types.UnixMilli(now),
209+
}
210+
211+
entities <- sht
212+
}
213+
214+
return nil
215+
})
216+
217+
g.Go(func() error {
218+
return s.db.CreateStreamed(ctx, entities)
219+
})
220+
} else {
221+
g.Go(func() error {
222+
columns := &SlaServiceHistoryTrailColumns{}
223+
query := s.db.BuildSelectStmt(entity, columns)
224+
if len(delta.Delete) == 1 {
225+
query += ` WHERE id = ?`
226+
} else {
227+
var placeholders []string
228+
for i := 0; i < len(delta.Delete); i++ {
229+
placeholders = append(placeholders, "?")
230+
}
231+
232+
query += fmt.Sprintf(` WHERE id IN (%s)`, strings.Join(placeholders, `, `))
233+
}
234+
entities, err := s.db.YieldAll(ctx, delta.Subject.Factory(), query, false, delta.Delete.IDs()...)
235+
com.ErrgroupReceive(g, err)
236+
237+
return s.db.CreateStreamed(ctx, entities)
238+
})
239+
}
240+
}
241+
163242
g.Go(func() error {
164243
return s.db.Delete(ctx, delta.Subject.Entity(), delta.Delete.IDs(), OnSuccessIncrement[any](stat))
165244
})
@@ -187,7 +266,8 @@ func (s Sync) SyncCustomvars(ctx context.Context) error {
187266

188267
actualCvs, errs := s.db.YieldAll(
189268
ctx, cv.FactoryForDelta(),
190-
s.db.BuildSelectStmt(NewScopedEntity(cv.Entity(), e.Meta()), cv.Entity().Fingerprint()), e.Meta(),
269+
s.db.BuildSelectStmt(NewScopedEntity(cv.Entity(), e.Meta()), cv.Entity().Fingerprint()),
270+
true, e.Meta(),
191271
)
192272
com.ErrgroupReceive(g, errs)
193273

@@ -199,7 +279,8 @@ func (s Sync) SyncCustomvars(ctx context.Context) error {
199279

200280
actualFlatCvs, errs := s.db.YieldAll(
201281
ctx, flatCv.FactoryForDelta(),
202-
s.db.BuildSelectStmt(NewScopedEntity(flatCv.Entity(), e.Meta()), flatCv.Entity().Fingerprint()), e.Meta(),
282+
s.db.BuildSelectStmt(NewScopedEntity(flatCv.Entity(), e.Meta()), flatCv.Entity().Fingerprint()),
283+
true, e.Meta(),
203284
)
204285
com.ErrgroupReceive(g, errs)
205286

pkg/types/int.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ import (
66
"database/sql/driver"
77
"encoding"
88
"encoding/json"
9+
"fmt"
910
"github.com/icinga/icingadb/internal"
11+
"github.com/icinga/icingadb/pkg/contracts"
1012
"strconv"
1113
)
1214

@@ -58,11 +60,16 @@ func (i *Int) UnmarshalJSON(data []byte) error {
5860
return nil
5961
}
6062

63+
func (i Int) String() string {
64+
return fmt.Sprintf("%d", i.Int64)
65+
}
66+
6167
// Assert interface compliance.
6268
var (
6369
_ json.Marshaler = Int{}
6470
_ json.Unmarshaler = (*Int)(nil)
6571
_ encoding.TextUnmarshaler = (*Int)(nil)
6672
_ driver.Valuer = Int{}
6773
_ sql.Scanner = (*Int)(nil)
74+
_ contracts.ID = (*Int)(nil)
6875
)

schema/mysql/schema.sql

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1321,6 +1321,18 @@ CREATE TABLE sla_history_downtime (
13211321
INDEX idx_sla_history_downtime_env_downtime_end (environment_id, downtime_end) COMMENT 'Filter for sla history retention'
13221322
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC;
13231323

1324+
CREATE TABLE sla_history_trail (
1325+
id bigint NOT NULL AUTO_INCREMENT,
1326+
environment_id binary(20) NOT NULL COMMENT 'environment.id',
1327+
host_id binary(20) NOT NULL COMMENT 'host.id (may reference already deleted hosts)',
1328+
service_id binary(20) DEFAULT NULL COMMENT 'service.id (may reference already deleted services)',
1329+
1330+
event_type enum('delete', 'create') NOT NULL,
1331+
event_time bigint unsigned NOT NULL COMMENT 'unix timestamp the event occurred',
1332+
1333+
PRIMARY KEY (id)
1334+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC;
1335+
13241336
CREATE TABLE icingadb_schema (
13251337
id int unsigned NOT NULL AUTO_INCREMENT,
13261338
version smallint unsigned NOT NULL,

schema/pgsql/schema.sql

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ CREATE TYPE boolenum AS ENUM ( 'n', 'y' );
1717
CREATE TYPE acked AS ENUM ( 'n', 'y', 'sticky' );
1818
CREATE TYPE state_type AS ENUM ( 'hard', 'soft' );
1919
CREATE TYPE checkable_type AS ENUM ( 'host', 'service' );
20+
CREATE TYPE sla_trail_event_type AS ENUM ( 'create', 'delete' );
2021
CREATE TYPE comment_type AS ENUM ( 'comment', 'ack' );
2122
CREATE TYPE notification_type AS ENUM ( 'downtime_start', 'downtime_end', 'downtime_removed', 'custom', 'acknowledgement', 'problem', 'recovery', 'flapping_start', 'flapping_end' );
2223
CREATE TYPE history_type AS ENUM ( 'notification', 'state_change', 'downtime_start', 'downtime_end', 'comment_add', 'comment_remove', 'flapping_start', 'flapping_end', 'ack_set', 'ack_clear' );
@@ -2147,6 +2148,23 @@ COMMENT ON COLUMN sla_history_downtime.downtime_id IS 'downtime.id (may referenc
21472148
COMMENT ON COLUMN sla_history_downtime.downtime_start IS 'start time of the downtime';
21482149
COMMENT ON COLUMN sla_history_downtime.downtime_end IS 'end time of the downtime';
21492150

2151+
CREATE TABLE sla_history_trail (
2152+
id bigserial NOT NULL,
2153+
environment_id bytea20 NOT NULL,
2154+
host_id bytea20 NOT NULL,
2155+
service_id bytea20 DEFAULT NULL,
2156+
2157+
event_type sla_trail_event_type NOT NULL,
2158+
event_time biguint NOT NULL,
2159+
2160+
CONSTRAINT pk_sla_history_trail PRIMARY KEY (id)
2161+
);
2162+
2163+
COMMENT ON COLUMN sla_history_trail.environment_id IS 'environment.id';
2164+
COMMENT ON COLUMN sla_history_trail.host_id IS 'host.id (may reference already deleted hosts)';
2165+
COMMENT ON COLUMN sla_history_trail.service_id IS 'service.id (may reference already deleted services)';
2166+
COMMENT ON COLUMN sla_history_trail.event_time IS 'unix timestamp the event occurred';
2167+
21502168
CREATE SEQUENCE icingadb_schema_id_seq;
21512169

21522170
CREATE TABLE icingadb_schema (

tests/object_sync_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,24 @@ func TestObjectSync(t *testing.T) {
310310
t.Skip()
311311
})
312312

313+
t.Run("Sla History Trail", func(t *testing.T) {
314+
t.Parallel()
315+
316+
assert.Eventuallyf(t, func() bool {
317+
var count int
318+
err := db.Get(&count, "SELECT COUNT(*) FROM sla_history_trail WHERE service_id IS NULL")
319+
require.NoError(t, err, "querying hosts sla history trail should not fail")
320+
return count == len(data.Hosts)
321+
}, 20*time.Second, 200*time.Millisecond, "Newly created hosts should exists in database")
322+
323+
assert.Eventuallyf(t, func() bool {
324+
var count int
325+
err := db.Get(&count, "SELECT COUNT(*) FROM sla_history_trail WHERE service_id IS NOT NULL")
326+
require.NoError(t, err, "querying services sla history trail should not fail")
327+
return count == len(data.Services)
328+
}, 20*time.Second, 200*time.Millisecond, "Newly created services should exists in database")
329+
})
330+
313331
t.Run("RuntimeUpdates", func(t *testing.T) {
314332
t.Parallel()
315333

0 commit comments

Comments
 (0)