Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
da0b98e
added basic structure for new fallack pipelines's adapter
rakibhossainctr Jul 16, 2025
6aa33f2
configured backend with new fallback pipelines adapter
rakibhossainctr Jul 16, 2025
27d1e6d
configured pipelined to handle fallback pipeline with available adapter
rakibhossainctr Jul 16, 2025
9f4cded
added some basic method in fallback pipelines adapter
rakibhossainctr Jul 16, 2025
2714dfb
completed mutate, handle and filter method for fallback pipelines
rakibhossainctr Jul 17, 2025
3f77079
separated each file in it's correct location
rakibhossainctr Jul 17, 2025
2a6d3af
added basic structure code to retrieve fallback pipelines from etcd
rakibhossainctr Jul 18, 2025
bb97a27
added fallback pipelines fetching code from etcd
rakibhossainctr Jul 21, 2025
abb2d11
fixed pipelines return statement from getFallbackPipelinesFromStore
rakibhossainctr Jul 22, 2025
fdc1685
removed fallback pipeline adapter from sensu-go and stored common ada…
rakibhossainctr Jul 24, 2025
446fd83
changed pipeline getter pattern
rakibhossainctr Jul 24, 2025
88ab3b3
changed the panic statement
rakibhossainctr Jul 24, 2025
d4a20b6
deleted fallback pipelines store
rakibhossainctr Jul 24, 2025
a917bc1
removed traces from proxy store
rakibhossainctr Jul 24, 2025
b1d5e1d
changed the resource getter pattern
rakibhossainctr Jul 26, 2025
1819335
fixed logger in pipelined
rakibhossainctr Jul 29, 2025
89df026
fixed the fallback pipeline refer
rakibhossainctr Jul 29, 2025
73af2b3
added continue on error on existing pipeline and changed non-zero han…
rakibhossainctr Jul 30, 2025
9661edd
raised error on missing runtime asset
rakibhossainctr Jul 30, 2025
25c3d60
reviewed proper documentation
rakibhossainctr Aug 4, 2025
c967ef6
updating go.mod for testing purposes
rakibhossainctr Aug 5, 2025
e30f271
updating changelog
rakibhossainctr Oct 25, 2025
69cf327
updating go.mod after resolving merge conflict
rakibhossainctr Nov 4, 2025
38648c7
Merge branch 'develop/6' into fallback-experiment
rakibhossainctr Dec 5, 2025
b2050c1
updating core module in go.mod
rakibhossainctr Dec 5, 2025
9450fe3
updating change log
rakibhossainctr Dec 5, 2025
24cc478
static check workflow fix
rakibhossainctr Dec 5, 2025
71cd9d7
updating go version in appveyor
rakibhossainctr Feb 5, 2026
4ff59c7
updating go downloadable url for appveyor
rakibhossainctr Feb 5, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ cache:
environment:
GOPATH: c:\gopath
GOROOT: c:\Program Files\Go
GOVERSION: 1.21.11
GOVERSION: 1.23.4
GO111MODULE: 'on'
GOPROXY: 'https://proxy.golang.org'

Expand All @@ -28,7 +28,7 @@ environment:

