From 5579f281289e3ea2af40777d58cdebfc658f839d Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Wed, 31 Jul 2024 13:26:41 +0200 Subject: [PATCH 1/3] Incident: Log ignored superfluous events --- internal/incident/incident.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/incident/incident.go b/internal/incident/incident.go index d674ba9b..64561c07 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -298,8 +298,8 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, oldSeverity := i.Severity newSeverity := ev.Severity if oldSeverity == newSeverity { - err := fmt.Errorf("%w: %s state event from source %d", event.ErrSuperfluousStateChange, ev.Severity.String(), ev.SourceId) - return err + i.logger.Debugw("Ignoring superfluous severity change event", zap.Int64("source_id", ev.SourceId), zap.Stringer("event", ev)) + return event.ErrSuperfluousStateChange } i.logger.Infof("Incident severity changed from %s to %s", oldSeverity.String(), newSeverity.String()) From f49956e62cea6b9c83f9e34ae3686d90ba961e53 Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Tue, 4 Nov 2025 17:12:43 +0100 Subject: [PATCH 2/3] Extract common params into a base struct & use that as an argument It's getting ridiculous that those fields have to be passed around everywhere into functions and methods. This becomes even more ridiculous with upcoming PR of mine, so I just bite the bullet and make a struct for it. This struct is typically embedded into other structs that need access to these common resources, and each function or method has just to take the `RuntimeConfig` as a parameter, and this will be populated from there. --- cmd/icinga-notifications/main.go | 15 +++--- internal/channel/channel.go | 11 ++-- internal/channel/plugin.go | 10 ++-- internal/config/channel.go | 4 +- internal/config/incremental_sync.go | 4 +- internal/config/runtime.go | 33 ++++++++---- internal/daemon/config.go | 22 ++------ internal/incident/incident.go | 82 ++++++++++++++--------------- internal/incident/incidents.go | 54 ++++++------------- internal/incident/incidents_test.go | 30 ++++++----- internal/incident/sync.go | 16 +++--- internal/listener/listener.go | 56 +++++++++----------- internal/testutils/testutils.go | 10 ++++ 13 files changed, 168 insertions(+), 179 deletions(-) diff --git a/cmd/icinga-notifications/main.go b/cmd/icinga-notifications/main.go index 6ac3f434..66d4cc71 100644 --- a/cmd/icinga-notifications/main.go +++ b/cmd/icinga-notifications/main.go @@ -19,8 +19,7 @@ import ( ) func main() { - daemon.ParseFlagsAndConfig() - conf := daemon.Config() + conf := daemon.ParseFlagsAndConfig() logs, err := logging.NewLoggingFromConfig("icinga-notifications", conf.Logging) if err != nil { @@ -45,17 +44,19 @@ func main() { logger.Fatalf("Cannot connect to the database: %+v", err) } - channel.UpsertPlugins(ctx, conf.ChannelsDir, logs.GetChildLogger("channel"), db) + channel.UpsertPlugins(ctx, conf, logs.GetChildLogger("channel"), db) - runtimeConfig := config.NewRuntimeConfig(logs, db) + resources := config.MakeResources(nil, conf, db, logs) + runtimeConfig := config.NewRuntimeConfig(resources) + resources.RuntimeConfig = runtimeConfig if err := runtimeConfig.UpdateFromDatabase(ctx); err != nil { logger.Fatalf("Failed to load config from database %+v", err) } go runtimeConfig.PeriodicUpdates(ctx, 1*time.Second) - err = incident.LoadOpenIncidents(ctx, db, logs.GetChildLogger("incident"), runtimeConfig) - if err != nil { + logger.Info("Loading all active incidents from database") + if err = incident.LoadOpenIncidents(ctx, resources); err != nil { logger.Fatalf("Cannot load incidents from database: %+v", err) } @@ -68,7 +69,7 @@ func main() { // When Icinga Notifications is started by systemd, we've to notify systemd that we're ready. _ = sdnotify.Ready() - if err := listener.NewListener(db, runtimeConfig, logs).Run(ctx); err != nil { + if err := listener.NewListener(resources).Run(ctx); err != nil { logger.Errorf("Listener has finished with an error: %+v", err) } else { logger.Info("Listener has finished") diff --git a/internal/channel/channel.go b/internal/channel/channel.go index acde0bb8..0c33a8e4 100644 --- a/internal/channel/channel.go +++ b/internal/channel/channel.go @@ -7,6 +7,7 @@ import ( "github.com/icinga/icinga-go-library/notifications/plugin" "github.com/icinga/icinga-notifications/internal/config/baseconf" "github.com/icinga/icinga-notifications/internal/contracts" + "github.com/icinga/icinga-notifications/internal/daemon" "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/recipient" "go.uber.org/zap" @@ -23,8 +24,9 @@ type Channel struct { Logger *zap.SugaredLogger `db:"-"` - restartCh chan newConfig - pluginCh chan *Plugin + daemonConfig *daemon.ConfigFile + restartCh chan newConfig + pluginCh chan *Plugin pluginCtx context.Context pluginCtxCancel func() @@ -50,8 +52,9 @@ type newConfig struct { } // Start initializes the channel and starts the plugin in the background -func (c *Channel) Start(ctx context.Context, logger *zap.SugaredLogger) { +func (c *Channel) Start(ctx context.Context, conf *daemon.ConfigFile, logger *zap.SugaredLogger) { c.Logger = logger.With(zap.Object("channel", c)) + c.daemonConfig = conf c.restartCh = make(chan newConfig) c.pluginCh = make(chan *Plugin) c.pluginCtx, c.pluginCtxCancel = context.WithCancel(ctx) @@ -63,7 +66,7 @@ func (c *Channel) Start(ctx context.Context, logger *zap.SugaredLogger) { func (c *Channel) initPlugin(cType string, config string) *Plugin { c.Logger.Debug("Initializing channel plugin") - p, err := NewPlugin(cType, c.Logger) + p, err := NewPlugin(cType, c.daemonConfig.ChannelsDir, c.Logger) if err != nil { c.Logger.Errorw("Failed to initialize channel plugin", zap.Error(err)) return nil diff --git a/internal/channel/plugin.go b/internal/channel/plugin.go index 59fa24cf..86497304 100644 --- a/internal/channel/plugin.go +++ b/internal/channel/plugin.go @@ -30,8 +30,8 @@ type Plugin struct { } // NewPlugin starts and returns a new plugin instance. If the start of the plugin fails, an error is returned -func NewPlugin(pluginType string, logger *zap.SugaredLogger) (*Plugin, error) { - file := filepath.Join(daemon.Config().ChannelsDir, pluginType) +func NewPlugin(pluginType, channelsDir string, logger *zap.SugaredLogger) (*Plugin, error) { + file := filepath.Join(channelsDir, pluginType) logger.Debugw("Starting new channel plugin process", zap.String("path", file)) @@ -167,9 +167,9 @@ func forwardLogs(errPipe io.Reader, logger *zap.SugaredLogger) { } // UpsertPlugins upsert the available_channel_type table with working plugins -func UpsertPlugins(ctx context.Context, channelPluginDir string, logger *logging.Logger, db *database.DB) { +func UpsertPlugins(ctx context.Context, conf *daemon.ConfigFile, logger *logging.Logger, db *database.DB) { logger.Debug("Updating available channel types") - files, err := os.ReadDir(channelPluginDir) + files, err := os.ReadDir(conf.ChannelsDir) if err != nil { logger.Errorw("Failed to read the channel plugin directory", zap.Error(err)) } @@ -185,7 +185,7 @@ func UpsertPlugins(ctx context.Context, channelPluginDir string, logger *logging continue } - p, err := NewPlugin(pluginType, pluginLogger) + p, err := NewPlugin(pluginType, conf.ChannelsDir, pluginLogger) if err != nil { pluginLogger.Errorw("Failed to start plugin", zap.Error(err)) continue diff --git a/internal/config/channel.go b/internal/config/channel.go index 1d49dda9..9b848f93 100644 --- a/internal/config/channel.go +++ b/internal/config/channel.go @@ -11,7 +11,7 @@ func (r *RuntimeConfig) applyPendingChannels() { r, &r.Channels, &r.configChange.Channels, func(newElement *channel.Channel) error { - newElement.Start(context.TODO(), r.logs.GetChildLogger("channel").SugaredLogger) + newElement.Start(context.TODO(), r.DaemonConfig, r.Logs.GetChildLogger("channel").SugaredLogger) return nil }, func(curElement, update *channel.Channel) error { @@ -19,7 +19,7 @@ func (r *RuntimeConfig) applyPendingChannels() { curElement.Name = update.Name curElement.Type = update.Type curElement.Config = update.Config - curElement.Restart(r.logs.GetChildLogger("channel").SugaredLogger) + curElement.Restart(r.Logs.GetChildLogger("channel").SugaredLogger) return nil }, func(delElement *channel.Channel) error { diff --git a/internal/config/incremental_sync.go b/internal/config/incremental_sync.go index 5959ec4a..67e379b0 100644 --- a/internal/config/incremental_sync.go +++ b/internal/config/incremental_sync.go @@ -58,7 +58,7 @@ func incrementalFetch[ stmtLogger := r.logger.With(zap.String("table", tableName)) var ( - stmt = r.db.BuildSelectStmt(typePtr, typePtr) + stmt = r.DB.BuildSelectStmt(typePtr, typePtr) stmtArgs []any ) if hasChangedAt { @@ -67,7 +67,7 @@ func incrementalFetch[ stmtArgs = []any{changedAt} } - stmt = r.db.Rebind(stmt + ` ORDER BY "changed_at"`) + stmt = r.DB.Rebind(stmt + ` ORDER BY "changed_at"`) stmtLogger = stmtLogger.With(zap.String("query", stmt)) var ts []T diff --git a/internal/config/runtime.go b/internal/config/runtime.go index 65b028a7..8a1447ce 100644 --- a/internal/config/runtime.go +++ b/internal/config/runtime.go @@ -8,6 +8,7 @@ import ( "github.com/icinga/icinga-go-library/logging" "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-notifications/internal/channel" + "github.com/icinga/icinga-notifications/internal/daemon" "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icinga-notifications/internal/rule" "github.com/icinga/icinga-notifications/internal/timeperiod" @@ -19,12 +20,30 @@ import ( "time" ) +// Resources holds references to commonly used objects. +type Resources struct { + DaemonConfig *daemon.ConfigFile `db:"-" json:"-"` + RuntimeConfig *RuntimeConfig `db:"-" json:"-"` + DB *database.DB `db:"-" json:"-"` + Logs *logging.Logging `db:"-" json:"-"` +} + +// MakeResources creates a new Resources instance with the provided components. +// +// This function initializes the Resources struct by assigning the given [RuntimeConfig], +// database connection, and logging facilities. +func MakeResources(rc *RuntimeConfig, file *daemon.ConfigFile, db *database.DB, logs *logging.Logging) *Resources { + return &Resources{RuntimeConfig: rc, DaemonConfig: file, DB: db, Logs: logs} +} + // RuntimeConfig stores the runtime representation of the configuration present in the database. type RuntimeConfig struct { // ConfigSet is the current live config. It is embedded to allow direct access to its members. // Accessing it requires a lock that is obtained with RLock() and released with RUnlock(). ConfigSet + *Resources // Contains references to commonly used objects and to itself. + // configChange contains incremental changes to config objects to be merged into the live configuration. // // It will be both created and deleted within RuntimeConfig.UpdateFromDatabase. To keep track of the known state, @@ -33,24 +52,18 @@ type RuntimeConfig struct { configChangeAvailable bool configChangeTimestamps map[string]types.UnixMilli - logs *logging.Logging logger *logging.Logger - db *database.DB // mu is used to synchronize access to the live ConfigSet. mu sync.RWMutex } -func NewRuntimeConfig( - logs *logging.Logging, - db *database.DB, -) *RuntimeConfig { +func NewRuntimeConfig(resouces *Resources) *RuntimeConfig { return &RuntimeConfig{ configChangeTimestamps: make(map[string]types.UnixMilli), - logs: logs, - logger: logs.GetChildLogger("runtime-updates"), - db: db, + Resources: resouces, + logger: resouces.Logs.GetChildLogger("runtime-updates"), } } @@ -235,7 +248,7 @@ func (r *RuntimeConfig) GetSourceFromCredentials(user, pass string, logger *logg } func (r *RuntimeConfig) fetchFromDatabase(ctx context.Context) error { - tx, err := r.db.BeginTxx(ctx, &sql.TxOptions{ + tx, err := r.DB.BeginTxx(ctx, &sql.TxOptions{ Isolation: sql.LevelRepeatableRead, ReadOnly: true, }) diff --git a/internal/daemon/config.go b/internal/daemon/config.go index 72e9742d..c7fb5898 100644 --- a/internal/daemon/config.go +++ b/internal/daemon/config.go @@ -61,23 +61,10 @@ type Flags struct { Config string `short:"c" long:"config" description:"path to config file"` } -// daemonConfig holds the configuration state as a singleton. -// It is initialised by the ParseFlagsAndConfig func and exposed through the Config function. -var daemonConfig *ConfigFile - -// Config returns the config that was loaded while starting the daemon. -// Panics when ParseFlagsAndConfig was not called earlier. -func Config() *ConfigFile { - if daemonConfig == nil { - panic("ERROR: daemon.Config() called before daemon.ParseFlagsAndConfig()") - } - - return daemonConfig -} - // ParseFlagsAndConfig parses the CLI flags provided to the executable and tries to load the config from the YAML file. -// Prints any error during parsing or config loading to os.Stderr and exits. -func ParseFlagsAndConfig() { +// +// Prints any error during parsing or config loading to os.Stderr and exits, otherwise returns the loaded ConfigFile. +func ParseFlagsAndConfig() *ConfigFile { flags := Flags{Config: internal.SysConfDir + "/icinga-notifications/config.yml"} if err := config.ParseFlags(&flags); err != nil { if errors.Is(err, config.ErrInvalidArgument) { @@ -92,7 +79,7 @@ func ParseFlagsAndConfig() { os.Exit(ExitSuccess) } - daemonConfig = new(ConfigFile) + daemonConfig := new(ConfigFile) if err := config.FromYAMLFile(flags.Config, daemonConfig); err != nil { if errors.Is(err, config.ErrInvalidArgument) { panic(err) @@ -100,4 +87,5 @@ func ParseFlagsAndConfig() { utils.PrintErrorThenExit(err, ExitFailure) } + return daemonConfig } diff --git a/internal/incident/incident.go b/internal/incident/incident.go index 64561c07..46281817 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -4,12 +4,10 @@ import ( "context" "errors" "fmt" - "github.com/icinga/icinga-go-library/database" baseEv "github.com/icinga/icinga-go-library/notifications/event" "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-notifications/internal/config" "github.com/icinga/icinga-notifications/internal/contracts" - "github.com/icinga/icinga-notifications/internal/daemon" "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/object" "github.com/icinga/icinga-notifications/internal/recipient" @@ -49,28 +47,30 @@ type Incident struct { // This prevents us from generating multiple muted histories when receiving several events that mute our Object. isMuted bool - db *database.DB - logger *zap.SugaredLogger - runtimeConfig *config.RuntimeConfig + *config.Resources // Contains DB, RuntimeConfig, and Logs + + logger *zap.SugaredLogger sync.Mutex } -func NewIncident( - db *database.DB, obj *object.Object, runtimeConfig *config.RuntimeConfig, logger *zap.SugaredLogger, -) *Incident { +// NewIncident creates a new incident for the given [object.Object]. +// +// If obj is nil, the returned incident won't be associated with any object, and its ObjectID will be zeroed. +// The returned incident won't be persisted to the database, that has to be done by the caller using [Incident.Sync]. +func NewIncident(obj *object.Object, resources *config.Resources) *Incident { i := &Incident{ - db: db, Object: obj, - logger: logger, - runtimeConfig: runtimeConfig, EscalationState: map[escalationID]*EscalationState{}, Rules: map[ruleID]struct{}{}, Recipients: map[recipient.Key]*RecipientState{}, + Resources: resources, + logger: resources.Logs.GetChildLogger("incident").SugaredLogger, } if obj != nil { i.ObjectID = obj.ID + i.logger = i.logger.With(zap.String("object", obj.DisplayName())) } return i @@ -94,7 +94,7 @@ func (i *Incident) ID() int64 { func (i *Incident) HasManager() bool { for recipientKey, state := range i.Recipients { - if i.runtimeConfig.GetRecipient(recipientKey) == nil { + if i.RuntimeConfig.GetRecipient(recipientKey) == nil { i.logger.Debugw("Incident refers unknown recipient key, might got deleted", zap.Inline(recipientKey)) continue } @@ -123,8 +123,8 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { i.Lock() defer i.Unlock() - i.runtimeConfig.RLock() - defer i.runtimeConfig.RUnlock() + i.RuntimeConfig.RLock() + defer i.RuntimeConfig.RUnlock() // These event types are not like the others used to mute an object/incident, such as DowntimeStart, which // uniquely identify themselves why an incident is being muted, but are rather super generic types, and as @@ -137,14 +137,14 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { return event.ErrSuperfluousMuteUnmuteEvent } - tx, err := i.db.BeginTxx(ctx, nil) + tx, err := i.DB.BeginTxx(ctx, nil) if err != nil { i.logger.Errorw("Cannot start a db transaction", zap.Error(err)) return err } defer func() { _ = tx.Rollback() }() - if err = ev.Sync(ctx, tx, i.db, i.Object.ID); err != nil { + if err = ev.Sync(ctx, tx, i.DB, i.Object.ID); err != nil { i.logger.Errorw("Failed to insert event and fetch its ID", zap.String("event", ev.String()), zap.Error(err)) return err } @@ -234,8 +234,8 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) { i.Lock() defer i.Unlock() - i.runtimeConfig.RLock() - defer i.runtimeConfig.RUnlock() + i.RuntimeConfig.RLock() + defer i.RuntimeConfig.RUnlock() if !i.RecoveredAt.Time().IsZero() { // Incident is recovered in the meantime. @@ -260,8 +260,8 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) { var notifications []*NotificationEntry ctx := context.Background() - err = i.db.ExecTx(ctx, func(ctx context.Context, tx *sqlx.Tx) error { - err := ev.Sync(ctx, tx, i.db, i.Object.ID) + err = i.DB.ExecTx(ctx, func(ctx context.Context, tx *sqlx.Tx) error { + err := ev.Sync(ctx, tx, i.DB, i.Object.ID) if err != nil { return err } @@ -314,7 +314,7 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, Message: types.MakeString(ev.Message, types.TransformEmptyStringToNull), } - if err := hr.Sync(ctx, i.db, tx); err != nil { + if err := hr.Sync(ctx, i.DB, tx); err != nil { i.logger.Errorw("Failed to insert incident severity changed history", zap.Error(err)) return err } @@ -332,7 +332,7 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, Type: Closed, } - if err := hr.Sync(ctx, i.db, tx); err != nil { + if err := hr.Sync(ctx, i.DB, tx); err != nil { i.logger.Errorw("Cannot insert incident closed history to the database", zap.Error(err)) return err } @@ -370,7 +370,7 @@ func (i *Incident) processIncidentOpenedEvent(ctx context.Context, tx *sqlx.Tx, Message: types.MakeString(ev.Message, types.TransformEmptyStringToNull), } - if err := hr.Sync(ctx, i.db, tx); err != nil { + if err := hr.Sync(ctx, i.DB, tx); err != nil { i.logger.Errorw("Cannot insert incident opened history event", zap.Error(err)) return err } @@ -397,7 +397,7 @@ func (i *Incident) handleUnmute(ctx context.Context, tx *sqlx.Tx, ev *event.Even // On the other hand, if an object is unmuted, its mute reason is already reset, and we can't access it anymore. Message: types.MakeString(ev.MuteReason, types.TransformEmptyStringToNull), } - return hr.Sync(ctx, i.db, tx) + return hr.Sync(ctx, i.DB, tx) } // handleMute generates the corresponding Muted history if the incident is not yet muted but is going to be muted now. @@ -420,7 +420,7 @@ func (i *Incident) handleMute(ctx context.Context, tx *sqlx.Tx, ev *event.Event) // existed, we have to use the mute reason from this object and not from the ongoing event. Message: i.Object.MuteReason, } - return hr.Sync(ctx, i.db, tx) + return hr.Sync(ctx, i.DB, tx) } // applyMatchingRules walks through the rule IDs obtained from source and generates a RuleMatched history entry. @@ -436,7 +436,7 @@ func (i *Incident) applyMatchingRules(ctx context.Context, tx *sqlx.Tx, ev *even return fmt.Errorf("cannot convert rule id %q to an int: %w", ruleId, err) } - r, ok := i.runtimeConfig.Rules[ruleIdInt] + r, ok := i.RuntimeConfig.Rules[ruleIdInt] if !ok { i.logger.Errorw("Event refers to non-existing event rule, might got deleted", zap.Int64("rule_id", ruleIdInt)) return fmt.Errorf("cannot apply unknown rule %d", ruleIdInt) @@ -467,7 +467,7 @@ func (i *Incident) applyMatchingRules(ctx context.Context, tx *sqlx.Tx, ev *even RuleID: types.MakeInt(r.ID, types.TransformZeroIntToNull), Type: RuleMatched, } - if err := hr.Sync(ctx, i.db, tx); err != nil { + if err := hr.Sync(ctx, i.DB, tx); err != nil { i.logger.Errorw("Failed to insert rule matched incident history", zap.Object("rule", r), zap.Error(err)) return err } @@ -498,7 +498,7 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Escalation, retryAfter := rule.RetryNever for rID := range i.Rules { - r := i.runtimeConfig.Rules[rID] + r := i.RuntimeConfig.Rules[rID] if r == nil { i.logger.Debugw("Incident refers unknown rule, might got deleted", zap.Int64("rule_id", rID)) continue @@ -551,7 +551,7 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Escalation, // Returns an error on database failure. func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *event.Event, escalations []*rule.Escalation) error { for _, escalation := range escalations { - r := i.runtimeConfig.Rules[escalation.RuleID] + r := i.RuntimeConfig.Rules[escalation.RuleID] if r == nil { i.logger.Debugw("Incident refers unknown rule, might got deleted", zap.Int64("rule_id", escalation.RuleID)) continue @@ -579,7 +579,7 @@ func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *even Type: EscalationTriggered, } - if err := hr.Sync(ctx, i.db, tx); err != nil { + if err := hr.Sync(ctx, i.DB, tx); err != nil { i.logger.Errorw( "Failed to insert escalation triggered incident history", zap.Object("rule", r), zap.Object("escalation", escalation), zap.Error(err), @@ -599,7 +599,7 @@ func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *even // Returns error on database failure or if the provided context is cancelled. func (i *Incident) notifyContacts(ctx context.Context, ev *event.Event, notifications []*NotificationEntry) error { for _, notification := range notifications { - contact := i.runtimeConfig.Contacts[notification.ContactID] + contact := i.RuntimeConfig.Contacts[notification.ContactID] if contact == nil { i.logger.Debugw("Incident refers unknown contact, might got deleted", zap.Int64("contact_id", notification.ContactID)) continue @@ -612,8 +612,8 @@ func (i *Incident) notifyContacts(ctx context.Context, ev *event.Event, notifica } notification.SentAt = types.UnixMilli(time.Now()) - stmt, _ := i.db.BuildUpdateStmt(notification) - if _, err := i.db.NamedExecContext(ctx, stmt, notification); err != nil { + stmt, _ := i.DB.BuildUpdateStmt(notification) + if _, err := i.DB.NamedExecContext(ctx, stmt, notification); err != nil { i.logger.Errorw( "Failed to update contact notified incident history", zap.String("contact", contact.String()), zap.Error(err), @@ -630,7 +630,7 @@ func (i *Incident) notifyContacts(ctx context.Context, ev *event.Event, notifica // notifyContact notifies the given recipient via a channel matching the given ID. func (i *Incident) notifyContact(contact *recipient.Contact, ev *event.Event, chID int64) error { - ch := i.runtimeConfig.Channels[chID] + ch := i.RuntimeConfig.Channels[chID] if ch == nil { i.logger.Errorw("Could not find config for channel", zap.Int64("channel_id", chID)) @@ -640,7 +640,7 @@ func (i *Incident) notifyContact(contact *recipient.Contact, ev *event.Event, ch i.logger.Infow(fmt.Sprintf("Notify contact %q via %q of type %q", contact.FullName, ch.Name, ch.Type), zap.Int64("channel_id", chID), zap.String("event_type", ev.Type.String())) - err := ch.Notify(contact, i, ev, daemon.Config().Icingaweb2URL) + err := ch.Notify(contact, i, ev, i.DaemonConfig.Icingaweb2URL) if err != nil { i.logger.Errorw("Failed to send notification via channel plugin", zap.String("type", ch.Type), zap.Error(err)) return err @@ -660,7 +660,7 @@ var errSuperfluousAckEvent = errors.New("superfluous acknowledgement set event, // Promotes the ack author to incident.RoleManager if it's not already the case and generates a history entry. // Returns error on database failure. func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx, ev *event.Event) error { - contact := i.runtimeConfig.GetContact(ev.Username) + contact := i.RuntimeConfig.GetContact(ev.Username) if contact == nil { i.logger.Warnw("Ignoring acknowledgement event from an unknown author", zap.String("author", ev.Username)) @@ -696,14 +696,14 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx, Message: types.MakeString(ev.Message, types.TransformEmptyStringToNull), } - if err := hr.Sync(ctx, i.db, tx); err != nil { + if err := hr.Sync(ctx, i.DB, tx); err != nil { i.logger.Errorw("Failed to add recipient role changed history", zap.String("recipient", contact.String()), zap.Error(err)) return err } cr := &ContactRow{IncidentID: hr.IncidentID, Key: recipientKey, Role: newRole} - stmt, _ := i.db.BuildUpsertStmt(cr) + stmt, _ := i.DB.BuildUpsertStmt(cr) _, err := tx.NamedExecContext(ctx, stmt, cr) if err != nil { i.logger.Errorw("Failed to upsert incident contact", zap.String("contact", contact.String()), zap.Error(err)) @@ -718,7 +718,7 @@ func (i *Incident) getRecipientsChannel(t time.Time) rule.ContactChannels { contactChs := make(rule.ContactChannels) // Load all escalations recipients channels for escalationID := range i.EscalationState { - escalation := i.runtimeConfig.GetRuleEscalation(escalationID) + escalation := i.RuntimeConfig.GetRuleEscalation(escalationID) if escalation == nil { i.logger.Debugw("Incident refers unknown escalation, might got deleted", zap.Int64("escalation_id", escalationID)) continue @@ -731,7 +731,7 @@ func (i *Incident) getRecipientsChannel(t time.Time) rule.ContactChannels { // When a recipient has subscribed/managed this incident via the UI or using an ACK, fallback // to the default contact channel. for recipientKey, state := range i.Recipients { - r := i.runtimeConfig.GetRecipient(recipientKey) + r := i.RuntimeConfig.GetRecipient(recipientKey) if r == nil { i.logger.Debugw("Incident refers unknown recipient key, might got deleted", zap.Inline(recipientKey)) continue @@ -764,7 +764,7 @@ func (i *Incident) getRecipientsChannel(t time.Time) rule.ContactChannels { func (i *Incident) restoreRecipients(ctx context.Context) error { contact := &ContactRow{} var contacts []*ContactRow - err := i.db.SelectContext(ctx, &contacts, i.db.Rebind(i.db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), i.Id) + err := i.DB.SelectContext(ctx, &contacts, i.DB.Rebind(i.DB.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), i.Id) if err != nil { i.logger.Errorw("Failed to restore incident recipients from the database", zap.Error(err)) return err diff --git a/internal/incident/incidents.go b/internal/incident/incidents.go index 90497088..ba813f94 100644 --- a/internal/incident/incidents.go +++ b/internal/incident/incidents.go @@ -4,8 +4,6 @@ import ( "context" "fmt" "github.com/icinga/icinga-go-library/com" - "github.com/icinga/icinga-go-library/database" - "github.com/icinga/icinga-go-library/logging" baseEv "github.com/icinga/icinga-go-library/notifications/event" "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-notifications/internal/config" @@ -27,16 +25,14 @@ var ( // LoadOpenIncidents loads all active (not yet closed) incidents from the database and restores all their states. // Returns error on any database failure. -func LoadOpenIncidents(ctx context.Context, db *database.DB, logger *logging.Logger, runtimeConfig *config.RuntimeConfig) error { - logger.Info("Loading all active incidents from database") - +func LoadOpenIncidents(ctx context.Context, resources *config.Resources) error { g, ctx := errgroup.WithContext(ctx) incidents := make(chan *Incident) g.Go(func() error { defer close(incidents) - rows, err := db.QueryxContext(ctx, db.BuildSelectStmt(new(Incident), new(Incident))+` WHERE "recovered_at" IS NULL`) + rows, err := resources.DB.QueryxContext(ctx, resources.DB.BuildSelectStmt(new(Incident), new(Incident))+` WHERE "recovered_at" IS NULL`) if err != nil { return err } @@ -46,7 +42,7 @@ func LoadOpenIncidents(ctx context.Context, db *database.DB, logger *logging.Log defer func() { _ = rows.Close() }() for rows.Next() { - i := NewIncident(db, nil, runtimeConfig, nil) + i := NewIncident(nil, resources) if err := rows.StructScan(i); err != nil { return err } @@ -62,7 +58,7 @@ func LoadOpenIncidents(ctx context.Context, db *database.DB, logger *logging.Log }) g.Go(func() error { - bulks := com.Bulk(ctx, incidents, db.Options.MaxPlaceholdersPerStatement, com.NeverSplit[*Incident]) + bulks := com.Bulk(ctx, incidents, resources.DB.Options.MaxPlaceholdersPerStatement, com.NeverSplit[*Incident]) for { select { @@ -89,20 +85,20 @@ func LoadOpenIncidents(ctx context.Context, db *database.DB, logger *logging.Log } // Restore all incident objects matching the given object ids - if err := object.RestoreObjects(ctx, db, objectIds); err != nil { + if err := object.RestoreObjects(ctx, resources.DB, objectIds); err != nil { return err } // Restore all escalation states and incident rules matching the given incident ids. - err := utils.ForEachRow[EscalationState](ctx, db, "incident_id", incidentIds, func(state *EscalationState) { + err := utils.ForEachRow[EscalationState](ctx, resources.DB, "incident_id", incidentIds, func(state *EscalationState) { i := incidentsById[state.IncidentID] i.EscalationState[state.RuleEscalationID] = state // Restore the incident rule matching the current escalation state if any. - i.runtimeConfig.RLock() - defer i.runtimeConfig.RUnlock() + i.RuntimeConfig.RLock() + defer i.RuntimeConfig.RUnlock() - escalation := i.runtimeConfig.GetRuleEscalation(state.RuleEscalationID) + escalation := i.RuntimeConfig.GetRuleEscalation(state.RuleEscalationID) if escalation != nil { i.Rules[escalation.RuleID] = struct{}{} } @@ -112,7 +108,7 @@ func LoadOpenIncidents(ctx context.Context, db *database.DB, logger *logging.Log } // Restore incident recipients matching the given incident ids. - err = utils.ForEachRow[ContactRow](ctx, db, "incident_id", incidentIds, func(c *ContactRow) { + err = utils.ForEachRow[ContactRow](ctx, resources.DB, "incident_id", incidentIds, func(c *ContactRow) { incidentsById[c.IncidentID].Recipients[c.Key] = &RecipientState{Role: c.Role} }) if err != nil { @@ -122,7 +118,7 @@ func LoadOpenIncidents(ctx context.Context, db *database.DB, logger *logging.Log for _, i := range incidentsById { i.Object = object.GetFromCache(i.ObjectID) i.isMuted = i.Object.IsMuted() - i.logger = logger.With(zap.String("object", i.Object.DisplayName()), + i.logger = i.logger.With(zap.String("object", i.Object.DisplayName()), zap.String("incident", i.String())) currentIncidentsMu.Lock() @@ -147,18 +143,14 @@ func LoadOpenIncidents(ctx context.Context, db *database.DB, logger *logging.Log return g.Wait() } -func GetCurrent( - ctx context.Context, db *database.DB, obj *object.Object, logger *logging.Logger, runtimeConfig *config.RuntimeConfig, - create bool, -) (*Incident, error) { +func GetCurrent(ctx context.Context, obj *object.Object, resources *config.Resources, create bool) (*Incident, error) { currentIncidentsMu.Lock() defer currentIncidentsMu.Unlock() currentIncident := currentIncidents[obj] if currentIncident == nil && create { - incidentLogger := logger.With(zap.String("object", obj.DisplayName())) - currentIncident = NewIncident(db, obj, runtimeConfig, incidentLogger) + currentIncident = NewIncident(obj, resources) currentIncidents[obj] = currentIncident } @@ -206,31 +198,19 @@ func GetCurrentIncidents() map[int64]*Incident { // checks, it calls the Incident.ProcessEvent method. // // The returned error might be wrapped around event.ErrSuperfluousStateChange. -func ProcessEvent( - ctx context.Context, - db *database.DB, - logs *logging.Logging, - runtimeConfig *config.RuntimeConfig, - ev *event.Event, -) error { +func ProcessEvent(ctx context.Context, resources *config.Resources, ev *event.Event) error { var wasObjectMuted bool if obj := object.GetFromCache(object.ID(ev.SourceId, ev.Tags)); obj != nil { wasObjectMuted = obj.IsMuted() } - obj, err := object.FromEvent(ctx, db, ev) + obj, err := object.FromEvent(ctx, resources.DB, ev) if err != nil { return fmt.Errorf("cannot sync event object: %w", err) } createIncident := ev.Severity != baseEv.SeverityNone && ev.Severity != baseEv.SeverityOK - currentIncident, err := GetCurrent( - ctx, - db, - obj, - logs.GetChildLogger("incident"), - runtimeConfig, - createIncident) + currentIncident, err := GetCurrent(ctx, obj, resources, createIncident) if err != nil { return fmt.Errorf("cannot get current incident for %q: %w", obj.DisplayName(), err) } @@ -248,7 +228,7 @@ func ProcessEvent( } // There is no active incident, but the event appears to be relevant, so try to persist it in the DB. - err = db.ExecTx(ctx, func(ctx context.Context, tx *sqlx.Tx) error { return ev.Sync(ctx, tx, db, obj.ID) }) + err = resources.DB.ExecTx(ctx, func(ctx context.Context, tx *sqlx.Tx) error { return ev.Sync(ctx, tx, resources.DB, obj.ID) }) if err != nil { return errors.New("cannot sync non-state event to the database") } diff --git a/internal/incident/incidents_test.go b/internal/incident/incidents_test.go index fd06b7ed..65d482a1 100644 --- a/internal/incident/incidents_test.go +++ b/internal/incident/incidents_test.go @@ -13,7 +13,7 @@ import ( "github.com/jmoiron/sqlx" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap/zaptest" + "go.uber.org/zap" "testing" "time" ) @@ -21,6 +21,10 @@ import ( func TestLoadOpenIncidents(t *testing.T) { ctx := context.Background() db := testutils.GetTestDB(ctx, t) + logs := logging.NewLoggingWithFactory("testing", zap.DebugLevel, time.Hour, testutils.NewTestLoggerFactory(t)) + resources := config.MakeResources(nil, nil, db, logs) + runtimeC := config.NewRuntimeConfig(resources) + resources.RuntimeConfig = runtimeC // Insert a dummy source for our test cases! source := &config.Source{ @@ -50,12 +54,12 @@ func TestLoadOpenIncidents(t *testing.T) { testData := make(map[string]*Incident, 10*db.Options.MaxPlaceholdersPerStatement) for j := 1; j <= 10*db.Options.MaxPlaceholdersPerStatement; j++ { - i := makeIncident(ctx, db, t, source.ID, false) + i := makeIncident(ctx, t, resources, source.ID, false) testData[i.ObjectID.String()] = i } t.Run("WithNoRecoveredIncidents", func(t *testing.T) { - assertIncidents(ctx, db, t, testData) + assertIncidents(ctx, t, resources, testData) }) t.Run("WithSomeRecoveredIncidents", func(t *testing.T) { @@ -82,16 +86,16 @@ func TestLoadOpenIncidents(t *testing.T) { for j := 1; j <= db.Options.MaxPlaceholdersPerStatement/2; j++ { // We don't need to cache recovered incidents in memory. - _ = makeIncident(ctx, db, t, source.ID, true) + _ = makeIncident(ctx, t, resources, source.ID, true) if j%2 == 0 { // Add some extra new not recovered incidents to fully simulate a daemon reload. - i := makeIncident(ctx, db, t, source.ID, false) + i := makeIncident(ctx, t, resources, source.ID, false) testData[i.ObjectID.String()] = i } } - assertIncidents(ctx, db, t, testData) + assertIncidents(ctx, t, resources, testData) }) } @@ -99,9 +103,7 @@ func TestLoadOpenIncidents(t *testing.T) { // // The incident loading process is limited to a maximum duration of 10 seconds and will be // aborted and causes the entire test suite to fail immediately, if it takes longer. -func assertIncidents(ctx context.Context, db *database.DB, t *testing.T, testData map[string]*Incident) { - logger := logging.NewLogger(zaptest.NewLogger(t).Sugar(), time.Hour) - +func assertIncidents(ctx context.Context, t *testing.T, resources *config.Resources, testData map[string]*Incident) { // Since we have been using object.FromEvent() to persist the test objects to the database, // these will be automatically added to the objects cache as well. So clear the cache before // reloading the incidents, otherwise it will panic in object.RestoreObjects(). @@ -112,7 +114,7 @@ func assertIncidents(ctx context.Context, db *database.DB, t *testing.T, testDat ctx, cancelFunc := context.WithDeadline(ctx, time.Now().Add(10*time.Second)) defer cancelFunc() - err := LoadOpenIncidents(ctx, db, logger, &config.RuntimeConfig{}) + err := LoadOpenIncidents(ctx, resources) require.NoError(t, err, "failed to load not recovered incidents") incidents := GetCurrentIncidents() @@ -145,7 +147,7 @@ func assertIncidents(ctx context.Context, db *database.DB, t *testing.T, testDat // This will firstly create and synchronise a new object from a freshly generated dummy event with distinct // tags and name, and ensures that no error is returned, otherwise it will cause the entire test suite to fail. // Once the object has been successfully synchronised, an incident is created and synced with the database. -func makeIncident(ctx context.Context, db *database.DB, t *testing.T, sourceID int64, recovered bool) *Incident { +func makeIncident(ctx context.Context, t *testing.T, resources *config.Resources, sourceID int64, recovered bool) *Incident { ev := &event.Event{ Time: time.Time{}, SourceId: sourceID, @@ -162,10 +164,10 @@ func makeIncident(ctx context.Context, db *database.DB, t *testing.T, sourceID i }, } - o, err := object.FromEvent(ctx, db, ev) + o, err := object.FromEvent(ctx, resources.DB, ev) require.NoError(t, err) - i := NewIncident(db, o, &config.RuntimeConfig{}, nil) + i := NewIncident(o, resources) i.StartedAt = types.UnixMilli(time.Now().Add(-2 * time.Hour).Truncate(time.Second)) i.Severity = baseEv.SeverityCrit if recovered { @@ -173,7 +175,7 @@ func makeIncident(ctx context.Context, db *database.DB, t *testing.T, sourceID i i.RecoveredAt = types.UnixMilli(time.Now()) } - tx, err := db.BeginTxx(ctx, nil) + tx, err := resources.DB.BeginTxx(ctx, nil) require.NoError(t, err, "starting a transaction should not fail") require.NoError(t, i.Sync(ctx, tx), "failed to insert incident") require.NoError(t, tx.Commit(), "committing a transaction should not fail") diff --git a/internal/incident/sync.go b/internal/incident/sync.go index 61bafd3d..fc308ff1 100644 --- a/internal/incident/sync.go +++ b/internal/incident/sync.go @@ -27,13 +27,13 @@ func (i *Incident) Upsert() interface{} { // Returns an error on db failure. func (i *Incident) Sync(ctx context.Context, tx *sqlx.Tx) error { if i.Id != 0 { - stmt, _ := i.db.BuildUpsertStmt(i) + stmt, _ := i.DB.BuildUpsertStmt(i) _, err := tx.NamedExecContext(ctx, stmt, i) if err != nil { return fmt.Errorf("failed to upsert incident: %w", err) } } else { - stmt := database.BuildInsertStmtWithout(i.db, i, "id") + stmt := database.BuildInsertStmtWithout(i.DB, i, "id") incidentId, err := database.InsertObtainID(ctx, tx, stmt, i) if err != nil { return err @@ -48,7 +48,7 @@ func (i *Incident) Sync(ctx context.Context, tx *sqlx.Tx) error { func (i *Incident) AddEscalationTriggered(ctx context.Context, tx *sqlx.Tx, state *EscalationState) error { state.IncidentID = i.Id - stmt, _ := i.db.BuildUpsertStmt(state) + stmt, _ := i.DB.BuildUpsertStmt(state) _, err := tx.NamedExecContext(ctx, stmt, state) return err @@ -57,7 +57,7 @@ func (i *Incident) AddEscalationTriggered(ctx context.Context, tx *sqlx.Tx, stat // AddEvent Inserts incident history record to the database and returns an error on db failure. func (i *Incident) AddEvent(ctx context.Context, tx *sqlx.Tx, ev *event.Event) error { ie := &EventRow{IncidentID: i.Id, EventID: ev.ID} - stmt, _ := i.db.BuildInsertStmt(ie) + stmt, _ := i.DB.BuildInsertStmt(ie) _, err := tx.NamedExecContext(ctx, stmt, ie) return err @@ -98,7 +98,7 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru OldRecipientRole: oldRole, } - if err := hr.Sync(ctx, i.db, tx); err != nil { + if err := hr.Sync(ctx, i.DB, tx); err != nil { i.logger.Errorw( "Failed to insert recipient role changed incident history", zap.Object("escalation", escalation), zap.String("recipients", r.String()), zap.Error(err), @@ -109,7 +109,7 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru cr.Role = state.Role } - stmt, _ := i.db.BuildUpsertStmt(cr) + stmt, _ := i.DB.BuildUpsertStmt(cr) _, err := tx.NamedExecContext(ctx, stmt, cr) if err != nil { i.logger.Errorw( @@ -127,7 +127,7 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru // Returns an error on database failure. func (i *Incident) AddRuleMatched(ctx context.Context, tx *sqlx.Tx, r *rule.Rule) error { rr := &RuleRow{IncidentID: i.Id, RuleID: r.ID} - stmt, _ := i.db.BuildUpsertStmt(rr) + stmt, _ := i.DB.BuildUpsertStmt(rr) _, err := tx.NamedExecContext(ctx, stmt, rr) return err @@ -159,7 +159,7 @@ func (i *Incident) generateNotifications( hr.NotificationState = NotificationStateSuppressed } - if err := hr.Sync(ctx, i.db, tx); err != nil { + if err := hr.Sync(ctx, i.DB, tx); err != nil { i.logger.Errorw("Failed to insert incident notification history", zap.String("contact", contact.FullName), zap.Bool("incident_muted", i.isMuted), diff --git a/internal/listener/listener.go b/internal/listener/listener.go index 683a5529..94025ae2 100644 --- a/internal/listener/listener.go +++ b/internal/listener/listener.go @@ -6,13 +6,11 @@ import ( "encoding/json" "errors" "fmt" - "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/logging" baseEv "github.com/icinga/icinga-go-library/notifications/event" baseSource "github.com/icinga/icinga-go-library/notifications/source" "github.com/icinga/icinga-notifications/internal" "github.com/icinga/icinga-notifications/internal/config" - "github.com/icinga/icinga-notifications/internal/daemon" "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/incident" "go.uber.org/zap" @@ -22,21 +20,15 @@ import ( ) type Listener struct { - db *database.DB - logger *logging.Logger - runtimeConfig *config.RuntimeConfig + *config.Resources // Embedding this struct to have easy access to all resources. - logs *logging.Logging - mux http.ServeMux + logger *logging.Logger + + mux http.ServeMux } -func NewListener(db *database.DB, runtimeConfig *config.RuntimeConfig, logs *logging.Logging) *Listener { - l := &Listener{ - db: db, - logger: logs.GetChildLogger("listener"), - logs: logs, - runtimeConfig: runtimeConfig, - } +func NewListener(resources *config.Resources) *Listener { + l := &Listener{Resources: resources, logger: resources.Logs.GetChildLogger("listener")} debugMux := http.NewServeMux() debugMux.HandleFunc("/dump-config", l.DumpConfig) @@ -61,7 +53,7 @@ func (l *Listener) ServeHTTP(rw http.ResponseWriter, req *http.Request) { // // An error is returned in every case except for a gracefully context-based shutdown without hitting the time limit. func (l *Listener) Run(ctx context.Context) error { - listenAddr := daemon.Config().Listen + listenAddr := l.DaemonConfig.Listen l.logger.Infof("Starting listener on http://%s", listenAddr) server := &http.Server{ Addr: listenAddr, @@ -90,7 +82,7 @@ func (l *Listener) Run(ctx context.Context) error { // returned and 401 was written back to the response writer. func (l *Listener) sourceFromAuthOrAbort(w http.ResponseWriter, r *http.Request) (*config.Source, bool) { if authUser, authPass, authOk := r.BasicAuth(); authOk { - src := l.runtimeConfig.GetSourceFromCredentials(authUser, authPass, l.logger) + src := l.RuntimeConfig.GetSourceFromCredentials(authUser, authPass, l.logger) if src != nil { return src, true } @@ -138,7 +130,7 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, r *http.Request) { // If the client uses an outdated rules version, reject the request but also send the current rules version // and rules for this source back to the client, so it can retry the request with the updated rules. - if latestRuleVersion := l.runtimeConfig.GetRulesVersionFor(src.ID); ev.RulesVersion != latestRuleVersion { + if latestRuleVersion := l.RuntimeConfig.GetRulesVersionFor(src.ID); ev.RulesVersion != latestRuleVersion { w.WriteHeader(http.StatusPreconditionFailed) l.writeSourceRulesInfo(w, src) @@ -149,7 +141,7 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, r *http.Request) { return } - ev.CompleteURL(daemon.Config().Icingaweb2URL) + ev.CompleteURL(l.Resources.DaemonConfig.Icingaweb2URL) ev.Time = time.Now() ev.SourceId = src.ID if ev.Type == baseEv.TypeUnknown { @@ -166,7 +158,7 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, r *http.Request) { } l.logger.Infow("Processing event", zap.String("event", ev.String())) - err := incident.ProcessEvent(context.Background(), l.db, l.logs, l.runtimeConfig, &ev) + err := incident.ProcessEvent(context.Background(), l.Resources, &ev) if errors.Is(err, event.ErrSuperfluousStateChange) || errors.Is(err, event.ErrSuperfluousMuteUnmuteEvent) { abort(http.StatusNotAcceptable, &ev, "%v", err) return @@ -187,7 +179,7 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, r *http.Request) { // configured or the supplied password is incorrect, it sends an error code and does not redirect the request. func (l *Listener) requireDebugAuth(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - expectedPassword := daemon.Config().DebugPassword + expectedPassword := l.DaemonConfig.DebugPassword if expectedPassword == "" { w.WriteHeader(http.StatusForbidden) _, _ = fmt.Fprintln(w, "config dump disabled, no debug-password set in config") @@ -220,7 +212,7 @@ func (l *Listener) DumpConfig(w http.ResponseWriter, r *http.Request) { enc := json.NewEncoder(w) enc.SetIndent("", " ") - _ = enc.Encode(&l.runtimeConfig.ConfigSet) + _ = enc.Encode(&l.RuntimeConfig.ConfigSet) } // DumpIncidents is used as /debug prefixed endpoint to dump all incidents. The authorization has to be done beforehand. @@ -268,10 +260,10 @@ func (l *Listener) DumpSchedules(w http.ResponseWriter, r *http.Request) { return } - l.runtimeConfig.RLock() - defer l.runtimeConfig.RUnlock() + l.RuntimeConfig.RLock() + defer l.RuntimeConfig.RUnlock() - for _, schedule := range l.runtimeConfig.Schedules { + for _, schedule := range l.RuntimeConfig.Schedules { _, _ = fmt.Fprintf(w, "[id=%d] %q:\n", schedule.ID, schedule.Name) // Iterate in 30 minute steps as this is the granularity Icinga Notifications Web allows in the configuration. @@ -294,12 +286,12 @@ func (l *Listener) DumpRules(w http.ResponseWriter, r *http.Request) { return } - l.runtimeConfig.RLock() - defer l.runtimeConfig.RUnlock() + l.RuntimeConfig.RLock() + defer l.RuntimeConfig.RUnlock() enc := json.NewEncoder(w) enc.SetIndent("", " ") - _ = enc.Encode(l.runtimeConfig.Rules) + _ = enc.Encode(l.RuntimeConfig.Rules) } // writeSourceRulesInfo writes the rules information for a specific source to the response writer. @@ -309,18 +301,18 @@ func (l *Listener) writeSourceRulesInfo(w http.ResponseWriter, source *config.So var rulesInfo baseSource.RulesInfo func() { // Use a function to ensure that the RLock and RUnlock are called before writing the response. - l.runtimeConfig.RLock() - defer l.runtimeConfig.RUnlock() + l.RuntimeConfig.RLock() + defer l.RuntimeConfig.RUnlock() - if sourceInfo, ok := l.runtimeConfig.RulesBySource[source.ID]; ok { + if sourceInfo, ok := l.RuntimeConfig.RulesBySource[source.ID]; ok { rulesInfo.Version = sourceInfo.Version.String() rulesInfo.Rules = make(map[string]string) for _, rID := range sourceInfo.RuleIDs { id := strconv.FormatInt(rID, 10) filterExpr := "" - if l.runtimeConfig.Rules[rID].ObjectFilterExpr.Valid { - filterExpr = l.runtimeConfig.Rules[rID].ObjectFilterExpr.String + if l.RuntimeConfig.Rules[rID].ObjectFilterExpr.Valid { + filterExpr = l.RuntimeConfig.Rules[rID].ObjectFilterExpr.String } rulesInfo.Rules[id] = filterExpr diff --git a/internal/testutils/testutils.go b/internal/testutils/testutils.go index 055c8277..eb5b95b3 100644 --- a/internal/testutils/testutils.go +++ b/internal/testutils/testutils.go @@ -8,6 +8,8 @@ import ( "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/logging" "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest" "os" "strconv" @@ -66,3 +68,11 @@ func MakeRandomString(t *testing.T) string { return fmt.Sprintf("%x", buf) } + +// NewTestLoggerFactory creates a new zap logger factory for testing purposes. +// It uses zaptest to create a logger that writes to the testing output. +func NewTestLoggerFactory(t *testing.T) logging.CoreFactory { + return func(level zap.AtomicLevel) zapcore.Core { + return zaptest.NewLogger(t, zaptest.Level(level.Level())).Core() + } +} From 5d0d19c75f69f3d0b73da4afa74b4ab9ec283c40 Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Thu, 18 Sep 2025 16:14:34 +0200 Subject: [PATCH 3/3] Validate & cache IW2 url instance at daemon start up --- internal/channel/channel.go | 6 ++---- internal/daemon/config.go | 17 +++++++++++++++++ internal/event/event.go | 12 ++++++------ internal/incident/incident.go | 2 +- internal/listener/listener.go | 2 +- 5 files changed, 27 insertions(+), 12 deletions(-) diff --git a/internal/channel/channel.go b/internal/channel/channel.go index 0c33a8e4..0fae3969 100644 --- a/internal/channel/channel.go +++ b/internal/channel/channel.go @@ -12,7 +12,6 @@ import ( "github.com/icinga/icinga-notifications/internal/recipient" "go.uber.org/zap" "go.uber.org/zap/zapcore" - "net/url" ) type Channel struct { @@ -163,7 +162,7 @@ func (c *Channel) Restart(logger *zap.SugaredLogger) { } // Notify prepares and sends the notification request, returns a non-error on fails, nil on success -func (c *Channel) Notify(contact *recipient.Contact, i contracts.Incident, ev *event.Event, icingaweb2Url string) error { +func (c *Channel) Notify(contact *recipient.Contact, i contracts.Incident, ev *event.Event) error { p := c.getPlugin() if p == nil { return errors.New("plugin could not be started") @@ -174,8 +173,7 @@ func (c *Channel) Notify(contact *recipient.Contact, i contracts.Incident, ev *e contactStruct.Addresses = append(contactStruct.Addresses, &plugin.Address{Type: addr.Type, Address: addr.Address}) } - baseUrl, _ := url.Parse(icingaweb2Url) - incidentUrl := baseUrl.JoinPath("/notifications/incident") + incidentUrl := c.daemonConfig.IcingaWeb2UrlParsed.JoinPath("/notifications/incident") incidentUrl.RawQuery = fmt.Sprintf("id=%d", i.ID()) object := i.IncidentObject() diff --git a/internal/daemon/config.go b/internal/daemon/config.go index c7fb5898..635a5644 100644 --- a/internal/daemon/config.go +++ b/internal/daemon/config.go @@ -2,12 +2,14 @@ package daemon import ( "errors" + "fmt" "github.com/creasty/defaults" "github.com/icinga/icinga-go-library/config" "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/logging" "github.com/icinga/icinga-go-library/utils" "github.com/icinga/icinga-notifications/internal" + "net/url" "os" "time" ) @@ -25,6 +27,12 @@ type ConfigFile struct { Icingaweb2URL string `yaml:"icingaweb2-url"` Database database.Config `yaml:"database"` Logging logging.Config `yaml:"logging"` + + // IcingaWeb2UrlParsed holds the parsed Icinga Web 2 URL after validation of the config file. + // + // This field is not part of the YAML config and is only populated after successful validation. + // The resulting URL always ends with a trailing slash, making it easier to resolve relative paths against it. + IcingaWeb2UrlParsed *url.URL } // SetDefaults implements the defaults.Setter interface. @@ -44,6 +52,15 @@ func (c *ConfigFile) Validate() error { return err } + if c.Icingaweb2URL == "" { + return errors.New("icingaweb2-url must be set") + } + + parsedUrl, err := url.Parse(c.Icingaweb2URL) + if err != nil { + return fmt.Errorf("invalid icingaweb2-url: %w", err) + } + c.IcingaWeb2UrlParsed = parsedUrl.JoinPath("/") return nil } diff --git a/internal/event/event.go b/internal/event/event.go index c8fb5a32..da5d59f5 100644 --- a/internal/event/event.go +++ b/internal/event/event.go @@ -36,18 +36,18 @@ type Event struct { } // CompleteURL prefixes the URL with the given Icinga Web 2 base URL unless it already carries a URL or is empty. -func (e *Event) CompleteURL(icingaWebBaseUrl string) { +func (e *Event) CompleteURL(icingaWebBaseUrl *url.URL) { if e.URL == "" { return } - if !strings.HasSuffix(icingaWebBaseUrl, "/") { - icingaWebBaseUrl += "/" + u, err := url.Parse(strings.TrimLeft(e.URL, "/")) + if err != nil { + return // leave as is on parse error } - u, err := url.Parse(e.URL) - if err != nil || u.Scheme == "" { - e.URL = icingaWebBaseUrl + e.URL + if !u.IsAbs() { + e.URL = icingaWebBaseUrl.ResolveReference(u).String() } } diff --git a/internal/incident/incident.go b/internal/incident/incident.go index 46281817..19b4bc4c 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -640,7 +640,7 @@ func (i *Incident) notifyContact(contact *recipient.Contact, ev *event.Event, ch i.logger.Infow(fmt.Sprintf("Notify contact %q via %q of type %q", contact.FullName, ch.Name, ch.Type), zap.Int64("channel_id", chID), zap.String("event_type", ev.Type.String())) - err := ch.Notify(contact, i, ev, i.DaemonConfig.Icingaweb2URL) + err := ch.Notify(contact, i, ev) if err != nil { i.logger.Errorw("Failed to send notification via channel plugin", zap.String("type", ch.Type), zap.Error(err)) return err diff --git a/internal/listener/listener.go b/internal/listener/listener.go index 94025ae2..b91bdf3d 100644 --- a/internal/listener/listener.go +++ b/internal/listener/listener.go @@ -141,7 +141,7 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, r *http.Request) { return } - ev.CompleteURL(l.Resources.DaemonConfig.Icingaweb2URL) + ev.CompleteURL(l.DaemonConfig.IcingaWeb2UrlParsed) ev.Time = time.Now() ev.SourceId = src.ID if ev.Type == baseEv.TypeUnknown {