@@ -177,14 +177,20 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) ->
177177 delete_stream (stream_port (Config , 0 ), S ),
178178
179179 % % online consumers should receive a metadata update frame (stream deleted)
180- % % we unqueue the this frame before closing the connection
180+ % % we unqueue this frame before closing the connection
181181 % % directly closing the connection of the cancelled consumer
182+ % % Edge case:
183+ % % the waiting consumer can get 2 frames: consumer_update then metadata_update.
184+ % % This is because the active consumer is removed from the group and this triggers
185+ % % a rebalancing. The 2 remaining consumers are most of the time cancelled when the
186+ % % stream is deleted, so the rebalancing does not take place.
187+ % % We just tolerate an extra frame when closing their respective connections.
182188 maps :foreach (fun (K , {S0 , C0 }) when K /= DiscSubId ->
183189 log (" Expecting frame in consumer ~p " , [K ]),
184190 {Cmd1 , C1 } = receive_commands (S0 , C0 ),
185191 log (" Received ~p " , [Cmd1 ]),
186192 log (" Closing" ),
187- {ok , _ } = stream_test_utils : close (S0 , C1 );
193+ {ok , _ } = close_connection (S0 , C1 );
188194 (K , {S0 , C0 }) ->
189195 log (" Closing ~p " , [K ]),
190196 {ok , _ } = stream_test_utils :close (S0 , C0 )
@@ -290,12 +296,18 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co
290296 % % online consumers should receive a metadata update frame (stream deleted)
291297 % % we unqueue this frame before closing the connection
292298 % % directly closing the connection of the cancelled consumer
299+ % % Edge case:
300+ % % the waiting consumer can get 2 frames: consumer_update then metadata_update.
301+ % % This is because the active consumer is removed from the group and this triggers
302+ % % a rebalancing. The 2 remaining consumers are most of the time cancelled when the
303+ % % stream is deleted, so the rebalancing does not take place.
304+ % % We just tolerate an extra frame when closing their respective connections.
293305 maps :foreach (fun (K , {S0 , C0 }) when K /= DiscSubId ->
294306 log (" Expecting frame in consumer ~p " , [K ]),
295307 {Cmd1 , C1 } = receive_commands (S0 , C0 ),
296308 log (" Received ~p " , [Cmd1 ]),
297309 log (" Closing" ),
298- {ok , _ } = stream_test_utils : close (S0 , C1 );
310+ {ok , _ } = close_connection (S0 , C1 );
299311 (K , {S0 , C0 }) ->
300312 log (" Closing ~p " , [K ]),
301313 {ok , _ } = stream_test_utils :close (S0 , C0 )
@@ -395,12 +407,18 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
395407 % % online consumers should receive a metadata update frame (stream deleted)
396408 % % we unqueue this frame before closing the connection
397409 % % directly closing the connection of the cancelled consumer
410+ % % Edge case:
411+ % % the waiting consumer can get 2 frames: consumer_update then metadata_update.
412+ % % This is because the active consumer is removed from the group and this triggers
413+ % % a rebalancing. The 2 remaining consumers are most of the time cancelled when the
414+ % % stream is deleted, so the rebalancing does not take place.
415+ % % We just tolerate an extra frame when closing their respective connections.
398416 maps :foreach (fun (K , {S0 , C0 }) when K /= DiscSubId ->
399417 log (" Expecting frame in consumer ~p " , [K ]),
400418 {Cmd1 , C1 } = receive_commands (S0 , C0 ),
401419 log (" Received ~p " , [Cmd1 ]),
402420 log (" Closing" ),
403- {ok , _ } = stream_test_utils : close (S0 , C1 );
421+ {ok , _ } = close_connection (S0 , C1 );
404422 (K , {S0 , C0 }) ->
405423 log (" Closing ~p " , [K ]),
406424 {ok , _ } = stream_test_utils :close (S0 , C0 )
@@ -516,12 +534,18 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit
516534 % % online consumers should receive a metadata update frame (stream deleted)
517535 % % we unqueue this frame before closing the connection
518536 % % directly closing the connection of the cancelled consumer
537+ % % Edge case:
538+ % % the waiting consumer can get 2 frames: consumer_update then metadata_update.
539+ % % This is because the active consumer is removed from the group and this triggers
540+ % % a rebalancing. The 2 remaining consumers are most of the time cancelled when the
541+ % % stream is deleted, so the rebalancing does not take place.
542+ % % We just tolerate an extra frame when closing their respective connections.
519543 maps :foreach (fun (K , {S0 , C0 }) when K /= DiscSubId ->
520544 log (" Expecting frame in consumer ~p " , [K ]),
521545 {Cmd1 , C1 } = receive_commands (S0 , C0 ),
522546 log (" Received ~p " , [Cmd1 ]),
523547 log (" Closing" ),
524- {ok , _ } = stream_test_utils : close (S0 , C1 );
548+ {ok , _ } = close_connection (S0 , C1 );
525549 (K , {S0 , C0 }) ->
526550 log (" Closing ~p " , [K ]),
527551 {ok , _ } = stream_test_utils :close (S0 , C0 )
@@ -858,3 +882,19 @@ log(Format) ->
858882
859883log (Format , Args ) ->
860884 ct :pal (Format , Args ).
885+
886+ close_connection (Sock , C ) ->
887+ CloseReason = <<" OK" >>,
888+ CloseFrame = rabbit_stream_core :frame ({request , 1 , {close , ? RESPONSE_CODE_OK , CloseReason }}),
889+ ok = gen_tcp :send (Sock , CloseFrame ),
890+ pump_until_close (Sock , C , 10 ).
891+
892+ pump_until_close (_ , _ , 0 ) ->
893+ ct :fail (" did not get close response" );
894+ pump_until_close (Sock , C0 , N ) ->
895+ case stream_test_utils :receive_stream_commands (Sock , C0 ) of
896+ {{response , 1 , {close , ? RESPONSE_CODE_OK }}, C1 } ->
897+ {ok , C1 };
898+ {_Cmd , C1 } ->
899+ pump_until_close (Sock , C1 , N - 1 )
900+ end .
0 commit comments