From 41cc7ee432009d42665490b948f1de2bc570c11f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Thu, 16 Oct 2025 17:19:59 +0200 Subject: [PATCH] Use deliver v2 in stream test utils Deliver v2 contains the last committed ID (first offset in the last committed chunk), which can be useful to calculate the offset lag in some tests. This requires to use the exchange_command_versions command when opening the stream connection. (cherry picked from commit 14e06cc6192dafc16370271690163188ca2c54f1) --- .../src/stream_test_utils.erl | 8 +++++- .../test/rabbit_prometheus_http_SUITE.erl | 23 ++++++++++------- .../test/rabbit_stream_SUITE.erl | 25 ++++++++++--------- 3 files changed, 34 insertions(+), 22 deletions(-) diff --git a/deps/rabbitmq_ct_helpers/src/stream_test_utils.erl b/deps/rabbitmq_ct_helpers/src/stream_test_utils.erl index faab0c7ed482..aed1f083d90c 100644 --- a/deps/rabbitmq_ct_helpers/src/stream_test_utils.erl +++ b/deps/rabbitmq_ct_helpers/src/stream_test_utils.erl @@ -48,7 +48,13 @@ do_connect(StreamPort, PeerProperties) -> ok = gen_tcp:send(Sock, rabbit_stream_core:frame({response, 0, {tune, DefaultFrameMax, 0}})), ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 3, {open, <<"/">>}})), {{response, 3, {open, _, _ConnectionProperties}}, C5} = receive_stream_commands(Sock, C4), - {ok, Sock, C5}. + + CmdVsns = [{deliver, 1, 2}], + ExchCmdVsnsCmd = {exchange_command_versions, CmdVsns}, + ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 4, ExchCmdVsnsCmd})), + {{response, 4, {exchange_command_versions, 1, _Cmds}}, C6} = receive_stream_commands(Sock, C5), + + {ok, Sock, C6}. close(Sock, C0) -> CloseReason = <<"OK">>, diff --git a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl index 535edd940e6a..e1a8188cafe3 100644 --- a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl +++ b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl @@ -787,11 +787,13 @@ stream_pub_sub_metrics(Config) -> ct:pal("Initial metrics: ~p", [Metrics]), Stream1 = atom_to_list(?FUNCTION_NAME) ++ "1", - MsgPerBatch1 = 2, - {ok, S1, C1} = publish_via_stream_protocol(list_to_binary(Stream1), MsgPerBatch1, Config), + MsgPerBatch1 = 2, %% we'll publish 2 batches + {ok, S1, C1, CmttdChkId1} = publish_via_stream_protocol(list_to_binary(Stream1), + MsgPerBatch1, Config), Stream2 = atom_to_list(?FUNCTION_NAME) ++ "2", - MsgPerBatch2 = 3, - {ok, S2, C2} = publish_via_stream_protocol(list_to_binary(Stream2), MsgPerBatch2, Config), + MsgPerBatch2 = 3, %% we'll publish 2 batches + {ok, S2, C2, CmttdChkId2} = publish_via_stream_protocol(list_to_binary(Stream2), + MsgPerBatch2, Config), %% aggregated metrics @@ -807,12 +809,14 @@ stream_pub_sub_metrics(Config) -> %% per-object metrics {_, Body2} = http_get_with_pal(Config, "/metrics/detailed?family=stream_consumer_metrics", - [], 200), + [], 200), ParsedBody2 = parse_response(Body2), #{rabbitmq_detailed_stream_consumer_max_offset_lag := MaxOffsetLag} = ParsedBody2, - ?assertEqual([{#{vhost => "/", queue => Stream1}, [2]}, - {#{vhost => "/", queue => Stream2}, [3]}], + %% we published 2 batches and received a first chunk (consumer offset = 0) + %% so the offset lag is the last committed chunk ID + ?assertEqual([{#{vhost => "/", queue => Stream1}, [CmttdChkId1]}, + {#{vhost => "/", queue => Stream2}, [CmttdChkId2]}], lists:sort(maps:to_list(MaxOffsetLag))), dispose_stream_connection(S1, C1, list_to_binary(Stream1)), dispose_stream_connection(S2, C2, list_to_binary(Stream2)), @@ -887,8 +891,9 @@ publish_via_stream_protocol(Stream, MsgPerBatch, Config) -> {ok, C6} = stream_test_utils:subscribe(S, C5, Stream, SubscriptionId, _InitialCredit = 0), ok = stream_test_utils:credit(S, SubscriptionId, 1), %% delivery of first batch of messages - {{deliver, SubscriptionId, _Bin1}, C7} = stream_test_utils:receive_stream_commands(S, C6), - {ok, S, C7}. + {{deliver_v2, SubscriptionId, CommittedChunkId, _Bin1}, C7} = + stream_test_utils:receive_stream_commands(S, C6), + {ok, S, C7, CommittedChunkId}. dispose_stream_connection(Sock, C0, Stream) -> {ok, C1} = stream_test_utils:delete_stream(Sock, C0, Stream), diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index cb47db8d716d..6cf4e3906b73 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -360,26 +360,27 @@ test_publish_v2(Config) -> C0 = rabbit_stream_core:init(0), C1 = test_peer_properties(Transport, S, C0), C2 = test_authenticate(Transport, S, C1), - C3 = test_create_stream(Transport, S, Stream, C2), + C3 = test_exchange_command_versions(Transport, S, C2), + C4 = test_create_stream(Transport, S, Stream, C3), PublisherId = 42, - C4 = test_declare_publisher(Transport, S, PublisherId, Stream, C3), + C5 = test_declare_publisher(Transport, S, PublisherId, Stream, C4), Body = <<"hello">>, - C5 = test_publish_confirm(Transport, S, publish_v2, PublisherId, Body, - publish_confirm, C4), C6 = test_publish_confirm(Transport, S, publish_v2, PublisherId, Body, publish_confirm, C5), + C7 = test_publish_confirm(Transport, S, publish_v2, PublisherId, Body, + publish_confirm, C6), SubscriptionId = 42, - C7 = test_subscribe(Transport, S, SubscriptionId, Stream, + C8 = test_subscribe(Transport, S, SubscriptionId, Stream, #{<<"filter.0">> => <<"foo">>}, ?RESPONSE_CODE_OK, - C6), - C8 = test_deliver(Transport, S, SubscriptionId, 0, Body, C7), - C8b = test_deliver(Transport, S, SubscriptionId, 1, Body, C8), + C7), + C9 = test_deliver_v2(Transport, S, SubscriptionId, 0, Body, C8), + C9b = test_deliver_v2(Transport, S, SubscriptionId, 1, Body, C9), - C9 = test_unsubscribe(Transport, S, SubscriptionId, C8b), + C10 = test_unsubscribe(Transport, S, SubscriptionId, C9b), - C10 = test_delete_stream(Transport, S, Stream, C9), - _C11 = test_close(Transport, S, C10), + C11 = test_delete_stream(Transport, S, Stream, C10), + _C12 = test_close(Transport, S, C11), closed = wait_for_socket_close(Transport, S, 10), ok. @@ -900,7 +901,7 @@ offset_lag_calculation(Config) -> C03 = case ReceiveDeliver of true -> - {{deliver, SubId, _}, C02} = receive_commands(S, C01), + {{deliver_v2, SubId, _, _}, C02} = receive_commands(S, C01), C02; _ -> C01