diff --git a/internal/event/event.go b/internal/event/event.go index 5a258f88..209fa788 100644 --- a/internal/event/event.go +++ b/internal/event/event.go @@ -7,7 +7,6 @@ import ( "fmt" "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/types" - "github.com/icinga/icinga-notifications/internal/utils" "github.com/jmoiron/sqlx" "time" ) @@ -176,7 +175,7 @@ func (e *Event) Sync(ctx context.Context, tx *sqlx.Tx, db *database.DB, objectId } eventRow := NewEventRow(e, objectId) - eventID, err := utils.InsertAndFetchId(ctx, tx, utils.BuildInsertStmtWithout(db, eventRow, "id"), eventRow) + eventID, err := database.InsertObtainID(ctx, tx, database.BuildInsertStmtWithout(db, eventRow, "id"), eventRow) if err == nil { e.ID = eventID } @@ -206,11 +205,11 @@ func NewEventRow(e *Event, objectId types.Binary) *EventRow { return &EventRow{ Time: types.UnixMilli(e.Time), ObjectID: objectId, - Type: utils.ToDBString(e.Type), + Type: types.MakeString(e.Type, types.TransformEmptyStringToNull), Severity: e.Severity, - Username: utils.ToDBString(e.Username), - Message: utils.ToDBString(e.Message), + Username: types.MakeString(e.Username, types.TransformEmptyStringToNull), + Message: types.MakeString(e.Message, types.TransformEmptyStringToNull), Mute: e.Mute, - MuteReason: utils.ToDBString(e.MuteReason), + MuteReason: types.MakeString(e.MuteReason, types.TransformEmptyStringToNull), } } diff --git a/internal/incident/db_types.go b/internal/incident/db_types.go index 594a3b39..78a26b7f 100644 --- a/internal/incident/db_types.go +++ b/internal/incident/db_types.go @@ -6,7 +6,6 @@ import ( "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/recipient" - "github.com/icinga/icinga-notifications/internal/utils" "github.com/jmoiron/sqlx" ) @@ -92,7 +91,7 @@ func (h *HistoryRow) TableName() string { // Sync persists the current state of this history to the database and retrieves the just inserted history ID. // Returns error when failed to execute the query. func (h *HistoryRow) Sync(ctx context.Context, db *database.DB, tx *sqlx.Tx) error { - historyId, err := utils.InsertAndFetchId(ctx, tx, utils.BuildInsertStmtWithout(db, h, "id"), h) + historyId, err := database.InsertObtainID(ctx, tx, database.BuildInsertStmtWithout(db, h, "id"), h) if err != nil { return err } diff --git a/internal/incident/incident.go b/internal/incident/incident.go index e8692b00..6bf5affe 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -4,6 +4,9 @@ import ( "context" "errors" "fmt" + "sync" + "time" + "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-notifications/internal/config" @@ -13,11 +16,8 @@ import ( "github.com/icinga/icinga-notifications/internal/object" "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icinga-notifications/internal/rule" - "github.com/icinga/icinga-notifications/internal/utils" "github.com/jmoiron/sqlx" "go.uber.org/zap" - "sync" - "time" ) type ruleID = int64 @@ -252,7 +252,7 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) { var notifications []*NotificationEntry ctx := context.Background() - err = utils.RunInTx(ctx, i.db, func(tx *sqlx.Tx) error { + err = i.db.ExecTx(ctx, func(ctx context.Context, tx *sqlx.Tx) error { err := ev.Sync(ctx, tx, i.db, i.Object.ID) if err != nil { return err @@ -298,12 +298,12 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, hr := &HistoryRow{ IncidentID: i.Id, - EventID: utils.ToDBInt(ev.ID), + EventID: types.MakeInt(ev.ID, types.TransformZeroIntToNull), Time: types.UnixMilli(time.Now()), Type: IncidentSeverityChanged, NewSeverity: newSeverity, OldSeverity: oldSeverity, - Message: utils.ToDBString(ev.Message), + Message: types.MakeString(ev.Message, types.TransformEmptyStringToNull), } if err := hr.Sync(ctx, i.db, tx); err != nil { @@ -319,7 +319,7 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, hr = &HistoryRow{ IncidentID: i.Id, - EventID: utils.ToDBInt(ev.ID), + EventID: types.MakeInt(ev.ID, types.TransformZeroIntToNull), Time: i.RecoveredAt, Type: Closed, } @@ -357,9 +357,9 @@ func (i *Incident) processIncidentOpenedEvent(ctx context.Context, tx *sqlx.Tx, IncidentID: i.Id, Type: Opened, Time: types.UnixMilli(ev.Time), - EventID: utils.ToDBInt(ev.ID), + EventID: types.MakeInt(ev.ID, types.TransformZeroIntToNull), NewSeverity: i.Severity, - Message: utils.ToDBString(ev.Message), + Message: types.MakeString(ev.Message, types.TransformEmptyStringToNull), } if err := hr.Sync(ctx, i.db, tx); err != nil { @@ -377,7 +377,7 @@ func (i *Incident) handleMuteUnmute(ctx context.Context, tx *sqlx.Tx, ev *event. return nil } - hr := &HistoryRow{IncidentID: i.Id, EventID: utils.ToDBInt(ev.ID), Time: types.UnixMilli(time.Now())} + hr := &HistoryRow{IncidentID: i.Id, EventID: types.MakeInt(ev.ID, types.TransformZeroIntToNull), Time: types.UnixMilli(time.Now())} logger := i.logger.With(zap.String("event", ev.String())) if i.Object.IsMuted() { hr.Type = Muted @@ -388,7 +388,7 @@ func (i *Incident) handleMuteUnmute(ctx context.Context, tx *sqlx.Tx, ev *event. } else { hr.Type = Unmuted // On the other hand, if an object is unmuted, its mute reason is already reset, and we can't access it anymore. - hr.Message = utils.ToDBString(ev.MuteReason) + hr.Message = types.MakeString(ev.MuteReason, types.TransformEmptyStringToNull) logger.Infow("Unmuting incident", zap.String("reason", ev.MuteReason)) } @@ -426,8 +426,8 @@ func (i *Incident) evaluateRules(ctx context.Context, tx *sqlx.Tx, eventID int64 hr := &HistoryRow{ IncidentID: i.Id, Time: types.UnixMilli(time.Now()), - EventID: utils.ToDBInt(eventID), - RuleID: utils.ToDBInt(r.ID), + EventID: types.MakeInt(eventID, types.TransformZeroIntToNull), + RuleID: types.MakeInt(r.ID, types.TransformZeroIntToNull), Type: RuleMatched, } if err := hr.Sync(ctx, i.db, tx); err != nil { @@ -534,9 +534,9 @@ func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *even hr := &HistoryRow{ IncidentID: i.Id, Time: state.TriggeredAt, - EventID: utils.ToDBInt(ev.ID), - RuleEscalationID: utils.ToDBInt(state.RuleEscalationID), - RuleID: utils.ToDBInt(r.ID), + EventID: types.MakeInt(ev.ID, types.TransformZeroIntToNull), + RuleEscalationID: types.MakeInt(state.RuleEscalationID, types.TransformZeroIntToNull), + RuleID: types.MakeInt(r.ID, types.TransformZeroIntToNull), Type: EscalationTriggered, } @@ -649,12 +649,12 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx, hr := &HistoryRow{ IncidentID: i.Id, Key: recipientKey, - EventID: utils.ToDBInt(ev.ID), + EventID: types.MakeInt(ev.ID, types.TransformZeroIntToNull), Type: RecipientRoleChanged, Time: types.UnixMilli(time.Now()), NewRecipientRole: newRole, OldRecipientRole: oldRole, - Message: utils.ToDBString(ev.Message), + Message: types.MakeString(ev.Message, types.TransformEmptyStringToNull), } if err := hr.Sync(ctx, i.db, tx); err != nil { diff --git a/internal/incident/incidents.go b/internal/incident/incidents.go index e35e4f34..07d32795 100644 --- a/internal/incident/incidents.go +++ b/internal/incident/incidents.go @@ -245,7 +245,7 @@ func ProcessEvent( } // There is no active incident, but the event appears to be relevant, so try to persist it in the DB. - err = utils.RunInTx(ctx, db, func(tx *sqlx.Tx) error { return ev.Sync(ctx, tx, db, obj.ID) }) + err = db.ExecTx(ctx, func(ctx context.Context, tx *sqlx.Tx) error { return ev.Sync(ctx, tx, db, obj.ID) }) if err != nil { return errors.New("cannot sync non-state event to the database") } diff --git a/internal/incident/incidents_test.go b/internal/incident/incidents_test.go index 2cf4c0e0..8e620646 100644 --- a/internal/incident/incidents_test.go +++ b/internal/incident/incidents_test.go @@ -9,7 +9,6 @@ import ( "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/object" "github.com/icinga/icinga-notifications/internal/testutils" - "github.com/icinga/icinga-notifications/internal/utils" "github.com/jmoiron/sqlx" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -31,14 +30,14 @@ func TestLoadOpenIncidents(t *testing.T) { source.ChangedAt = types.UnixMilli(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)) source.Deleted = types.Bool{Bool: false, Valid: true} - err := utils.RunInTx(ctx, db, func(tx *sqlx.Tx) error { - id, err := utils.InsertAndFetchId(ctx, tx, utils.BuildInsertStmtWithout(db, source, "id"), source) + err := db.ExecTx(ctx, func(ctx context.Context, tx *sqlx.Tx) error { + id, err := database.InsertObtainID(ctx, tx, database.BuildInsertStmtWithout(db, source, "id"), source) require.NoError(t, err, "populating source table should not fail") source.ID = id return nil }) - require.NoError(t, err, "utils.RunInTx() should not fail") + require.NoError(t, err, "db.ExecTx should not fail") // Reduce the default placeholders per statement to a meaningful number, so that we can // test some parallelism when loading the incidents. diff --git a/internal/incident/sync.go b/internal/incident/sync.go index c54fe61d..5d292347 100644 --- a/internal/incident/sync.go +++ b/internal/incident/sync.go @@ -3,14 +3,15 @@ package incident import ( "context" "fmt" + "time" + + "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icinga-notifications/internal/rule" - "github.com/icinga/icinga-notifications/internal/utils" "github.com/jmoiron/sqlx" "go.uber.org/zap" - "time" ) // Upsert implements the contracts.Upserter interface. @@ -32,8 +33,8 @@ func (i *Incident) Sync(ctx context.Context, tx *sqlx.Tx) error { return fmt.Errorf("failed to upsert incident: %w", err) } } else { - stmt := utils.BuildInsertStmtWithout(i.db, i, "id") - incidentId, err := utils.InsertAndFetchId(ctx, tx, stmt, i) + stmt := database.BuildInsertStmtWithout(i.db, i, "id") + incidentId, err := database.InsertObtainID(ctx, tx, stmt, i) if err != nil { return err } @@ -89,7 +90,7 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru hr := &HistoryRow{ IncidentID: i.Id, - EventID: utils.ToDBInt(eventId), + EventID: types.MakeInt(eventId, types.TransformZeroIntToNull), Key: cr.Key, Time: types.UnixMilli(time.Now()), Type: RecipientRoleChanged, @@ -147,12 +148,12 @@ func (i *Incident) generateNotifications( hr := &HistoryRow{ IncidentID: i.Id, Key: recipient.ToKey(contact), - EventID: utils.ToDBInt(ev.ID), + EventID: types.MakeInt(ev.ID, types.TransformZeroIntToNull), Time: types.UnixMilli(time.Now()), Type: Notified, - ChannelID: utils.ToDBInt(chID), + ChannelID: types.MakeInt(chID, types.TransformZeroIntToNull), NotificationState: NotificationStatePending, - Message: utils.ToDBString(ev.Message), + Message: types.MakeString(ev.Message, types.TransformEmptyStringToNull), } if suppress { hr.NotificationState = NotificationStateSuppressed diff --git a/internal/object/object.go b/internal/object/object.go index d4b0fdc5..88f0b256 100644 --- a/internal/object/object.go +++ b/internal/object/object.go @@ -12,7 +12,6 @@ import ( "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-notifications/internal/event" - "github.com/icinga/icinga-notifications/internal/utils" "regexp" "sort" "strings" @@ -37,7 +36,7 @@ func New(db *database.DB, ev *event.Event) *Object { SourceID: ev.SourceId, Name: ev.Name, db: db, - URL: utils.ToDBString(ev.URL), + URL: types.MakeString(ev.URL, types.TransformEmptyStringToNull), Tags: ev.Tags, ExtraTags: ev.ExtraTags, } @@ -85,10 +84,10 @@ func FromEvent(ctx context.Context, db *database.DB, ev *event.Event) (*Object, newObject.ExtraTags = ev.ExtraTags newObject.Name = ev.Name - newObject.URL = utils.ToDBString(ev.URL) + newObject.URL = types.MakeString(ev.URL, types.TransformEmptyStringToNull) if ev.Mute.Valid { if ev.Mute.Bool { - newObject.MuteReason = utils.ToDBString(ev.MuteReason) + newObject.MuteReason = types.MakeString(ev.MuteReason, types.TransformEmptyStringToNull) } else { // The ongoing event unmutes the object, so reset the mute reason to null. newObject.MuteReason = types.String{} diff --git a/internal/object/objects_test.go b/internal/object/objects_test.go index 449a463d..cfbb583e 100644 --- a/internal/object/objects_test.go +++ b/internal/object/objects_test.go @@ -6,7 +6,6 @@ import ( "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/testutils" - "github.com/icinga/icinga-notifications/internal/utils" "github.com/jmoiron/sqlx" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -19,20 +18,20 @@ func TestRestoreMutedObjects(t *testing.T) { db := testutils.GetTestDB(ctx, t) var sourceID int64 - err := utils.RunInTx(ctx, db, func(tx *sqlx.Tx) error { + err := db.ExecTx(ctx, func(ctx context.Context, tx *sqlx.Tx) error { args := map[string]any{ "type": "notifications", "name": "Icinga Notifications", "changed_at": int64(1720702049000), } // We can't use config.Source here unfortunately due to cyclic import error! - id, err := utils.InsertAndFetchId(ctx, tx, `INSERT INTO source (type, name, changed_at) VALUES (:type, :name, :changed_at)`, args) + id, err := database.InsertObtainID(ctx, tx, `INSERT INTO source (type, name, changed_at) VALUES (:type, :name, :changed_at)`, args) require.NoError(t, err, "populating source table should not fail") sourceID = id return nil }) - require.NoError(t, err, "utils.RunInTx() should not fail") + require.NoError(t, err, "db.ExecTx should not fail") ClearCache() diff --git a/internal/recipient/recipient.go b/internal/recipient/recipient.go index 58f36866..54edc65e 100644 --- a/internal/recipient/recipient.go +++ b/internal/recipient/recipient.go @@ -3,7 +3,6 @@ package recipient import ( "fmt" "github.com/icinga/icinga-go-library/types" - "github.com/icinga/icinga-notifications/internal/utils" "go.uber.org/zap/zapcore" "time" ) @@ -55,11 +54,11 @@ func (r Key) MarshalText() (text []byte, err error) { func ToKey(r Recipient) Key { switch v := r.(type) { case *Contact: - return Key{ContactID: utils.ToDBInt(v.ID)} + return Key{ContactID: types.MakeInt(v.ID, types.TransformZeroIntToNull)} case *Group: - return Key{GroupID: utils.ToDBInt(v.ID)} + return Key{GroupID: types.MakeInt(v.ID, types.TransformZeroIntToNull)} case *Schedule: - return Key{ScheduleID: utils.ToDBInt(v.ID)} + return Key{ScheduleID: types.MakeInt(v.ID, types.TransformZeroIntToNull)} default: panic(fmt.Sprintf("unexpected recipient type: %T", r)) } diff --git a/internal/utils/utils.go b/internal/utils/utils.go index bdcd3a45..0ebddca3 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -1,86 +1,13 @@ package utils import ( - "cmp" "context" - "database/sql" "fmt" "github.com/icinga/icinga-go-library/database" - "github.com/icinga/icinga-go-library/types" "github.com/jmoiron/sqlx" "github.com/pkg/errors" - "iter" - "slices" - "strings" ) -// BuildInsertStmtWithout builds an insert stmt without the provided column. -func BuildInsertStmtWithout(db *database.DB, into interface{}, withoutColumn string) string { - columns := db.BuildColumns(into) - for i, column := range columns { - if column == withoutColumn { - // Event id is auto incremented, so just erase it from our insert columns - columns = append(columns[:i], columns[i+1:]...) - break - } - } - - return fmt.Sprintf( - `INSERT INTO "%s" ("%s") VALUES (%s)`, - database.TableName(into), strings.Join(columns, `", "`), - fmt.Sprintf(":%s", strings.Join(columns, ", :")), - ) -} - -// RunInTx allows running a function in a database transaction without requiring manual transaction handling. -// -// A new transaction is started on db which is then passed to fn. After fn returns, the transaction is -// committed unless an error was returned. If fn returns an error, that error is returned, otherwise an -// error is returned if a database operation fails. -func RunInTx(ctx context.Context, db *database.DB, fn func(tx *sqlx.Tx) error) error { - tx, err := db.BeginTxx(ctx, nil) - if err != nil { - return err - } - defer func() { _ = tx.Rollback() }() - - err = fn(tx) - if err != nil { - return err - } - - return tx.Commit() -} - -// InsertAndFetchId executes the given query and fetches the last inserted ID. -func InsertAndFetchId(ctx context.Context, tx *sqlx.Tx, stmt string, args any) (int64, error) { - var lastInsertId int64 - if tx.DriverName() == database.PostgreSQL { - preparedStmt, err := tx.PrepareNamedContext(ctx, stmt+" RETURNING id") - if err != nil { - return 0, err - } - defer func() { _ = preparedStmt.Close() }() - - err = preparedStmt.Get(&lastInsertId, args) - if err != nil { - return 0, fmt.Errorf("failed to insert entry for type %T: %w", args, err) - } - } else { - result, err := tx.NamedExecContext(ctx, stmt, args) - if err != nil { - return 0, fmt.Errorf("failed to insert entry for type %T: %w", args, err) - } - - lastInsertId, err = result.LastInsertId() - if err != nil { - return 0, fmt.Errorf("failed to fetch last insert id for type %T: %w", args, err) - } - } - - return lastInsertId, nil -} - // ExecAndApply applies the provided restoreFunc callback for each successfully retrieved row of the specified type. // Returns error on any database failure or fails to acquire the table semaphore. func ExecAndApply[Row any](ctx context.Context, db *database.DB, stmt string, args []interface{}, restoreFunc func(*Row)) error { @@ -125,43 +52,3 @@ func ForEachRow[Row, Id any](ctx context.Context, db *database.DB, idColumn stri return ExecAndApply(ctx, db, stmt, args, restoreFunc) } - -// ToDBString transforms the given string to types.String. -func ToDBString(value string) types.String { - str := types.String{NullString: sql.NullString{String: value}} - if value != "" { - str.Valid = true - } - - return str -} - -// ToDBInt transforms the given value to types.Int. -func ToDBInt(value int64) types.Int { - val := types.Int{NullInt64: sql.NullInt64{Int64: value}} - if value != 0 { - val.Valid = true - } - - return val -} - -// IterateOrderedMap implements iter.Seq2 to iterate over a map in the key's order. -// -// This function returns a func yielding key-value-pairs from a given map in the order of their keys, if their type -// is cmp.Ordered. -func IterateOrderedMap[K cmp.Ordered, V any](m map[K]V) iter.Seq2[K, V] { - keys := make([]K, 0, len(m)) - for key := range m { - keys = append(keys, key) - } - slices.Sort(keys) - - return func(yield func(K, V) bool) { - for _, key := range keys { - if !yield(key, m[key]) { - return - } - } - } -} diff --git a/internal/utils/utils_test.go b/internal/utils/utils_test.go deleted file mode 100644 index 3c9f10e8..00000000 --- a/internal/utils/utils_test.go +++ /dev/null @@ -1,47 +0,0 @@ -package utils - -import ( - "github.com/stretchr/testify/assert" - "testing" -) - -func TestIterateOrderedMap(t *testing.T) { - tests := []struct { - name string - in map[int]string - outKeys []int - }{ - {"empty", map[int]string{}, nil}, - {"single", map[int]string{1: "foo"}, []int{1}}, - {"few-numbers", map[int]string{1: "a", 2: "b", 3: "c"}, []int{1, 2, 3}}, - { - "1k-numbers", - func() map[int]string { - m := make(map[int]string) - for i := 0; i < 1000; i++ { - m[i] = "foo" - } - return m - }(), - func() []int { - keys := make([]int, 1000) - for i := 0; i < 1000; i++ { - keys[i] = i - } - return keys - }(), - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - var outKeys []int - for k, v := range IterateOrderedMap(tt.in) { - assert.Equal(t, tt.in[k], v) - outKeys = append(outKeys, k) - } - - assert.Equal(t, tt.outKeys, outKeys) - }) - } -} diff --git a/pkg/plugin/plugin.go b/pkg/plugin/plugin.go index b9a49d2c..ba292b95 100644 --- a/pkg/plugin/plugin.go +++ b/pkg/plugin/plugin.go @@ -6,8 +6,8 @@ import ( "errors" "fmt" "github.com/icinga/icinga-go-library/types" + "github.com/icinga/icinga-go-library/utils" "github.com/icinga/icinga-notifications/internal/event" - "github.com/icinga/icinga-notifications/internal/utils" "github.com/icinga/icinga-notifications/pkg/rpc" "io" "log"