Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/alb.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ This mechanism is useful in applications such as live internet television. Consi

#### Custom Good Status Codes List

By default, fgr will return the first response with a status code < 400. However, you can optionally provide an explicit list of good status codes using the `fgr_status_codes` configuration setting, as shown in the example below. When set, Trickster will return the first response to be returned that has a status code found in the configured list.
By default, fgr will return the first response with a status code < 400. However, you can optionally provide an explicit list of good status codes using the `fgr.status_codes` configuration setting, as shown in the example below. When set, Trickster will return the first response to be returned that has a status code found in the configured list.

#### First Good Response Configuration Example

Expand All @@ -216,10 +216,11 @@ backends:
provider: alb
alb:
mechanism: fgr # first good response
fgr_status_codes: [ 200, 201, 204 ] # only consider these codes when selecting a response
pool:
- node01
- node02
fgr:
status_codes: [ 200, 201, 204 ] # only consider these codes when selecting a response
```

Here is the visual representation of this configuration:
Expand Down
10 changes: 5 additions & 5 deletions examples/conf/example.full.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -557,11 +557,11 @@ backends:
# # -1 includes all backends, regardless of reporting state
# # default is 0
# healthy_floor: 0

# # fgr_status_codes is a list of status codes considered 'good' when using the fgr mechanism
# # when this is not set, any response code < 400 is considered good. Use this setting to
# # provide an explicit list.
# fgr_status_codes: [ 200 ] # this would consider only 200 OK's good, and not 204, 302, etc.
# fgr: # First Good Response mechanism options, only applicable when mechanism is set to fgr
# # status_codes is a list of status codes considered 'good' when using the fgr mechanism
# # when this is not set, any response code < 400 is considered good. Use this setting to
# # provide an explicit list.
# status_codes: [ 200 ] # this would consider only 200 OK's good, and not 204, 302, etc.

# # Configuration Options for Request Routing Rules - see /docs/rule.md for more information

Expand Down
1 change: 0 additions & 1 deletion pkg/backends/alb/alb_502_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"

"github.com/stretchr/testify/require"

"github.com/trickstercache/trickster/v2/pkg/backends/alb/mech/tsm"
"github.com/trickstercache/trickster/v2/pkg/backends/alb/names"
ao "github.com/trickstercache/trickster/v2/pkg/backends/alb/options"
Expand Down
46 changes: 18 additions & 28 deletions pkg/backends/alb/mech/fr/first_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package fr
import (
"context"
"net/http"
"sync"
"sync/atomic"

"github.com/trickstercache/trickster/v2/pkg/backends/alb/mech/types"
Expand All @@ -32,6 +31,7 @@ import (
"github.com/trickstercache/trickster/v2/pkg/proxy/request"
"github.com/trickstercache/trickster/v2/pkg/proxy/response/capture"
"github.com/trickstercache/trickster/v2/pkg/util/sets"
"golang.org/x/sync/errgroup"
)

const (
Expand All @@ -46,6 +46,7 @@ type handler struct {
pool pool.Pool
fgr bool
fgrCodes sets.Set[int]
options options.FirstGoodResponseOptions
}

func RegistryEntry() types.RegistryEntry {
Expand All @@ -60,6 +61,7 @@ func NewFGR(o *options.Options, _ rt.Lookup) (types.Mechanism, error) {
return &handler{
fgr: true,
fgrCodes: o.FgrCodesLookup,
options: o.FGROptions,
}, nil
}

Expand Down Expand Up @@ -109,13 +111,11 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
// otherwise iterate the fanout
var claimed int64 = -1
contexts := make([]context.Context, l)
cancels := make([]context.CancelFunc, l)
for i := range l {
contexts[i], cancels[i] = context.WithCancel(r.Context())
}
captures := make([]*capture.CaptureResponseWriter, l)
var wg sync.WaitGroup
var eg errgroup.Group
if limit := h.options.ConcurrencyOptions.GetQueryConcurrencyLimit(); limit > 0 {
eg.SetLimit(limit)
}
responseWritten := make(chan struct{}, 1)

serve := func(crw *capture.CaptureResponseWriter) {
Expand All @@ -126,47 +126,37 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
responseWritten <- struct{}{}
}

serveAndCancelOthers := func(i int, crw *capture.CaptureResponseWriter) {
go func() {
// cancels all other contexts
for j, cancel := range cancels {
if j != i {
cancel()
}
}
}()
serve(crw)
}

// fanout to all healthy targets
ctx, cancel := context.WithCancel(r.Context())
defer cancel()
for i := range l {
if hl[i] == nil {
continue
}
wg.Go(func() {
eg.Go(func() error {
r2, _ := request.Clone(r)
r2 = r2.WithContext(contexts[i])
r2 = r2.WithContext(ctx)
r2 = request.SetResources(r2, &request.Resources{Cancelable: true})
crw := capture.NewCaptureResponseWriter()
captures[i] = crw
hl[i].ServeHTTP(crw, r2)
statusCode := crw.StatusCode()
custom := h.fgr && len(h.fgrCodes) > 0
isGood := custom && h.fgrCodes.Contains(statusCode)
// this checks if the response qualifies as a client response
if (!h.fgr || (!custom && statusCode < 400) || isGood) &&
// this checks that the qualifying response is the first response
atomic.CompareAndSwapInt64(&claimed, -1, int64(i)) {
// this serves only the first qualifying response
serveAndCancelOthers(i, crw)

if (!h.fgr || (!custom && statusCode < 400) || isGood) && // this checks if the response qualifies as a client response
atomic.CompareAndSwapInt64(&claimed, -1, int64(i)) { // this checks that the qualifying response is the first response
serve(crw)
cancel()
}
return nil
})
}

// this is a fallback case for when no qualifying upstream response arrives,
// the first response is used, regardless of qualification
go func() {
wg.Wait()
eg.Wait()
// if claimed is still -1, the fallback case must be used
if atomic.CompareAndSwapInt64(&claimed, -1, -2) && r.Context().Err() == nil {
// this iterates the captures and serves the first non-nil response
Expand Down
21 changes: 14 additions & 7 deletions pkg/backends/alb/mech/nlm/newest_last_modified.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/trickstercache/trickster/v2/pkg/proxy/headers"
"github.com/trickstercache/trickster/v2/pkg/proxy/request"
"github.com/trickstercache/trickster/v2/pkg/proxy/response/capture"
"golang.org/x/sync/errgroup"
)

const (
Expand All @@ -38,15 +39,18 @@ const (
)

type handler struct {
pool pool.Pool
pool pool.Pool
options options.NewestLastModifiedOptions
}

func RegistryEntry() types.RegistryEntry {
return types.RegistryEntry{ID: ID, Name: Name, ShortName: names.MechanismNLM, New: New}
}

func New(_ *options.Options, _ rt.Lookup) (types.Mechanism, error) {
return &handler{}, nil
func New(o *options.Options, _ rt.Lookup) (types.Mechanism, error) {
return &handler{
options: o.NLMOptions,
}, nil
}

func (h *handler) SetPool(p pool.Pool) {
Expand Down Expand Up @@ -94,15 +98,17 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

// Capture all responses
captures := make([]*capture.CaptureResponseWriter, l)
var wg sync.WaitGroup

var eg errgroup.Group
if limit := h.options.ConcurrencyOptions.GetQueryConcurrencyLimit(); limit > 0 {
eg.SetLimit(limit)
}
// Fanout to all healthy targets
for i := range l {
if hl[i] == nil {
continue
}
idx := i
wg.Go(func() {
eg.Go(func() error {
r2, _ := request.Clone(r)
r2 = request.ClearResources(r2.WithContext(ctx))
crw := capture.NewCaptureResponseWriter()
Expand All @@ -120,11 +126,12 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
mu.Unlock()
}
}
return nil
})
}

// Wait for all responses to complete
wg.Wait()
eg.Wait()

// Write the response with the newest Last-Modified
if newestIdx >= 0 && newestIdx < len(captures) && captures[newestIdx] != nil {
Expand Down
73 changes: 48 additions & 25 deletions pkg/backends/alb/mech/tsm/time_series_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package tsm

import (
stderrors "errors"
"net/http"
"strings"
"sync"

"github.com/trickstercache/trickster/v2/pkg/backends"
"github.com/trickstercache/trickster/v2/pkg/backends/alb/errors"
Expand All @@ -30,12 +30,14 @@ import (
"github.com/trickstercache/trickster/v2/pkg/backends/alb/pool"
"github.com/trickstercache/trickster/v2/pkg/backends/providers"
rt "github.com/trickstercache/trickster/v2/pkg/backends/providers/registry/types"
"github.com/trickstercache/trickster/v2/pkg/observability/logging"
"github.com/trickstercache/trickster/v2/pkg/observability/logging/logger"
"github.com/trickstercache/trickster/v2/pkg/proxy/handlers/trickster/failures"
"github.com/trickstercache/trickster/v2/pkg/proxy/headers"
"github.com/trickstercache/trickster/v2/pkg/proxy/request"
"github.com/trickstercache/trickster/v2/pkg/proxy/response/capture"
"github.com/trickstercache/trickster/v2/pkg/proxy/response/merge"
"golang.org/x/sync/errgroup"
)

const (
Expand All @@ -49,6 +51,7 @@ type handler struct {
mergePaths []string // paths handled by the alb client that are enabled for tsmerge
nonmergeHandler types.Mechanism // when methodology is tsmerge, this handler is for non-mergeable paths
outputFormat string // the provider output format (e.g., "prometheus")
tsmOptions options.TimeSeriesMergeOptions
}

func RegistryEntry() types.RegistryEntry {
Expand All @@ -57,7 +60,10 @@ func RegistryEntry() types.RegistryEntry {

func New(o *options.Options, factories rt.Lookup) (types.Mechanism, error) {
nmh, _ := rr.New(nil, nil)
out := &handler{nonmergeHandler: nmh}
out := &handler{
nonmergeHandler: nmh,
tsmOptions: o.TSMOptions,
}
// this validates the merge configuration for the ALB client as it sets it up
// First, verify the output format is a support merge provider
if !providers.IsSupportedTimeSeriesMergeProvider(o.OutputFormat) {
Expand Down Expand Up @@ -140,25 +146,31 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Scatter/Gather section

accumulator := merge.NewAccumulator()
var wg sync.WaitGroup
var statusCode int
var statusHeader string
var mu sync.Mutex // protects statusCode and statusHeader
var eg errgroup.Group
if limit := h.tsmOptions.ConcurrencyOptions.GetQueryConcurrencyLimit(); limit > 0 {
eg.SetLimit(limit)
}

type result struct {
statusCode int
header http.Header
mergeFunc merge.RespondFunc
}
results := make([]result, l)

for i := range l {
if hl[i] == nil {
continue
}
wg.Go(func() {
eg.Go(func() error {
r2, _ := request.Clone(r)
rsc2 := &request.Resources{IsMergeMember: true, TSReqestOptions: rsc.TSReqestOptions}
r2 = request.SetResources(r2, rsc2)
crw := capture.NewCaptureResponseWriter()
hl[i].Handler().ServeHTTP(crw, r2)
rsc2 = request.GetResources(r2)
if rsc2 == nil {
logger.Warn("tsm gather failed due to nil resources", nil)
return
return stderrors.New("tsm gather failed due to nil resources")
}

// ensure merge functions are set on cloned request
Expand All @@ -170,27 +182,38 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if rsc2.MergeFunc != nil && rsc2.TS != nil {
rsc2.MergeFunc(accumulator, rsc2.TS, i)
}
// update status code and headers (take best status code)
mu.Lock()
if mrf == nil {
mrf = rsc2.MergeRespondFunc
}
if crw.StatusCode() > 0 {
if statusCode == 0 || crw.StatusCode() < statusCode {
statusCode = crw.StatusCode()
}
results[i] = result{
statusCode: crw.StatusCode(),
header: crw.Header(),
mergeFunc: rsc2.MergeRespondFunc,
}
if crw.Header() != nil {
headers.StripMergeHeaders(crw.Header())
statusHeader = headers.MergeResultHeaderVals(statusHeader,
crw.Header().Get(headers.NameTricksterResult))
}
mu.Unlock()
return nil
})
}

// wait for all fanout requests to complete
wg.Wait()
if err := eg.Wait(); err != nil {
logger.Warn("tsm gather failure", logging.Pairs{"error": err})
}

// Aggregate results sequentially - no mutex contention
var statusCode int
var statusHeader string
for _, res := range results {
if mrf == nil {
mrf = res.mergeFunc
}
if res.statusCode > 0 {
if statusCode == 0 || res.statusCode < statusCode {
statusCode = res.statusCode
}
}
if res.header != nil {
headers.StripMergeHeaders(res.header)
statusHeader = headers.MergeResultHeaderVals(statusHeader,
res.header.Get(headers.NameTricksterResult))
}
}

// set aggregated status header
if statusHeader != "" {
Expand Down
Loading
Loading