Skip to content

Commit 00d2244

Browse files
acogoluegnesmergify[bot]
authored andcommitted
Fix wait-for-confirms sequence in stream test utils
And refine the implementation and its usage. (cherry picked from commit b3b0940)
1 parent 70b04ae commit 00d2244

File tree

4 files changed

+68
-10
lines changed

4 files changed

+68
-10
lines changed

deps/rabbitmq_ct_helpers/src/stream_test_utils.erl

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,24 +40,49 @@ connect(Config, Node) ->
4040
{{response, 3, {open, _, _ConnectionProperties}}, C5} = receive_stream_commands(Sock, C4),
4141
{ok, Sock, C5}.
4242

43+
close(Sock, C0) ->
44+
CloseReason = <<"OK">>,
45+
CloseFrame = rabbit_stream_core:frame({request, 1, {close, ?RESPONSE_CODE_OK, CloseReason}}),
46+
ok = gen_tcp:send(Sock, CloseFrame),
47+
{{response, 1, {close, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
48+
{ok, C1}.
49+
4350
create_stream(Sock, C0, Stream) ->
4451
CreateStreamFrame = rabbit_stream_core:frame({request, 1, {create_stream, Stream, #{}}}),
4552
ok = gen_tcp:send(Sock, CreateStreamFrame),
4653
{{response, 1, {create_stream, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
4754
{ok, C1}.
4855

56+
delete_stream(Sock, C0, Stream) ->
57+
DeleteStreamFrame = rabbit_stream_core:frame({request, 1, {delete_stream, Stream}}),
58+
ok = gen_tcp:send(Sock, DeleteStreamFrame),
59+
{{response, 1, {delete_stream, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
60+
{ok, C1}.
61+
4962
declare_publisher(Sock, C0, Stream, PublisherId) ->
5063
DeclarePublisherFrame = rabbit_stream_core:frame({request, 1, {declare_publisher, PublisherId, <<>>, Stream}}),
5164
ok = gen_tcp:send(Sock, DeclarePublisherFrame),
5265
{{response, 1, {declare_publisher, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
5366
{ok, C1}.
5467

68+
delete_publisher(Sock, C0, PublisherId) ->
69+
DeletePublisherFrame = rabbit_stream_core:frame({request, 1, {delete_publisher, PublisherId}}),
70+
ok = gen_tcp:send(Sock, DeletePublisherFrame),
71+
{{response, 1, {delete_publisher, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
72+
{ok, C1}.
73+
5574
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit) ->
5675
SubscribeFrame = rabbit_stream_core:frame({request, 1, {subscribe, SubscriptionId, Stream, _OffsetSpec = first, InitialCredit, _Props = #{}}}),
5776
ok = gen_tcp:send(Sock, SubscribeFrame),
5877
{{response, 1, {subscribe, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
5978
{ok, C1}.
6079

80+
unsubscribe(Sock, C0, SubscriptionId) ->
81+
UnsubscribeFrame = rabbit_stream_core:frame({request, 1, {unsubscribe, SubscriptionId}}),
82+
ok = gen_tcp:send(Sock, UnsubscribeFrame),
83+
{{response, 1, {unsubscribe, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
84+
{ok, C1}.
85+
6186
publish(Sock, C0, PublisherId, Sequence0, Payloads) ->
6287
SeqIds = lists:seq(Sequence0, Sequence0 + length(Payloads) - 1),
6388
Messages = [simple_entry(Seq, P)
@@ -68,8 +93,17 @@ publish(Sock, C0, PublisherId, Sequence0, Payloads) ->
6893
publish_entries(Sock, C0, PublisherId, MsgCount, Messages) ->
6994
PublishFrame1 = rabbit_stream_core:frame({publish, PublisherId, MsgCount, Messages}),
7095
ok = gen_tcp:send(Sock, PublishFrame1),
71-
{{publish_confirm, PublisherId, SeqIds}, C1} = receive_stream_commands(Sock, C0),
72-
{ok, SeqIds, C1}.
96+
wait_for_confirms(Sock, C0, PublisherId, [], MsgCount).
97+
98+
wait_for_confirms(_, C, _, Acc, 0) ->
99+
{ok, Acc, C};
100+
wait_for_confirms(S, C0, PublisherId, Acc, Remaining) ->
101+
case receive_stream_commands(S, C0) of
102+
{{publish_confirm, PublisherId, SeqIds}, C1} ->
103+
wait_for_confirms(S, C1, PublisherId, Acc ++ SeqIds, Remaining - length(SeqIds));
104+
{Frame, C1} ->
105+
{unexpected_frame, Frame, C1}
106+
end.
73107

74108
%% Streams contain AMQP 1.0 encoded messages.
75109
%% In this case, the AMQP 1.0 encoded message contains a single data section.

deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -713,10 +713,10 @@ exchange_names_metric(Config) ->
713713
stream_pub_sub_metrics(Config) ->
714714
Stream1 = atom_to_list(?FUNCTION_NAME) ++ "1",
715715
MsgPerBatch1 = 2,
716-
publish_via_stream_protocol(list_to_binary(Stream1), MsgPerBatch1, Config),
716+
{ok, S1, C1} = publish_via_stream_protocol(list_to_binary(Stream1), MsgPerBatch1, Config),
717717
Stream2 = atom_to_list(?FUNCTION_NAME) ++ "2",
718718
MsgPerBatch2 = 3,
719-
publish_via_stream_protocol(list_to_binary(Stream2), MsgPerBatch2, Config),
719+
{ok, S2, C2} = publish_via_stream_protocol(list_to_binary(Stream2), MsgPerBatch2, Config),
720720

721721
%% aggregated metrics
722722

@@ -739,6 +739,8 @@ stream_pub_sub_metrics(Config) ->
739739
?assertEqual([{#{vhost => "/", queue => Stream1}, [2]},
740740
{#{vhost => "/", queue => Stream2}, [3]}],
741741
lists:sort(maps:to_list(MaxOffsetLag))),
742+
dispose_stream_connection(S1, C1, list_to_binary(Stream1)),
743+
dispose_stream_connection(S2, C2, list_to_binary(Stream2)),
742744
ok.
743745

744746
core_metrics_special_chars(Config) ->
@@ -808,8 +810,13 @@ publish_via_stream_protocol(Stream, MsgPerBatch, Config) ->
808810
SubscriptionId = 97,
809811
{ok, C6} = stream_test_utils:subscribe(S, C5, Stream, SubscriptionId, _InitialCredit = 1),
810812
%% delivery of first batch of messages
811-
{{deliver, SubscriptionId, _Bin1}, _C7} = stream_test_utils:receive_stream_commands(S, C6),
812-
ok.
813+
{{deliver, SubscriptionId, _Bin1}, C7} = stream_test_utils:receive_stream_commands(S, C6),
814+
{ok, S, C7}.
815+
816+
dispose_stream_connection(Sock, C0, Stream) ->
817+
{ok, C1} = stream_test_utils:delete_stream(Sock, C0, Stream),
818+
{_MetadataUpdateFrame, C2} = stream_test_utils:receive_stream_commands(Sock, C1),
819+
{ok, _} = stream_test_utils:close(Sock, C2).
813820

814821
http_get(Config, ReqHeaders, CodeExp) ->
815822
Path = proplists:get_value(prometheus_path, Config, "/metrics"),

deps/rabbitmq_stream/test/protocol_interop_SUITE.erl

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -290,15 +290,16 @@ publish_via_stream_protocol(Stream, Config) ->
290290
{ok, _, C3} = stream_test_utils:publish_entries(S, C2, PublisherId, length(Messages1), Messages1),
291291

292292
UncompressedSubbatch = stream_test_utils:sub_batch_entry_uncompressed(4, [<<"m4">>, <<"m5">>, <<"m6">>]),
293-
{ok, _, C4} = stream_test_utils:publish_entries(S, C3, PublisherId, 3, UncompressedSubbatch),
293+
{ok, _, C4} = stream_test_utils:publish_entries(S, C3, PublisherId, 1, UncompressedSubbatch),
294294

295295
CompressedSubbatch = stream_test_utils:sub_batch_entry_compressed(5, [<<"m7">>, <<"m8">>, <<"m9">>]),
296-
{ok, _, C5} = stream_test_utils:publish_entries(S, C4, PublisherId, 3, CompressedSubbatch),
296+
{ok, _, C5} = stream_test_utils:publish_entries(S, C4, PublisherId, 1, CompressedSubbatch),
297297

298298
M10 = stream_test_utils:simple_entry(6, <<"m10">>),
299299
M11 = stream_test_utils:simple_entry(7, <<"m11">>),
300300
Messages2 = [M10, M11],
301-
{ok, _, _C6} = stream_test_utils:publish_entries(S, C5, PublisherId, length(Messages2), Messages2).
301+
{ok, _, C6} = stream_test_utils:publish_entries(S, C5, PublisherId, length(Messages2), Messages2),
302+
{ok, _} = stream_test_utils:close(S, C6).
302303

303304
connection_config(Config) ->
304305
Host = ?config(rmq_hostname, Config),

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ groups() ->
6767
sasl_anonymous,
6868
test_publisher_with_too_long_reference_errors,
6969
test_consumer_with_too_long_reference_errors,
70-
subscribe_unsubscribe_should_create_events
70+
subscribe_unsubscribe_should_create_events,
71+
test_stream_test_utils
7172
]},
7273
%% Run `test_global_counters` on its own so the global metrics are
7374
%% initialised to 0 for each testcase
@@ -1053,6 +1054,21 @@ subscribe_unsubscribe_should_create_events(Config) ->
10531054
closed = wait_for_socket_close(Transport, S, 10),
10541055
ok.
10551056

1057+
test_stream_test_utils(Config) ->
1058+
Stream = atom_to_binary(?FUNCTION_NAME, utf8),
1059+
{ok, S, C0} = stream_test_utils:connect(Config, 0),
1060+
{ok, C1} = stream_test_utils:create_stream(S, C0, Stream),
1061+
PublisherId = 42,
1062+
{ok, C2} = stream_test_utils:declare_publisher(S, C1, Stream, PublisherId),
1063+
MsgPerBatch = 100,
1064+
Payloads = lists:duplicate(MsgPerBatch, <<"m1">>),
1065+
SequenceFrom1 = 1,
1066+
{ok, C3} = stream_test_utils:publish(S, C2, PublisherId, SequenceFrom1, Payloads),
1067+
{ok, C4} = stream_test_utils:delete_publisher(S, C3, PublisherId),
1068+
{ok, C5} = stream_test_utils:delete_stream(S, C4, Stream),
1069+
{ok, _} = stream_test_utils:close(S, C5),
1070+
ok.
1071+
10561072
filtered_events(Config, EventType) ->
10571073
Events = rabbit_ct_broker_helpers:rpc(Config, 0,
10581074
gen_event,

0 commit comments

Comments
 (0)