Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 45 additions & 5 deletions deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,20 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) ->
delete_stream(stream_port(Config, 0), S),

%% online consumers should receive a metadata update frame (stream deleted)
%% we unqueue the this frame before closing the connection
%% we unqueue this frame before closing the connection
%% directly closing the connection of the cancelled consumer
%% Edge case:
%% the waiting consumer can get 2 frames: consumer_update then metadata_update.
%% This is because the active consumer is removed from the group and this triggers
%% a rebalancing. The 2 remaining consumers are most of the time cancelled when the
%% stream is deleted, so the rebalancing does not take place.
%% We just tolerate an extra frame when closing their respective connections.
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
log("Expecting frame in consumer ~p", [K]),
{Cmd1, C1} = receive_commands(S0, C0),
log("Received ~p", [Cmd1]),
log("Closing"),
{ok, _} = stream_test_utils:close(S0, C1);
{ok, _} = close_connection(S0, C1);
(K, {S0, C0}) ->
log("Closing ~p", [K]),
{ok, _} = stream_test_utils:close(S0, C0)
Expand Down Expand Up @@ -290,12 +296,18 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co
%% online consumers should receive a metadata update frame (stream deleted)
%% we unqueue this frame before closing the connection
%% directly closing the connection of the cancelled consumer
%% Edge case:
%% the waiting consumer can get 2 frames: consumer_update then metadata_update.
%% This is because the active consumer is removed from the group and this triggers
%% a rebalancing. The 2 remaining consumers are most of the time cancelled when the
%% stream is deleted, so the rebalancing does not take place.
%% We just tolerate an extra frame when closing their respective connections.
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
log("Expecting frame in consumer ~p", [K]),
{Cmd1, C1} = receive_commands(S0, C0),
log("Received ~p", [Cmd1]),
log("Closing"),
{ok, _} = stream_test_utils:close(S0, C1);
{ok, _} = close_connection(S0, C1);
(K, {S0, C0}) ->
log("Closing ~p", [K]),
{ok, _} = stream_test_utils:close(S0, C0)
Expand Down Expand Up @@ -395,12 +407,18 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
%% online consumers should receive a metadata update frame (stream deleted)
%% we unqueue this frame before closing the connection
%% directly closing the connection of the cancelled consumer
%% Edge case:
%% the waiting consumer can get 2 frames: consumer_update then metadata_update.
%% This is because the active consumer is removed from the group and this triggers
%% a rebalancing. The 2 remaining consumers are most of the time cancelled when the
%% stream is deleted, so the rebalancing does not take place.
%% We just tolerate an extra frame when closing their respective connections.
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
log("Expecting frame in consumer ~p", [K]),
{Cmd1, C1} = receive_commands(S0, C0),
log("Received ~p", [Cmd1]),
log("Closing"),
{ok, _} = stream_test_utils:close(S0, C1);
{ok, _} = close_connection(S0, C1);
(K, {S0, C0}) ->
log("Closing ~p", [K]),
{ok, _} = stream_test_utils:close(S0, C0)
Expand Down Expand Up @@ -516,12 +534,18 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit
%% online consumers should receive a metadata update frame (stream deleted)
%% we unqueue this frame before closing the connection
%% directly closing the connection of the cancelled consumer
%% Edge case:
%% the waiting consumer can get 2 frames: consumer_update then metadata_update.
%% This is because the active consumer is removed from the group and this triggers
%% a rebalancing. The 2 remaining consumers are most of the time cancelled when the
%% stream is deleted, so the rebalancing does not take place.
%% We just tolerate an extra frame when closing their respective connections.
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
log("Expecting frame in consumer ~p", [K]),
{Cmd1, C1} = receive_commands(S0, C0),
log("Received ~p", [Cmd1]),
log("Closing"),
{ok, _} = stream_test_utils:close(S0, C1);
{ok, _} = close_connection(S0, C1);
(K, {S0, C0}) ->
log("Closing ~p", [K]),
{ok, _} = stream_test_utils:close(S0, C0)
Expand Down Expand Up @@ -858,3 +882,19 @@ log(Format) ->

log(Format, Args) ->
ct:pal(Format, Args).

close_connection(Sock, C) ->
CloseReason = <<"OK">>,
CloseFrame = rabbit_stream_core:frame({request, 1, {close, ?RESPONSE_CODE_OK, CloseReason}}),
ok = gen_tcp:send(Sock, CloseFrame),
pump_until_close(Sock, C, 10).

pump_until_close(_, _, 0) ->
ct:fail("did not get close response");
pump_until_close(Sock, C0, N) ->
case stream_test_utils:receive_stream_commands(Sock, C0) of
{{response, 1, {close, ?RESPONSE_CODE_OK}}, C1} ->
{ok, C1};
{_Cmd, C1} ->
pump_until_close(Sock, C1, N - 1)
end.
Loading