@@ -3,6 +3,7 @@ package stream
33import (
44 "context"
55 "fmt"
6+ "sync"
67 "testing"
78 "time"
89
@@ -406,7 +407,6 @@ func FuzzTick(f *testing.F) {
406407}
407408
408409func FuzzScaler (f * testing.F ) {
409- // Define InterceptFunc
410410 interceptFunc := func (ctx context.Context , t int ) (string , bool ) {
411411 return fmt .Sprintf ("%d" , t ), true
412412 }
@@ -464,3 +464,129 @@ func FuzzScaler(f *testing.F) {
464464 }
465465 })
466466}
467+
468+ func Test_Scaler_Max (t * testing.T ) {
469+ tests := map [string ]struct {
470+ max uint
471+ send int
472+ expected int
473+ }{
474+ "max 0" : {
475+ max : 0 ,
476+ send : 1000 ,
477+ expected : 1000 ,
478+ },
479+ "max 1" : {
480+ max : 1 ,
481+ send : 10 ,
482+ expected : 10 ,
483+ },
484+ "max 2" : {
485+ max : 2 ,
486+ send : 10 ,
487+ expected : 10 ,
488+ },
489+ "max 3" : {
490+ max : 3 ,
491+ send : 10 ,
492+ expected : 10 ,
493+ },
494+ "max 4" : {
495+ max : 4 ,
496+ send : 100 ,
497+ expected : 100 ,
498+ },
499+ "max 1000" : {
500+ max : 1000 ,
501+ send : 10000 ,
502+ expected : 10000 ,
503+ },
504+ }
505+
506+ for name , test := range tests {
507+ t .Run (name , func (t * testing.T ) {
508+ ctx , cancel := context .WithCancel (context .Background ())
509+ defer cancel ()
510+
511+ inited := 0
512+ initedMu := sync.Mutex {}
513+ release := make (chan struct {})
514+
515+ interceptFunc := func (ctx context.Context , t int ) (int , bool ) {
516+ defer func () {
517+ initedMu .Lock ()
518+ defer initedMu .Unlock ()
519+ inited --
520+ }()
521+
522+ initedMu .Lock ()
523+ inited ++
524+ initedMu .Unlock ()
525+
526+ <- release
527+
528+ return t , true
529+ }
530+
531+ // Initialize Scaler
532+ scaler := Scaler [int , int ]{
533+ Wait : time .Millisecond ,
534+ Life : time .Millisecond ,
535+ Fn : interceptFunc ,
536+ Max : test .max ,
537+ }
538+
539+ // Create a simple input channel
540+ input := make (chan int , test .send )
541+ defer close (input )
542+
543+ for i := 0 ; i < test .send ; i ++ {
544+ input <- i
545+ }
546+
547+ // Execute the Scaler
548+ out , err := scaler .Exec (ctx , input )
549+ if err != nil {
550+ t .Errorf ("Scaler Exec failed: %v" , err )
551+ t .Fail ()
552+ }
553+
554+ recv := 1
555+
556+ tloop:
557+ for {
558+ select {
559+ case <- ctx .Done ():
560+ t .Errorf ("Scaler Exec timed out" )
561+ case _ , ok := <- out :
562+ if ! ok {
563+ break tloop
564+ }
565+
566+ recv ++
567+ t .Logf ("received %d" , recv )
568+ if recv >= test .expected {
569+ break tloop
570+ }
571+ default :
572+ time .Sleep (time .Millisecond )
573+
574+ initedMu .Lock ()
575+ if test .max > 0 && inited > int (test .max ) {
576+ t .Errorf ("Scaler Exec failed: expected %d, got %d" , test .max , inited )
577+ t .Fail ()
578+ }
579+ initedMu .Unlock ()
580+
581+ // Release one goroutine
582+ release <- struct {}{}
583+ }
584+ }
585+
586+ if recv != test .expected {
587+ t .Errorf ("Scaler Exec failed: expected %d, got %d" , test .expected , recv )
588+ t .Fail ()
589+ }
590+ })
591+ }
592+ }
0 commit comments