Skip to content

Commit 9a659dd

Browse files
authored
feat: [alb] add concurrency limit options (#918)
Signed-off-by: Chris Randles <randles.chris@gmail.com>
1 parent 0c64902 commit 9a659dd

File tree

9 files changed

+149
-73
lines changed

9 files changed

+149
-73
lines changed

docs/alb.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ This mechanism is useful in applications such as live internet television. Consi
193193

194194
#### Custom Good Status Codes List
195195

196-
By default, fgr will return the first response with a status code < 400. However, you can optionally provide an explicit list of good status codes using the `fgr_status_codes` configuration setting, as shown in the example below. When set, Trickster will return the first response to be returned that has a status code found in the configured list.
196+
By default, fgr will return the first response with a status code < 400. However, you can optionally provide an explicit list of good status codes using the `fgr.status_codes` configuration setting, as shown in the example below. When set, Trickster will return the first response to be returned that has a status code found in the configured list.
197197

198198
#### First Good Response Configuration Example
199199

@@ -216,10 +216,11 @@ backends:
216216
provider: alb
217217
alb:
218218
mechanism: fgr # first good response
219-
fgr_status_codes: [ 200, 201, 204 ] # only consider these codes when selecting a response
220219
pool:
221220
- node01
222221
- node02
222+
fgr:
223+
status_codes: [ 200, 201, 204 ] # only consider these codes when selecting a response
223224
```
224225

225226
Here is the visual representation of this configuration:

examples/conf/example.full.yaml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -557,11 +557,11 @@ backends:
557557
# # -1 includes all backends, regardless of reporting state
558558
# # default is 0
559559
# healthy_floor: 0
560-
561-
# # fgr_status_codes is a list of status codes considered 'good' when using the fgr mechanism
562-
# # when this is not set, any response code < 400 is considered good. Use this setting to
563-
# # provide an explicit list.
564-
# fgr_status_codes: [ 200 ] # this would consider only 200 OK's good, and not 204, 302, etc.
560+
# fgr: # First Good Response mechanism options, only applicable when mechanism is set to fgr
561+
# # status_codes is a list of status codes considered 'good' when using the fgr mechanism
562+
# # when this is not set, any response code < 400 is considered good. Use this setting to
563+
# # provide an explicit list.
564+
# status_codes: [ 200 ] # this would consider only 200 OK's good, and not 204, 302, etc.
565565

566566
# # Configuration Options for Request Routing Rules - see /docs/rule.md for more information
567567

pkg/backends/alb/alb_502_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"time"
2424

2525
"github.com/stretchr/testify/require"
26-
2726
"github.com/trickstercache/trickster/v2/pkg/backends/alb/mech/tsm"
2827
"github.com/trickstercache/trickster/v2/pkg/backends/alb/names"
2928
ao "github.com/trickstercache/trickster/v2/pkg/backends/alb/options"

pkg/backends/alb/mech/fr/first_response.go

Lines changed: 18 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package fr
1919
import (
2020
"context"
2121
"net/http"
22-
"sync"
2322
"sync/atomic"
2423

2524
"github.com/trickstercache/trickster/v2/pkg/backends/alb/mech/types"
@@ -32,6 +31,7 @@ import (
3231
"github.com/trickstercache/trickster/v2/pkg/proxy/request"
3332
"github.com/trickstercache/trickster/v2/pkg/proxy/response/capture"
3433
"github.com/trickstercache/trickster/v2/pkg/util/sets"
34+
"golang.org/x/sync/errgroup"
3535
)
3636

3737
const (
@@ -46,6 +46,7 @@ type handler struct {
4646
pool pool.Pool
4747
fgr bool
4848
fgrCodes sets.Set[int]
49+
options options.FirstGoodResponseOptions
4950
}
5051

5152
func RegistryEntry() types.RegistryEntry {
@@ -60,6 +61,7 @@ func NewFGR(o *options.Options, _ rt.Lookup) (types.Mechanism, error) {
6061
return &handler{
6162
fgr: true,
6263
fgrCodes: o.FgrCodesLookup,
64+
options: o.FGROptions,
6365
}, nil
6466
}
6567

@@ -109,13 +111,11 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
109111
}
110112
// otherwise iterate the fanout
111113
var claimed int64 = -1
112-
contexts := make([]context.Context, l)
113-
cancels := make([]context.CancelFunc, l)
114-
for i := range l {
115-
contexts[i], cancels[i] = context.WithCancel(r.Context())
116-
}
117114
captures := make([]*capture.CaptureResponseWriter, l)
118-
var wg sync.WaitGroup
115+
var eg errgroup.Group
116+
if limit := h.options.ConcurrencyOptions.GetQueryConcurrencyLimit(); limit > 0 {
117+
eg.SetLimit(limit)
118+
}
119119
responseWritten := make(chan struct{}, 1)
120120

121121
serve := func(crw *capture.CaptureResponseWriter) {
@@ -126,47 +126,37 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
126126
responseWritten <- struct{}{}
127127
}
128128

129-
serveAndCancelOthers := func(i int, crw *capture.CaptureResponseWriter) {
130-
go func() {
131-
// cancels all other contexts
132-
for j, cancel := range cancels {
133-
if j != i {
134-
cancel()
135-
}
136-
}
137-
}()
138-
serve(crw)
139-
}
140-
141129
// fanout to all healthy targets
130+
ctx, cancel := context.WithCancel(r.Context())
131+
defer cancel()
142132
for i := range l {
143133
if hl[i] == nil {
144134
continue
145135
}
146-
wg.Go(func() {
136+
eg.Go(func() error {
147137
r2, _ := request.Clone(r)
148-
r2 = r2.WithContext(contexts[i])
138+
r2 = r2.WithContext(ctx)
149139
r2 = request.SetResources(r2, &request.Resources{Cancelable: true})
150140
crw := capture.NewCaptureResponseWriter()
151141
captures[i] = crw
152142
hl[i].ServeHTTP(crw, r2)
153143
statusCode := crw.StatusCode()
154144
custom := h.fgr && len(h.fgrCodes) > 0
155145
isGood := custom && h.fgrCodes.Contains(statusCode)
156-
// this checks if the response qualifies as a client response
157-
if (!h.fgr || (!custom && statusCode < 400) || isGood) &&
158-
// this checks that the qualifying response is the first response
159-
atomic.CompareAndSwapInt64(&claimed, -1, int64(i)) {
160-
// this serves only the first qualifying response
161-
serveAndCancelOthers(i, crw)
146+
147+
if (!h.fgr || (!custom && statusCode < 400) || isGood) && // this checks if the response qualifies as a client response
148+
atomic.CompareAndSwapInt64(&claimed, -1, int64(i)) { // this checks that the qualifying response is the first response
149+
serve(crw)
150+
cancel()
162151
}
152+
return nil
163153
})
164154
}
165155

166156
// this is a fallback case for when no qualifying upstream response arrives,
167157
// the first response is used, regardless of qualification
168158
go func() {
169-
wg.Wait()
159+
eg.Wait()
170160
// if claimed is still -1, the fallback case must be used
171161
if atomic.CompareAndSwapInt64(&claimed, -1, -2) && r.Context().Err() == nil {
172162
// this iterates the captures and serves the first non-nil response

pkg/backends/alb/mech/nlm/newest_last_modified.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/trickstercache/trickster/v2/pkg/proxy/headers"
3131
"github.com/trickstercache/trickster/v2/pkg/proxy/request"
3232
"github.com/trickstercache/trickster/v2/pkg/proxy/response/capture"
33+
"golang.org/x/sync/errgroup"
3334
)
3435

3536
const (
@@ -38,15 +39,18 @@ const (
3839
)
3940

4041
type handler struct {
41-
pool pool.Pool
42+
pool pool.Pool
43+
options options.NewestLastModifiedOptions
4244
}
4345

4446
func RegistryEntry() types.RegistryEntry {
4547
return types.RegistryEntry{ID: ID, Name: Name, ShortName: names.MechanismNLM, New: New}
4648
}
4749

48-
func New(_ *options.Options, _ rt.Lookup) (types.Mechanism, error) {
49-
return &handler{}, nil
50+
func New(o *options.Options, _ rt.Lookup) (types.Mechanism, error) {
51+
return &handler{
52+
options: o.NLMOptions,
53+
}, nil
5054
}
5155

5256
func (h *handler) SetPool(p pool.Pool) {
@@ -94,15 +98,17 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
9498

9599
// Capture all responses
96100
captures := make([]*capture.CaptureResponseWriter, l)
97-
var wg sync.WaitGroup
98-
101+
var eg errgroup.Group
102+
if limit := h.options.ConcurrencyOptions.GetQueryConcurrencyLimit(); limit > 0 {
103+
eg.SetLimit(limit)
104+
}
99105
// Fanout to all healthy targets
100106
for i := range l {
101107
if hl[i] == nil {
102108
continue
103109
}
104110
idx := i
105-
wg.Go(func() {
111+
eg.Go(func() error {
106112
r2, _ := request.Clone(r)
107113
r2 = request.ClearResources(r2.WithContext(ctx))
108114
crw := capture.NewCaptureResponseWriter()
@@ -120,11 +126,12 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
120126
mu.Unlock()
121127
}
122128
}
129+
return nil
123130
})
124131
}
125132

126133
// Wait for all responses to complete
127-
wg.Wait()
134+
eg.Wait()
128135

129136
// Write the response with the newest Last-Modified
130137
if newestIdx >= 0 && newestIdx < len(captures) && captures[newestIdx] != nil {

pkg/backends/alb/mech/tsm/time_series_merge.go

Lines changed: 48 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
package tsm
1818

1919
import (
20+
stderrors "errors"
2021
"net/http"
2122
"strings"
22-
"sync"
2323

2424
"github.com/trickstercache/trickster/v2/pkg/backends"
2525
"github.com/trickstercache/trickster/v2/pkg/backends/alb/errors"
@@ -30,12 +30,14 @@ import (
3030
"github.com/trickstercache/trickster/v2/pkg/backends/alb/pool"
3131
"github.com/trickstercache/trickster/v2/pkg/backends/providers"
3232
rt "github.com/trickstercache/trickster/v2/pkg/backends/providers/registry/types"
33+
"github.com/trickstercache/trickster/v2/pkg/observability/logging"
3334
"github.com/trickstercache/trickster/v2/pkg/observability/logging/logger"
3435
"github.com/trickstercache/trickster/v2/pkg/proxy/handlers/trickster/failures"
3536
"github.com/trickstercache/trickster/v2/pkg/proxy/headers"
3637
"github.com/trickstercache/trickster/v2/pkg/proxy/request"
3738
"github.com/trickstercache/trickster/v2/pkg/proxy/response/capture"
3839
"github.com/trickstercache/trickster/v2/pkg/proxy/response/merge"
40+
"golang.org/x/sync/errgroup"
3941
)
4042

4143
const (
@@ -49,6 +51,7 @@ type handler struct {
4951
mergePaths []string // paths handled by the alb client that are enabled for tsmerge
5052
nonmergeHandler types.Mechanism // when methodology is tsmerge, this handler is for non-mergeable paths
5153
outputFormat string // the provider output format (e.g., "prometheus")
54+
tsmOptions options.TimeSeriesMergeOptions
5255
}
5356

5457
func RegistryEntry() types.RegistryEntry {
@@ -57,7 +60,10 @@ func RegistryEntry() types.RegistryEntry {
5760

5861
func New(o *options.Options, factories rt.Lookup) (types.Mechanism, error) {
5962
nmh, _ := rr.New(nil, nil)
60-
out := &handler{nonmergeHandler: nmh}
63+
out := &handler{
64+
nonmergeHandler: nmh,
65+
tsmOptions: o.TSMOptions,
66+
}
6167
// this validates the merge configuration for the ALB client as it sets it up
6268
// First, verify the output format is a support merge provider
6369
if !providers.IsSupportedTimeSeriesMergeProvider(o.OutputFormat) {
@@ -140,25 +146,31 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
140146
// Scatter/Gather section
141147

142148
accumulator := merge.NewAccumulator()
143-
var wg sync.WaitGroup
144-
var statusCode int
145-
var statusHeader string
146-
var mu sync.Mutex // protects statusCode and statusHeader
149+
var eg errgroup.Group
150+
if limit := h.tsmOptions.ConcurrencyOptions.GetQueryConcurrencyLimit(); limit > 0 {
151+
eg.SetLimit(limit)
152+
}
153+
154+
type result struct {
155+
statusCode int
156+
header http.Header
157+
mergeFunc merge.RespondFunc
158+
}
159+
results := make([]result, l)
147160

148161
for i := range l {
149162
if hl[i] == nil {
150163
continue
151164
}
152-
wg.Go(func() {
165+
eg.Go(func() error {
153166
r2, _ := request.Clone(r)
154167
rsc2 := &request.Resources{IsMergeMember: true, TSReqestOptions: rsc.TSReqestOptions}
155168
r2 = request.SetResources(r2, rsc2)
156169
crw := capture.NewCaptureResponseWriter()
157170
hl[i].Handler().ServeHTTP(crw, r2)
158171
rsc2 = request.GetResources(r2)
159172
if rsc2 == nil {
160-
logger.Warn("tsm gather failed due to nil resources", nil)
161-
return
173+
return stderrors.New("tsm gather failed due to nil resources")
162174
}
163175

164176
// ensure merge functions are set on cloned request
@@ -170,27 +182,38 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
170182
if rsc2.MergeFunc != nil && rsc2.TS != nil {
171183
rsc2.MergeFunc(accumulator, rsc2.TS, i)
172184
}
173-
// update status code and headers (take best status code)
174-
mu.Lock()
175-
if mrf == nil {
176-
mrf = rsc2.MergeRespondFunc
177-
}
178-
if crw.StatusCode() > 0 {
179-
if statusCode == 0 || crw.StatusCode() < statusCode {
180-
statusCode = crw.StatusCode()
181-
}
185+
results[i] = result{
186+
statusCode: crw.StatusCode(),
187+
header: crw.Header(),
188+
mergeFunc: rsc2.MergeRespondFunc,
182189
}
183-
if crw.Header() != nil {
184-
headers.StripMergeHeaders(crw.Header())
185-
statusHeader = headers.MergeResultHeaderVals(statusHeader,
186-
crw.Header().Get(headers.NameTricksterResult))
187-
}
188-
mu.Unlock()
190+
return nil
189191
})
190192
}
191193

192194
// wait for all fanout requests to complete
193-
wg.Wait()
195+
if err := eg.Wait(); err != nil {
196+
logger.Warn("tsm gather failure", logging.Pairs{"error": err})
197+
}
198+
199+
// Aggregate results sequentially - no mutex contention
200+
var statusCode int
201+
var statusHeader string
202+
for _, res := range results {
203+
if mrf == nil {
204+
mrf = res.mergeFunc
205+
}
206+
if res.statusCode > 0 {
207+
if statusCode == 0 || res.statusCode < statusCode {
208+
statusCode = res.statusCode
209+
}
210+
}
211+
if res.header != nil {
212+
headers.StripMergeHeaders(res.header)
213+
statusHeader = headers.MergeResultHeaderVals(statusHeader,
214+
res.header.Get(headers.NameTricksterResult))
215+
}
216+
}
194217

195218
// set aggregated status header
196219
if statusHeader != "" {

0 commit comments

Comments
 (0)