install:
- rmdir c:\go /s /q
- appveyor DownloadFile https://storage.googleapis.com/golang/go%GOVERSION%.windows-%GOARCH%.msi
- appveyor DownloadFile https://go.dev/dl/go%GOVERSION%.windows-%GOARCH%.msi
- msiexec /i go%GOVERSION%.windows-%GOARCH%.msi /q
- echo %PATH%
- echo %GOPATH%
Expand Down
10 changes: 5 additions & 5 deletions .github/workflows/static-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ jobs:
staticcheck:
name: staticcheck (project)
runs-on: ubuntu-latest
env:
GO_VERSION: 1.23.4
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
with:
fetch-depth: 1
- uses: WillAbides/setup-go-faster@v1.14.0
with:
go-version: "1.21.x"
go-version: "1.23.x"
- uses: dominikh/staticcheck-action@v1.3.1
with:
version: "2025.1.1"
install-go: false
cache-key: "1.21.x"
env:
GO_VERSION: 1.21.3
cache-key: "1.23.x"
11 changes: 10 additions & 1 deletion CHANGELOG-6.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,22 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
and this project adheres to [Semantic
Versioning](http://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added
- Added `continue_on_error` field to `core/v2.Pipeline`.
This flag controls whether pipeline execution should continue when an error occurs in a handler, filter, mutator, or during asset resolution.
Defaults to `false`, preserving existing behavior.
- Added `ttl_status` field to `Check` and `CheckConfig` to configure the status (warning or critical) for TTL failure events. This field accepts values 1 (warning) or 2 (critical), with warning (1) as the default to maintain backward compatibility.
- Added Prometheus metrics collection for HTTP API requests, including request count (`sensu_go_http_requests_total`), request duration (`sensu_go_http_request_duration_seconds`), and client error count (`sensu_go_http_client_errors_total`) for better observability of backend API performance.

### Changed
- Updated `GetAssets` method to return an error when one or more required assets are missing.
When `continue_on_error` is `false`, missing assets or other errors will stop the pipeline execution (existing behavior).
When set to `true`, the pipeline will continue executing subsequent workflows until one succeeds.
- Modified handler execution behavior to return a non-zero status code on failure.
Previously, handler errors were only logged without affecting pipeline.

## [6.13.1] - 2025-05-28

### Fixed
Expand Down
1 change: 1 addition & 0 deletions agent/check_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func (a *Agent) executeCheck(ctx context.Context, request *corev2.CheckRequest,
event.Check.Executed = time.Now().Unix()
event.Check.Issued = request.Issued
event.Pipelines = checkConfig.Pipelines
event.FallbackPipeline = checkConfig.FallbackPipeline

// To guard against publishing sensitive/redacted client attribute values
// the original command value is reinstated.
Expand Down
7 changes: 5 additions & 2 deletions asset/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,25 @@ import (

"github.com/sensu/sensu-go/backend/store"
"github.com/sensu/sensu-go/types"
"go.uber.org/multierr"
)

// GetAssets retrieves all Assets from the store if contained in the list of asset names
func GetAssets(ctx context.Context, store store.Store, assetList []string) []types.Asset {
func GetAssets(ctx context.Context, store store.Store, assetList []string) ([]types.Asset, error) {
assets := []types.Asset{}

var errors error
for _, assetName := range assetList {
asset, err := store.GetAssetByName(ctx, assetName)
if err != nil {
logger.WithField("asset", assetName).WithError(err).Error("error fetching asset from store")
errors = multierr.Append(errors, err)
} else if asset == nil {
logger.WithField("asset", assetName).Info("asset does not exist")
} else {
assets = append(assets, *asset)
}
}

return assets
return assets, errors
}
2 changes: 1 addition & 1 deletion asset/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestGetAssets(t *testing.T) {
store.On("GetAssetByName", mock.Anything, "foo").Return(nilAsset, nil)
store.On("GetAssetByName", mock.Anything, "bar").Return(nilAsset, errors.New("error"))

assets := GetAssets(context.Background(), store, tc.assetList)
assets, _ := GetAssets(context.Background(), store, tc.assetList)
assert.EqualValues(t, tc.expectedAssets, assets)
})
}
Expand Down
14 changes: 11 additions & 3 deletions backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ type Backend struct {
PipelineAdapterV1 pipeline.AdapterV1
LicenseGetter licensing.Getter
Bus messaging.MessageBus
CommonAdapters pipeline.CommonAdapter

ctx context.Context
runCtx context.Context
Expand Down Expand Up @@ -422,6 +423,8 @@ func Initialize(ctx context.Context, config *Config) (*Backend, error) {
return nil, fmt.Errorf("error initializing %s: %s", pipelineDaemon.Name(), err)
}

pipelineDaemon.AddPipelineResourceGetter(&pipelined.PipelineResourceGetterImpl{})

// Initialize PipelineAdapterV1
storeTimeout := 2 * time.Minute
b.PipelineAdapterV1 = pipeline.AdapterV1{
Expand All @@ -439,13 +442,15 @@ func Initialize(ctx context.Context, config *Config) (*Backend, error) {
isIncidentFilterAdapter := &filter.IsIncidentAdapter{}
notSilencedFilterAdapter := &filter.NotSilencedAdapter{}

b.PipelineAdapterV1.FilterAdapters = []pipeline.FilterAdapter{
b.CommonAdapters.FilterAdapters = []pipeline.FilterAdapter{
legacyFilterAdapter,
hasMetricsFilterAdapter,
isIncidentFilterAdapter,
notSilencedFilterAdapter,
}

b.PipelineAdapterV1.FilterAdapters = b.CommonAdapters.FilterAdapters

// Initialize PipelineAdapterV1 mutator adapters
legacyMutatorAdapter := &mutator.LegacyAdapter{
AssetGetter: assetGetter,
Expand All @@ -457,12 +462,14 @@ func Initialize(ctx context.Context, config *Config) (*Backend, error) {
onlyCheckOutputMutatorAdapter := &mutator.OnlyCheckOutputAdapter{}
jsonMutatorAdapter := &mutator.JSONAdapter{}

b.PipelineAdapterV1.MutatorAdapters = []pipeline.MutatorAdapter{
b.CommonAdapters.MutatorAdapters = []pipeline.MutatorAdapter{
legacyMutatorAdapter,
onlyCheckOutputMutatorAdapter,
jsonMutatorAdapter,
}

b.PipelineAdapterV1.MutatorAdapters = b.CommonAdapters.MutatorAdapters

// Initialize PipelineAdapterV1 handler adapters
legacyHandlerAdapter := &handler.LegacyAdapter{
AssetGetter: assetGetter,
Expand All @@ -473,9 +480,10 @@ func Initialize(ctx context.Context, config *Config) (*Backend, error) {
StoreTimeout: storeTimeout,
}

b.PipelineAdapterV1.HandlerAdapters = []pipeline.HandlerAdapter{
b.CommonAdapters.HandlerAdapters = []pipeline.HandlerAdapter{
legacyHandlerAdapter,
}
b.PipelineAdapterV1.HandlerAdapters = b.CommonAdapters.HandlerAdapters

pipelineDaemon.AddAdapter(&b.PipelineAdapterV1)
b.Daemons = append(b.Daemons, pipelineDaemon)
Expand Down
6 changes: 6 additions & 0 deletions backend/pipeline/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,9 @@ func (e *errNoLegacyHandlers) Error() string {
}

func (e *errNoLegacyHandlers) MisconfiguredPipeline() {}

type CommonAdapter struct {
FilterAdapters []FilterAdapter
MutatorAdapters []MutatorAdapter
HandlerAdapters []HandlerAdapter
}
13 changes: 9 additions & 4 deletions backend/pipeline/adapterv1.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/sensu/sensu-go/backend/store"
metricspkg "github.com/sensu/sensu-go/metrics"
"github.com/sirupsen/logrus"
"go.uber.org/multierr"
)

const (
Expand Down Expand Up @@ -174,6 +175,7 @@ func (a *AdapterV1) Run(ctx context.Context, ref *corev2.ResourceReference, reso
return &ErrNoWorkflows{}
}

var allErrors error
for _, workflow := range pipeline.Workflows {
ctx = context.WithValue(ctx, corev2.PipelineWorkflowKey, workflow.Name)

Expand All @@ -182,9 +184,10 @@ func (a *AdapterV1) Run(ctx context.Context, ref *corev2.ResourceReference, reso

// Process the event through the workflow filters
filtered, err := a.processFilters(ctx, workflow.Filters, event)
if err != nil {
if err != nil && !pipeline.ContinueOnError {
return err
}
allErrors = multierr.Append(allErrors, err)
if filtered {
continue
}
Expand All @@ -200,20 +203,22 @@ func (a *AdapterV1) Run(ctx context.Context, ref *corev2.ResourceReference, reso

// Process the event through the workflow mutator
mutatedData, err := a.processMutator(ctx, workflow.Mutator, event)
if err != nil {
if err != nil && !pipeline.ContinueOnError {
return err
}
allErrors = multierr.Append(allErrors, err)

// Process the event through the workflow handler
handlerRequestsTotalCounter.Inc()
err = a.processHandler(ctx, workflow.Handler, event, mutatedData)
incrementCounter(workflow.Handler, err)
if err != nil {
if err != nil && !pipeline.ContinueOnError {
return err
}
allErrors = multierr.Append(allErrors, err)
}

return nil
return allErrors
}

func incrementCounter(handler *corev2.ResourceReference, err error) {
Expand Down
6 changes: 5 additions & 1 deletion backend/pipeline/filter/legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ func (l *LegacyAdapter) Filter(ctx context.Context, ref *corev2.ResourceReferenc
// expressions against the event. The event is rejected
// if the product of all expressions is true.
ctx = corev2.SetContextFromResource(ctx, filter)
matchedAssets := asset.GetAssets(ctx, l.Store, filter.RuntimeAssets)
matchedAssets, errors := asset.GetAssets(ctx, l.Store, filter.RuntimeAssets)
if errors != nil {
logger.WithFields(fields).WithError(errors).Error("failed to retrieve assets for filter")
return false, errors
}
assets, err := asset.GetAll(ctx, l.AssetGetter, matchedAssets)
if err != nil {
logger.WithFields(fields).WithError(err).Error("failed to retrieve assets for filter")
Expand Down
1 change: 1 addition & 0 deletions backend/pipeline/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,4 @@ func (a *AdapterV1) getHandlerAdapterForResource(ctx context.Context, ref *corev
}
return nil, fmt.Errorf("no handler adapters were found that can handle the resource: %s.%s = %s", ref.APIVersion, ref.Type, ref.Name)
}

18 changes: 7 additions & 11 deletions backend/pipeline/handler/legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func (l *LegacyAdapter) Handle(ctx context.Context, ref *corev2.ResourceReferenc
logger.WithFields(fields).Info("event pipe handler executed")
} else {
logger.WithFields(fields).Error("event pipe handler returned non ok status code")
return fmt.Errorf("event pipe handler returned non ok status code: %d", result.Status)
}
case "tcp", "udp":
err := l.socketHandler(ctx, handler, event, mutatedData)
Expand Down Expand Up @@ -143,21 +144,16 @@ func (l *LegacyAdapter) pipeHandler(ctx context.Context, handler *corev2.Handler
if len(handler.RuntimeAssets) != 0 {
logger.WithFields(fields).Debug("fetching assets for handler")
// Fetch and install all assets required for handler execution
// TODO: check for errors here once GetAssets() has been updated to
// return errors.
// See issue #4407: https://github.com/sensu/sensu-go/issues/4407
matchedAssets := asset.GetAssets(ctx, l.Store, handler.RuntimeAssets)
matchedAssets, errors := asset.GetAssets(ctx, l.Store, handler.RuntimeAssets)
if errors != nil {
logger.WithFields(fields).WithError(errors).Error("failed to retrieve assets for handler")
return nil, errors
}

assets, err := asset.GetAll(ctx, l.AssetGetter, matchedAssets)
if err != nil {
logger.WithFields(fields).WithError(err).Error("failed to retrieve assets for handler")
// TODO(jk): I think we should return an error here regardless of // nosemgrep:dgryski.semgrep-go.errtodo.err-todo
// the type of error.
// See issue #4407: https://github.com/sensu/sensu-go/issues/4407
if _, ok := err.(*store.ErrInternal); ok {
// Fatal error
return nil, err
}
return nil, err
} else {
handlerExec.Env = environment.MergeEnvironments(os.Environ(), assets.Env(), handler.EnvVars, secrets)
}
Expand Down
6 changes: 5 additions & 1 deletion backend/pipeline/mutator/legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ func (l *LegacyAdapter) Mutate(ctx context.Context, ref *corev2.ResourceReferenc
logger.WithFields(fields).Debug("fetching assets for mutator")

// Fetch and install all assets required for handler execution
matchedAssets := asset.GetAssets(ctx, l.Store, mutator.RuntimeAssets)
matchedAssets, errors := asset.GetAssets(ctx, l.Store, mutator.RuntimeAssets)
if errors != nil {
logger.WithFields(fields).WithError(errors).Error("failed to retrieve assets for mutator")
return nil, errors
}

var err error
assets, err = asset.GetAll(ctx, l.AssetGetter, matchedAssets)
Expand Down
Loading