Skip to content

Commit 5a05544

Browse files
authored
Merge pull request #14749 from rabbitmq/mergify/bp/v4.1.x/pr-14740
Use deliver v2 in stream test utils (backport #14740)
2 parents 5886af9 + 41cc7ee commit 5a05544

File tree

3 files changed

+34
-22
lines changed

3 files changed

+34
-22
lines changed

deps/rabbitmq_ct_helpers/src/stream_test_utils.erl

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,13 @@ do_connect(StreamPort, PeerProperties) ->
4848
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({response, 0, {tune, DefaultFrameMax, 0}})),
4949
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 3, {open, <<"/">>}})),
5050
{{response, 3, {open, _, _ConnectionProperties}}, C5} = receive_stream_commands(Sock, C4),
51-
{ok, Sock, C5}.
51+
52+
CmdVsns = [{deliver, 1, 2}],
53+
ExchCmdVsnsCmd = {exchange_command_versions, CmdVsns},
54+
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 4, ExchCmdVsnsCmd})),
55+
{{response, 4, {exchange_command_versions, 1, _Cmds}}, C6} = receive_stream_commands(Sock, C5),
56+
57+
{ok, Sock, C6}.
5258

5359
close(Sock, C0) ->
5460
CloseReason = <<"OK">>,

deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -787,11 +787,13 @@ stream_pub_sub_metrics(Config) ->
787787
ct:pal("Initial metrics: ~p", [Metrics]),
788788

789789
Stream1 = atom_to_list(?FUNCTION_NAME) ++ "1",
790-
MsgPerBatch1 = 2,
791-
{ok, S1, C1} = publish_via_stream_protocol(list_to_binary(Stream1), MsgPerBatch1, Config),
790+
MsgPerBatch1 = 2, %% we'll publish 2 batches
791+
{ok, S1, C1, CmttdChkId1} = publish_via_stream_protocol(list_to_binary(Stream1),
792+
MsgPerBatch1, Config),
792793
Stream2 = atom_to_list(?FUNCTION_NAME) ++ "2",
793-
MsgPerBatch2 = 3,
794-
{ok, S2, C2} = publish_via_stream_protocol(list_to_binary(Stream2), MsgPerBatch2, Config),
794+
MsgPerBatch2 = 3, %% we'll publish 2 batches
795+
{ok, S2, C2, CmttdChkId2} = publish_via_stream_protocol(list_to_binary(Stream2),
796+
MsgPerBatch2, Config),
795797

796798
%% aggregated metrics
797799

@@ -807,12 +809,14 @@ stream_pub_sub_metrics(Config) ->
807809

808810
%% per-object metrics
809811
{_, Body2} = http_get_with_pal(Config, "/metrics/detailed?family=stream_consumer_metrics",
810-
[], 200),
812+
[], 200),
811813
ParsedBody2 = parse_response(Body2),
812814
#{rabbitmq_detailed_stream_consumer_max_offset_lag := MaxOffsetLag} = ParsedBody2,
813815

814-
?assertEqual([{#{vhost => "/", queue => Stream1}, [2]},
815-
{#{vhost => "/", queue => Stream2}, [3]}],
816+
%% we published 2 batches and received a first chunk (consumer offset = 0)
817+
%% so the offset lag is the last committed chunk ID
818+
?assertEqual([{#{vhost => "/", queue => Stream1}, [CmttdChkId1]},
819+
{#{vhost => "/", queue => Stream2}, [CmttdChkId2]}],
816820
lists:sort(maps:to_list(MaxOffsetLag))),
817821
dispose_stream_connection(S1, C1, list_to_binary(Stream1)),
818822
dispose_stream_connection(S2, C2, list_to_binary(Stream2)),
@@ -887,8 +891,9 @@ publish_via_stream_protocol(Stream, MsgPerBatch, Config) ->
887891
{ok, C6} = stream_test_utils:subscribe(S, C5, Stream, SubscriptionId, _InitialCredit = 0),
888892
ok = stream_test_utils:credit(S, SubscriptionId, 1),
889893
%% delivery of first batch of messages
890-
{{deliver, SubscriptionId, _Bin1}, C7} = stream_test_utils:receive_stream_commands(S, C6),
891-
{ok, S, C7}.
894+
{{deliver_v2, SubscriptionId, CommittedChunkId, _Bin1}, C7} =
895+
stream_test_utils:receive_stream_commands(S, C6),
896+
{ok, S, C7, CommittedChunkId}.
892897

893898
dispose_stream_connection(Sock, C0, Stream) ->
894899
{ok, C1} = stream_test_utils:delete_stream(Sock, C0, Stream),

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -360,26 +360,27 @@ test_publish_v2(Config) ->
360360
C0 = rabbit_stream_core:init(0),
361361
C1 = test_peer_properties(Transport, S, C0),
362362
C2 = test_authenticate(Transport, S, C1),
363-
C3 = test_create_stream(Transport, S, Stream, C2),
363+
C3 = test_exchange_command_versions(Transport, S, C2),
364+
C4 = test_create_stream(Transport, S, Stream, C3),
364365
PublisherId = 42,
365-
C4 = test_declare_publisher(Transport, S, PublisherId, Stream, C3),
366+
C5 = test_declare_publisher(Transport, S, PublisherId, Stream, C4),
366367
Body = <<"hello">>,
367-
C5 = test_publish_confirm(Transport, S, publish_v2, PublisherId, Body,
368-
publish_confirm, C4),
369368
C6 = test_publish_confirm(Transport, S, publish_v2, PublisherId, Body,
370369
publish_confirm, C5),
370+
C7 = test_publish_confirm(Transport, S, publish_v2, PublisherId, Body,
371+
publish_confirm, C6),
371372
SubscriptionId = 42,
372-
C7 = test_subscribe(Transport, S, SubscriptionId, Stream,
373+
C8 = test_subscribe(Transport, S, SubscriptionId, Stream,
373374
#{<<"filter.0">> => <<"foo">>},
374375
?RESPONSE_CODE_OK,
375-
C6),
376-
C8 = test_deliver(Transport, S, SubscriptionId, 0, Body, C7),
377-
C8b = test_deliver(Transport, S, SubscriptionId, 1, Body, C8),
376+
C7),
377+
C9 = test_deliver_v2(Transport, S, SubscriptionId, 0, Body, C8),
378+
C9b = test_deliver_v2(Transport, S, SubscriptionId, 1, Body, C9),
378379

379-
C9 = test_unsubscribe(Transport, S, SubscriptionId, C8b),
380+
C10 = test_unsubscribe(Transport, S, SubscriptionId, C9b),
380381

381-
C10 = test_delete_stream(Transport, S, Stream, C9),
382-
_C11 = test_close(Transport, S, C10),
382+
C11 = test_delete_stream(Transport, S, Stream, C10),
383+
_C12 = test_close(Transport, S, C11),
383384
closed = wait_for_socket_close(Transport, S, 10),
384385
ok.
385386

@@ -900,7 +901,7 @@ offset_lag_calculation(Config) ->
900901

901902
C03 = case ReceiveDeliver of
902903
true ->
903-
{{deliver, SubId, _}, C02} = receive_commands(S, C01),
904+
{{deliver_v2, SubId, _, _}, C02} = receive_commands(S, C01),
904905
C02;
905906
_ ->
906907
C01

0 commit comments

Comments
 (0)