@@ -442,3 +442,389 @@ func BenchmarkProducerThroughput(b *testing.B) {
442442 cancel ()
443443 testQueue .Close (true )
444444}
445+
446+ // ---------------------------------------------------------------------------
447+ // Behavioral equivalence tests: these verify the guarantees that callers
448+ // depend on. Every test here must pass on both the old (buffered pushChan +
449+ // per-event resp channel) and new (unbuffered pushChan, no resp) designs.
450+ // ---------------------------------------------------------------------------
451+
452+ // TestPublishReturnsOnlyAfterInsert verifies that when Publish returns true,
453+ // the event is already in the ring buffer (not just buffered in a channel).
454+ // We confirm this by immediately calling Get after Publish and verifying
455+ // the event is available.
456+ func TestPublishReturnsOnlyAfterInsert (t * testing.T ) {
457+ q := NewQueue (logp .NewNopLogger (), nil ,
458+ Settings {
459+ Events : 100 ,
460+ MaxGetRequest : 100 ,
461+ FlushTimeout : 0 , // no flush delay
462+ }, 0 , nil )
463+ defer q .Close (true )
464+
465+ p := q .Producer (queue.ProducerConfig {})
466+
467+ // Publish one event
468+ _ , ok := p .Publish ("event-1" )
469+ require .True (t , ok )
470+
471+ // The event must be immediately available for consumption
472+ batch , err := q .Get (1 )
473+ require .NoError (t , err )
474+ require .Equal (t , 1 , batch .Count ())
475+ batch .Done ()
476+ }
477+
478+ // TestBackpressureBlocksAtQueueCapacity verifies that Publish blocks when
479+ // the ring buffer is full (backpressure) and unblocks when space is freed.
480+ func TestBackpressureBlocksAtQueueCapacity (t * testing.T ) {
481+ const queueSize = 5
482+ q := NewQueue (logp .NewNopLogger (), nil ,
483+ Settings {
484+ Events : queueSize ,
485+ MaxGetRequest : queueSize ,
486+ FlushTimeout : 0 ,
487+ }, 0 , nil )
488+ defer q .Close (true )
489+
490+ p := q .Producer (queue.ProducerConfig {})
491+
492+ // Fill the queue to capacity
493+ for i := 0 ; i < queueSize ; i ++ {
494+ _ , ok := p .Publish (i )
495+ require .True (t , ok , "Publish %d should succeed" , i )
496+ }
497+
498+ // Next publish must block (queue is full).
499+ // Use a goroutine + timer to detect blocking.
500+ published := make (chan bool , 1 )
501+ go func () {
502+ _ , ok := p .Publish ("overflow" )
503+ published <- ok
504+ }()
505+
506+ select {
507+ case <- published :
508+ t .Fatal ("Publish should block when queue is full" )
509+ case <- time .After (50 * time .Millisecond ):
510+ // Expected: blocked
511+ }
512+
513+ // Free one slot by consuming and acknowledging
514+ batch , err := q .Get (1 )
515+ require .NoError (t , err )
516+ batch .Done ()
517+
518+ // The blocked publish should now succeed
519+ select {
520+ case ok := <- published :
521+ require .True (t , ok , "Publish should succeed after space freed" )
522+ case <- time .After (time .Second ):
523+ t .Fatal ("Publish did not unblock after freeing space" )
524+ }
525+ }
526+
527+ // TestShutdownUnblocksProducer verifies that a producer blocked on a full
528+ // queue is unblocked by queue.Close and Publish returns false.
529+ func TestShutdownUnblocksProducer (t * testing.T ) {
530+ const queueSize = 3
531+ q := NewQueue (logp .NewNopLogger (), nil ,
532+ Settings {
533+ Events : queueSize ,
534+ MaxGetRequest : queueSize ,
535+ FlushTimeout : 0 ,
536+ }, 0 , nil )
537+
538+ p := q .Producer (queue.ProducerConfig {})
539+
540+ // Fill the queue
541+ for i := 0 ; i < queueSize ; i ++ {
542+ _ , ok := p .Publish (i )
543+ require .True (t , ok )
544+ }
545+
546+ // Blocked publish in goroutine
547+ result := make (chan bool , 1 )
548+ go func () {
549+ _ , ok := p .Publish ("blocked" )
550+ result <- ok
551+ }()
552+
553+ // Give the goroutine time to block
554+ time .Sleep (20 * time .Millisecond )
555+
556+ // Close the queue — should unblock the producer with ok=false
557+ q .Close (false )
558+
559+ select {
560+ case ok := <- result :
561+ require .False (t , ok , "Publish should return false when queue is closing" )
562+ case <- time .After (time .Second ):
563+ t .Fatal ("Blocked producer was not unblocked by queue.Close" )
564+ }
565+ }
566+
567+ // TestPublishedEqualsAckedOnGracefulShutdown verifies that every event
568+ // for which Publish returned true is eventually acknowledged, across many
569+ // concurrent producers during a graceful shutdown. This is the key
570+ // correctness invariant that the pipeline depends on.
571+ func TestPublishedEqualsAckedOnGracefulShutdown (t * testing.T ) {
572+ // Run multiple times to stress-test the shutdown path
573+ for iter := 0 ; iter < 20 ; iter ++ {
574+ const queueSize = 500
575+ const publishWorkers = 20
576+ var ackedCount atomic.Int64
577+ var publishedCount atomic.Int64
578+
579+ testQueue := NewQueue (
580+ logp .NewNopLogger (), nil ,
581+ Settings {
582+ Events : queueSize ,
583+ MaxGetRequest : queueSize ,
584+ FlushTimeout : time .Millisecond ,
585+ }, 0 , nil )
586+
587+ var wg sync.WaitGroup
588+
589+ // Publish workers
590+ for range publishWorkers {
591+ wg .Add (1 )
592+ go func () {
593+ defer wg .Done ()
594+ producer := testQueue .Producer (
595+ queue.ProducerConfig {
596+ ACK : func (count int ) { ackedCount .Add (int64 (count )) },
597+ })
598+ for {
599+ _ , published := producer .Publish (0 )
600+ if published {
601+ publishedCount .Add (1 )
602+ } else {
603+ return
604+ }
605+ }
606+ }()
607+ }
608+
609+ // Consumer
610+ wg .Add (1 )
611+ go func () {
612+ defer wg .Done ()
613+ for {
614+ batch , err := testQueue .Get (queueSize )
615+ if err == nil {
616+ batch .Done ()
617+ } else {
618+ return
619+ }
620+ }
621+ }()
622+
623+ // Let events flow
624+ require .Eventually (
625+ t ,
626+ func () bool { return publishedCount .Load () > int64 (queueSize ) },
627+ time .Second ,
628+ time .Millisecond ,
629+ "iter %d: events are not flowing" , iter )
630+
631+ // Graceful shutdown
632+ testQueue .Close (false )
633+ select {
634+ case <- testQueue .Done ():
635+ case <- time .After (5 * time .Second ):
636+ require .Fail (t , "iter %d: queue did not shut down" , iter )
637+ }
638+ wg .Wait ()
639+ testQueue .wg .Wait ()
640+
641+ require .Equal (t , publishedCount .Load (), ackedCount .Load (),
642+ "iter %d: published (%d) != acked (%d)" ,
643+ iter , publishedCount .Load (), ackedCount .Load ())
644+ }
645+ }
646+
647+ // TestMultiProducerAllEventsDelivered verifies no events are lost when
648+ // multiple producers publish concurrently and all events are consumed.
649+ func TestMultiProducerAllEventsDelivered (t * testing.T ) {
650+ const (
651+ queueSize = 200
652+ numProducers = 10
653+ eventsPerProducer = 500
654+ totalEvents = numProducers * eventsPerProducer
655+ )
656+
657+ testQueue := NewQueue (logp .NewNopLogger (), nil ,
658+ Settings {
659+ Events : queueSize ,
660+ MaxGetRequest : 100 ,
661+ FlushTimeout : time .Millisecond ,
662+ }, 0 , nil )
663+
664+ var consumedCount atomic.Int64
665+
666+ // Consumer: read and acknowledge everything
667+ var wg sync.WaitGroup
668+ wg .Add (1 )
669+ go func () {
670+ defer wg .Done ()
671+ for {
672+ batch , err := testQueue .Get (100 )
673+ if err != nil {
674+ return
675+ }
676+ consumedCount .Add (int64 (batch .Count ()))
677+ batch .Done ()
678+ }
679+ }()
680+
681+ // Producers: each publishes exactly eventsPerProducer events
682+ var publishedCount atomic.Int64
683+ for i := 0 ; i < numProducers ; i ++ {
684+ wg .Add (1 )
685+ go func (producerID int ) {
686+ defer wg .Done ()
687+ p := testQueue .Producer (queue.ProducerConfig {})
688+ for j := 0 ; j < eventsPerProducer ; j ++ {
689+ _ , ok := p .Publish (fmt .Sprintf ("p%d-e%d" , producerID , j ))
690+ if ok {
691+ publishedCount .Add (1 )
692+ }
693+ }
694+ }(i )
695+ }
696+
697+ // Wait for all events to be consumed
698+ require .Eventually (
699+ t ,
700+ func () bool { return consumedCount .Load () == totalEvents },
701+ 10 * time .Second ,
702+ time .Millisecond ,
703+ "expected %d consumed events, got %d" , totalEvents , consumedCount .Load ())
704+
705+ testQueue .Close (false )
706+ <- testQueue .Done ()
707+ wg .Wait ()
708+
709+ require .Equal (t , int64 (totalEvents ), publishedCount .Load (),
710+ "all Publish calls should succeed" )
711+ require .Equal (t , int64 (totalEvents ), consumedCount .Load (),
712+ "all published events should be consumed" )
713+ }
714+
715+ // TestTryPublishDropsWhenFull verifies that TryPublish does not succeed
716+ // when the queue's ring buffer is at capacity.
717+ func TestTryPublishDropsWhenFull (t * testing.T ) {
718+ const queueSize = 3
719+ q := NewQueue (logp .NewNopLogger (), nil ,
720+ Settings {
721+ Events : queueSize ,
722+ MaxGetRequest : queueSize ,
723+ FlushTimeout : 0 ,
724+ }, 0 , nil )
725+ defer q .Close (true )
726+
727+ p := q .Producer (queue.ProducerConfig {})
728+
729+ // Fill the queue
730+ for i := 0 ; i < queueSize ; i ++ {
731+ _ , ok := p .Publish (i )
732+ require .True (t , ok )
733+ }
734+
735+ // TryPublish on a full queue should not succeed. On an unbuffered
736+ // pushChan it returns false immediately; on a buffered pushChan it may
737+ // block (the send succeeds into the buffer but the runLoop never reads
738+ // it because the ring buffer is full). Either way, we verify it does
739+ // not return true within a short window.
740+ result := make (chan bool , 1 )
741+ go func () {
742+ _ , ok := p .TryPublish ("overflow" )
743+ result <- ok
744+ }()
745+
746+ select {
747+ case ok := <- result :
748+ require .False (t , ok , "TryPublish must not return true when queue is full" )
749+ case <- time .After (100 * time .Millisecond ):
750+ // Acceptable: TryPublish is blocked, which means it did not succeed
751+ }
752+ }
753+
754+ // TestProducerCloseDoesNotBlockOnFullQueue verifies that closing a producer
755+ // does not deadlock even when the queue is full.
756+ func TestProducerCloseDoesNotBlockOnFullQueue (t * testing.T ) {
757+ const queueSize = 2
758+ q := NewQueue (logp .NewNopLogger (), nil ,
759+ Settings {
760+ Events : queueSize ,
761+ MaxGetRequest : queueSize ,
762+ FlushTimeout : 0 ,
763+ }, 0 , nil )
764+ defer q .Close (true )
765+
766+ p := q .Producer (queue.ProducerConfig {})
767+
768+ // Fill the queue
769+ for i := 0 ; i < queueSize ; i ++ {
770+ _ , ok := p .Publish (i )
771+ require .True (t , ok )
772+ }
773+
774+ // Close the producer — should not deadlock
775+ done := make (chan struct {})
776+ go func () {
777+ p .Close ()
778+ close (done )
779+ }()
780+
781+ select {
782+ case <- done :
783+ // Success
784+ case <- time .After (time .Second ):
785+ t .Fatal ("Producer.Close() deadlocked on full queue" )
786+ }
787+ }
788+
789+ // TestRapidCloseAfterPublish verifies that closing the queue immediately
790+ // after a successful Publish does not lose the event — it should still
791+ // be counted either as published (and acked) or the Publish returns false.
792+ func TestRapidCloseAfterPublish (t * testing.T ) {
793+ for iter := 0 ; iter < 50 ; iter ++ {
794+ var acked atomic.Int64
795+ q := NewQueue (logp .NewNopLogger (), nil ,
796+ Settings {
797+ Events : 10 ,
798+ MaxGetRequest : 10 ,
799+ FlushTimeout : 0 ,
800+ }, 0 , nil )
801+
802+ p := q .Producer (queue.ProducerConfig {
803+ ACK : func (count int ) { acked .Add (int64 (count )) },
804+ })
805+
806+ // Consumer
807+ go func () {
808+ for {
809+ batch , err := q .Get (10 )
810+ if err != nil {
811+ return
812+ }
813+ batch .Done ()
814+ }
815+ }()
816+
817+ var published int64
818+ _ , ok := p .Publish ("event-1" )
819+ if ok {
820+ published ++
821+ }
822+
823+ q .Close (false )
824+ <- q .Done ()
825+
826+ // Ensure we never ack more than we published
827+ require .GreaterOrEqual (t , published , acked .Load (),
828+ "iter %d: acked (%d) > published (%d)" , iter , acked .Load (), published )
829+ }
830+ }
0 commit comments