diff --git a/cmd/icinga-notifications/main.go b/cmd/icinga-notifications/main.go index 6ac3f434..66d4cc71 100644 --- a/cmd/icinga-notifications/main.go +++ b/cmd/icinga-notifications/main.go @@ -19,8 +19,7 @@ import ( ) func main() { - daemon.ParseFlagsAndConfig() - conf := daemon.Config() + conf := daemon.ParseFlagsAndConfig() logs, err := logging.NewLoggingFromConfig("icinga-notifications", conf.Logging) if err != nil { @@ -45,17 +44,19 @@ func main() { logger.Fatalf("Cannot connect to the database: %+v", err) } - channel.UpsertPlugins(ctx, conf.ChannelsDir, logs.GetChildLogger("channel"), db) + channel.UpsertPlugins(ctx, conf, logs.GetChildLogger("channel"), db) - runtimeConfig := config.NewRuntimeConfig(logs, db) + resources := config.MakeResources(nil, conf, db, logs) + runtimeConfig := config.NewRuntimeConfig(resources) + resources.RuntimeConfig = runtimeConfig if err := runtimeConfig.UpdateFromDatabase(ctx); err != nil { logger.Fatalf("Failed to load config from database %+v", err) } go runtimeConfig.PeriodicUpdates(ctx, 1*time.Second) - err = incident.LoadOpenIncidents(ctx, db, logs.GetChildLogger("incident"), runtimeConfig) - if err != nil { + logger.Info("Loading all active incidents from database") + if err = incident.LoadOpenIncidents(ctx, resources); err != nil { logger.Fatalf("Cannot load incidents from database: %+v", err) } @@ -68,7 +69,7 @@ func main() { // When Icinga Notifications is started by systemd, we've to notify systemd that we're ready. _ = sdnotify.Ready() - if err := listener.NewListener(db, runtimeConfig, logs).Run(ctx); err != nil { + if err := listener.NewListener(resources).Run(ctx); err != nil { logger.Errorf("Listener has finished with an error: %+v", err) } else { logger.Info("Listener has finished") diff --git a/internal/channel/channel.go b/internal/channel/channel.go index acde0bb8..0fae3969 100644 --- a/internal/channel/channel.go +++ b/internal/channel/channel.go @@ -7,11 +7,11 @@ import ( "github.com/icinga/icinga-go-library/notifications/plugin" "github.com/icinga/icinga-notifications/internal/config/baseconf" "github.com/icinga/icinga-notifications/internal/contracts" + "github.com/icinga/icinga-notifications/internal/daemon" "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/recipient" "go.uber.org/zap" "go.uber.org/zap/zapcore" - "net/url" ) type Channel struct { @@ -23,8 +23,9 @@ type Channel struct { Logger *zap.SugaredLogger `db:"-"` - restartCh chan newConfig - pluginCh chan *Plugin + daemonConfig *daemon.ConfigFile + restartCh chan newConfig + pluginCh chan *Plugin pluginCtx context.Context pluginCtxCancel func() @@ -50,8 +51,9 @@ type newConfig struct { } // Start initializes the channel and starts the plugin in the background -func (c *Channel) Start(ctx context.Context, logger *zap.SugaredLogger) { +func (c *Channel) Start(ctx context.Context, conf *daemon.ConfigFile, logger *zap.SugaredLogger) { c.Logger = logger.With(zap.Object("channel", c)) + c.daemonConfig = conf c.restartCh = make(chan newConfig) c.pluginCh = make(chan *Plugin) c.pluginCtx, c.pluginCtxCancel = context.WithCancel(ctx) @@ -63,7 +65,7 @@ func (c *Channel) Start(ctx context.Context, logger *zap.SugaredLogger) { func (c *Channel) initPlugin(cType string, config string) *Plugin { c.Logger.Debug("Initializing channel plugin") - p, err := NewPlugin(cType, c.Logger) + p, err := NewPlugin(cType, c.daemonConfig.ChannelsDir, c.Logger) if err != nil { c.Logger.Errorw("Failed to initialize channel plugin", zap.Error(err)) return nil @@ -160,7 +162,7 @@ func (c *Channel) Restart(logger *zap.SugaredLogger) { } // Notify prepares and sends the notification request, returns a non-error on fails, nil on success -func (c *Channel) Notify(contact *recipient.Contact, i contracts.Incident, ev *event.Event, icingaweb2Url string) error { +func (c *Channel) Notify(contact *recipient.Contact, i contracts.Incident, ev *event.Event) error { p := c.getPlugin() if p == nil { return errors.New("plugin could not be started") @@ -171,8 +173,7 @@ func (c *Channel) Notify(contact *recipient.Contact, i contracts.Incident, ev *e contactStruct.Addresses = append(contactStruct.Addresses, &plugin.Address{Type: addr.Type, Address: addr.Address}) } - baseUrl, _ := url.Parse(icingaweb2Url) - incidentUrl := baseUrl.JoinPath("/notifications/incident") + incidentUrl := c.daemonConfig.IcingaWeb2UrlParsed.JoinPath("/notifications/incident") incidentUrl.RawQuery = fmt.Sprintf("id=%d", i.ID()) object := i.IncidentObject() diff --git a/internal/channel/plugin.go b/internal/channel/plugin.go index 59fa24cf..86497304 100644 --- a/internal/channel/plugin.go +++ b/internal/channel/plugin.go @@ -30,8 +30,8 @@ type Plugin struct { } // NewPlugin starts and returns a new plugin instance. If the start of the plugin fails, an error is returned -func NewPlugin(pluginType string, logger *zap.SugaredLogger) (*Plugin, error) { - file := filepath.Join(daemon.Config().ChannelsDir, pluginType) +func NewPlugin(pluginType, channelsDir string, logger *zap.SugaredLogger) (*Plugin, error) { + file := filepath.Join(channelsDir, pluginType) logger.Debugw("Starting new channel plugin process", zap.String("path", file)) @@ -167,9 +167,9 @@ func forwardLogs(errPipe io.Reader, logger *zap.SugaredLogger) { } // UpsertPlugins upsert the available_channel_type table with working plugins -func UpsertPlugins(ctx context.Context, channelPluginDir string, logger *logging.Logger, db *database.DB) { +func UpsertPlugins(ctx context.Context, conf *daemon.ConfigFile, logger *logging.Logger, db *database.DB) { logger.Debug("Updating available channel types") - files, err := os.ReadDir(channelPluginDir) + files, err := os.ReadDir(conf.ChannelsDir) if err != nil { logger.Errorw("Failed to read the channel plugin directory", zap.Error(err)) } @@ -185,7 +185,7 @@ func UpsertPlugins(ctx context.Context, channelPluginDir string, logger *logging continue } - p, err := NewPlugin(pluginType, pluginLogger) + p, err := NewPlugin(pluginType, conf.ChannelsDir, pluginLogger) if err != nil { pluginLogger.Errorw("Failed to start plugin", zap.Error(err)) continue diff --git a/internal/config/channel.go b/internal/config/channel.go index 1d49dda9..9b848f93 100644 --- a/internal/config/channel.go +++ b/internal/config/channel.go @@ -11,7 +11,7 @@ func (r *RuntimeConfig) applyPendingChannels() { r, &r.Channels, &r.configChange.Channels, func(newElement *channel.Channel) error { - newElement.Start(context.TODO(), r.logs.GetChildLogger("channel").SugaredLogger) + newElement.Start(context.TODO(), r.DaemonConfig, r.Logs.GetChildLogger("channel").SugaredLogger) return nil }, func(curElement, update *channel.Channel) error { @@ -19,7 +19,7 @@ func (r *RuntimeConfig) applyPendingChannels() { curElement.Name = update.Name curElement.Type = update.Type curElement.Config = update.Config - curElement.Restart(r.logs.GetChildLogger("channel").SugaredLogger) + curElement.Restart(r.Logs.GetChildLogger("channel").SugaredLogger) return nil }, func(delElement *channel.Channel) error { diff --git a/internal/config/incremental_sync.go b/internal/config/incremental_sync.go index 5959ec4a..67e379b0 100644 --- a/internal/config/incremental_sync.go +++ b/internal/config/incremental_sync.go @@ -58,7 +58,7 @@ func incrementalFetch[ stmtLogger := r.logger.With(zap.String("table", tableName)) var ( - stmt = r.db.BuildSelectStmt(typePtr, typePtr) + stmt = r.DB.BuildSelectStmt(typePtr, typePtr) stmtArgs []any ) if hasChangedAt { @@ -67,7 +67,7 @@ func incrementalFetch[ stmtArgs = []any{changedAt} } - stmt = r.db.Rebind(stmt + ` ORDER BY "changed_at"`) + stmt = r.DB.Rebind(stmt + ` ORDER BY "changed_at"`) stmtLogger = stmtLogger.With(zap.String("query", stmt)) var ts []T diff --git a/internal/config/runtime.go b/internal/config/runtime.go index 65b028a7..8a1447ce 100644 --- a/internal/config/runtime.go +++ b/internal/config/runtime.go @@ -8,6 +8,7 @@ import ( "github.com/icinga/icinga-go-library/logging" "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-notifications/internal/channel" + "github.com/icinga/icinga-notifications/internal/daemon" "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icinga-notifications/internal/rule" "github.com/icinga/icinga-notifications/internal/timeperiod" @@ -19,12 +20,30 @@ import ( "time" ) +// Resources holds references to commonly used objects. +type Resources struct { + DaemonConfig *daemon.ConfigFile `db:"-" json:"-"` + RuntimeConfig *RuntimeConfig `db:"-" json:"-"` + DB *database.DB `db:"-" json:"-"` + Logs *logging.Logging `db:"-" json:"-"` +} + +// MakeResources creates a new Resources instance with the provided components. +// +// This function initializes the Resources struct by assigning the given [RuntimeConfig], +// database connection, and logging facilities. +func MakeResources(rc *RuntimeConfig, file *daemon.ConfigFile, db *database.DB, logs *logging.Logging) *Resources { + return &Resources{RuntimeConfig: rc, DaemonConfig: file, DB: db, Logs: logs} +} + // RuntimeConfig stores the runtime representation of the configuration present in the database. type RuntimeConfig struct { // ConfigSet is the current live config. It is embedded to allow direct access to its members. // Accessing it requires a lock that is obtained with RLock() and released with RUnlock(). ConfigSet + *Resources // Contains references to commonly used objects and to itself. + // configChange contains incremental changes to config objects to be merged into the live configuration. // // It will be both created and deleted within RuntimeConfig.UpdateFromDatabase. To keep track of the known state, @@ -33,24 +52,18 @@ type RuntimeConfig struct { configChangeAvailable bool configChangeTimestamps map[string]types.UnixMilli - logs *logging.Logging logger *logging.Logger - db *database.DB // mu is used to synchronize access to the live ConfigSet. mu sync.RWMutex } -func NewRuntimeConfig( - logs *logging.Logging, - db *database.DB, -) *RuntimeConfig { +func NewRuntimeConfig(resouces *Resources) *RuntimeConfig { return &RuntimeConfig{ configChangeTimestamps: make(map[string]types.UnixMilli), - logs: logs, - logger: logs.GetChildLogger("runtime-updates"), - db: db, + Resources: resouces, + logger: resouces.Logs.GetChildLogger("runtime-updates"), } } @@ -235,7 +248,7 @@ func (r *RuntimeConfig) GetSourceFromCredentials(user, pass string, logger *logg } func (r *RuntimeConfig) fetchFromDatabase(ctx context.Context) error { - tx, err := r.db.BeginTxx(ctx, &sql.TxOptions{ + tx, err := r.DB.BeginTxx(ctx, &sql.TxOptions{ Isolation: sql.LevelRepeatableRead, ReadOnly: true, }) diff --git a/internal/daemon/config.go b/internal/daemon/config.go index 72e9742d..635a5644 100644 --- a/internal/daemon/config.go +++ b/internal/daemon/config.go @@ -2,12 +2,14 @@ package daemon import ( "errors" + "fmt" "github.com/creasty/defaults" "github.com/icinga/icinga-go-library/config" "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/logging" "github.com/icinga/icinga-go-library/utils" "github.com/icinga/icinga-notifications/internal" + "net/url" "os" "time" ) @@ -25,6 +27,12 @@ type ConfigFile struct { Icingaweb2URL string `yaml:"icingaweb2-url"` Database database.Config `yaml:"database"` Logging logging.Config `yaml:"logging"` + + // IcingaWeb2UrlParsed holds the parsed Icinga Web 2 URL after validation of the config file. + // + // This field is not part of the YAML config and is only populated after successful validation. + // The resulting URL always ends with a trailing slash, making it easier to resolve relative paths against it. + IcingaWeb2UrlParsed *url.URL } // SetDefaults implements the defaults.Setter interface. @@ -44,6 +52,15 @@ func (c *ConfigFile) Validate() error { return err } + if c.Icingaweb2URL == "" { + return errors.New("icingaweb2-url must be set") + } + + parsedUrl, err := url.Parse(c.Icingaweb2URL) + if err != nil { + return fmt.Errorf("invalid icingaweb2-url: %w", err) + } + c.IcingaWeb2UrlParsed = parsedUrl.JoinPath("/") return nil } @@ -61,23 +78,10 @@ type Flags struct { Config string `short:"c" long:"config" description:"path to config file"` } -// daemonConfig holds the configuration state as a singleton. -// It is initialised by the ParseFlagsAndConfig func and exposed through the Config function. -var daemonConfig *ConfigFile - -// Config returns the config that was loaded while starting the daemon. -// Panics when ParseFlagsAndConfig was not called earlier. -func Config() *ConfigFile { - if daemonConfig == nil { - panic("ERROR: daemon.Config() called before daemon.ParseFlagsAndConfig()") - } - - return daemonConfig -} - // ParseFlagsAndConfig parses the CLI flags provided to the executable and tries to load the config from the YAML file. -// Prints any error during parsing or config loading to os.Stderr and exits. -func ParseFlagsAndConfig() { +// +// Prints any error during parsing or config loading to os.Stderr and exits, otherwise returns the loaded ConfigFile. +func ParseFlagsAndConfig() *ConfigFile { flags := Flags{Config: internal.SysConfDir + "/icinga-notifications/config.yml"} if err := config.ParseFlags(&flags); err != nil { if errors.Is(err, config.ErrInvalidArgument) { @@ -92,7 +96,7 @@ func ParseFlagsAndConfig() { os.Exit(ExitSuccess) } - daemonConfig = new(ConfigFile) + daemonConfig := new(ConfigFile) if err := config.FromYAMLFile(flags.Config, daemonConfig); err != nil { if errors.Is(err, config.ErrInvalidArgument) { panic(err) @@ -100,4 +104,5 @@ func ParseFlagsAndConfig() { utils.PrintErrorThenExit(err, ExitFailure) } + return daemonConfig } diff --git a/internal/event/event.go b/internal/event/event.go index c8fb5a32..da5d59f5 100644 --- a/internal/event/event.go +++ b/internal/event/event.go @@ -36,18 +36,18 @@ type Event struct { } // CompleteURL prefixes the URL with the given Icinga Web 2 base URL unless it already carries a URL or is empty. -func (e *Event) CompleteURL(icingaWebBaseUrl string) { +func (e *Event) CompleteURL(icingaWebBaseUrl *url.URL) { if e.URL == "" { return } - if !strings.HasSuffix(icingaWebBaseUrl, "/") { - icingaWebBaseUrl += "/" + u, err := url.Parse(strings.TrimLeft(e.URL, "/")) + if err != nil { + return // leave as is on parse error } - u, err := url.Parse(e.URL) - if err != nil || u.Scheme == "" { - e.URL = icingaWebBaseUrl + e.URL + if !u.IsAbs() { + e.URL = icingaWebBaseUrl.ResolveReference(u).String() } } diff --git a/internal/incident/incident.go b/internal/incident/incident.go index d674ba9b..19b4bc4c 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -4,12 +4,10 @@ import ( "context" "errors" "fmt" - "github.com/icinga/icinga-go-library/database" baseEv "github.com/icinga/icinga-go-library/notifications/event" "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-notifications/internal/config" "github.com/icinga/icinga-notifications/internal/contracts" - "github.com/icinga/icinga-notifications/internal/daemon" "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/object" "github.com/icinga/icinga-notifications/internal/recipient" @@ -49,28 +47,30 @@ type Incident struct { // This prevents us from generating multiple muted histories when receiving several events that mute our Object. isMuted bool - db *database.DB - logger *zap.SugaredLogger - runtimeConfig *config.RuntimeConfig + *config.Resources // Contains DB, RuntimeConfig, and Logs + + logger *zap.SugaredLogger sync.Mutex } -func NewIncident( - db *database.DB, obj *object.Object, runtimeConfig *config.RuntimeConfig, logger *zap.SugaredLogger, -) *Incident { +// NewIncident creates a new incident for the given [object.Object]. +// +// If obj is nil, the returned incident won't be associated with any object, and its ObjectID will be zeroed. +// The returned incident won't be persisted to the database, that has to be done by the caller using [Incident.Sync]. +func NewIncident(obj *object.Object, resources *config.Resources) *Incident { i := &Incident{ - db: db, Object: obj, - logger: logger, - runtimeConfig: runtimeConfig, EscalationState: map[escalationID]*EscalationState{}, Rules: map[ruleID]struct{}{}, Recipients: map[recipient.Key]*RecipientState{}, + Resources: resources, + logger: resources.Logs.GetChildLogger("incident").SugaredLogger, } if obj != nil { i.ObjectID = obj.ID + i.logger = i.logger.With(zap.String("object", obj.DisplayName())) } return i @@ -94,7 +94,7 @@ func (i *Incident) ID() int64 { func (i *Incident) HasManager() bool { for recipientKey, state := range i.Recipients { - if i.runtimeConfig.GetRecipient(recipientKey) == nil { + if i.RuntimeConfig.GetRecipient(recipientKey) == nil { i.logger.Debugw("Incident refers unknown recipient key, might got deleted", zap.Inline(recipientKey)) continue } @@ -123,8 +123,8 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { i.Lock() defer i.Unlock() - i.runtimeConfig.RLock() - defer i.runtimeConfig.RUnlock() + i.RuntimeConfig.RLock() + defer i.RuntimeConfig.RUnlock() // These event types are not like the others used to mute an object/incident, such as DowntimeStart, which // uniquely identify themselves why an incident is being muted, but are rather super generic types, and as @@ -137,14 +137,14 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { return event.ErrSuperfluousMuteUnmuteEvent } - tx, err := i.db.BeginTxx(ctx, nil) + tx, err := i.DB.BeginTxx(ctx, nil) if err != nil { i.logger.Errorw("Cannot start a db transaction", zap.Error(err)) return err } defer func() { _ = tx.Rollback() }() - if err = ev.Sync(ctx, tx, i.db, i.Object.ID); err != nil { + if err = ev.Sync(ctx, tx, i.DB, i.Object.ID); err != nil { i.logger.Errorw("Failed to insert event and fetch its ID", zap.String("event", ev.String()), zap.Error(err)) return err } @@ -234,8 +234,8 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) { i.Lock() defer i.Unlock() - i.runtimeConfig.RLock() - defer i.runtimeConfig.RUnlock() + i.RuntimeConfig.RLock() + defer i.RuntimeConfig.RUnlock() if !i.RecoveredAt.Time().IsZero() { // Incident is recovered in the meantime. @@ -260,8 +260,8 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) { var notifications []*NotificationEntry ctx := context.Background() - err = i.db.ExecTx(ctx, func(ctx context.Context, tx *sqlx.Tx) error { - err := ev.Sync(ctx, tx, i.db, i.Object.ID) + err = i.DB.ExecTx(ctx, func(ctx context.Context, tx *sqlx.Tx) error { + err := ev.Sync(ctx, tx, i.DB, i.Object.ID) if err != nil { return err } @@ -298,8 +298,8 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, oldSeverity := i.Severity newSeverity := ev.Severity if oldSeverity == newSeverity { - err := fmt.Errorf("%w: %s state event from source %d", event.ErrSuperfluousStateChange, ev.Severity.String(), ev.SourceId) - return err + i.logger.Debugw("Ignoring superfluous severity change event", zap.Int64("source_id", ev.SourceId), zap.Stringer("event", ev)) + return event.ErrSuperfluousStateChange } i.logger.Infof("Incident severity changed from %s to %s", oldSeverity.String(), newSeverity.String()) @@ -314,7 +314,7 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, Message: types.MakeString(ev.Message, types.TransformEmptyStringToNull), } - if err := hr.Sync(ctx, i.db, tx); err != nil { + if err := hr.Sync(ctx, i.DB, tx); err != nil { i.logger.Errorw("Failed to insert incident severity changed history", zap.Error(err)) return err } @@ -332,7 +332,7 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, Type: Closed, } - if err := hr.Sync(ctx, i.db, tx); err != nil { + if err := hr.Sync(ctx, i.DB, tx); err != nil { i.logger.Errorw("Cannot insert incident closed history to the database", zap.Error(err)) return err } @@ -370,7 +370,7 @@ func (i *Incident) processIncidentOpenedEvent(ctx context.Context, tx *sqlx.Tx, Message: types.MakeString(ev.Message, types.TransformEmptyStringToNull), } - if err := hr.Sync(ctx, i.db, tx); err != nil { + if err := hr.Sync(ctx, i.DB, tx); err != nil { i.logger.Errorw("Cannot insert incident opened history event", zap.Error(err)) return err } @@ -397,7 +397,7 @@ func (i *Incident) handleUnmute(ctx context.Context, tx *sqlx.Tx, ev *event.Even // On the other hand, if an object is unmuted, its mute reason is already reset, and we can't access it anymore. Message: types.MakeString(ev.MuteReason, types.TransformEmptyStringToNull), } - return hr.Sync(ctx, i.db, tx) + return hr.Sync(ctx, i.DB, tx) } // handleMute generates the corresponding Muted history if the incident is not yet muted but is going to be muted now. @@ -420,7 +420,7 @@ func (i *Incident) handleMute(ctx context.Context, tx *sqlx.Tx, ev *event.Event) // existed, we have to use the mute reason from this object and not from the ongoing event. Message: i.Object.MuteReason, } - return hr.Sync(ctx, i.db, tx) + return hr.Sync(ctx, i.DB, tx) } // applyMatchingRules walks through the rule IDs obtained from source and generates a RuleMatched history entry. @@ -436,7 +436,7 @@ func (i *Incident) applyMatchingRules(ctx context.Context, tx *sqlx.Tx, ev *even return fmt.Errorf("cannot convert rule id %q to an int: %w", ruleId, err) } - r, ok := i.runtimeConfig.Rules[ruleIdInt] + r, ok := i.RuntimeConfig.Rules[ruleIdInt] if !ok { i.logger.Errorw("Event refers to non-existing event rule, might got deleted", zap.Int64("rule_id", ruleIdInt)) return fmt.Errorf("cannot apply unknown rule %d", ruleIdInt) @@ -467,7 +467,7 @@ func (i *Incident) applyMatchingRules(ctx context.Context, tx *sqlx.Tx, ev *even RuleID: types.MakeInt(r.ID, types.TransformZeroIntToNull), Type: RuleMatched, } - if err := hr.Sync(ctx, i.db, tx); err != nil { + if err := hr.Sync(ctx, i.DB, tx); err != nil { i.logger.Errorw("Failed to insert rule matched incident history", zap.Object("rule", r), zap.Error(err)) return err } @@ -498,7 +498,7 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Escalation, retryAfter := rule.RetryNever for rID := range i.Rules { - r := i.runtimeConfig.Rules[rID] + r := i.RuntimeConfig.Rules[rID] if r == nil { i.logger.Debugw("Incident refers unknown rule, might got deleted", zap.Int64("rule_id", rID)) continue @@ -551,7 +551,7 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Escalation, // Returns an error on database failure. func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *event.Event, escalations []*rule.Escalation) error { for _, escalation := range escalations { - r := i.runtimeConfig.Rules[escalation.RuleID] + r := i.RuntimeConfig.Rules[escalation.RuleID] if r == nil { i.logger.Debugw("Incident refers unknown rule, might got deleted", zap.Int64("rule_id", escalation.RuleID)) continue @@ -579,7 +579,7 @@ func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *even Type: EscalationTriggered, } - if err := hr.Sync(ctx, i.db, tx); err != nil { + if err := hr.Sync(ctx, i.DB, tx); err != nil { i.logger.Errorw( "Failed to insert escalation triggered incident history", zap.Object("rule", r), zap.Object("escalation", escalation), zap.Error(err), @@ -599,7 +599,7 @@ func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *even // Returns error on database failure or if the provided context is cancelled. func (i *Incident) notifyContacts(ctx context.Context, ev *event.Event, notifications []*NotificationEntry) error { for _, notification := range notifications { - contact := i.runtimeConfig.Contacts[notification.ContactID] + contact := i.RuntimeConfig.Contacts[notification.ContactID] if contact == nil { i.logger.Debugw("Incident refers unknown contact, might got deleted", zap.Int64("contact_id", notification.ContactID)) continue @@ -612,8 +612,8 @@ func (i *Incident) notifyContacts(ctx context.Context, ev *event.Event, notifica } notification.SentAt = types.UnixMilli(time.Now()) - stmt, _ := i.db.BuildUpdateStmt(notification) - if _, err := i.db.NamedExecContext(ctx, stmt, notification); err != nil { + stmt, _ := i.DB.BuildUpdateStmt(notification) + if _, err := i.DB.NamedExecContext(ctx, stmt, notification); err != nil { i.logger.Errorw( "Failed to update contact notified incident history", zap.String("contact", contact.String()), zap.Error(err), @@ -630,7 +630,7 @@ func (i *Incident) notifyContacts(ctx context.Context, ev *event.Event, notifica // notifyContact notifies the given recipient via a channel matching the given ID. func (i *Incident) notifyContact(contact *recipient.Contact, ev *event.Event, chID int64) error { - ch := i.runtimeConfig.Channels[chID] + ch := i.RuntimeConfig.Channels[chID] if ch == nil { i.logger.Errorw("Could not find config for channel", zap.Int64("channel_id", chID)) @@ -640,7 +640,7 @@ func (i *Incident) notifyContact(contact *recipient.Contact, ev *event.Event, ch i.logger.Infow(fmt.Sprintf("Notify contact %q via %q of type %q", contact.FullName, ch.Name, ch.Type), zap.Int64("channel_id", chID), zap.String("event_type", ev.Type.String())) - err := ch.Notify(contact, i, ev, daemon.Config().Icingaweb2URL) + err := ch.Notify(contact, i, ev) if err != nil { i.logger.Errorw("Failed to send notification via channel plugin", zap.String("type", ch.Type), zap.Error(err)) return err @@ -660,7 +660,7 @@ var errSuperfluousAckEvent = errors.New("superfluous acknowledgement set event, // Promotes the ack author to incident.RoleManager if it's not already the case and generates a history entry. // Returns error on database failure. func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx, ev *event.Event) error { - contact := i.runtimeConfig.GetContact(ev.Username) + contact := i.RuntimeConfig.GetContact(ev.Username) if contact == nil { i.logger.Warnw("Ignoring acknowledgement event from an unknown author", zap.String("author", ev.Username)) @@ -696,14 +696,14 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx, Message: types.MakeString(ev.Message, types.TransformEmptyStringToNull), } - if err := hr.Sync(ctx, i.db, tx); err != nil { + if err := hr.Sync(ctx, i.DB, tx); err != nil { i.logger.Errorw("Failed to add recipient role changed history", zap.String("recipient", contact.String()), zap.Error(err)) return err } cr := &ContactRow{IncidentID: hr.IncidentID, Key: recipientKey, Role: newRole} - stmt, _ := i.db.BuildUpsertStmt(cr) + stmt, _ := i.DB.BuildUpsertStmt(cr) _, err := tx.NamedExecContext(ctx, stmt, cr) if err != nil { i.logger.Errorw("Failed to upsert incident contact", zap.String("contact", contact.String()), zap.Error(err)) @@ -718,7 +718,7 @@ func (i *Incident) getRecipientsChannel(t time.Time) rule.ContactChannels { contactChs := make(rule.ContactChannels) // Load all escalations recipients channels for escalationID := range i.EscalationState { - escalation := i.runtimeConfig.GetRuleEscalation(escalationID) + escalation := i.RuntimeConfig.GetRuleEscalation(escalationID) if escalation == nil { i.logger.Debugw("Incident refers unknown escalation, might got deleted", zap.Int64("escalation_id", escalationID)) continue @@ -731,7 +731,7 @@ func (i *Incident) getRecipientsChannel(t time.Time) rule.ContactChannels { // When a recipient has subscribed/managed this incident via the UI or using an ACK, fallback // to the default contact channel. for recipientKey, state := range i.Recipients { - r := i.runtimeConfig.GetRecipient(recipientKey) + r := i.RuntimeConfig.GetRecipient(recipientKey) if r == nil { i.logger.Debugw("Incident refers unknown recipient key, might got deleted", zap.Inline(recipientKey)) continue @@ -764,7 +764,7 @@ func (i *Incident) getRecipientsChannel(t time.Time) rule.ContactChannels { func (i *Incident) restoreRecipients(ctx context.Context) error { contact := &ContactRow{} var contacts []*ContactRow - err := i.db.SelectContext(ctx, &contacts, i.db.Rebind(i.db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), i.Id) + err := i.DB.SelectContext(ctx, &contacts, i.DB.Rebind(i.DB.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), i.Id) if err != nil { i.logger.Errorw("Failed to restore incident recipients from the database", zap.Error(err)) return err diff --git a/internal/incident/incidents.go b/internal/incident/incidents.go index 90497088..ba813f94 100644 --- a/internal/incident/incidents.go +++ b/internal/incident/incidents.go @@ -4,8 +4,6 @@ import ( "context" "fmt" "github.com/icinga/icinga-go-library/com" - "github.com/icinga/icinga-go-library/database" - "github.com/icinga/icinga-go-library/logging" baseEv "github.com/icinga/icinga-go-library/notifications/event" "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-notifications/internal/config" @@ -27,16 +25,14 @@ var ( // LoadOpenIncidents loads all active (not yet closed) incidents from the database and restores all their states. // Returns error on any database failure. -func LoadOpenIncidents(ctx context.Context, db *database.DB, logger *logging.Logger, runtimeConfig *config.RuntimeConfig) error { - logger.Info("Loading all active incidents from database") - +func LoadOpenIncidents(ctx context.Context, resources *config.Resources) error { g, ctx := errgroup.WithContext(ctx) incidents := make(chan *Incident) g.Go(func() error { defer close(incidents) - rows, err := db.QueryxContext(ctx, db.BuildSelectStmt(new(Incident), new(Incident))+` WHERE "recovered_at" IS NULL`) + rows, err := resources.DB.QueryxContext(ctx, resources.DB.BuildSelectStmt(new(Incident), new(Incident))+` WHERE "recovered_at" IS NULL`) if err != nil { return err } @@ -46,7 +42,7 @@ func LoadOpenIncidents(ctx context.Context, db *database.DB, logger *logging.Log defer func() { _ = rows.Close() }() for rows.Next() { - i := NewIncident(db, nil, runtimeConfig, nil) + i := NewIncident(nil, resources) if err := rows.StructScan(i); err != nil { return err } @@ -62,7 +58,7 @@ func LoadOpenIncidents(ctx context.Context, db *database.DB, logger *logging.Log }) g.Go(func() error { - bulks := com.Bulk(ctx, incidents, db.Options.MaxPlaceholdersPerStatement, com.NeverSplit[*Incident]) + bulks := com.Bulk(ctx, incidents, resources.DB.Options.MaxPlaceholdersPerStatement, com.NeverSplit[*Incident]) for { select { @@ -89,20 +85,20 @@ func LoadOpenIncidents(ctx context.Context, db *database.DB, logger *logging.Log } // Restore all incident objects matching the given object ids - if err := object.RestoreObjects(ctx, db, objectIds); err != nil { + if err := object.RestoreObjects(ctx, resources.DB, objectIds); err != nil { return err } // Restore all escalation states and incident rules matching the given incident ids. - err := utils.ForEachRow[EscalationState](ctx, db, "incident_id", incidentIds, func(state *EscalationState) { + err := utils.ForEachRow[EscalationState](ctx, resources.DB, "incident_id", incidentIds, func(state *EscalationState) { i := incidentsById[state.IncidentID] i.EscalationState[state.RuleEscalationID] = state // Restore the incident rule matching the current escalation state if any. - i.runtimeConfig.RLock() - defer i.runtimeConfig.RUnlock() + i.RuntimeConfig.RLock() + defer i.RuntimeConfig.RUnlock() - escalation := i.runtimeConfig.GetRuleEscalation(state.RuleEscalationID) + escalation := i.RuntimeConfig.GetRuleEscalation(state.RuleEscalationID) if escalation != nil { i.Rules[escalation.RuleID] = struct{}{} } @@ -112,7 +108,7 @@ func LoadOpenIncidents(ctx context.Context, db *database.DB, logger *logging.Log } // Restore incident recipients matching the given incident ids. - err = utils.ForEachRow[ContactRow](ctx, db, "incident_id", incidentIds, func(c *ContactRow) { + err = utils.ForEachRow[ContactRow](ctx, resources.DB, "incident_id", incidentIds, func(c *ContactRow) { incidentsById[c.IncidentID].Recipients[c.Key] = &RecipientState{Role: c.Role} }) if err != nil { @@ -122,7 +118,7 @@ func LoadOpenIncidents(ctx context.Context, db *database.DB, logger *logging.Log for _, i := range incidentsById { i.Object = object.GetFromCache(i.ObjectID) i.isMuted = i.Object.IsMuted() - i.logger = logger.With(zap.String("object", i.Object.DisplayName()), + i.logger = i.logger.With(zap.String("object", i.Object.DisplayName()), zap.String("incident", i.String())) currentIncidentsMu.Lock() @@ -147,18 +143,14 @@ func LoadOpenIncidents(ctx context.Context, db *database.DB, logger *logging.Log return g.Wait() } -func GetCurrent( - ctx context.Context, db *database.DB, obj *object.Object, logger *logging.Logger, runtimeConfig *config.RuntimeConfig, - create bool, -) (*Incident, error) { +func GetCurrent(ctx context.Context, obj *object.Object, resources *config.Resources, create bool) (*Incident, error) { currentIncidentsMu.Lock() defer currentIncidentsMu.Unlock() currentIncident := currentIncidents[obj] if currentIncident == nil && create { - incidentLogger := logger.With(zap.String("object", obj.DisplayName())) - currentIncident = NewIncident(db, obj, runtimeConfig, incidentLogger) + currentIncident = NewIncident(obj, resources) currentIncidents[obj] = currentIncident } @@ -206,31 +198,19 @@ func GetCurrentIncidents() map[int64]*Incident { // checks, it calls the Incident.ProcessEvent method. // // The returned error might be wrapped around event.ErrSuperfluousStateChange. -func ProcessEvent( - ctx context.Context, - db *database.DB, - logs *logging.Logging, - runtimeConfig *config.RuntimeConfig, - ev *event.Event, -) error { +func ProcessEvent(ctx context.Context, resources *config.Resources, ev *event.Event) error { var wasObjectMuted bool if obj := object.GetFromCache(object.ID(ev.SourceId, ev.Tags)); obj != nil { wasObjectMuted = obj.IsMuted() } - obj, err := object.FromEvent(ctx, db, ev) + obj, err := object.FromEvent(ctx, resources.DB, ev) if err != nil { return fmt.Errorf("cannot sync event object: %w", err) } createIncident := ev.Severity != baseEv.SeverityNone && ev.Severity != baseEv.SeverityOK - currentIncident, err := GetCurrent( - ctx, - db, - obj, - logs.GetChildLogger("incident"), - runtimeConfig, - createIncident) + currentIncident, err := GetCurrent(ctx, obj, resources, createIncident) if err != nil { return fmt.Errorf("cannot get current incident for %q: %w", obj.DisplayName(), err) } @@ -248,7 +228,7 @@ func ProcessEvent( } // There is no active incident, but the event appears to be relevant, so try to persist it in the DB. - err = db.ExecTx(ctx, func(ctx context.Context, tx *sqlx.Tx) error { return ev.Sync(ctx, tx, db, obj.ID) }) + err = resources.DB.ExecTx(ctx, func(ctx context.Context, tx *sqlx.Tx) error { return ev.Sync(ctx, tx, resources.DB, obj.ID) }) if err != nil { return errors.New("cannot sync non-state event to the database") } diff --git a/internal/incident/incidents_test.go b/internal/incident/incidents_test.go index fd06b7ed..65d482a1 100644 --- a/internal/incident/incidents_test.go +++ b/internal/incident/incidents_test.go @@ -13,7 +13,7 @@ import ( "github.com/jmoiron/sqlx" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap/zaptest" + "go.uber.org/zap" "testing" "time" ) @@ -21,6 +21,10 @@ import ( func TestLoadOpenIncidents(t *testing.T) { ctx := context.Background() db := testutils.GetTestDB(ctx, t) + logs := logging.NewLoggingWithFactory("testing", zap.DebugLevel, time.Hour, testutils.NewTestLoggerFactory(t)) + resources := config.MakeResources(nil, nil, db, logs) + runtimeC := config.NewRuntimeConfig(resources) + resources.RuntimeConfig = runtimeC // Insert a dummy source for our test cases! source := &config.Source{ @@ -50,12 +54,12 @@ func TestLoadOpenIncidents(t *testing.T) { testData := make(map[string]*Incident, 10*db.Options.MaxPlaceholdersPerStatement) for j := 1; j <= 10*db.Options.MaxPlaceholdersPerStatement; j++ { - i := makeIncident(ctx, db, t, source.ID, false) + i := makeIncident(ctx, t, resources, source.ID, false) testData[i.ObjectID.String()] = i } t.Run("WithNoRecoveredIncidents", func(t *testing.T) { - assertIncidents(ctx, db, t, testData) + assertIncidents(ctx, t, resources, testData) }) t.Run("WithSomeRecoveredIncidents", func(t *testing.T) { @@ -82,16 +86,16 @@ func TestLoadOpenIncidents(t *testing.T) { for j := 1; j <= db.Options.MaxPlaceholdersPerStatement/2; j++ { // We don't need to cache recovered incidents in memory. - _ = makeIncident(ctx, db, t, source.ID, true) + _ = makeIncident(ctx, t, resources, source.ID, true) if j%2 == 0 { // Add some extra new not recovered incidents to fully simulate a daemon reload. - i := makeIncident(ctx, db, t, source.ID, false) + i := makeIncident(ctx, t, resources, source.ID, false) testData[i.ObjectID.String()] = i } } - assertIncidents(ctx, db, t, testData) + assertIncidents(ctx, t, resources, testData) }) } @@ -99,9 +103,7 @@ func TestLoadOpenIncidents(t *testing.T) { // // The incident loading process is limited to a maximum duration of 10 seconds and will be // aborted and causes the entire test suite to fail immediately, if it takes longer. -func assertIncidents(ctx context.Context, db *database.DB, t *testing.T, testData map[string]*Incident) { - logger := logging.NewLogger(zaptest.NewLogger(t).Sugar(), time.Hour) - +func assertIncidents(ctx context.Context, t *testing.T, resources *config.Resources, testData map[string]*Incident) { // Since we have been using object.FromEvent() to persist the test objects to the database, // these will be automatically added to the objects cache as well. So clear the cache before // reloading the incidents, otherwise it will panic in object.RestoreObjects(). @@ -112,7 +114,7 @@ func assertIncidents(ctx context.Context, db *database.DB, t *testing.T, testDat ctx, cancelFunc := context.WithDeadline(ctx, time.Now().Add(10*time.Second)) defer cancelFunc() - err := LoadOpenIncidents(ctx, db, logger, &config.RuntimeConfig{}) + err := LoadOpenIncidents(ctx, resources) require.NoError(t, err, "failed to load not recovered incidents") incidents := GetCurrentIncidents() @@ -145,7 +147,7 @@ func assertIncidents(ctx context.Context, db *database.DB, t *testing.T, testDat // This will firstly create and synchronise a new object from a freshly generated dummy event with distinct // tags and name, and ensures that no error is returned, otherwise it will cause the entire test suite to fail. // Once the object has been successfully synchronised, an incident is created and synced with the database. -func makeIncident(ctx context.Context, db *database.DB, t *testing.T, sourceID int64, recovered bool) *Incident { +func makeIncident(ctx context.Context, t *testing.T, resources *config.Resources, sourceID int64, recovered bool) *Incident { ev := &event.Event{ Time: time.Time{}, SourceId: sourceID, @@ -162,10 +164,10 @@ func makeIncident(ctx context.Context, db *database.DB, t *testing.T, sourceID i }, } - o, err := object.FromEvent(ctx, db, ev) + o, err := object.FromEvent(ctx, resources.DB, ev) require.NoError(t, err) - i := NewIncident(db, o, &config.RuntimeConfig{}, nil) + i := NewIncident(o, resources) i.StartedAt = types.UnixMilli(time.Now().Add(-2 * time.Hour).Truncate(time.Second)) i.Severity = baseEv.SeverityCrit if recovered { @@ -173,7 +175,7 @@ func makeIncident(ctx context.Context, db *database.DB, t *testing.T, sourceID i i.RecoveredAt = types.UnixMilli(time.Now()) } - tx, err := db.BeginTxx(ctx, nil) + tx, err := resources.DB.BeginTxx(ctx, nil) require.NoError(t, err, "starting a transaction should not fail") require.NoError(t, i.Sync(ctx, tx), "failed to insert incident") require.NoError(t, tx.Commit(), "committing a transaction should not fail") diff --git a/internal/incident/sync.go b/internal/incident/sync.go index 61bafd3d..fc308ff1 100644 --- a/internal/incident/sync.go +++ b/internal/incident/sync.go @@ -27,13 +27,13 @@ func (i *Incident) Upsert() interface{} { // Returns an error on db failure. func (i *Incident) Sync(ctx context.Context, tx *sqlx.Tx) error { if i.Id != 0 { - stmt, _ := i.db.BuildUpsertStmt(i) + stmt, _ := i.DB.BuildUpsertStmt(i) _, err := tx.NamedExecContext(ctx, stmt, i) if err != nil { return fmt.Errorf("failed to upsert incident: %w", err) } } else { - stmt := database.BuildInsertStmtWithout(i.db, i, "id") + stmt := database.BuildInsertStmtWithout(i.DB, i, "id") incidentId, err := database.InsertObtainID(ctx, tx, stmt, i) if err != nil { return err @@ -48,7 +48,7 @@ func (i *Incident) Sync(ctx context.Context, tx *sqlx.Tx) error { func (i *Incident) AddEscalationTriggered(ctx context.Context, tx *sqlx.Tx, state *EscalationState) error { state.IncidentID = i.Id - stmt, _ := i.db.BuildUpsertStmt(state) + stmt, _ := i.DB.BuildUpsertStmt(state) _, err := tx.NamedExecContext(ctx, stmt, state) return err @@ -57,7 +57,7 @@ func (i *Incident) AddEscalationTriggered(ctx context.Context, tx *sqlx.Tx, stat // AddEvent Inserts incident history record to the database and returns an error on db failure. func (i *Incident) AddEvent(ctx context.Context, tx *sqlx.Tx, ev *event.Event) error { ie := &EventRow{IncidentID: i.Id, EventID: ev.ID} - stmt, _ := i.db.BuildInsertStmt(ie) + stmt, _ := i.DB.BuildInsertStmt(ie) _, err := tx.NamedExecContext(ctx, stmt, ie) return err @@ -98,7 +98,7 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru OldRecipientRole: oldRole, } - if err := hr.Sync(ctx, i.db, tx); err != nil { + if err := hr.Sync(ctx, i.DB, tx); err != nil { i.logger.Errorw( "Failed to insert recipient role changed incident history", zap.Object("escalation", escalation), zap.String("recipients", r.String()), zap.Error(err), @@ -109,7 +109,7 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru cr.Role = state.Role } - stmt, _ := i.db.BuildUpsertStmt(cr) + stmt, _ := i.DB.BuildUpsertStmt(cr) _, err := tx.NamedExecContext(ctx, stmt, cr) if err != nil { i.logger.Errorw( @@ -127,7 +127,7 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru // Returns an error on database failure. func (i *Incident) AddRuleMatched(ctx context.Context, tx *sqlx.Tx, r *rule.Rule) error { rr := &RuleRow{IncidentID: i.Id, RuleID: r.ID} - stmt, _ := i.db.BuildUpsertStmt(rr) + stmt, _ := i.DB.BuildUpsertStmt(rr) _, err := tx.NamedExecContext(ctx, stmt, rr) return err @@ -159,7 +159,7 @@ func (i *Incident) generateNotifications( hr.NotificationState = NotificationStateSuppressed } - if err := hr.Sync(ctx, i.db, tx); err != nil { + if err := hr.Sync(ctx, i.DB, tx); err != nil { i.logger.Errorw("Failed to insert incident notification history", zap.String("contact", contact.FullName), zap.Bool("incident_muted", i.isMuted), diff --git a/internal/listener/listener.go b/internal/listener/listener.go index 683a5529..b91bdf3d 100644 --- a/internal/listener/listener.go +++ b/internal/listener/listener.go @@ -6,13 +6,11 @@ import ( "encoding/json" "errors" "fmt" - "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/logging" baseEv "github.com/icinga/icinga-go-library/notifications/event" baseSource "github.com/icinga/icinga-go-library/notifications/source" "github.com/icinga/icinga-notifications/internal" "github.com/icinga/icinga-notifications/internal/config" - "github.com/icinga/icinga-notifications/internal/daemon" "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/incident" "go.uber.org/zap" @@ -22,21 +20,15 @@ import ( ) type Listener struct { - db *database.DB - logger *logging.Logger - runtimeConfig *config.RuntimeConfig + *config.Resources // Embedding this struct to have easy access to all resources. - logs *logging.Logging - mux http.ServeMux + logger *logging.Logger + + mux http.ServeMux } -func NewListener(db *database.DB, runtimeConfig *config.RuntimeConfig, logs *logging.Logging) *Listener { - l := &Listener{ - db: db, - logger: logs.GetChildLogger("listener"), - logs: logs, - runtimeConfig: runtimeConfig, - } +func NewListener(resources *config.Resources) *Listener { + l := &Listener{Resources: resources, logger: resources.Logs.GetChildLogger("listener")} debugMux := http.NewServeMux() debugMux.HandleFunc("/dump-config", l.DumpConfig) @@ -61,7 +53,7 @@ func (l *Listener) ServeHTTP(rw http.ResponseWriter, req *http.Request) { // // An error is returned in every case except for a gracefully context-based shutdown without hitting the time limit. func (l *Listener) Run(ctx context.Context) error { - listenAddr := daemon.Config().Listen + listenAddr := l.DaemonConfig.Listen l.logger.Infof("Starting listener on http://%s", listenAddr) server := &http.Server{ Addr: listenAddr, @@ -90,7 +82,7 @@ func (l *Listener) Run(ctx context.Context) error { // returned and 401 was written back to the response writer. func (l *Listener) sourceFromAuthOrAbort(w http.ResponseWriter, r *http.Request) (*config.Source, bool) { if authUser, authPass, authOk := r.BasicAuth(); authOk { - src := l.runtimeConfig.GetSourceFromCredentials(authUser, authPass, l.logger) + src := l.RuntimeConfig.GetSourceFromCredentials(authUser, authPass, l.logger) if src != nil { return src, true } @@ -138,7 +130,7 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, r *http.Request) { // If the client uses an outdated rules version, reject the request but also send the current rules version // and rules for this source back to the client, so it can retry the request with the updated rules. - if latestRuleVersion := l.runtimeConfig.GetRulesVersionFor(src.ID); ev.RulesVersion != latestRuleVersion { + if latestRuleVersion := l.RuntimeConfig.GetRulesVersionFor(src.ID); ev.RulesVersion != latestRuleVersion { w.WriteHeader(http.StatusPreconditionFailed) l.writeSourceRulesInfo(w, src) @@ -149,7 +141,7 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, r *http.Request) { return } - ev.CompleteURL(daemon.Config().Icingaweb2URL) + ev.CompleteURL(l.DaemonConfig.IcingaWeb2UrlParsed) ev.Time = time.Now() ev.SourceId = src.ID if ev.Type == baseEv.TypeUnknown { @@ -166,7 +158,7 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, r *http.Request) { } l.logger.Infow("Processing event", zap.String("event", ev.String())) - err := incident.ProcessEvent(context.Background(), l.db, l.logs, l.runtimeConfig, &ev) + err := incident.ProcessEvent(context.Background(), l.Resources, &ev) if errors.Is(err, event.ErrSuperfluousStateChange) || errors.Is(err, event.ErrSuperfluousMuteUnmuteEvent) { abort(http.StatusNotAcceptable, &ev, "%v", err) return @@ -187,7 +179,7 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, r *http.Request) { // configured or the supplied password is incorrect, it sends an error code and does not redirect the request. func (l *Listener) requireDebugAuth(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - expectedPassword := daemon.Config().DebugPassword + expectedPassword := l.DaemonConfig.DebugPassword if expectedPassword == "" { w.WriteHeader(http.StatusForbidden) _, _ = fmt.Fprintln(w, "config dump disabled, no debug-password set in config") @@ -220,7 +212,7 @@ func (l *Listener) DumpConfig(w http.ResponseWriter, r *http.Request) { enc := json.NewEncoder(w) enc.SetIndent("", " ") - _ = enc.Encode(&l.runtimeConfig.ConfigSet) + _ = enc.Encode(&l.RuntimeConfig.ConfigSet) } // DumpIncidents is used as /debug prefixed endpoint to dump all incidents. The authorization has to be done beforehand. @@ -268,10 +260,10 @@ func (l *Listener) DumpSchedules(w http.ResponseWriter, r *http.Request) { return } - l.runtimeConfig.RLock() - defer l.runtimeConfig.RUnlock() + l.RuntimeConfig.RLock() + defer l.RuntimeConfig.RUnlock() - for _, schedule := range l.runtimeConfig.Schedules { + for _, schedule := range l.RuntimeConfig.Schedules { _, _ = fmt.Fprintf(w, "[id=%d] %q:\n", schedule.ID, schedule.Name) // Iterate in 30 minute steps as this is the granularity Icinga Notifications Web allows in the configuration. @@ -294,12 +286,12 @@ func (l *Listener) DumpRules(w http.ResponseWriter, r *http.Request) { return } - l.runtimeConfig.RLock() - defer l.runtimeConfig.RUnlock() + l.RuntimeConfig.RLock() + defer l.RuntimeConfig.RUnlock() enc := json.NewEncoder(w) enc.SetIndent("", " ") - _ = enc.Encode(l.runtimeConfig.Rules) + _ = enc.Encode(l.RuntimeConfig.Rules) } // writeSourceRulesInfo writes the rules information for a specific source to the response writer. @@ -309,18 +301,18 @@ func (l *Listener) writeSourceRulesInfo(w http.ResponseWriter, source *config.So var rulesInfo baseSource.RulesInfo func() { // Use a function to ensure that the RLock and RUnlock are called before writing the response. - l.runtimeConfig.RLock() - defer l.runtimeConfig.RUnlock() + l.RuntimeConfig.RLock() + defer l.RuntimeConfig.RUnlock() - if sourceInfo, ok := l.runtimeConfig.RulesBySource[source.ID]; ok { + if sourceInfo, ok := l.RuntimeConfig.RulesBySource[source.ID]; ok { rulesInfo.Version = sourceInfo.Version.String() rulesInfo.Rules = make(map[string]string) for _, rID := range sourceInfo.RuleIDs { id := strconv.FormatInt(rID, 10) filterExpr := "" - if l.runtimeConfig.Rules[rID].ObjectFilterExpr.Valid { - filterExpr = l.runtimeConfig.Rules[rID].ObjectFilterExpr.String + if l.RuntimeConfig.Rules[rID].ObjectFilterExpr.Valid { + filterExpr = l.RuntimeConfig.Rules[rID].ObjectFilterExpr.String } rulesInfo.Rules[id] = filterExpr diff --git a/internal/testutils/testutils.go b/internal/testutils/testutils.go index 055c8277..eb5b95b3 100644 --- a/internal/testutils/testutils.go +++ b/internal/testutils/testutils.go @@ -8,6 +8,8 @@ import ( "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/logging" "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest" "os" "strconv" @@ -66,3 +68,11 @@ func MakeRandomString(t *testing.T) string { return fmt.Sprintf("%x", buf) } + +// NewTestLoggerFactory creates a new zap logger factory for testing purposes. +// It uses zaptest to create a logger that writes to the testing output. +func NewTestLoggerFactory(t *testing.T) logging.CoreFactory { + return func(level zap.AtomicLevel) zapcore.Core { + return zaptest.NewLogger(t, zaptest.Level(level.Level())).Core() + } +}