@@ -86,7 +86,7 @@ simple_sac_test(_) ->
8686 State1 ,
8787 {ok , Active1 }, Effects1 } = ? MOD :apply (Command0 , State0 ),
8888 ? assert (Active1 ),
89- ? assertEqual ([csr (ConnectionPid , 0 , active )], Consumers1 ),
89+ assertCsrsEqual ([csr (ConnectionPid , 0 , active )], Consumers1 ),
9090 assertSendMessageActivateEffect (ConnectionPid , 0 , Stream , ConsumerName , true , Effects1 ),
9191
9292 Command1 =
@@ -95,9 +95,9 @@ simple_sac_test(_) ->
9595 State2 ,
9696 {ok , Active2 }, Effects2 } = ? MOD :apply (Command1 , State1 ),
9797 ? assertNot (Active2 ),
98- ? assertEqual ([csr (ConnectionPid , 0 , active ),
99- csr (ConnectionPid , 1 , waiting )],
100- Consumers2 ),
98+ assertCsrsEqual ([csr (ConnectionPid , 0 , active ),
99+ csr (ConnectionPid , 1 , waiting )],
100+ Consumers2 ),
101101 assertEmpty (Effects2 ),
102102
103103 Command2 =
@@ -106,28 +106,28 @@ simple_sac_test(_) ->
106106 State3 ,
107107 {ok , Active3 }, Effects3 } = ? MOD :apply (Command2 , State2 ),
108108 ? assertNot (Active3 ),
109- ? assertEqual ([csr (ConnectionPid , 0 , active ),
110- csr (ConnectionPid , 1 , waiting ),
111- csr (ConnectionPid , 2 , waiting )],
112- Consumers3 ),
109+ assertCsrsEqual ([csr (ConnectionPid , 0 , active ),
110+ csr (ConnectionPid , 1 , waiting ),
111+ csr (ConnectionPid , 2 , waiting )],
112+ Consumers3 ),
113113 assertEmpty (Effects3 ),
114114
115115 Command3 =
116116 unregister_consumer_command (Stream , ConsumerName , ConnectionPid , 0 ),
117117 {#? STATE {groups = #{GroupId := # group {consumers = Consumers4 }}} =
118118 State4 ,
119119 ok , Effects4 } = ? MOD :apply (Command3 , State3 ),
120- ? assertEqual ([csr (ConnectionPid , 1 , active ),
121- csr (ConnectionPid , 2 , waiting )],
122- Consumers4 ),
120+ assertCsrsEqual ([csr (ConnectionPid , 1 , active ),
121+ csr (ConnectionPid , 2 , waiting )],
122+ Consumers4 ),
123123 assertSendMessageActivateEffect (ConnectionPid , 1 , Stream , ConsumerName , true , Effects4 ),
124124
125125 Command4 =
126126 unregister_consumer_command (Stream , ConsumerName , ConnectionPid , 1 ),
127127 {#? STATE {groups = #{GroupId := # group {consumers = Consumers5 }}} =
128128 State5 ,
129129 ok , Effects5 } = ? MOD :apply (Command4 , State4 ),
130- ? assertEqual ([csr (ConnectionPid , 2 , active )], Consumers5 ),
130+ assertCsrsEqual ([csr (ConnectionPid , 2 , active )], Consumers5 ),
131131 assertSendMessageActivateEffect (ConnectionPid , 2 , Stream , ConsumerName , true , Effects5 ),
132132
133133 Command5 =
@@ -150,7 +150,7 @@ super_stream_partition_sac_test(_) ->
150150 State1 ,
151151 {ok , Active1 }, Effects1 } = ? MOD :apply (Command0 , State0 ),
152152 ? assert (Active1 ),
153- ? assertEqual ([csr (ConnectionPid , 0 , active )], Consumers1 ),
153+ assertCsrsEqual ([csr (ConnectionPid , 0 , active )], Consumers1 ),
154154 assertSendMessageActivateEffect (ConnectionPid , 0 , Stream , ConsumerName , true , Effects1 ),
155155
156156 Command1 =
@@ -161,9 +161,9 @@ super_stream_partition_sac_test(_) ->
161161 % % never active on registration
162162 ? assertNot (Active2 ),
163163 % % all consumers inactive, until the former active one steps down and activates the new consumer
164- ? assertEqual ([csr (ConnectionPid , 0 , deactivating ),
165- csr (ConnectionPid , 1 , waiting )],
166- Consumers2 ),
164+ assertCsrsEqual ([csr (ConnectionPid , 0 , deactivating ),
165+ csr (ConnectionPid , 1 , waiting )],
166+ Consumers2 ),
167167 assertSendMessageSteppingDownEffect (ConnectionPid , 0 , Stream , ConsumerName , Effects2 ),
168168
169169 Command2 = activate_consumer_command (Stream , ConsumerName ),
@@ -172,9 +172,9 @@ super_stream_partition_sac_test(_) ->
172172 ok , Effects3 } = ? MOD :apply (Command2 , State2 ),
173173
174174 % % 1 (partition index) % 2 (consumer count) = 1 (active consumer index)
175- ? assertEqual ([csr (ConnectionPid , 0 , waiting ),
176- csr (ConnectionPid , 1 , active )],
177- Consumers3 ),
175+ assertCsrsEqual ([csr (ConnectionPid , 0 , waiting ),
176+ csr (ConnectionPid , 1 , active )],
177+ Consumers3 ),
178178 assertSendMessageActivateEffect (ConnectionPid , 1 , Stream , ConsumerName , true , Effects3 ),
179179
180180 Command3 =
@@ -186,10 +186,10 @@ super_stream_partition_sac_test(_) ->
186186 ? assertNot (Active4 ),
187187 % % 1 (partition index) % 3 (consumer count) = 1 (active consumer index)
188188 % % the active consumer stays the same
189- ? assertEqual ([csr (ConnectionPid , 0 , waiting ),
190- csr (ConnectionPid , 1 , active ),
191- csr (ConnectionPid , 2 , waiting )],
192- Consumers4 ),
189+ assertCsrsEqual ([csr (ConnectionPid , 0 , waiting ),
190+ csr (ConnectionPid , 1 , active ),
191+ csr (ConnectionPid , 2 , waiting )],
192+ Consumers4 ),
193193 assertEmpty (Effects4 ),
194194
195195 Command4 =
@@ -199,9 +199,9 @@ super_stream_partition_sac_test(_) ->
199199 ok , Effects5 } = ? MOD :apply (Command4 , State4 ),
200200 % % 1 (partition index) % 2 (consumer count) = 1 (active consumer index)
201201 % % the active consumer will move from sub 1 to sub 2
202- ? assertEqual ([csr (ConnectionPid , 1 , deactivating ),
203- csr (ConnectionPid , 2 , waiting )],
204- Consumers5 ),
202+ assertCsrsEqual ([csr (ConnectionPid , 1 , deactivating ),
203+ csr (ConnectionPid , 2 , waiting )],
204+ Consumers5 ),
205205
206206 assertSendMessageSteppingDownEffect (ConnectionPid , 1 , Stream , ConsumerName , Effects5 ),
207207
@@ -210,17 +210,17 @@ super_stream_partition_sac_test(_) ->
210210 State6 ,
211211 ok , Effects6 } = ? MOD :apply (Command5 , State5 ),
212212
213- ? assertEqual ([csr (ConnectionPid , 1 , waiting ),
214- csr (ConnectionPid , 2 , active )],
215- Consumers6 ),
213+ assertCsrsEqual ([csr (ConnectionPid , 1 , waiting ),
214+ csr (ConnectionPid , 2 , active )],
215+ Consumers6 ),
216216 assertSendMessageActivateEffect (ConnectionPid , 2 , Stream , ConsumerName , true , Effects6 ),
217217
218218 Command6 =
219219 unregister_consumer_command (Stream , ConsumerName , ConnectionPid , 1 ),
220220 {#? STATE {groups = #{GroupId := # group {consumers = Consumers7 }}} =
221221 State7 ,
222222 ok , Effects7 } = ? MOD :apply (Command6 , State6 ),
223- ? assertEqual ([csr (ConnectionPid , 2 , active )], Consumers7 ),
223+ assertCsrsEqual ([csr (ConnectionPid , 2 , active )], Consumers7 ),
224224 assertEmpty (Effects7 ),
225225
226226 Command7 =
@@ -1641,6 +1641,11 @@ assertHasGroup(GroupId,
16411641 Groups ) ->
16421642 #{GroupId := # group {partition_index = CurrentPI , consumers = CurrentCs }} = Groups ,
16431643 ? assertEqual (ExpectedPI , CurrentPI ),
1644+ assertCsrsEqual (ExpectedCs , CurrentCs ).
1645+
1646+ assertCsrsEqual ([Expected ], [Current ]) ->
1647+ assertCsrEqual (Expected , Current );
1648+ assertCsrsEqual (ExpectedCs , CurrentCs ) ->
16441649 assertSize (length (ExpectedCs ), CurrentCs ),
16451650 lists :foreach (fun (N ) ->
16461651 Expected = lists :nth (N , ExpectedCs ),
0 commit comments