Skip to content

Commit 41cc7ee

Browse files
acogoluegnesmergify[bot]
authored andcommitted
Use deliver v2 in stream test utils
Deliver v2 contains the last committed ID (first offset in the last committed chunk), which can be useful to calculate the offset lag in some tests. This requires to use the exchange_command_versions command when opening the stream connection. (cherry picked from commit 14e06cc)
1 parent 5886af9 commit 41cc7ee

File tree

3 files changed

+34
-22
lines changed

3 files changed

+34
-22
lines changed

deps/rabbitmq_ct_helpers/src/stream_test_utils.erl

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,13 @@ do_connect(StreamPort, PeerProperties) ->
4848
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({response, 0, {tune, DefaultFrameMax, 0}})),
4949
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 3, {open, <<"/">>}})),
5050
{{response, 3, {open, _, _ConnectionProperties}}, C5} = receive_stream_commands(Sock, C4),
51-
{ok, Sock, C5}.
51+
52+
CmdVsns = [{deliver, 1, 2}],
53+
ExchCmdVsnsCmd = {exchange_command_versions, CmdVsns},
54+
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 4, ExchCmdVsnsCmd})),
55+
{{response, 4, {exchange_command_versions, 1, _Cmds}}, C6} = receive_stream_commands(Sock, C5),
56+
57+
{ok, Sock, C6}.
5258

5359
close(Sock, C0) ->
5460
CloseReason = <<"OK">>,

deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -787,11 +787,13 @@ stream_pub_sub_metrics(Config) ->
787787
ct:pal("Initial metrics: ~p", [Metrics]),
788788

789789
Stream1 = atom_to_list(?FUNCTION_NAME) ++ "1",
790-
MsgPerBatch1 = 2,
791-
{ok, S1, C1} = publish_via_stream_protocol(list_to_binary(Stream1), MsgPerBatch1, Config),
790+
MsgPerBatch1 = 2, %% we'll publish 2 batches
791+
{ok, S1, C1, CmttdChkId1} = publish_via_stream_protocol(list_to_binary(Stream1),
792+
MsgPerBatch1, Config),
792793
Stream2 = atom_to_list(?FUNCTION_NAME) ++ "2",
793-
MsgPerBatch2 = 3,
794-
{ok, S2, C2} = publish_via_stream_protocol(list_to_binary(Stream2), MsgPerBatch2, Config),
794+
MsgPerBatch2 = 3, %% we'll publish 2 batches
795+
{ok, S2, C2, CmttdChkId2} = publish_via_stream_protocol(list_to_binary(Stream2),
796+
MsgPerBatch2, Config),
795797

796798
%% aggregated metrics
797799

@@ -807,12 +809,14 @@ stream_pub_sub_metrics(Config) ->
807809

808810
%% per-object metrics
809811
{_, Body2} = http_get_with_pal(Config, "/metrics/detailed?family=stream_consumer_metrics",
810-
[], 200),
812+
[], 200),
811813
ParsedBody2 = parse_response(Body2),
812814
#{rabbitmq_detailed_stream_consumer_max_offset_lag := MaxOffsetLag} = ParsedBody2,
813815

814-
?assertEqual([{#{vhost => "/", queue => Stream1}, [2]},
815-
{#{vhost => "/", queue => Stream2}, [3]}],
816+
%% we published 2 batches and received a first chunk (consumer offset = 0)
817+
%% so the offset lag is the last committed chunk ID
818+
?assertEqual([{#{vhost => "/", queue => Stream1}, [CmttdChkId1]},
819+
{#{vhost => "/", queue => Stream2}, [CmttdChkId2]}],
816820
lists:sort(maps:to_list(MaxOffsetLag))),
817821
dispose_stream_connection(S1, C1, list_to_binary(Stream1)),
818822
dispose_stream_connection(S2, C2, list_to_binary(Stream2)),
@@ -887,8 +891,9 @@ publish_via_stream_protocol(Stream, MsgPerBatch, Config) ->
887891
{ok, C6} = stream_test_utils:subscribe(S, C5, Stream, SubscriptionId, _InitialCredit = 0),
888892
ok = stream_test_utils:credit(S, SubscriptionId, 1),
889893
%% delivery of first batch of messages
890-
{{deliver, SubscriptionId, _Bin1}, C7} = stream_test_utils:receive_stream_commands(S, C6),
891-
{ok, S, C7}.
894+
{{deliver_v2, SubscriptionId, CommittedChunkId, _Bin1}, C7} =
895+
stream_test_utils:receive_stream_commands(S, C6),
896+
{ok, S, C7, CommittedChunkId}.
892897

893898
dispose_stream_connection(Sock, C0, Stream) ->
894899
{ok, C1} = stream_test_utils:delete_stream(Sock, C0, Stream),

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -360,26 +360,27 @@ test_publish_v2(Config) ->
360360
C0 = rabbit_stream_core:init(0),
361361
C1 = test_peer_properties(Transport, S, C0),
362362
C2 = test_authenticate(Transport, S, C1),
363-
C3 = test_create_stream(Transport, S, Stream, C2),
363+
C3 = test_exchange_command_versions(Transport, S, C2),
364+
C4 = test_create_stream(Transport, S, Stream, C3),
364365
PublisherId = 42,
365-
C4 = test_declare_publisher(Transport, S, PublisherId, Stream, C3),
366+
C5 = test_declare_publisher(Transport, S, PublisherId, Stream, C4),
366367
Body = <<"hello">>,
367-
C5 = test_publish_confirm(Transport, S, publish_v2, PublisherId, Body,
368-
publish_confirm, C4),
369368
C6 = test_publish_confirm(Transport, S, publish_v2, PublisherId, Body,
370369
publish_confirm, C5),
370+
C7 = test_publish_confirm(Transport, S, publish_v2, PublisherId, Body,
371+
publish_confirm, C6),
371372
SubscriptionId = 42,
372-
C7 = test_subscribe(Transport, S, SubscriptionId, Stream,
373+
C8 = test_subscribe(Transport, S, SubscriptionId, Stream,
373374
#{<<"filter.0">> => <<"foo">>},
374375
?RESPONSE_CODE_OK,
375-
C6),
376-
C8 = test_deliver(Transport, S, SubscriptionId, 0, Body, C7),
377-
C8b = test_deliver(Transport, S, SubscriptionId, 1, Body, C8),
376+
C7),
377+
C9 = test_deliver_v2(Transport, S, SubscriptionId, 0, Body, C8),
378+
C9b = test_deliver_v2(Transport, S, SubscriptionId, 1, Body, C9),
378379

379-
C9 = test_unsubscribe(Transport, S, SubscriptionId, C8b),
380+
C10 = test_unsubscribe(Transport, S, SubscriptionId, C9b),
380381

381-
C10 = test_delete_stream(Transport, S, Stream, C9),
382-
_C11 = test_close(Transport, S, C10),
382+
C11 = test_delete_stream(Transport, S, Stream, C10),
383+
_C12 = test_close(Transport, S, C11),
383384
closed = wait_for_socket_close(Transport, S, 10),
384385
ok.
385386

@@ -900,7 +901,7 @@ offset_lag_calculation(Config) ->
900901

901902
C03 = case ReceiveDeliver of
902903
true ->
903-
{{deliver, SubId, _}, C02} = receive_commands(S, C01),
904+
{{deliver_v2, SubId, _, _}, C02} = receive_commands(S, C01),
904905
C02;
905906
_ ->
906907
C01

0 commit comments

Comments
 (0)