From 2244efe2ad747c913257e2ad7c2b7892c35d04d5 Mon Sep 17 00:00:00 2001 From: Chris Randles Date: Sat, 21 Feb 2026 10:25:44 -0500 Subject: [PATCH] feat: [alb] add concurrency limit options Signed-off-by: Chris Randles --- docs/alb.md | 5 +- examples/conf/example.full.yaml | 10 +-- pkg/backends/alb/alb_502_test.go | 1 - pkg/backends/alb/mech/fr/first_response.go | 46 +++++------- .../alb/mech/nlm/newest_last_modified.go | 21 ++++-- .../alb/mech/tsm/time_series_merge.go | 73 ++++++++++++------- pkg/backends/alb/options/options.go | 56 +++++++++++++- pkg/backends/alb/options/options_data_test.go | 3 +- pkg/config/config_test.go | 7 +- 9 files changed, 149 insertions(+), 73 deletions(-) diff --git a/docs/alb.md b/docs/alb.md index c71b948eb..fa0894dcb 100644 --- a/docs/alb.md +++ b/docs/alb.md @@ -193,7 +193,7 @@ This mechanism is useful in applications such as live internet television. Consi #### Custom Good Status Codes List -By default, fgr will return the first response with a status code < 400. However, you can optionally provide an explicit list of good status codes using the `fgr_status_codes` configuration setting, as shown in the example below. When set, Trickster will return the first response to be returned that has a status code found in the configured list. +By default, fgr will return the first response with a status code < 400. However, you can optionally provide an explicit list of good status codes using the `fgr.status_codes` configuration setting, as shown in the example below. When set, Trickster will return the first response to be returned that has a status code found in the configured list. #### First Good Response Configuration Example @@ -216,10 +216,11 @@ backends: provider: alb alb: mechanism: fgr # first good response - fgr_status_codes: [ 200, 201, 204 ] # only consider these codes when selecting a response pool: - node01 - node02 + fgr: + status_codes: [ 200, 201, 204 ] # only consider these codes when selecting a response ``` Here is the visual representation of this configuration: diff --git a/examples/conf/example.full.yaml b/examples/conf/example.full.yaml index f7b55b85a..87d2a29c0 100644 --- a/examples/conf/example.full.yaml +++ b/examples/conf/example.full.yaml @@ -557,11 +557,11 @@ backends: # # -1 includes all backends, regardless of reporting state # # default is 0 # healthy_floor: 0 - -# # fgr_status_codes is a list of status codes considered 'good' when using the fgr mechanism -# # when this is not set, any response code < 400 is considered good. Use this setting to -# # provide an explicit list. -# fgr_status_codes: [ 200 ] # this would consider only 200 OK's good, and not 204, 302, etc. +# fgr: # First Good Response mechanism options, only applicable when mechanism is set to fgr +# # status_codes is a list of status codes considered 'good' when using the fgr mechanism +# # when this is not set, any response code < 400 is considered good. Use this setting to +# # provide an explicit list. +# status_codes: [ 200 ] # this would consider only 200 OK's good, and not 204, 302, etc. # # Configuration Options for Request Routing Rules - see /docs/rule.md for more information diff --git a/pkg/backends/alb/alb_502_test.go b/pkg/backends/alb/alb_502_test.go index 7a6fa0e6e..7babb7a71 100644 --- a/pkg/backends/alb/alb_502_test.go +++ b/pkg/backends/alb/alb_502_test.go @@ -23,7 +23,6 @@ import ( "time" "github.com/stretchr/testify/require" - "github.com/trickstercache/trickster/v2/pkg/backends/alb/mech/tsm" "github.com/trickstercache/trickster/v2/pkg/backends/alb/names" ao "github.com/trickstercache/trickster/v2/pkg/backends/alb/options" diff --git a/pkg/backends/alb/mech/fr/first_response.go b/pkg/backends/alb/mech/fr/first_response.go index 60cded5c1..0f74ec7f5 100644 --- a/pkg/backends/alb/mech/fr/first_response.go +++ b/pkg/backends/alb/mech/fr/first_response.go @@ -19,7 +19,6 @@ package fr import ( "context" "net/http" - "sync" "sync/atomic" "github.com/trickstercache/trickster/v2/pkg/backends/alb/mech/types" @@ -32,6 +31,7 @@ import ( "github.com/trickstercache/trickster/v2/pkg/proxy/request" "github.com/trickstercache/trickster/v2/pkg/proxy/response/capture" "github.com/trickstercache/trickster/v2/pkg/util/sets" + "golang.org/x/sync/errgroup" ) const ( @@ -46,6 +46,7 @@ type handler struct { pool pool.Pool fgr bool fgrCodes sets.Set[int] + options options.FirstGoodResponseOptions } func RegistryEntry() types.RegistryEntry { @@ -60,6 +61,7 @@ func NewFGR(o *options.Options, _ rt.Lookup) (types.Mechanism, error) { return &handler{ fgr: true, fgrCodes: o.FgrCodesLookup, + options: o.FGROptions, }, nil } @@ -109,13 +111,11 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // otherwise iterate the fanout var claimed int64 = -1 - contexts := make([]context.Context, l) - cancels := make([]context.CancelFunc, l) - for i := range l { - contexts[i], cancels[i] = context.WithCancel(r.Context()) - } captures := make([]*capture.CaptureResponseWriter, l) - var wg sync.WaitGroup + var eg errgroup.Group + if limit := h.options.ConcurrencyOptions.GetQueryConcurrencyLimit(); limit > 0 { + eg.SetLimit(limit) + } responseWritten := make(chan struct{}, 1) serve := func(crw *capture.CaptureResponseWriter) { @@ -126,26 +126,16 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { responseWritten <- struct{}{} } - serveAndCancelOthers := func(i int, crw *capture.CaptureResponseWriter) { - go func() { - // cancels all other contexts - for j, cancel := range cancels { - if j != i { - cancel() - } - } - }() - serve(crw) - } - // fanout to all healthy targets + ctx, cancel := context.WithCancel(r.Context()) + defer cancel() for i := range l { if hl[i] == nil { continue } - wg.Go(func() { + eg.Go(func() error { r2, _ := request.Clone(r) - r2 = r2.WithContext(contexts[i]) + r2 = r2.WithContext(ctx) r2 = request.SetResources(r2, &request.Resources{Cancelable: true}) crw := capture.NewCaptureResponseWriter() captures[i] = crw @@ -153,20 +143,20 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { statusCode := crw.StatusCode() custom := h.fgr && len(h.fgrCodes) > 0 isGood := custom && h.fgrCodes.Contains(statusCode) - // this checks if the response qualifies as a client response - if (!h.fgr || (!custom && statusCode < 400) || isGood) && - // this checks that the qualifying response is the first response - atomic.CompareAndSwapInt64(&claimed, -1, int64(i)) { - // this serves only the first qualifying response - serveAndCancelOthers(i, crw) + + if (!h.fgr || (!custom && statusCode < 400) || isGood) && // this checks if the response qualifies as a client response + atomic.CompareAndSwapInt64(&claimed, -1, int64(i)) { // this checks that the qualifying response is the first response + serve(crw) + cancel() } + return nil }) } // this is a fallback case for when no qualifying upstream response arrives, // the first response is used, regardless of qualification go func() { - wg.Wait() + eg.Wait() // if claimed is still -1, the fallback case must be used if atomic.CompareAndSwapInt64(&claimed, -1, -2) && r.Context().Err() == nil { // this iterates the captures and serves the first non-nil response diff --git a/pkg/backends/alb/mech/nlm/newest_last_modified.go b/pkg/backends/alb/mech/nlm/newest_last_modified.go index 466d3f5ac..1867cab6f 100644 --- a/pkg/backends/alb/mech/nlm/newest_last_modified.go +++ b/pkg/backends/alb/mech/nlm/newest_last_modified.go @@ -30,6 +30,7 @@ import ( "github.com/trickstercache/trickster/v2/pkg/proxy/headers" "github.com/trickstercache/trickster/v2/pkg/proxy/request" "github.com/trickstercache/trickster/v2/pkg/proxy/response/capture" + "golang.org/x/sync/errgroup" ) const ( @@ -38,15 +39,18 @@ const ( ) type handler struct { - pool pool.Pool + pool pool.Pool + options options.NewestLastModifiedOptions } func RegistryEntry() types.RegistryEntry { return types.RegistryEntry{ID: ID, Name: Name, ShortName: names.MechanismNLM, New: New} } -func New(_ *options.Options, _ rt.Lookup) (types.Mechanism, error) { - return &handler{}, nil +func New(o *options.Options, _ rt.Lookup) (types.Mechanism, error) { + return &handler{ + options: o.NLMOptions, + }, nil } func (h *handler) SetPool(p pool.Pool) { @@ -94,15 +98,17 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Capture all responses captures := make([]*capture.CaptureResponseWriter, l) - var wg sync.WaitGroup - + var eg errgroup.Group + if limit := h.options.ConcurrencyOptions.GetQueryConcurrencyLimit(); limit > 0 { + eg.SetLimit(limit) + } // Fanout to all healthy targets for i := range l { if hl[i] == nil { continue } idx := i - wg.Go(func() { + eg.Go(func() error { r2, _ := request.Clone(r) r2 = request.ClearResources(r2.WithContext(ctx)) crw := capture.NewCaptureResponseWriter() @@ -120,11 +126,12 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { mu.Unlock() } } + return nil }) } // Wait for all responses to complete - wg.Wait() + eg.Wait() // Write the response with the newest Last-Modified if newestIdx >= 0 && newestIdx < len(captures) && captures[newestIdx] != nil { diff --git a/pkg/backends/alb/mech/tsm/time_series_merge.go b/pkg/backends/alb/mech/tsm/time_series_merge.go index e024dff81..351944904 100644 --- a/pkg/backends/alb/mech/tsm/time_series_merge.go +++ b/pkg/backends/alb/mech/tsm/time_series_merge.go @@ -17,9 +17,9 @@ package tsm import ( + stderrors "errors" "net/http" "strings" - "sync" "github.com/trickstercache/trickster/v2/pkg/backends" "github.com/trickstercache/trickster/v2/pkg/backends/alb/errors" @@ -30,12 +30,14 @@ import ( "github.com/trickstercache/trickster/v2/pkg/backends/alb/pool" "github.com/trickstercache/trickster/v2/pkg/backends/providers" rt "github.com/trickstercache/trickster/v2/pkg/backends/providers/registry/types" + "github.com/trickstercache/trickster/v2/pkg/observability/logging" "github.com/trickstercache/trickster/v2/pkg/observability/logging/logger" "github.com/trickstercache/trickster/v2/pkg/proxy/handlers/trickster/failures" "github.com/trickstercache/trickster/v2/pkg/proxy/headers" "github.com/trickstercache/trickster/v2/pkg/proxy/request" "github.com/trickstercache/trickster/v2/pkg/proxy/response/capture" "github.com/trickstercache/trickster/v2/pkg/proxy/response/merge" + "golang.org/x/sync/errgroup" ) const ( @@ -49,6 +51,7 @@ type handler struct { mergePaths []string // paths handled by the alb client that are enabled for tsmerge nonmergeHandler types.Mechanism // when methodology is tsmerge, this handler is for non-mergeable paths outputFormat string // the provider output format (e.g., "prometheus") + tsmOptions options.TimeSeriesMergeOptions } func RegistryEntry() types.RegistryEntry { @@ -57,7 +60,10 @@ func RegistryEntry() types.RegistryEntry { func New(o *options.Options, factories rt.Lookup) (types.Mechanism, error) { nmh, _ := rr.New(nil, nil) - out := &handler{nonmergeHandler: nmh} + out := &handler{ + nonmergeHandler: nmh, + tsmOptions: o.TSMOptions, + } // this validates the merge configuration for the ALB client as it sets it up // First, verify the output format is a support merge provider if !providers.IsSupportedTimeSeriesMergeProvider(o.OutputFormat) { @@ -140,16 +146,23 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Scatter/Gather section accumulator := merge.NewAccumulator() - var wg sync.WaitGroup - var statusCode int - var statusHeader string - var mu sync.Mutex // protects statusCode and statusHeader + var eg errgroup.Group + if limit := h.tsmOptions.ConcurrencyOptions.GetQueryConcurrencyLimit(); limit > 0 { + eg.SetLimit(limit) + } + + type result struct { + statusCode int + header http.Header + mergeFunc merge.RespondFunc + } + results := make([]result, l) for i := range l { if hl[i] == nil { continue } - wg.Go(func() { + eg.Go(func() error { r2, _ := request.Clone(r) rsc2 := &request.Resources{IsMergeMember: true, TSReqestOptions: rsc.TSReqestOptions} r2 = request.SetResources(r2, rsc2) @@ -157,8 +170,7 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { hl[i].Handler().ServeHTTP(crw, r2) rsc2 = request.GetResources(r2) if rsc2 == nil { - logger.Warn("tsm gather failed due to nil resources", nil) - return + return stderrors.New("tsm gather failed due to nil resources") } // ensure merge functions are set on cloned request @@ -170,27 +182,38 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if rsc2.MergeFunc != nil && rsc2.TS != nil { rsc2.MergeFunc(accumulator, rsc2.TS, i) } - // update status code and headers (take best status code) - mu.Lock() - if mrf == nil { - mrf = rsc2.MergeRespondFunc - } - if crw.StatusCode() > 0 { - if statusCode == 0 || crw.StatusCode() < statusCode { - statusCode = crw.StatusCode() - } + results[i] = result{ + statusCode: crw.StatusCode(), + header: crw.Header(), + mergeFunc: rsc2.MergeRespondFunc, } - if crw.Header() != nil { - headers.StripMergeHeaders(crw.Header()) - statusHeader = headers.MergeResultHeaderVals(statusHeader, - crw.Header().Get(headers.NameTricksterResult)) - } - mu.Unlock() + return nil }) } // wait for all fanout requests to complete - wg.Wait() + if err := eg.Wait(); err != nil { + logger.Warn("tsm gather failure", logging.Pairs{"error": err}) + } + + // Aggregate results sequentially - no mutex contention + var statusCode int + var statusHeader string + for _, res := range results { + if mrf == nil { + mrf = res.mergeFunc + } + if res.statusCode > 0 { + if statusCode == 0 || res.statusCode < statusCode { + statusCode = res.statusCode + } + } + if res.header != nil { + headers.StripMergeHeaders(res.header) + statusHeader = headers.MergeResultHeaderVals(statusHeader, + res.header.Get(headers.NameTricksterResult)) + } + } // set aggregated status header if statusHeader != "" { diff --git a/pkg/backends/alb/options/options.go b/pkg/backends/alb/options/options.go index 406c47e8a..676a2cdd8 100644 --- a/pkg/backends/alb/options/options.go +++ b/pkg/backends/alb/options/options.go @@ -19,6 +19,7 @@ package options import ( "errors" "fmt" + "runtime" "slices" "strings" @@ -47,6 +48,7 @@ type Options struct { // OutputFormat accompanies the tsmerge Mechanism to indicate the provider output format // options include any valid time seres backend like prometheus, influxdb or clickhouse OutputFormat string `yaml:"output_format,omitempty"` + // Deprecated: use fgr.status_codes instead of this top-level option // FGRStatusCodes provides an explicit list of status codes considered "good" when using // the First Good Response (fgr) methodology. By default, any code < 400 is good. FGRStatusCodes []int `yaml:"fgr_status_codes,omitempty"` @@ -55,6 +57,52 @@ type Options struct { // // synthetic values FgrCodesLookup sets.Set[int] `yaml:"-"` + + // mechanism-specific options + TSMOptions TimeSeriesMergeOptions `yaml:"tsm,omitempty"` + NLMOptions NewestLastModifiedOptions `yaml:"nlm,omitempty"` + FGROptions FirstGoodResponseOptions `yaml:"fgr,omitempty"` +} + +type FirstGoodResponseOptions struct { + // StatusCodes provides an explicit list of status codes considered "good" when using + // the First Good Response (fgr) methodology. By default, any code < 400 is good. + StatusCodes []int `yaml:"status_codes,omitempty"` + ConcurrencyOptions ConcurrencyOptions `yaml:",inline"` +} + +type TimeSeriesMergeOptions struct { + ConcurrencyOptions ConcurrencyOptions `yaml:",inline"` +} + +type NewestLastModifiedOptions struct { + ConcurrencyOptions ConcurrencyOptions `yaml:",inline"` +} + +// Common concurrency options to apply to ALB mechanisms +type ConcurrencyOptions struct { + // QueryConcurrencyLimit defines the concurrency limit while querying backends for the given mechanism. + // If set to 0, no limit is applied, if set to a positive integer, that number of queries can be performed concurrently. + // If the value is not set, it defaults to the number of logical CPUs available to the process (GOMAXPROCS). + // Default value is GOMAXPROCS. + QueryConcurrencyLimit *int `yaml:"query_concurrency_limit,omitempty"` + + // QueryConcurrencyMultiplier is a multiplier that can be applied to the default concurrency limit. + // This multiplier is applied to the query_concurrency_limit value to result in the overall concurrency limit for the given mechanism. + // Default and minimum value is 1. + QueryConcurrencyMultiplier *int `yaml:"query_concurrency_multiplier,omitempty"` +} + +func (o *ConcurrencyOptions) GetQueryConcurrencyLimit() int { + multiplier := 1 + if o != nil && o.QueryConcurrencyMultiplier != nil && *o.QueryConcurrencyMultiplier > 1 { + multiplier = *o.QueryConcurrencyMultiplier + } + limit := runtime.GOMAXPROCS(0) + if o != nil && o.QueryConcurrencyLimit != nil { + limit = *o.QueryConcurrencyLimit + } + return limit * multiplier } // InvalidALBOptionsError is an error type for invalid ALB Options @@ -116,9 +164,13 @@ func (o *Options) Initialize(_ string) error { } switch o.MechanismName { case names.MechanismFGR: - if len(o.FGRStatusCodes) > 0 { + // apply deprecated top-level FGRStatusCodes to new FROptions level + if len(o.FGRStatusCodes) > 0 && len(o.FGROptions.StatusCodes) == 0 { + o.FGROptions.StatusCodes = o.FGRStatusCodes + } + if len(o.FGROptions.StatusCodes) > 0 { o.FgrCodesLookup = sets.NewIntSet() - o.FgrCodesLookup.SetAll(o.FGRStatusCodes) + o.FgrCodesLookup.SetAll(o.FGROptions.StatusCodes) } case names.MechanismTSM: if o.OutputFormat == "" { diff --git a/pkg/backends/alb/options/options_data_test.go b/pkg/backends/alb/options/options_data_test.go index 3efaa31e2..43ccad9d3 100644 --- a/pkg/backends/alb/options/options_data_test.go +++ b/pkg/backends/alb/options/options_data_test.go @@ -52,5 +52,6 @@ backends: test: alb: mechanism: fgr - fgr_status_codes: [200, 201] + fgr: + status_codes: [200, 201] ` diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 6613dda38..8f29b4936 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -274,8 +274,11 @@ func TestSetStalenessInfo(t *testing.T) { func TestConfig_defaulting(t *testing.T) { // test the overall defaulting logic for the entire trickster config, using // existing documentation examples as input - // NOTE: if adding new config fields, run tests with the UPDATE_GOLDENS=true - // environment variable to update the golden files with the new default values. + t.Cleanup(func() { + if t.Failed() { + t.Log("Config defaulting test failed, if adding new config fields please run tests with UPDATE_GOLDENS=true to update the golden files with the new default values.") + } + }) entries, err := os.ReadDir("../../examples/conf") require.NoError(t, err)