@@ -726,14 +726,18 @@ func TestCircuitBreaker(t *testing.T) {
726726
727727 require .Equal (t , failedTries , employeesCalls .Load ())
728728
729+ // Ensure all previous subscriptions are fully cleaned up before
730+ // waiting for the circuit to reset, to prevent leftover subscription
731+ // cleanup from interfering with the half-open circuit state.
732+ xEnv .WaitForSubscriptionCount (0 , time .Second * 5 )
733+
729734 // Wait for current bucket to be cleaned up
730735 time .Sleep (breaker .RollingDuration * 3 + time .Millisecond * 1000 )
731736
732737 // ====
733738 // Verify a success case with messages validated from here onwards
734739 // ====
735740
736- // Sending a complete must stop the subscription
737741 conn := xEnv .InitGraphQLWebSocketConnection (nil , nil , nil )
738742 err := testenv .WSWriteJSON (t , conn , & testenv.WebSocketMessage {
739743 ID : "1" ,
@@ -744,6 +748,12 @@ func TestCircuitBreaker(t *testing.T) {
744748
745749 _ , message , err := testenv .WSReadMessage (t , conn )
746750 require .NoError (t , err )
751+
752+ t .Logf ("employeesCalls after recovery: %d" , employeesCalls .Load ())
753+ t .Logf ("received message: %s" , string (message ))
754+ t .Logf ("circuit breaker status changed count: %d" , xEnv .Observer ().FilterMessage ("Circuit breaker status changed" ).Len ())
755+ t .Logf ("circuit breaker open count: %d" , xEnv .Observer ().FilterMessage ("Circuit breaker open, request callback did not execute" ).Len ())
756+
747757 require .JSONEq (t , timestampMessage , string (message ))
748758
749759 err = testenv .WSWriteJSON (t , conn , & testenv.WebSocketMessage {ID : "1" , Type : "complete" })
0 commit comments