Skip to content

Commit e9f1cae

Browse files
authored
Merge pull request #14740 from rabbitmq/fix-flake-prometheus
Use deliver v2 in stream test utils
2 parents 6515471 + 14e06cc commit e9f1cae

File tree

3 files changed

+34
-22
lines changed

3 files changed

+34
-22
lines changed

deps/rabbitmq_ct_helpers/src/stream_test_utils.erl

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,13 @@ do_connect(StreamPort, PeerProperties) ->
4848
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({response, 0, {tune, DefaultFrameMax, 0}})),
4949
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 3, {open, <<"/">>}})),
5050
{{response, 3, {open, _, _ConnectionProperties}}, C5} = receive_stream_commands(Sock, C4),
51-
{ok, Sock, C5}.
51+
52+
CmdVsns = [{deliver, 1, 2}],
53+
ExchCmdVsnsCmd = {exchange_command_versions, CmdVsns},
54+
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 4, ExchCmdVsnsCmd})),
55+
{{response, 4, {exchange_command_versions, 1, _Cmds}}, C6} = receive_stream_commands(Sock, C5),
56+
57+
{ok, Sock, C6}.
5258

5359
close(Sock, C0) ->
5460
CloseReason = <<"OK">>,

deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -805,11 +805,13 @@ stream_pub_sub_metrics(Config) ->
805805
ct:pal("Initial metrics: ~p", [Metrics]),
806806

807807
Stream1 = atom_to_list(?FUNCTION_NAME) ++ "1",
808-
MsgPerBatch1 = 2,
809-
{ok, S1, C1} = publish_via_stream_protocol(list_to_binary(Stream1), MsgPerBatch1, Config),
808+
MsgPerBatch1 = 2, %% we'll publish 2 batches
809+
{ok, S1, C1, CmttdChkId1} = publish_via_stream_protocol(list_to_binary(Stream1),
810+
MsgPerBatch1, Config),
810811
Stream2 = atom_to_list(?FUNCTION_NAME) ++ "2",
811-
MsgPerBatch2 = 3,
812-
{ok, S2, C2} = publish_via_stream_protocol(list_to_binary(Stream2), MsgPerBatch2, Config),
812+
MsgPerBatch2 = 3, %% we'll publish 2 batches
813+
{ok, S2, C2, CmttdChkId2} = publish_via_stream_protocol(list_to_binary(Stream2),
814+
MsgPerBatch2, Config),
813815

814816
%% aggregated metrics
815817

@@ -825,12 +827,14 @@ stream_pub_sub_metrics(Config) ->
825827

826828
%% per-object metrics
827829
{_, Body2} = http_get_with_pal(Config, "/metrics/detailed?family=stream_consumer_metrics",
828-
[], 200),
830+
[], 200),
829831
ParsedBody2 = parse_response(Body2),
830832
#{rabbitmq_detailed_stream_consumer_max_offset_lag := MaxOffsetLag} = ParsedBody2,
831833

832-
?assertEqual([{#{vhost => "/", queue => Stream1}, [2]},
833-
{#{vhost => "/", queue => Stream2}, [3]}],
834+
%% we published 2 batches and received a first chunk (consumer offset = 0)
835+
%% so the offset lag is the last committed chunk ID
836+
?assertEqual([{#{vhost => "/", queue => Stream1}, [CmttdChkId1]},
837+
{#{vhost => "/", queue => Stream2}, [CmttdChkId2]}],
834838
lists:sort(maps:to_list(MaxOffsetLag))),
835839
dispose_stream_connection(S1, C1, list_to_binary(Stream1)),
836840
dispose_stream_connection(S2, C2, list_to_binary(Stream2)),
@@ -926,8 +930,9 @@ publish_via_stream_protocol(Stream, MsgPerBatch, Config) ->
926930
{ok, C6} = stream_test_utils:subscribe(S, C5, Stream, SubscriptionId, _InitialCredit = 0),
927931
ok = stream_test_utils:credit(S, SubscriptionId, 1),
928932
%% delivery of first batch of messages
929-
{{deliver, SubscriptionId, _Bin1}, C7} = stream_test_utils:receive_stream_commands(S, C6),
930-
{ok, S, C7}.
933+
{{deliver_v2, SubscriptionId, CommittedChunkId, _Bin1}, C7} =
934+
stream_test_utils:receive_stream_commands(S, C6),
935+
{ok, S, C7, CommittedChunkId}.
931936

932937
dispose_stream_connection(Sock, C0, Stream) ->
933938
{ok, C1} = stream_test_utils:delete_stream(Sock, C0, Stream),

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -361,26 +361,27 @@ test_publish_v2(Config) ->
361361
C0 = rabbit_stream_core:init(0),
362362
C1 = test_peer_properties(Transport, S, C0),
363363
C2 = test_authenticate(Transport, S, C1),
364-
C3 = test_create_stream(Transport, S, Stream, C2),
364+
C3 = test_exchange_command_versions(Transport, S, C2),
365+
C4 = test_create_stream(Transport, S, Stream, C3),
365366
PublisherId = 42,
366-
C4 = test_declare_publisher(Transport, S, PublisherId, Stream, C3),
367+
C5 = test_declare_publisher(Transport, S, PublisherId, Stream, C4),
367368
Body = <<"hello">>,
368-
C5 = test_publish_confirm(Transport, S, publish_v2, PublisherId, Body,
369-
publish_confirm, C4),
370369
C6 = test_publish_confirm(Transport, S, publish_v2, PublisherId, Body,
371370
publish_confirm, C5),
371+
C7 = test_publish_confirm(Transport, S, publish_v2, PublisherId, Body,
372+
publish_confirm, C6),
372373
SubscriptionId = 42,
373-
C7 = test_subscribe(Transport, S, SubscriptionId, Stream,
374+
C8 = test_subscribe(Transport, S, SubscriptionId, Stream,
374375
#{<<"filter.0">> => <<"foo">>},
375376
?RESPONSE_CODE_OK,
376-
C6),
377-
C8 = test_deliver(Transport, S, SubscriptionId, 0, Body, C7),
378-
C8b = test_deliver(Transport, S, SubscriptionId, 1, Body, C8),
377+
C7),
378+
C9 = test_deliver_v2(Transport, S, SubscriptionId, 0, Body, C8),
379+
C9b = test_deliver_v2(Transport, S, SubscriptionId, 1, Body, C9),
379380

380-
C9 = test_unsubscribe(Transport, S, SubscriptionId, C8b),
381+
C10 = test_unsubscribe(Transport, S, SubscriptionId, C9b),
381382

382-
C10 = test_delete_stream(Transport, S, Stream, C9),
383-
_C11 = test_close(Transport, S, C10),
383+
C11 = test_delete_stream(Transport, S, Stream, C10),
384+
_C12 = test_close(Transport, S, C11),
384385
closed = wait_for_socket_close(Transport, S, 10),
385386
ok.
386387

@@ -901,7 +902,7 @@ offset_lag_calculation(Config) ->
901902

902903
C03 = case ReceiveDeliver of
903904
true ->
904-
{{deliver, SubId, _}, C02} = receive_commands(S, C01),
905+
{{deliver_v2, SubId, _, _}, C02} = receive_commands(S, C01),
905906
C02;
906907
_ ->
907908
C01

0 commit comments

Comments
 (0)