Skip to content

Commit 7d9f297

Browse files
gomoripetimergify[bot]
authored andcommitted
Move client-side stream protocol test helpers to a separate module
So that they can be used from multiple test suites. (cherry picked from commit cf8a00c) (cherry picked from commit 9bb7530) # Conflicts: # deps/rabbitmq_ct_helpers/Makefile # deps/rabbitmq_stream/test/protocol_interop_SUITE.erl
1 parent bfb4ce8 commit 7d9f297

File tree

3 files changed

+175
-68
lines changed

3 files changed

+175
-68
lines changed

deps/rabbitmq_ct_helpers/Makefile

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,19 @@
11
PROJECT = rabbitmq_ct_helpers
22
PROJECT_DESCRIPTION = Common Test helpers for RabbitMQ
33

4+
<<<<<<< HEAD
45
DEPS = rabbit_common proper inet_tcp_proxy meck
56
TEST_DEPS = rabbit
7+
=======
8+
DEPS = rabbit_common amqp10_common rabbitmq_stream_common proper inet_tcp_proxy meck
9+
LOCAL_DEPS = common_test eunit inets
10+
#TEST_DEPS = rabbit
11+
12+
# We are calling one function from 'rabbit' so we need it in the PLT.
13+
# But really this should be a full dependency; or we don't use the
14+
# function anymore; or move it to rabbit_common. @todo
15+
dialyze: DEPS += rabbit
16+
>>>>>>> 9bb7530d04 (Move client-side stream protocol test helpers to a separate module)
617

