Skip to content

Commit 20173e8

Browse files
acogoluegnesmergify[bot]
authored andcommitted
Use deliver v2 in stream test utils
Deliver v2 contains the last committed ID (first offset in the last committed chunk), which can be useful to calculate the offset lag in some tests. This requires to use the exchange_command_versions command when opening the stream connection. (cherry picked from commit 14e06cc)
1 parent e895d78 commit 20173e8

File tree

3 files changed

+34
-22
lines changed

3 files changed

+34
-22
lines changed

deps/rabbitmq_ct_helpers/src/stream_test_utils.erl

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,13 @@ do_connect(StreamPort, PeerProperties) ->
4848
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({response, 0, {tune, DefaultFrameMax, 0}})),
4949
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 3, {open, <<"/">>}})),
5050
{{response, 3, {open, _, _ConnectionProperties}}, C5} = receive_stream_commands(Sock, C4),
51-
{ok, Sock, C5}.
51+
52+
CmdVsns = [{deliver, 1, 2}],
53+
ExchCmdVsnsCmd = {exchange_command_versions, CmdVsns},
54+
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 4, ExchCmdVsnsCmd})),
55+
{{response, 4, {exchange_command_versions, 1, _Cmds}}, C6} = receive_stream_commands(Sock, C5),
56+
57+
{ok, Sock, C6}.
5258

5359
close(Sock, C0) ->
5460
CloseReason = <<"OK">>,

deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -805,11 +805,13 @@ stream_pub_sub_metrics(Config) ->
805805
ct:pal("Initial metrics: ~p", [Metrics]),
806806

807807
Stream1 = atom_to_list(?FUNCTION_NAME) ++ "1",
808-
MsgPerBatch1 = 2,
809-
{ok, S1, C1} = publish_via_stream_protocol(list_to_binary(Stream1), MsgPerBatch1, Config),
808+
MsgPerBatch1 = 2, %% we'll publish 2 batches
809+
{ok, S1, C1, CmttdChkId1} = publish_via_stream_protocol(list_to_binary(Stream1),
810+
MsgPerBatch1, Config),
810811
Stream2 = atom_to_list(?FUNCTION_NAME) ++ "2",
811-
MsgPerBatch2 = 3,
812-
{ok, S2, C2} = publish_via_stream_protocol(list_to_binary(Stream2), MsgPerBatch2, Config),
812+
MsgPerBatch2 = 3, %% we'll publish 2 batches
813+
{ok, S2, C2, CmttdChkId2} = publish_via_stream_protocol(list_to_binary(Stream2),
814+
MsgPerBatch2, Config),
813815

814816
%% aggregated metrics
815817

@@ -825,12 +827,14 @@ stream_pub_sub_metrics(Config) ->
825827

826828
%% per-object metrics
827829
{_, Body2} = http_get_with_pal(Config, "/metrics/detailed?family=stream_consumer_metrics",
828-
[], 200),
830+
[], 200),
829831
ParsedBody2 = parse_response(Body2),
830832
#{rabbitmq_detailed_stream_consumer_max_offset_lag := MaxOffsetLag} = ParsedBody2,
831833

832-
?assertEqual([{#{vhost => "/", queue => Stream1}, [2]},
833-
{#{vhost => "/", queue => Stream2}, [3]}],
834+
%% we published 2 batches and received a first chunk (consumer offset = 0)
835+
%% so the offset lag is the last committed chunk ID
836+
?assertEqual([{#{vhost => "/", queue => Stream1}, [CmttdChkId1]},
837+
{#{vhost => "/", queue => Stream2}, [CmttdChkId2]}],
834838
lists:sort(maps:to_list(MaxOffsetLag))),
835839
dispose_stream_connection(S1, C1, list_to_binary(Stream1)),
836840
dispose_stream_connection(S2, C2, list_to_binary(Stream2)),
@@ -926,8 +930,9 @@ publish_via_stream_protocol(Stream, MsgPerBatch, Config) ->
926930
{ok, C6} = stream_test_utils:subscribe(S, C5, Stream, SubscriptionId, _InitialCredit = 0),
927931
ok = stream_test_utils:credit(S, SubscriptionId, 1),
928932
%% delivery of first batch of messages
929-
{{deliver, SubscriptionId, _Bin1}, C7} = stream_test_utils:receive_stream_commands(S, C6),
930-
{ok, S, C7}.
933+
{{deliver_v2, SubscriptionId, CommittedChunkId, _Bin1}, C7} =
934+
stream_test_utils:receive_stream_commands(S, C6),
935+
{ok, S, C7, CommittedChunkId}.
931936

932937
dispose_stream_connection(Sock, C0, Stream) ->
933938
{ok, C1} = stream_test_utils:delete_stream(Sock, C0, Stream),

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -361,26 +361,27 @@ test_publish_v2(Config) ->
361361
C0 = rabbit_stream_core:init(0),
362362
C1 = test_peer_properties(Transport, S, C0),
363363
C2 = test_authenticate(Transport, S, C1),
364-
C3 = test_create_stream(Transport, S, Stream, C2),
364+
C3 = test_exchange_command_versions(Transport, S, C2),
365+
C4 = test_create_stream(Transport, S, Stream, C3),
365366
PublisherId = 42,
366-
C4 = test_declare_publisher(Transport, S, PublisherId, Stream, C3),
367+
C5 = test_declare_publisher(Transport, S, PublisherId, Stream, C4),
367368
Body = <<"hello">>,
368-
C5 = test_publish_confirm(Transport, S, publish_v2, PublisherId, Body,
369-
publish_confirm, C4),
370369
C6 = test_publish_confirm(Transport, S, publish_v2, PublisherId, Body,
371370
publish_confirm, C5),
371+
C7 = test_publish_confirm(Transport, S, publish_v2, PublisherId, Body,
372+
publish_confirm, C6),
372373
SubscriptionId = 42,
373-
C7 = test_subscribe(Transport, S, SubscriptionId, Stream,
374+
C8 = test_subscribe(Transport, S, SubscriptionId, Stream,
374375
#{<<"filter.0">> => <<"foo">>},
375376
?RESPONSE_CODE_OK,
376-
C6),
377-
C8 = test_deliver(Transport, S, SubscriptionId, 0, Body, C7),
378-
C8b = test_deliver(Transport, S, SubscriptionId, 1, Body, C8),
377+
C7),
378+
C9 = test_deliver_v2(Transport, S, SubscriptionId, 0, Body, C8),
379+
C9b = test_deliver_v2(Transport, S, SubscriptionId, 1, Body, C9),
379380

380-
C9 = test_unsubscribe(Transport, S, SubscriptionId, C8b),
381+
C10 = test_unsubscribe(Transport, S, SubscriptionId, C9b),
381382

382-
C10 = test_delete_stream(Transport, S, Stream, C9),
383-
_C11 = test_close(Transport, S, C10),
383+
C11 = test_delete_stream(Transport, S, Stream, C10),
384+
_C12 = test_close(Transport, S, C11),
384385
closed = wait_for_socket_close(Transport, S, 10),
385386
ok.
386387

@@ -901,7 +902,7 @@ offset_lag_calculation(Config) ->
901902

902903
C03 = case ReceiveDeliver of
903904
true ->
904-
{{deliver, SubId, _}, C02} = receive_commands(S, C01),
905+
{{deliver_v2, SubId, _, _}, C02} = receive_commands(S, C01),
905906
C02;
906907
_ ->
907908
C01

0 commit comments

Comments
 (0)