diff --git a/deps/rabbitmq_ct_helpers/src/stream_test_utils.erl b/deps/rabbitmq_ct_helpers/src/stream_test_utils.erl index 7cb78dff7c24..75e3ac6ebd6f 100644 --- a/deps/rabbitmq_ct_helpers/src/stream_test_utils.erl +++ b/deps/rabbitmq_ct_helpers/src/stream_test_utils.erl @@ -48,7 +48,13 @@ do_connect(StreamPort, PeerProperties) -> ok = gen_tcp:send(Sock, rabbit_stream_core:frame({response, 0, {tune, DefaultFrameMax, 0}})), ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 3, {open, <<"/">>}})), {{response, 3, {open, _, _ConnectionProperties}}, C5} = receive_stream_commands(Sock, C4), - {ok, Sock, C5}. + + CmdVsns = [{deliver, 1, 2}], + ExchCmdVsnsCmd = {exchange_command_versions, CmdVsns}, + ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 4, ExchCmdVsnsCmd})), + {{response, 4, {exchange_command_versions, 1, _Cmds}}, C6} = receive_stream_commands(Sock, C5), + + {ok, Sock, C6}. close(Sock, C0) -> CloseReason = <<"OK">>, diff --git a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl index 359b206d12a7..5417ba32163e 100644 --- a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl +++ b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl @@ -805,11 +805,13 @@ stream_pub_sub_metrics(Config) -> ct:pal("Initial metrics: ~p", [Metrics]), Stream1 = atom_to_list(?FUNCTION_NAME) ++ "1", - MsgPerBatch1 = 2, - {ok, S1, C1} = publish_via_stream_protocol(list_to_binary(Stream1), MsgPerBatch1, Config), + MsgPerBatch1 = 2, %% we'll publish 2 batches + {ok, S1, C1, CmttdChkId1} = publish_via_stream_protocol(list_to_binary(Stream1), + MsgPerBatch1, Config), Stream2 = atom_to_list(?FUNCTION_NAME) ++ "2", - MsgPerBatch2 = 3, - {ok, S2, C2} = publish_via_stream_protocol(list_to_binary(Stream2), MsgPerBatch2, Config), + MsgPerBatch2 = 3, %% we'll publish 2 batches + {ok, S2, C2, CmttdChkId2} = publish_via_stream_protocol(list_to_binary(Stream2), + MsgPerBatch2, Config), %% aggregated metrics @@ -825,12 +827,14 @@ stream_pub_sub_metrics(Config) -> %% per-object metrics {_, Body2} = http_get_with_pal(Config, "/metrics/detailed?family=stream_consumer_metrics", - [], 200), + [], 200), ParsedBody2 = parse_response(Body2), #{rabbitmq_detailed_stream_consumer_max_offset_lag := MaxOffsetLag} = ParsedBody2, - ?assertEqual([{#{vhost => "/", queue => Stream1}, [2]}, - {#{vhost => "/", queue => Stream2}, [3]}], + %% we published 2 batches and received a first chunk (consumer offset = 0) + %% so the offset lag is the last committed chunk ID + ?assertEqual([{#{vhost => "/", queue => Stream1}, [CmttdChkId1]}, + {#{vhost => "/", queue => Stream2}, [CmttdChkId2]}], lists:sort(maps:to_list(MaxOffsetLag))), dispose_stream_connection(S1, C1, list_to_binary(Stream1)), dispose_stream_connection(S2, C2, list_to_binary(Stream2)), @@ -926,8 +930,9 @@ publish_via_stream_protocol(Stream, MsgPerBatch, Config) -> {ok, C6} = stream_test_utils:subscribe(S, C5, Stream, SubscriptionId, _InitialCredit = 0), ok = stream_test_utils:credit(S, SubscriptionId, 1), %% delivery of first batch of messages - {{deliver, SubscriptionId, _Bin1}, C7} = stream_test_utils:receive_stream_commands(S, C6), - {ok, S, C7}. + {{deliver_v2, SubscriptionId, CommittedChunkId, _Bin1}, C7} = + stream_test_utils:receive_stream_commands(S, C6), + {ok, S, C7, CommittedChunkId}. dispose_stream_connection(Sock, C0, Stream) -> {ok, C1} = stream_test_utils:delete_stream(Sock, C0, Stream), diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index 14d712e42d56..0a716004e1f9 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -361,26 +361,27 @@ test_publish_v2(Config) -> C0 = rabbit_stream_core:init(0), C1 = test_peer_properties(Transport, S, C0), C2 = test_authenticate(Transport, S, C1), - C3 = test_create_stream(Transport, S, Stream, C2), + C3 = test_exchange_command_versions(Transport, S, C2), + C4 = test_create_stream(Transport, S, Stream, C3), PublisherId = 42, - C4 = test_declare_publisher(Transport, S, PublisherId, Stream, C3), + C5 = test_declare_publisher(Transport, S, PublisherId, Stream, C4), Body = <<"hello">>, - C5 = test_publish_confirm(Transport, S, publish_v2, PublisherId, Body, - publish_confirm, C4), C6 = test_publish_confirm(Transport, S, publish_v2, PublisherId, Body, publish_confirm, C5), + C7 = test_publish_confirm(Transport, S, publish_v2, PublisherId, Body, + publish_confirm, C6), SubscriptionId = 42, - C7 = test_subscribe(Transport, S, SubscriptionId, Stream, + C8 = test_subscribe(Transport, S, SubscriptionId, Stream, #{<<"filter.0">> => <<"foo">>}, ?RESPONSE_CODE_OK, - C6), - C8 = test_deliver(Transport, S, SubscriptionId, 0, Body, C7), - C8b = test_deliver(Transport, S, SubscriptionId, 1, Body, C8), + C7), + C9 = test_deliver_v2(Transport, S, SubscriptionId, 0, Body, C8), + C9b = test_deliver_v2(Transport, S, SubscriptionId, 1, Body, C9), - C9 = test_unsubscribe(Transport, S, SubscriptionId, C8b), + C10 = test_unsubscribe(Transport, S, SubscriptionId, C9b), - C10 = test_delete_stream(Transport, S, Stream, C9), - _C11 = test_close(Transport, S, C10), + C11 = test_delete_stream(Transport, S, Stream, C10), + _C12 = test_close(Transport, S, C11), closed = wait_for_socket_close(Transport, S, 10), ok. @@ -901,7 +902,7 @@ offset_lag_calculation(Config) -> C03 = case ReceiveDeliver of true -> - {{deliver, SubId, _}, C02} = receive_commands(S, C01), + {{deliver_v2, SubId, _, _}, C02} = receive_commands(S, C01), C02; _ -> C01