718
XREF_IGNORE = [ \
819
{'Elixir.OptionParser',split,1}, \
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
%% There is no open source Erlang RabbitMQ Stream client.
9+
%% Therefore, we have to build the Stream protocol commands manually.
10+
11+
-module(stream_test_utils).
12+
13+
-compile([export_all, nowarn_export_all]).
14+
15+
-include_lib("amqp10_common/include/amqp10_framing.hrl").
16+
17+
-define(RESPONSE_CODE_OK, 1).
18+
19+
connect(Config, Node) ->
20+
StreamPort = rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_stream),
21+
{ok, Sock} = gen_tcp:connect("localhost", StreamPort, [{active, false}, {mode, binary}]),
22+
23+
C0 = rabbit_stream_core:init(0),
24+
PeerPropertiesFrame = rabbit_stream_core:frame({request, 1, {peer_properties, #{}}}),
25+
ok = gen_tcp:send(Sock, PeerPropertiesFrame),
26+
{{response, 1, {peer_properties, _, _}}, C1} = receive_stream_commands(Sock, C0),
27+
28+
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 1, sasl_handshake})),
29+
{{response, _, {sasl_handshake, _, _}}, C2} = receive_stream_commands(Sock, C1),
30+
Username = <<"guest">>,
31+
Password = <<"guest">>,
32+
Null = 0,
33+
PlainSasl = <<Null:8, Username/binary, Null:8, Password/binary>>,
34+
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 2, {sasl_authenticate, <<"PLAIN">>, PlainSasl}})),
35+
{{response, 2, {sasl_authenticate, _}}, C3} = receive_stream_commands(Sock, C2),
36+
{{tune, DefaultFrameMax, _}, C4} = receive_stream_commands(Sock, C3),
37+
38+
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({response, 0, {tune, DefaultFrameMax, 0}})),
39+
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 3, {open, <<"/">>}})),
40+
{{response, 3, {open, _, _ConnectionProperties}}, C5} = receive_stream_commands(Sock, C4),
41+
{ok, Sock, C5}.
42+
43+
create_stream(Sock, C0, Stream) ->
44+
CreateStreamFrame = rabbit_stream_core:frame({request, 1, {create_stream, Stream, #{}}}),
45+
ok = gen_tcp:send(Sock, CreateStreamFrame),
46+
{{response, 1, {create_stream, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
47+
{ok, C1}.
48+
49+
declare_publisher(Sock, C0, Stream, PublisherId) ->
50+
DeclarePublisherFrame = rabbit_stream_core:frame({request, 1, {declare_publisher, PublisherId, <<>>, Stream}}),
51+
ok = gen_tcp:send(Sock, DeclarePublisherFrame),
52+
{{response, 1, {declare_publisher, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
53+
{ok, C1}.
54+
55+
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit) ->
56+
SubscribeFrame = rabbit_stream_core:frame({request, 1, {subscribe, SubscriptionId, Stream, _OffsetSpec = first, InitialCredit, _Props = #{}}}),
57+
ok = gen_tcp:send(Sock, SubscribeFrame),
58+
{{response, 1, {subscribe, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
59+
{ok, C1}.
60+
61+
publish(Sock, C0, PublisherId, Sequence0, Payloads) ->
62+
SeqIds = lists:seq(Sequence0, Sequence0 + length(Payloads) - 1),
63+
Messages = [simple_entry(Seq, P)
64+
|| {Seq, P} <- lists:zip(SeqIds, Payloads)],
65+
{ok, SeqIds, C1} = publish_entries(Sock, C0, PublisherId, length(Messages), Messages),
66+
{ok, C1}.
67+
68+
publish_entries(Sock, C0, PublisherId, MsgCount, Messages) ->
69+
PublishFrame1 = rabbit_stream_core:frame({publish, PublisherId, MsgCount, Messages}),
70+
ok = gen_tcp:send(Sock, PublishFrame1),
71+
{{publish_confirm, PublisherId, SeqIds}, C1} = receive_stream_commands(Sock, C0),
72+
{ok, SeqIds, C1}.
73+
74+
%% Streams contain AMQP 1.0 encoded messages.
75+
%% In this case, the AMQP 1.0 encoded message contains a single data section.
76+
simple_entry(Sequence, Body)
77+
when is_binary(Body) ->
78+
DataSect = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
79+
DataSectSize = byte_size(DataSect),
80+
<<Sequence:64, 0:1, DataSectSize:31, DataSect:DataSectSize/binary>>.
81+
82+
%% Streams contain AMQP 1.0 encoded messages.
83+
%% In this case, the AMQP 1.0 encoded message consists of an application-properties section and a data section.
84+
simple_entry(Sequence, Body, AppProps)
85+
when is_binary(Body) ->
86+
AppPropsSect = iolist_to_binary(amqp10_framing:encode_bin(AppProps)),
87+
DataSect = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
88+
Sects = <<AppPropsSect/binary, DataSect/binary>>,
89+
SectSize = byte_size(Sects),
90+
<<Sequence:64, 0:1, SectSize:31, Sects:SectSize/binary>>.
91+
92+
%% Here, each AMQP 1.0 encoded message consists of an application-properties section and a data section.
93+
%% All data sections are delivered uncompressed in 1 batch.
94+
sub_batch_entry_uncompressed(Sequence, Bodies) ->
95+
Batch = lists:foldl(fun(Body, Acc) ->
96+
AppProps = #'v1_0.application_properties'{
97+
content = [{{utf8, <<"my key">>}, {utf8, <<"my value">>}}]},
98+
Sect0 = iolist_to_binary(amqp10_framing:encode_bin(AppProps)),
99+
Sect1 = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
100+
Sect = <<Sect0/binary, Sect1/binary>>,
101+
<<Acc/binary, 0:1, (byte_size(Sect)):31, Sect/binary>>
102+
end, <<>>, Bodies),
103+
Size = byte_size(Batch),
104+
<<Sequence:64, 1:1, 0:3, 0:4, (length(Bodies)):16, Size:32, Size:32, Batch:Size/binary>>.
105+
106+
%% Here, each AMQP 1.0 encoded message contains a single data section.
107+
%% All data sections are delivered in 1 gzip compressed batch.
108+
sub_batch_entry_compressed(Sequence, Bodies) ->
109+
Uncompressed = lists:foldl(fun(Body, Acc) ->
110+
Bin = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
111+
<<Acc/binary, Bin/binary>>
112+
end, <<>>, Bodies),
113+
Compressed = zlib:gzip(Uncompressed),
114+
CompressedLen = byte_size(Compressed),
115+
<<Sequence:64, 1:1, 1:3, 0:4, (length(Bodies)):16, (byte_size(Uncompressed)):32,
116+
CompressedLen:32, Compressed:CompressedLen/binary>>.
117+
118+
receive_stream_commands(Sock, C0) ->
119+
case rabbit_stream_core:next_command(C0) of
120+
empty ->
121+
case gen_tcp:recv(Sock, 0, 5000) of
122+
{ok, Data} ->
123+
C1 = rabbit_stream_core:incoming_data(Data, C0),
124+
case rabbit_stream_core:next_command(C1) of
125+
empty ->
126+
{ok, Data2} = gen_tcp:recv(Sock, 0, 5000),
127+
rabbit_stream_core:next_command(
128+
rabbit_stream_core:incoming_data(Data2, C1));
129+
Res ->
130+
Res
131+
end;
132+
{error, Err} ->
133+
ct:fail("error receiving stream data ~w", [Err])
134+
end;
135+
Res ->
136+
Res
137+
end.

deps/rabbitmq_stream/test/protocol_interop_SUITE.erl

Lines changed: 27 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -275,61 +275,38 @@ amqp_attach_sub_batch(Config) ->
275275
%% -------------------------------------------------------------------
276276

277277
publish_via_stream_protocol(Stream, Config) ->
278-
%% There is no open source Erlang RabbitMQ Stream client.
279-
%% Therefore, we have to build the Stream protocol commands manually.
280-
281-
StreamPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stream),
282-
{ok, S} = gen_tcp:connect("localhost", StreamPort, [{active, false}, {mode, binary}]),
283-
284-
C0 = rabbit_stream_core:init(0),
285-
PeerPropertiesFrame = rabbit_stream_core:frame({request, 1, {peer_properties, #{}}}),
286-
ok = gen_tcp:send(S, PeerPropertiesFrame),
287-
{{response, 1, {peer_properties, _, _}}, C1} = receive_stream_commands(S, C0),
288-
289-
ok = gen_tcp:send(S, rabbit_stream_core:frame({request, 1, sasl_handshake})),
290-
{{response, _, {sasl_handshake, _, _}}, C2} = receive_stream_commands(S, C1),
291-
Username = <<"guest">>,
292-
Password = <<"guest">>,
293-
Null = 0,
294-
PlainSasl = <<Null:8, Username/binary, Null:8, Password/binary>>,
295-
ok = gen_tcp:send(S, rabbit_stream_core:frame({request, 2, {sasl_authenticate, <<"PLAIN">>, PlainSasl}})),
296-
{{response, 2, {sasl_authenticate, _}}, C3} = receive_stream_commands(S, C2),
297-
{{tune, DefaultFrameMax, _}, C4} = receive_stream_commands(S, C3),
298-
299-
ok = gen_tcp:send(S, rabbit_stream_core:frame({response, 0, {tune, DefaultFrameMax, 0}})),
300-
ok = gen_tcp:send(S, rabbit_stream_core:frame({request, 3, {open, <<"/">>}})),
301-
{{response, 3, {open, _, _ConnectionProperties}}, C5} = receive_stream_commands(S, C4),
302-
303-
CreateStreamFrame = rabbit_stream_core:frame({request, 1, {create_stream, Stream, #{}}}),
304-
ok = gen_tcp:send(S, CreateStreamFrame),
305-
{{response, 1, {create_stream, _}}, C6} = receive_stream_commands(S, C5),
278+
{ok, S, C0} = stream_test_utils:connect(Config, 0),
279+
280+
{ok, C1} = stream_test_utils:create_stream(S, C0, Stream),
306281

307282
PublisherId = 99,
308-
DeclarePublisherFrame = rabbit_stream_core:frame({request, 1, {declare_publisher, PublisherId, <<>>, Stream}}),
309-
ok = gen_tcp:send(S, DeclarePublisherFrame),
310-
{{response, 1, {declare_publisher, _}}, C7} = receive_stream_commands(S, C6),
283+
{ok, C2} = stream_test_utils:declare_publisher(S, C1, Stream, PublisherId),
311284

285+
<<<<<<< HEAD
312286
M1 = simple_entry(1, <<"m1">>),
313287
M2 = simple_entry(2, <<"m2">>),
314288
M3 = simple_entry(3, <<"m3">>),
289+
=======
290+
M1 = stream_test_utils:simple_entry(1, <<"m1">>),
291+
M2 = stream_test_utils:simple_entry(2, <<"m2">>, #'v1_0.application_properties'{
292+
content = [{{utf8, <<"my key">>},
293+
{utf8, <<"my value">>}}]}),
294+
M3 = stream_test_utils:simple_entry(3, <<"m3">>),
295+
>>>>>>> 9bb7530d04 (Move client-side stream protocol test helpers to a separate module)
315296
Messages1 = [M1, M2, M3],
316-
PublishFrame1 = rabbit_stream_core:frame({publish, PublisherId, length(Messages1), Messages1}),
317-
ok = gen_tcp:send(S, PublishFrame1),
318-
{{publish_confirm, PublisherId, _}, C8} = receive_stream_commands(S, C7),
319-
320-
UncompressedSubbatch = sub_batch_entry_uncompressed(4, [<<"m4">>, <<"m5">>, <<"m6">>]),
321-
PublishFrame2 = rabbit_stream_core:frame({publish, PublisherId, 3, UncompressedSubbatch}),
322-
ok = gen_tcp:send(S, PublishFrame2),
323-
{{publish_confirm, PublisherId, _}, C9} = receive_stream_commands(S, C8),
324-
325-
CompressedSubbatch = sub_batch_entry_compressed(5, [<<"m7">>, <<"m8">>, <<"m9">>]),
326-
PublishFrame3 = rabbit_stream_core:frame({publish, PublisherId, 3, CompressedSubbatch}),
327-
ok = gen_tcp:send(S, PublishFrame3),
328-
{{publish_confirm, PublisherId, _}, C10} = receive_stream_commands(S, C9),
329-
330-
M10 = simple_entry(6, <<"m10">>),
331-
M11 = simple_entry(7, <<"m11">>),
297+
298+
{ok, _, C3} = stream_test_utils:publish_entries(S, C2, PublisherId, length(Messages1), Messages1),
299+
300+
UncompressedSubbatch = stream_test_utils:sub_batch_entry_uncompressed(4, [<<"m4">>, <<"m5">>, <<"m6">>]),
301+
{ok, _, C4} = stream_test_utils:publish_entries(S, C3, PublisherId, 3, UncompressedSubbatch),
302+
303+
CompressedSubbatch = stream_test_utils:sub_batch_entry_compressed(5, [<<"m7">>, <<"m8">>, <<"m9">>]),
304+
{ok, _, C5} = stream_test_utils:publish_entries(S, C4, PublisherId, 3, CompressedSubbatch),
305+
306+
M10 = stream_test_utils:simple_entry(6, <<"m10">>),
307+
M11 = stream_test_utils:simple_entry(7, <<"m11">>),
332308
Messages2 = [M10, M11],
309+
<<<<<<< HEAD
333310
PublishFrame4 = rabbit_stream_core:frame({publish, PublisherId, length(Messages2), Messages2}),
334311
ok = gen_tcp:send(S, PublishFrame4),
335312
{{publish_confirm, PublisherId, _}, _C11} = receive_stream_commands(S, C10).
@@ -363,6 +340,9 @@ sub_batch_entry_compressed(Sequence, Bodies) ->
363340
CompressedLen = byte_size(Compressed),
364341
<<Sequence:64, 1:1, 1:3, 0:4, (length(Bodies)):16, (byte_size(Uncompressed)):32,
365342
CompressedLen:32, Compressed:CompressedLen/binary>>.
343+
=======
344+
{ok, _, _C6} = stream_test_utils:publish_entries(S, C5, PublisherId, length(Messages2), Messages2).
345+
>>>>>>> 9bb7530d04 (Move client-side stream protocol test helpers to a separate module)
366346

367347
connection_config(Config) ->
368348
Host = ?config(rmq_hostname, Config),
@@ -372,27 +352,6 @@ connection_config(Config) ->
372352
container_id => <<"my container">>,
373353
sasl => {plain, <<"guest">>, <<"guest">>}}.
374354

375-
receive_stream_commands(Sock, C0) ->
376-
case rabbit_stream_core:next_command(C0) of
377-
empty ->
378-
case gen_tcp:recv(Sock, 0, 5000) of
379-
{ok, Data} ->
380-
C1 = rabbit_stream_core:incoming_data(Data, C0),
381-
case rabbit_stream_core:next_command(C1) of
382-
empty ->
383-
{ok, Data2} = gen_tcp:recv(Sock, 0, 5000),
384-
rabbit_stream_core:next_command(
385-
rabbit_stream_core:incoming_data(Data2, C1));
386-
Res ->
387-
Res
388-
end;
389-
{error, Err} ->
390-
ct:fail("error receiving stream data ~w", [Err])
391-
end;
392-
Res ->
393-
Res
394-
end.
395-
396355
receive_amqp_messages(Receiver, N) ->
397356
receive_amqp_messages0(Receiver, N, []).
398357

0 commit comments

Comments
 (0)