diff --git a/.appveyor.yml b/.appveyor.yml index f93914dc05..0a97c84a6f 100644 --- a/.appveyor.yml +++ b/.appveyor.yml @@ -16,7 +16,7 @@ cache: environment: GOPATH: c:\gopath GOROOT: c:\Program Files\Go - GOVERSION: 1.21.11 + GOVERSION: 1.23.4 GO111MODULE: 'on' GOPROXY: 'https://proxy.golang.org' @@ -28,7 +28,7 @@ environment: install: - rmdir c:\go /s /q - - appveyor DownloadFile https://storage.googleapis.com/golang/go%GOVERSION%.windows-%GOARCH%.msi + - appveyor DownloadFile https://go.dev/dl/go%GOVERSION%.windows-%GOARCH%.msi - msiexec /i go%GOVERSION%.windows-%GOARCH%.msi /q - echo %PATH% - echo %GOPATH% diff --git a/.github/workflows/static-check.yml b/.github/workflows/static-check.yml index 4c91b5ce7e..dbf82e8b72 100644 --- a/.github/workflows/static-check.yml +++ b/.github/workflows/static-check.yml @@ -12,17 +12,17 @@ jobs: staticcheck: name: staticcheck (project) runs-on: ubuntu-latest + env: + GO_VERSION: 1.23.4 steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: fetch-depth: 1 - uses: WillAbides/setup-go-faster@v1.14.0 with: - go-version: "1.21.x" + go-version: "1.23.x" - uses: dominikh/staticcheck-action@v1.3.1 with: version: "2025.1.1" install-go: false - cache-key: "1.21.x" -env: - GO_VERSION: 1.21.3 + cache-key: "1.23.x" \ No newline at end of file diff --git a/CHANGELOG-6.md b/CHANGELOG-6.md index 1e258bd06f..37fb15f353 100644 --- a/CHANGELOG-6.md +++ b/CHANGELOG-6.md @@ -5,13 +5,22 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). - ## [Unreleased] ### Added +- Added `continue_on_error` field to `core/v2.Pipeline`. + This flag controls whether pipeline execution should continue when an error occurs in a handler, filter, mutator, or during asset resolution. + Defaults to `false`, preserving existing behavior. - Added `ttl_status` field to `Check` and `CheckConfig` to configure the status (warning or critical) for TTL failure events. This field accepts values 1 (warning) or 2 (critical), with warning (1) as the default to maintain backward compatibility. - Added Prometheus metrics collection for HTTP API requests, including request count (`sensu_go_http_requests_total`), request duration (`sensu_go_http_request_duration_seconds`), and client error count (`sensu_go_http_client_errors_total`) for better observability of backend API performance. +### Changed +- Updated `GetAssets` method to return an error when one or more required assets are missing. + When `continue_on_error` is `false`, missing assets or other errors will stop the pipeline execution (existing behavior). + When set to `true`, the pipeline will continue executing subsequent workflows until one succeeds. +- Modified handler execution behavior to return a non-zero status code on failure. + Previously, handler errors were only logged without affecting pipeline. + ## [6.13.1] - 2025-05-28 ### Fixed diff --git a/agent/check_handler.go b/agent/check_handler.go index e9677beb2f..1e656362fc 100644 --- a/agent/check_handler.go +++ b/agent/check_handler.go @@ -175,6 +175,7 @@ func (a *Agent) executeCheck(ctx context.Context, request *corev2.CheckRequest, event.Check.Executed = time.Now().Unix() event.Check.Issued = request.Issued event.Pipelines = checkConfig.Pipelines + event.FallbackPipeline = checkConfig.FallbackPipeline // To guard against publishing sensitive/redacted client attribute values // the original command value is reinstated. diff --git a/asset/store.go b/asset/store.go index 2b3c99ce0e..129dfe099b 100644 --- a/asset/store.go +++ b/asset/store.go @@ -5,16 +5,19 @@ import ( "github.com/sensu/sensu-go/backend/store" "github.com/sensu/sensu-go/types" + "go.uber.org/multierr" ) // GetAssets retrieves all Assets from the store if contained in the list of asset names -func GetAssets(ctx context.Context, store store.Store, assetList []string) []types.Asset { +func GetAssets(ctx context.Context, store store.Store, assetList []string) ([]types.Asset, error) { assets := []types.Asset{} + var errors error for _, assetName := range assetList { asset, err := store.GetAssetByName(ctx, assetName) if err != nil { logger.WithField("asset", assetName).WithError(err).Error("error fetching asset from store") + errors = multierr.Append(errors, err) } else if asset == nil { logger.WithField("asset", assetName).Info("asset does not exist") } else { @@ -22,5 +25,5 @@ func GetAssets(ctx context.Context, store store.Store, assetList []string) []typ } } - return assets + return assets, errors } diff --git a/asset/store_test.go b/asset/store_test.go index 71db3c8df7..66569d5eb5 100644 --- a/asset/store_test.go +++ b/asset/store_test.go @@ -56,7 +56,7 @@ func TestGetAssets(t *testing.T) { store.On("GetAssetByName", mock.Anything, "foo").Return(nilAsset, nil) store.On("GetAssetByName", mock.Anything, "bar").Return(nilAsset, errors.New("error")) - assets := GetAssets(context.Background(), store, tc.assetList) + assets, _ := GetAssets(context.Background(), store, tc.assetList) assert.EqualValues(t, tc.expectedAssets, assets) }) } diff --git a/backend/backend.go b/backend/backend.go index 1fe9ecf4ed..1b3eb31193 100644 --- a/backend/backend.go +++ b/backend/backend.go @@ -164,6 +164,7 @@ type Backend struct { PipelineAdapterV1 pipeline.AdapterV1 LicenseGetter licensing.Getter Bus messaging.MessageBus + CommonAdapters pipeline.CommonAdapter ctx context.Context runCtx context.Context @@ -422,6 +423,8 @@ func Initialize(ctx context.Context, config *Config) (*Backend, error) { return nil, fmt.Errorf("error initializing %s: %s", pipelineDaemon.Name(), err) } + pipelineDaemon.AddPipelineResourceGetter(&pipelined.PipelineResourceGetterImpl{}) + // Initialize PipelineAdapterV1 storeTimeout := 2 * time.Minute b.PipelineAdapterV1 = pipeline.AdapterV1{ @@ -439,13 +442,15 @@ func Initialize(ctx context.Context, config *Config) (*Backend, error) { isIncidentFilterAdapter := &filter.IsIncidentAdapter{} notSilencedFilterAdapter := &filter.NotSilencedAdapter{} - b.PipelineAdapterV1.FilterAdapters = []pipeline.FilterAdapter{ + b.CommonAdapters.FilterAdapters = []pipeline.FilterAdapter{ legacyFilterAdapter, hasMetricsFilterAdapter, isIncidentFilterAdapter, notSilencedFilterAdapter, } + b.PipelineAdapterV1.FilterAdapters = b.CommonAdapters.FilterAdapters + // Initialize PipelineAdapterV1 mutator adapters legacyMutatorAdapter := &mutator.LegacyAdapter{ AssetGetter: assetGetter, @@ -457,12 +462,14 @@ func Initialize(ctx context.Context, config *Config) (*Backend, error) { onlyCheckOutputMutatorAdapter := &mutator.OnlyCheckOutputAdapter{} jsonMutatorAdapter := &mutator.JSONAdapter{} - b.PipelineAdapterV1.MutatorAdapters = []pipeline.MutatorAdapter{ + b.CommonAdapters.MutatorAdapters = []pipeline.MutatorAdapter{ legacyMutatorAdapter, onlyCheckOutputMutatorAdapter, jsonMutatorAdapter, } + b.PipelineAdapterV1.MutatorAdapters = b.CommonAdapters.MutatorAdapters + // Initialize PipelineAdapterV1 handler adapters legacyHandlerAdapter := &handler.LegacyAdapter{ AssetGetter: assetGetter, @@ -473,9 +480,10 @@ func Initialize(ctx context.Context, config *Config) (*Backend, error) { StoreTimeout: storeTimeout, } - b.PipelineAdapterV1.HandlerAdapters = []pipeline.HandlerAdapter{ + b.CommonAdapters.HandlerAdapters = []pipeline.HandlerAdapter{ legacyHandlerAdapter, } + b.PipelineAdapterV1.HandlerAdapters = b.CommonAdapters.HandlerAdapters pipelineDaemon.AddAdapter(&b.PipelineAdapterV1) b.Daemons = append(b.Daemons, pipelineDaemon) diff --git a/backend/pipeline/adapter.go b/backend/pipeline/adapter.go index c1844dbd0a..1b8695b158 100644 --- a/backend/pipeline/adapter.go +++ b/backend/pipeline/adapter.go @@ -40,3 +40,9 @@ func (e *errNoLegacyHandlers) Error() string { } func (e *errNoLegacyHandlers) MisconfiguredPipeline() {} + +type CommonAdapter struct { + FilterAdapters []FilterAdapter + MutatorAdapters []MutatorAdapter + HandlerAdapters []HandlerAdapter +} diff --git a/backend/pipeline/adapterv1.go b/backend/pipeline/adapterv1.go index caea9e7c98..d9a87e7a30 100644 --- a/backend/pipeline/adapterv1.go +++ b/backend/pipeline/adapterv1.go @@ -12,6 +12,7 @@ import ( "github.com/sensu/sensu-go/backend/store" metricspkg "github.com/sensu/sensu-go/metrics" "github.com/sirupsen/logrus" + "go.uber.org/multierr" ) const ( @@ -174,6 +175,7 @@ func (a *AdapterV1) Run(ctx context.Context, ref *corev2.ResourceReference, reso return &ErrNoWorkflows{} } + var allErrors error for _, workflow := range pipeline.Workflows { ctx = context.WithValue(ctx, corev2.PipelineWorkflowKey, workflow.Name) @@ -182,9 +184,10 @@ func (a *AdapterV1) Run(ctx context.Context, ref *corev2.ResourceReference, reso // Process the event through the workflow filters filtered, err := a.processFilters(ctx, workflow.Filters, event) - if err != nil { + if err != nil && !pipeline.ContinueOnError { return err } + allErrors = multierr.Append(allErrors, err) if filtered { continue } @@ -200,20 +203,22 @@ func (a *AdapterV1) Run(ctx context.Context, ref *corev2.ResourceReference, reso // Process the event through the workflow mutator mutatedData, err := a.processMutator(ctx, workflow.Mutator, event) - if err != nil { + if err != nil && !pipeline.ContinueOnError { return err } + allErrors = multierr.Append(allErrors, err) // Process the event through the workflow handler handlerRequestsTotalCounter.Inc() err = a.processHandler(ctx, workflow.Handler, event, mutatedData) incrementCounter(workflow.Handler, err) - if err != nil { + if err != nil && !pipeline.ContinueOnError { return err } + allErrors = multierr.Append(allErrors, err) } - return nil + return allErrors } func incrementCounter(handler *corev2.ResourceReference, err error) { diff --git a/backend/pipeline/filter/legacy.go b/backend/pipeline/filter/legacy.go index e2f58f62c5..0b38b12a9f 100644 --- a/backend/pipeline/filter/legacy.go +++ b/backend/pipeline/filter/legacy.go @@ -89,7 +89,11 @@ func (l *LegacyAdapter) Filter(ctx context.Context, ref *corev2.ResourceReferenc // expressions against the event. The event is rejected // if the product of all expressions is true. ctx = corev2.SetContextFromResource(ctx, filter) - matchedAssets := asset.GetAssets(ctx, l.Store, filter.RuntimeAssets) + matchedAssets, errors := asset.GetAssets(ctx, l.Store, filter.RuntimeAssets) + if errors != nil { + logger.WithFields(fields).WithError(errors).Error("failed to retrieve assets for filter") + return false, errors + } assets, err := asset.GetAll(ctx, l.AssetGetter, matchedAssets) if err != nil { logger.WithFields(fields).WithError(err).Error("failed to retrieve assets for filter") diff --git a/backend/pipeline/handler.go b/backend/pipeline/handler.go index 6bcb7f53db..7dfb0f1ddc 100644 --- a/backend/pipeline/handler.go +++ b/backend/pipeline/handler.go @@ -64,3 +64,4 @@ func (a *AdapterV1) getHandlerAdapterForResource(ctx context.Context, ref *corev } return nil, fmt.Errorf("no handler adapters were found that can handle the resource: %s.%s = %s", ref.APIVersion, ref.Type, ref.Name) } + diff --git a/backend/pipeline/handler/legacy.go b/backend/pipeline/handler/legacy.go index aef06ba748..fe70aa55c0 100644 --- a/backend/pipeline/handler/legacy.go +++ b/backend/pipeline/handler/legacy.go @@ -88,6 +88,7 @@ func (l *LegacyAdapter) Handle(ctx context.Context, ref *corev2.ResourceReferenc logger.WithFields(fields).Info("event pipe handler executed") } else { logger.WithFields(fields).Error("event pipe handler returned non ok status code") + return fmt.Errorf("event pipe handler returned non ok status code: %d", result.Status) } case "tcp", "udp": err := l.socketHandler(ctx, handler, event, mutatedData) @@ -143,21 +144,16 @@ func (l *LegacyAdapter) pipeHandler(ctx context.Context, handler *corev2.Handler if len(handler.RuntimeAssets) != 0 { logger.WithFields(fields).Debug("fetching assets for handler") // Fetch and install all assets required for handler execution - // TODO: check for errors here once GetAssets() has been updated to - // return errors. - // See issue #4407: https://github.com/sensu/sensu-go/issues/4407 - matchedAssets := asset.GetAssets(ctx, l.Store, handler.RuntimeAssets) + matchedAssets, errors := asset.GetAssets(ctx, l.Store, handler.RuntimeAssets) + if errors != nil { + logger.WithFields(fields).WithError(errors).Error("failed to retrieve assets for handler") + return nil, errors + } assets, err := asset.GetAll(ctx, l.AssetGetter, matchedAssets) if err != nil { logger.WithFields(fields).WithError(err).Error("failed to retrieve assets for handler") - // TODO(jk): I think we should return an error here regardless of // nosemgrep:dgryski.semgrep-go.errtodo.err-todo - // the type of error. - // See issue #4407: https://github.com/sensu/sensu-go/issues/4407 - if _, ok := err.(*store.ErrInternal); ok { - // Fatal error - return nil, err - } + return nil, err } else { handlerExec.Env = environment.MergeEnvironments(os.Environ(), assets.Env(), handler.EnvVars, secrets) } diff --git a/backend/pipeline/mutator/legacy.go b/backend/pipeline/mutator/legacy.go index ce4ae08489..ceb39c5fda 100644 --- a/backend/pipeline/mutator/legacy.go +++ b/backend/pipeline/mutator/legacy.go @@ -88,7 +88,11 @@ func (l *LegacyAdapter) Mutate(ctx context.Context, ref *corev2.ResourceReferenc logger.WithFields(fields).Debug("fetching assets for mutator") // Fetch and install all assets required for handler execution - matchedAssets := asset.GetAssets(ctx, l.Store, mutator.RuntimeAssets) + matchedAssets, errors := asset.GetAssets(ctx, l.Store, mutator.RuntimeAssets) + if errors != nil { + logger.WithFields(fields).WithError(errors).Error("failed to retrieve assets for mutator") + return nil, errors + } var err error assets, err = asset.GetAll(ctx, l.AssetGetter, matchedAssets) diff --git a/backend/pipelined/pipelined.go b/backend/pipelined/pipelined.go index 9bd52f277f..36083a9554 100644 --- a/backend/pipelined/pipelined.go +++ b/backend/pipelined/pipelined.go @@ -44,17 +44,18 @@ var ( // handler configuration determines which Sensu filters and mutator // are used. type Pipelined struct { - stopping chan struct{} - running *atomic.Value - wg *sync.WaitGroup - errChan chan error - eventChan chan interface{} - subscription messaging.Subscription - bus messaging.MessageBus - workerCount int - store store.Store - storeTimeout time.Duration - adapters []pipeline.Adapter + stopping chan struct{} + running *atomic.Value + wg *sync.WaitGroup + errChan chan error + eventChan chan interface{} + subscription messaging.Subscription + bus messaging.MessageBus + workerCount int + store store.Store + storeTimeout time.Duration + adapters []pipeline.Adapter + pipelineGetter []PipelineResourceGetter } // Config configures a Pipelined. @@ -73,9 +74,43 @@ type Option func(*Pipelined) error // slice of Pipeline resource references. type PipelineGetter interface { GetPipelines() []*corev2.ResourceReference +} + +// PipelineResourceGetter defines an interface for any structures which can return a +// slice of Pipeline resource references. It also defines a Match method +// which is used to determine if the object is a match for the getter. +type PipelineResourceGetter interface { + Match(obj any) bool + Get(obj any) []*corev2.ResourceReference +} + +type PipelineLogGetter interface { LogFields(bool) map[string]interface{} } +// AddPipelineResourceGetter adds a PipelineResourceGetter to the Pipelined. +func (p *Pipelined) AddPipelineResourceGetter(getter PipelineResourceGetter) { + p.pipelineGetter = append(p.pipelineGetter, getter) +} + +// PipelineResourceGetterImpl is a default implementation of the PipelineResourceGetter +type PipelineResourceGetterImpl struct{} + +// Match checks if the object is a PipelineGetter. +func (p *PipelineResourceGetterImpl) Match(obj any) bool { + _, ok := obj.(PipelineGetter) + return ok +} + +// Get returns a slice of Pipeline resource references from the object if it +// implements the PipelineGetter interface. +func (p *PipelineResourceGetterImpl) Get(obj any) []*corev2.ResourceReference { + if event, ok := obj.(PipelineGetter); ok { + return event.GetPipelines() + } + return nil +} + // New creates a new Pipelined with supplied Options applied. func New(c Config, options ...Option) (*Pipelined, error) { if c.BufferSize == 0 { @@ -209,13 +244,19 @@ func (p *Pipelined) handleMessage(ctx context.Context, msg interface{}) (hadPipe Observe(float64(duration) / float64(time.Millisecond)) }() - getter, ok := msg.(PipelineGetter) + logGetter, ok := msg.(PipelineLogGetter) if !ok { - panic("message received was not a PipelineGetter") + panic("message received was not a PipelineLogGetter") } - fields := getter.LogFields(false) - pipelineRefs := getter.GetPipelines() + fields := logGetter.LogFields(false) + var pipelineRefs []*corev2.ResourceReference + + for _, getter := range p.pipelineGetter { + if getter.Match(msg) { + pipelineRefs = append(pipelineRefs, getter.Get(msg)...) + } + } // Add a legacy pipeline "reference" if msg is a // corev2.Event & has handlers. @@ -228,7 +269,7 @@ func (p *Pipelined) handleMessage(ctx context.Context, msg interface{}) (hadPipe } if len(pipelineRefs) == 0 { - logger.WithFields(fields).Info("no pipelines defined in resource") + logger.WithFields(fields).Info("no pipelines or fallback_pipeline defined in resource") return false, nil } diff --git a/go.mod b/go.mod index c0d85fca36..09dbf4d934 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,6 @@ go 1.22 toolchain go1.23.4 - require ( github.com/AlecAivazis/survey/v2 v2.2.14 github.com/ash2k/stager v0.0.0-20170622123058-6e9c7b0eacd4 // indirect @@ -86,14 +85,14 @@ require ( github.com/kr/pty v1.1.8 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/sensu/core/v2 v2.20.1-0.20251104042553-0603aa304a04 - github.com/sensu/core/v3 v3.9.0 + github.com/sensu/core/v3 v3.9.1-0.20251104042553-0603aa304a04 github.com/sensu/sensu-api-tools v0.2.1 github.com/sensu/sensu-go/types v0.13.0 github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 // indirect github.com/xiang90/probing v0.0.0-20221125231312-a49e3df8f510 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.20.0 // indirect - go.uber.org/multierr v1.11.0 // indirect + go.uber.org/multierr v1.11.0 google.golang.org/genproto v0.0.0-20231120223509-83a465c0220f // indirect google.golang.org/genproto/googleapis/api v0.0.0-20241202173237-19429a94021a // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a // indirect diff --git a/go.sum b/go.sum index 1985216dde..9a0efcd180 100644 --- a/go.sum +++ b/go.sum @@ -1575,12 +1575,13 @@ github.com/ruudk/golang-pdf417 v0.0.0-20201230142125-a7e3863a1245/go.mod h1:pQAZ github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/schollz/progressbar/v2 v2.13.2/go.mod h1:6YZjqdthH6SCZKv2rqGryrxPtfmRB/DWZxSMfCXPyD8= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= -github.com/sensu/core/v2 v2.20.0 h1:QJQbqZiXSQ2dUE8LUvwUcKaqxHjYYCQwJ1Mil0cap4U= github.com/sensu/core/v2 v2.20.0/go.mod h1:fzr3qioLAHyLsBfq9I6ELVjL01NUNjqmQx4kj2tbDLo= github.com/sensu/core/v2 v2.20.1-0.20251104042553-0603aa304a04 h1:pfmJDcFVu/cIlwEim5wExlLQuhCt0it4W8RCDNraWQg= github.com/sensu/core/v2 v2.20.1-0.20251104042553-0603aa304a04/go.mod h1:AjLvqZm/fv1hhWPJLrOZiwLvmWURrKwfevEZ/sAZgkE= github.com/sensu/core/v3 v3.9.0 h1:7wz4ILGwPbHHTT/8sKTtflBAgOuJgTaKhUlN4lyUdSY= github.com/sensu/core/v3 v3.9.0/go.mod h1:AuupbIzmjYdzEkaEdmpzP/ZgrqQgHTovXVejzyTN6u4= +github.com/sensu/core/v3 v3.9.1-0.20251104042553-0603aa304a04 h1:uwUSN9kh+uhdrAgO+AIbnuuII6DL1AkTTsPpYG+TebI= +github.com/sensu/core/v3 v3.9.1-0.20251104042553-0603aa304a04/go.mod h1:KWdeuxHsPczT5KF+/fLxDLsVwENnfDr6yx7DE3AQt4E= github.com/sensu/lasr v1.2.1 h1:4H1QfOrPkwYHMFE5qAI6GwKEFkcI1YRyjjWidz1MihQ= github.com/sensu/lasr v1.2.1/go.mod h1:VIMtIK67Bcef6dTfctRCBg8EY9M9TtCY9NEFT6Zw5xQ= github.com/sensu/sensu-api-tools v0.0.0-20221025205055-db03ae2f8099/go.mod h1:SNISS4OhwNSZI9/YKTQr1bghOEwed9ZT4v+ztKk1Mq0=