@@ -7,6 +7,15 @@ import (
77 "time"
88)
99
10+ // MinWait is the absolute minimum wait time for the ticker. This is used to
11+ // prevent the ticker from firing too often and causing too small of a wait
12+ // time.
13+ const MinWait = time .Millisecond
14+
15+ // MinLife is the minimum life time for the scaler. This is used to prevent
16+ // the scaler from exiting too quickly, and causing too small of a lifetime.
17+ const MinLife = time .Millisecond
18+
1019// Scaler implements generic auto-scaling logic which starts with a net-zero
1120// set of processing routines (with the exception of the channel listener) and
1221// then scales up and down based on the CPU contention of a system and the speed
@@ -42,6 +51,10 @@ type Scaler[T, U any] struct {
4251 // that are CPU bound and need to scale up more/less quickly.
4352 WaitModifier DurationScaler
4453
54+ // Max is the maximum number of layer2 routines that will be spawned.
55+ // If Max is set to 0, then there is no limit.
56+ Max uint
57+
4558 wScale * DurationScaler
4659}
4760
@@ -51,7 +64,7 @@ var ErrFnRequired = fmt.Errorf("nil InterceptFunc, Fn is required")
5164// returns the output channel where the resulting data from the Fn function
5265// will be sent.
5366//
54- //nolint:funlen // This really can't be broken up any further
67+ //nolint:funlen,gocognit // This really can't be broken up any further
5568func (s Scaler [T , U ]) Exec (ctx context.Context , in <- chan T ) (<- chan U , error ) {
5669 ctx = _ctx (ctx )
5770
@@ -72,13 +85,13 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) {
7285 // because the caller did not specify a wait time. This means Scaler will
7386 // likely always scale up rather than waiting for an existing layer2 routine
7487 // to pick up data.
75- if s .Wait <= 0 {
76- s .Wait = time . Nanosecond
88+ if s .Wait <= MinWait {
89+ s .Wait = MinWait
7790 }
7891
7992 // Minimum life of a spawned layer2 should be 1ms
80- if s .Life < time . Microsecond {
81- s .Life = time . Microsecond
93+ if s .Life < MinLife {
94+ s .Life = MinLife
8295 }
8396
8497 go func () {
@@ -99,7 +112,16 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) {
99112 ticker := time .NewTicker (s .Wait )
100113 defer ticker .Stop ()
101114 step := 0
102- var stepMu sync.RWMutex
115+ stepMu := sync.RWMutex {}
116+
117+ var max chan struct {}
118+
119+ if s .Max > 0 {
120+ max = make (chan struct {}, s .Max )
121+ for i := uint (0 ); i < s .Max ; i ++ {
122+ max <- struct {}{}
123+ }
124+ }
103125
104126 scaleLoop:
105127 for {
@@ -117,6 +139,17 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) {
117139 case <- ctx .Done ():
118140 return
119141 case <- ticker .C :
142+ if max != nil {
143+ select {
144+ case <- ctx .Done ():
145+ return
146+ case <- max : // start a new layer2 routine
147+ default :
148+ // wait for a layer2 routine to finish
149+ continue l2loop
150+ }
151+ }
152+
120153 wgMu .Lock ()
121154 wg .Add (1 )
122155 wgMu .Unlock ()
@@ -129,6 +162,16 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) {
129162
130163 go func () {
131164 defer wg .Done ()
165+
166+ if s .Max > 0 {
167+ defer func () {
168+ select {
169+ case <- ctx .Done ():
170+ case max <- struct {}{}:
171+ }
172+ }()
173+ }
174+
132175 if ! s .WaitModifier .inactive () {
133176 defer func () {
134177 stepMu .Lock ()
@@ -144,11 +187,16 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) {
144187 }
145188 }
146189
147- stepMu .RLock ()
190+ stepN := 0
191+ if ! s .WaitModifier .inactive () {
192+ stepMu .RLock ()
193+ stepN = step
194+ stepMu .RUnlock ()
195+ }
196+
148197 // Reset the ticker so that it does not immediately trip the
149198 // case statement on loop.
150- ticker .Reset (s .wScale .scaledDuration (s .Wait , step ))
151- stepMu .RUnlock ()
199+ ticker .Reset (s .wScale .scaledDuration (s .Wait , stepN ))
152200 }
153201 }
154202 }()
@@ -261,6 +309,10 @@ func (t *DurationScaler) scaledDuration(
261309 dur time.Duration ,
262310 currentInterval int ,
263311) time.Duration {
312+ if dur < MinWait {
313+ dur = MinWait
314+ }
315+
264316 if t .inactive () {
265317 return dur
266318 }
@@ -272,7 +324,12 @@ func (t *DurationScaler) scaledDuration(
272324
273325 if currentInterval % t .Interval == 0 {
274326 t .lastInterval = currentInterval
275- return dur + time .Duration (float64 (t .originalDuration )* mod )
327+ out := dur + time .Duration (float64 (t .originalDuration )* mod )
328+ if out < MinWait {
329+ return MinWait
330+ }
331+
332+ return out
276333 }
277334
278335 return dur
0 commit comments