@@ -805,11 +805,13 @@ stream_pub_sub_metrics(Config) ->
805805 ct :pal (" Initial metrics: ~p " , [Metrics ]),
806806
807807 Stream1 = atom_to_list (? FUNCTION_NAME ) ++ " 1" ,
808- MsgPerBatch1 = 2 ,
809- {ok , S1 , C1 } = publish_via_stream_protocol (list_to_binary (Stream1 ), MsgPerBatch1 , Config ),
808+ MsgPerBatch1 = 2 , % % we'll publish 2 batches
809+ {ok , S1 , C1 , MsgCount1 } = publish_via_stream_protocol (list_to_binary (Stream1 ),
810+ MsgPerBatch1 , Config ),
810811 Stream2 = atom_to_list (? FUNCTION_NAME ) ++ " 2" ,
811- MsgPerBatch2 = 3 ,
812- {ok , S2 , C2 } = publish_via_stream_protocol (list_to_binary (Stream2 ), MsgPerBatch2 , Config ),
812+ MsgPerBatch2 = 3 , % % we'll publish 2 batches
813+ {ok , S2 , C2 , MsgCount2 } = publish_via_stream_protocol (list_to_binary (Stream2 ),
814+ MsgPerBatch2 , Config ),
813815
814816 % % aggregated metrics
815817
@@ -825,12 +827,15 @@ stream_pub_sub_metrics(Config) ->
825827
826828 % % per-object metrics
827829 {_ , Body2 } = http_get_with_pal (Config , " /metrics/detailed?family=stream_consumer_metrics" ,
828- [], 200 ),
830+ [], 200 ),
829831 ParsedBody2 = parse_response (Body2 ),
830832 #{rabbitmq_detailed_stream_consumer_max_offset_lag := MaxOffsetLag } = ParsedBody2 ,
831833
832- ? assertEqual ([{#{vhost => " /" , queue => Stream1 }, [2 ]},
833- {#{vhost => " /" , queue => Stream2 }, [3 ]}],
834+ % % we published 2 batches and received a first chunk
835+ ExpectedLag1 = MsgPerBatch1 * 2 - MsgCount1 ,
836+ ExpectedLag2 = MsgPerBatch2 * 2 - MsgCount2 ,
837+ ? assertEqual ([{#{vhost => " /" , queue => Stream1 }, [ExpectedLag1 ]},
838+ {#{vhost => " /" , queue => Stream2 }, [ExpectedLag2 ]}],
834839 lists :sort (maps :to_list (MaxOffsetLag ))),
835840 dispose_stream_connection (S1 , C1 , list_to_binary (Stream1 )),
836841 dispose_stream_connection (S2 , C2 , list_to_binary (Stream2 )),
@@ -926,8 +931,13 @@ publish_via_stream_protocol(Stream, MsgPerBatch, Config) ->
926931 {ok , C6 } = stream_test_utils :subscribe (S , C5 , Stream , SubscriptionId , _InitialCredit = 0 ),
927932 ok = stream_test_utils :credit (S , SubscriptionId , 1 ),
928933 % % delivery of first batch of messages
929- {{deliver , SubscriptionId , _Bin1 }, C7 } = stream_test_utils :receive_stream_commands (S , C6 ),
930- {ok , S , C7 }.
934+ {{deliver , SubscriptionId , Bin1 }, C7 } = stream_test_utils :receive_stream_commands (S , C6 ),
935+ <<5 :4 /unsigned ,
936+ 0 :4 /unsigned ,
937+ _ChType :8 /unsigned ,
938+ NumEntries :16 /unsigned ,
939+ _Rest /binary >> = Bin1 ,
940+ {ok , S , C7 , NumEntries }.
931941
932942dispose_stream_connection (Sock , C0 , Stream ) ->
933943 {ok , C1 } = stream_test_utils :delete_stream (Sock , C0 , Stream ),
0 commit comments