@@ -83,50 +83,78 @@ end_per_testcase(TestCase, Config) ->
8383 rabbit_ct_helpers :testcase_finished (Config , TestCase ).
8484
8585simple_sac_consumer_should_get_disconnected_on_partition (Config ) ->
86- T = ? TRSPT ,
8786 S = rabbit_data_coercion :to_binary (? FUNCTION_NAME ),
88- {ok , S0 , C0 } = stream_test_utils :connect (Config , 0 ),
89- {ok , S1 , C1 } = stream_test_utils :connect (Config , 1 ),
90- {ok , S2 , C2 } = stream_test_utils :connect (Config , 2 ),
87+ {ok , So0 , C0_00 } = stream_test_utils :connect (Config , 0 ),
88+ {ok , So1 , C1_00 } = stream_test_utils :connect (Config , 1 ),
89+ {ok , So2 , C2_00 } = stream_test_utils :connect (Config , 2 ),
9190
9291 create_stream (Config , S ),
93- wait_for_members (S0 , C0 , S , 3 ),
92+ wait_for_members (So0 , C0_00 , S , 3 ),
9493
95- C0_01 = register_sac (S0 , C0 , S ),
96- C0_02 = receive_consumer_update (S0 , C0_01 ),
94+ C0_01 = register_sac (So0 , C0_00 , S , 0 ),
95+ C0_02 = receive_consumer_update (So0 , C0_01 ),
9796
98- C1_01 = register_sac (S1 , C1 , S ),
99- C2_01 = register_sac (S2 , C2 , S ),
97+ C1_01 = register_sac (So1 , C1_00 , S , 1 ),
98+ C2_01 = register_sac (So2 , C2_00 , S , 2 ),
99+ SubIdToState0 = #{0 => {So0 , C0_02 },
100+ 1 => {So1 , C1_01 },
101+ 2 => {So2 , C2_01 }},
100102
101103 Members = stream_members (Config , S ),
102104 L = leader (Members ),
103105 [F1 , F2 ] = followers (Members ),
104106
105- Consumers = query_consumers (Config , S ),
106- assertSize (3 , Consumers ),
107- assertConsumersConnected (Consumers ),
107+ Consumers1 = query_consumers (Config , S ),
108+ assertSize (3 , Consumers1 ),
109+ assertConsumersConnected (Consumers1 ),
110+
111+ Isolated = F1 ,
112+ {value , DisconnectedConsumer } =
113+ lists :search (fun (# consumer {pid = ConnPid }) ->
114+ rpc (Config , erlang , node , [ConnPid ]) =:= Isolated
115+ end , Consumers1 ),
116+ # consumer {subscription_id = DiscSubId } = DisconnectedConsumer ,
108117
109- rabbit_ct_broker_helpers :block_traffic_between (F1 , L ),
110- rabbit_ct_broker_helpers :block_traffic_between (F1 , F2 ),
118+ rabbit_ct_broker_helpers :block_traffic_between (Isolated , L ),
119+ rabbit_ct_broker_helpers :block_traffic_between (Isolated , F2 ),
111120
112121 wait_for_disconnected_consumer (Config , S ),
113122 wait_for_forgotten_consumer (Config , S ),
114123
115- rabbit_ct_broker_helpers :allow_traffic_between (F1 , L ),
116- rabbit_ct_broker_helpers :allow_traffic_between (F1 , F2 ),
124+ rabbit_ct_broker_helpers :allow_traffic_between (Isolated , L ),
125+ rabbit_ct_broker_helpers :allow_traffic_between (Isolated , F2 ),
117126
118127 wait_for_all_consumers_connected (Config , S ),
119- assertConsumersConnected (query_consumers (Config , S )),
128+
129+ Consumers2 = query_consumers (Config , S ),
130+ % % the disconnected, then forgotten consumer is cancelled,
131+ % % because the stream member on its node has been restarted
132+ assertSize (2 , Consumers2 ),
133+ assertConsumersConnected (Consumers2 ),
134+ ? assertMatch ([DisconnectedConsumer ],
135+ Consumers1 -- Consumers2 ),
136+
137+ % % assert the cancelled consumer received a metadata update frame
138+ SubIdToState1 =
139+ maps :fold (fun (K , {S0 , C0 }, Acc ) when K == DiscSubId ->
140+ C1 = receive_metadata_update (S0 , C0 ),
141+ Acc #{K => {S0 , C1 }};
142+ (K , {S0 , C0 }, Acc ) ->
143+ Acc #{K => {S0 , C0 }}
144+ end , #{}, SubIdToState0 ),
120145
121146 delete_stream (Config , S ),
122147
123- {_ , _ } = receive_commands (T , S0 , C0 ),
124- {_ , _ } = receive_commands (T , S1 , C1 ),
125- {_ , _ } = receive_commands (T , S2 , C2 ),
148+ % % online consumers should receive a metadata update frame (stream deleted)
149+ % % we unqueue the this frame before closing the connection
150+ % % directly closing the connection of the cancelled consumer
151+ maps :foreach (fun (K , {S0 , C0 }) when K /= DiscSubId ->
152+ {_ , C1 } = receive_commands (S0 , C0 ),
153+ {ok , _ } = stream_test_utils :close (S0 , C1 );
154+ (_ , {S0 , C0 }) ->
155+ {ok , _ } = stream_test_utils :close (S0 , C0 )
156+ end , SubIdToState1 ),
126157
127- {ok , _ } = stream_test_utils :close (S0 , C0 ),
128- {ok , _ } = stream_test_utils :close (S1 , C1 ),
129- {ok , _ } = stream_test_utils :close (S2 , C2 ),
130158 ok .
131159
132160leader (Members ) ->
@@ -161,8 +189,8 @@ delete_stream(Config, St) ->
161189 {ok , C1 } = stream_test_utils :delete_stream (S , C0 , St ),
162190 {ok , _ } = stream_test_utils :close (S , C1 ).
163191
164- register_sac (S , C0 , St ) ->
165- SacSubscribeFrame = request ({subscribe , 0 , St ,
192+ register_sac (S , C0 , St , SubId ) ->
193+ SacSubscribeFrame = request ({subscribe , SubId , St ,
166194 first , 1 ,
167195 #{<<" single-active-consumer" >> => <<" true" >>,
168196 <<" name" >> => name ()}}),
@@ -174,11 +202,17 @@ register_sac(S, C0, St) ->
174202 C1 .
175203
176204receive_consumer_update (S , C0 ) ->
177- {Cmd , C1 } = receive_commands (? TRSPT , S , C0 ),
205+ {Cmd , C1 } = receive_commands (S , C0 ),
178206 ? assertMatch ({request , _CorrId , {consumer_update , _SubId , _Status }},
179207 Cmd ),
180208 C1 .
181209
210+ receive_metadata_update (S , C0 ) ->
211+ {Cmd , C1 } = receive_commands (S , C0 ),
212+ ? assertMatch ({metadata_update , _ , ? RESPONSE_CODE_STREAM_NOT_AVAILABLE },
213+ Cmd ),
214+ C1 .
215+
182216unsubscribe (S , C0 ) ->
183217 {ok , C1 } = stream_test_utils :unsubscribe (S , C0 , sub_id ()),
184218 C1 .
@@ -204,6 +238,9 @@ request(Cmd) ->
204238request (CorrId , Cmd ) ->
205239 rabbit_stream_core :frame ({request , CorrId , Cmd }).
206240
241+ receive_commands (S , C ) ->
242+ receive_commands (? TRSPT , S , C ).
243+
207244receive_commands (Transport , S , C ) ->
208245 stream_test_utils :receive_stream_commands (Transport , S , C ).
209246
0 commit comments