Skip to content

Commit b3b0940

Browse files
committed
Fix wait-for-confirms sequence in stream test utils
And refine the implementation and its usage.
1 parent c5edd60 commit b3b0940

File tree

4 files changed

+68
-10
lines changed

4 files changed

+68
-10
lines changed

deps/rabbitmq_ct_helpers/src/stream_test_utils.erl

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,24 +40,49 @@ connect(Config, Node) ->
4040
{{response, 3, {open, _, _ConnectionProperties}}, C5} = receive_stream_commands(Sock, C4),
4141
{ok, Sock, C5}.
4242

43+
close(Sock, C0) ->
44+
CloseReason = <<"OK">>,
45+
CloseFrame = rabbit_stream_core:frame({request, 1, {close, ?RESPONSE_CODE_OK, CloseReason}}),
46+
ok = gen_tcp:send(Sock, CloseFrame),
47+
{{response, 1, {close, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
48+
{ok, C1}.
49+
4350
create_stream(Sock, C0, Stream) ->
4451
CreateStreamFrame = rabbit_stream_core:frame({request, 1, {create_stream, Stream, #{}}}),
4552
ok = gen_tcp:send(Sock, CreateStreamFrame),
4653
{{response, 1, {create_stream, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
4754
{ok, C1}.
4855

56+
delete_stream(Sock, C0, Stream) ->
57+
DeleteStreamFrame = rabbit_stream_core:frame({request, 1, {delete_stream, Stream}}),
58+
ok = gen_tcp:send(Sock, DeleteStreamFrame),
59+
{{response, 1, {delete_stream, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
60+
{ok, C1}.
61+
4962
declare_publisher(Sock, C0, Stream, PublisherId) ->
5063
DeclarePublisherFrame = rabbit_stream_core:frame({request, 1, {declare_publisher, PublisherId, <<>>, Stream}}),
5164
ok = gen_tcp:send(Sock, DeclarePublisherFrame),
5265
{{response, 1, {declare_publisher, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
5366
{ok, C1}.
5467

68+
delete_publisher(Sock, C0, PublisherId) ->
69+
DeletePublisherFrame = rabbit_stream_core:frame({request, 1, {delete_publisher, PublisherId}}),
70+
ok = gen_tcp:send(Sock, DeletePublisherFrame),
71+
{{response, 1, {delete_publisher, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
72+
{ok, C1}.
73+
5574
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit) ->
5675
SubscribeFrame = rabbit_stream_core:frame({request, 1, {subscribe, SubscriptionId, Stream, _OffsetSpec = first, InitialCredit, _Props = #{}}}),
5776
ok = gen_tcp:send(Sock, SubscribeFrame),
5877
{{response, 1, {subscribe, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
5978
{ok, C1}.
6079

80+
unsubscribe(Sock, C0, SubscriptionId) ->
81+
UnsubscribeFrame = rabbit_stream_core:frame({request, 1, {unsubscribe, SubscriptionId}}),
82+
ok = gen_tcp:send(Sock, UnsubscribeFrame),
83+
{{response, 1, {unsubscribe, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
84+
{ok, C1}.
85+
6186
publish(Sock, C0, PublisherId, Sequence0, Payloads) ->
6287
SeqIds = lists:seq(Sequence0, Sequence0 + length(Payloads) - 1),
6388
Messages = [simple_entry(Seq, P)
@@ -68,8 +93,17 @@ publish(Sock, C0, PublisherId, Sequence0, Payloads) ->
6893
publish_entries(Sock, C0, PublisherId, MsgCount, Messages) ->
6994
PublishFrame1 = rabbit_stream_core:frame({publish, PublisherId, MsgCount, Messages}),
7095
ok = gen_tcp:send(Sock, PublishFrame1),
71-
{{publish_confirm, PublisherId, SeqIds}, C1} = receive_stream_commands(Sock, C0),
72-
{ok, SeqIds, C1}.
96+
wait_for_confirms(Sock, C0, PublisherId, [], MsgCount).
97+
98+
wait_for_confirms(_, C, _, Acc, 0) ->
99+
{ok, Acc, C};
100+
wait_for_confirms(S, C0, PublisherId, Acc, Remaining) ->
101+
case receive_stream_commands(S, C0) of
102+
{{publish_confirm, PublisherId, SeqIds}, C1} ->
103+
wait_for_confirms(S, C1, PublisherId, Acc ++ SeqIds, Remaining - length(SeqIds));
104+
{Frame, C1} ->
105+
{unexpected_frame, Frame, C1}
106+
end.
73107

74108
%% Streams contain AMQP 1.0 encoded messages.
75109
%% In this case, the AMQP 1.0 encoded message contains a single data section.

deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -744,10 +744,10 @@ exchange_names_metric(Config) ->
744744
stream_pub_sub_metrics(Config) ->
745745
Stream1 = atom_to_list(?FUNCTION_NAME) ++ "1",
746746
MsgPerBatch1 = 2,
747-
publish_via_stream_protocol(list_to_binary(Stream1), MsgPerBatch1, Config),
747+
{ok, S1, C1} = publish_via_stream_protocol(list_to_binary(Stream1), MsgPerBatch1, Config),
748748
Stream2 = atom_to_list(?FUNCTION_NAME) ++ "2",
749749
MsgPerBatch2 = 3,
750-
publish_via_stream_protocol(list_to_binary(Stream2), MsgPerBatch2, Config),
750+
{ok, S2, C2} = publish_via_stream_protocol(list_to_binary(Stream2), MsgPerBatch2, Config),
751751

752752
%% aggregated metrics
753753

@@ -770,6 +770,8 @@ stream_pub_sub_metrics(Config) ->
770770
?assertEqual([{#{vhost => "/", queue => Stream1}, [2]},
771771
{#{vhost => "/", queue => Stream2}, [3]}],
772772
lists:sort(maps:to_list(MaxOffsetLag))),
773+
dispose_stream_connection(S1, C1, list_to_binary(Stream1)),
774+
dispose_stream_connection(S2, C2, list_to_binary(Stream2)),
773775
ok.
774776

775777
core_metrics_special_chars(Config) ->
@@ -839,8 +841,13 @@ publish_via_stream_protocol(Stream, MsgPerBatch, Config) ->
839841
SubscriptionId = 97,
840842
{ok, C6} = stream_test_utils:subscribe(S, C5, Stream, SubscriptionId, _InitialCredit = 1),
841843
%% delivery of first batch of messages
842-
{{deliver, SubscriptionId, _Bin1}, _C7} = stream_test_utils:receive_stream_commands(S, C6),
843-
ok.
844+
{{deliver, SubscriptionId, _Bin1}, C7} = stream_test_utils:receive_stream_commands(S, C6),
845+
{ok, S, C7}.
846+
847+
dispose_stream_connection(Sock, C0, Stream) ->
848+
{ok, C1} = stream_test_utils:delete_stream(Sock, C0, Stream),
849+
{_MetadataUpdateFrame, C2} = stream_test_utils:receive_stream_commands(Sock, C1),
850+
{ok, _} = stream_test_utils:close(Sock, C2).
844851

845852
http_get(Config, ReqHeaders, CodeExp) ->
846853
Path = proplists:get_value(prometheus_path, Config, "/metrics"),

deps/rabbitmq_stream/test/protocol_interop_SUITE.erl

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -339,15 +339,16 @@ publish_via_stream_protocol(Stream, Config) ->
339339
{ok, _, C3} = stream_test_utils:publish_entries(S, C2, PublisherId, length(Messages1), Messages1),
340340

341341
UncompressedSubbatch = stream_test_utils:sub_batch_entry_uncompressed(4, [<<"m4">>, <<"m5">>, <<"m6">>]),
342-
{ok, _, C4} = stream_test_utils:publish_entries(S, C3, PublisherId, 3, UncompressedSubbatch),
342+
{ok, _, C4} = stream_test_utils:publish_entries(S, C3, PublisherId, 1, UncompressedSubbatch),
343343

344344
CompressedSubbatch = stream_test_utils:sub_batch_entry_compressed(5, [<<"m7">>, <<"m8">>, <<"m9">>]),
345-
{ok, _, C5} = stream_test_utils:publish_entries(S, C4, PublisherId, 3, CompressedSubbatch),
345+
{ok, _, C5} = stream_test_utils:publish_entries(S, C4, PublisherId, 1, CompressedSubbatch),
346346

347347
M10 = stream_test_utils:simple_entry(6, <<"m10">>),
348348
M11 = stream_test_utils:simple_entry(7, <<"m11">>),
349349
Messages2 = [M10, M11],
350-
{ok, _, _C6} = stream_test_utils:publish_entries(S, C5, PublisherId, length(Messages2), Messages2).
350+
{ok, _, C6} = stream_test_utils:publish_entries(S, C5, PublisherId, length(Messages2), Messages2),
351+
{ok, _} = stream_test_utils:close(S, C6).
351352

352353
connection_config(Config) ->
353354
Host = ?config(rmq_hostname, Config),

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ groups() ->
6767
sasl_anonymous,
6868
test_publisher_with_too_long_reference_errors,
6969
test_consumer_with_too_long_reference_errors,
70-
subscribe_unsubscribe_should_create_events
70+
subscribe_unsubscribe_should_create_events,
71+
test_stream_test_utils
7172
]},
7273
%% Run `test_global_counters` on its own so the global metrics are
7374
%% initialised to 0 for each testcase
@@ -1053,6 +1054,21 @@ subscribe_unsubscribe_should_create_events(Config) ->
10531054
closed = wait_for_socket_close(Transport, S, 10),
10541055
ok.
10551056

1057+
test_stream_test_utils(Config) ->
1058+
Stream = atom_to_binary(?FUNCTION_NAME, utf8),
1059+
{ok, S, C0} = stream_test_utils:connect(Config, 0),
1060+
{ok, C1} = stream_test_utils:create_stream(S, C0, Stream),
1061+
PublisherId = 42,
1062+
{ok, C2} = stream_test_utils:declare_publisher(S, C1, Stream, PublisherId),
1063+
MsgPerBatch = 100,
1064+
Payloads = lists:duplicate(MsgPerBatch, <<"m1">>),
1065+
SequenceFrom1 = 1,
1066+
{ok, C3} = stream_test_utils:publish(S, C2, PublisherId, SequenceFrom1, Payloads),
1067+
{ok, C4} = stream_test_utils:delete_publisher(S, C3, PublisherId),
1068+
{ok, C5} = stream_test_utils:delete_stream(S, C4, Stream),
1069+
{ok, _} = stream_test_utils:close(S, C5),
1070+
ok.
1071+
10561072
filtered_events(Config, EventType) ->
10571073
Events = rabbit_ct_broker_helpers:rpc(Config, 0,
10581074
gen_event,

0 commit comments

Comments
 (0)