Skip to content

Commit 484a859

Browse files
committed
Move client-side stream protocol test helpers to a separate module
So that they can be used from multiple test suites.
1 parent 8b55447 commit 484a859

File tree

2 files changed

+159
-121
lines changed

2 files changed

+159
-121
lines changed
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
%% There is no open source Erlang RabbitMQ Stream client.
9+
%% Therefore, we have to build the Stream protocol commands manually.
10+
11+
-module(stream_test_utils).
12+
13+
-compile([export_all, nowarn_export_all]).
14+
15+
-include_lib("amqp10_common/include/amqp10_framing.hrl").
16+
17+
-define(RESPONSE_CODE_OK, 1).
18+
19+
connect(Config, Node) ->
20+
StreamPort = rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_stream),
21+
{ok, Sock} = gen_tcp:connect("localhost", StreamPort, [{active, false}, {mode, binary}]),
22+
23+
C0 = rabbit_stream_core:init(0),
24+
PeerPropertiesFrame = rabbit_stream_core:frame({request, 1, {peer_properties, #{}}}),
25+
ok = gen_tcp:send(Sock, PeerPropertiesFrame),
26+
{{response, 1, {peer_properties, _, _}}, C1} = receive_stream_commands(Sock, C0),
27+
28+
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 1, sasl_handshake})),
29+
{{response, _, {sasl_handshake, _, _}}, C2} = receive_stream_commands(Sock, C1),
30+
Username = <<"guest">>,
31+
Password = <<"guest">>,
32+
Null = 0,
33+
PlainSasl = <<Null:8, Username/binary, Null:8, Password/binary>>,
34+
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 2, {sasl_authenticate, <<"PLAIN">>, PlainSasl}})),
35+
{{response, 2, {sasl_authenticate, _}}, C3} = receive_stream_commands(Sock, C2),
36+
{{tune, DefaultFrameMax, _}, C4} = receive_stream_commands(Sock, C3),
37+
38+
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({response, 0, {tune, DefaultFrameMax, 0}})),
39+
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 3, {open, <<"/">>}})),
40+
{{response, 3, {open, _, _ConnectionProperties}}, C5} = receive_stream_commands(Sock, C4),
41+
{ok, Sock, C5}.
42+
43+
create_stream(Sock, C0, Stream) ->
44+
CreateStreamFrame = rabbit_stream_core:frame({request, 1, {create_stream, Stream, #{}}}),
45+
ok = gen_tcp:send(Sock, CreateStreamFrame),
46+
{{response, 1, {create_stream, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
47+
{ok, C1}.
48+
49+
declare_publisher(Sock, C0, Stream, PublisherId) ->
50+
DeclarePublisherFrame = rabbit_stream_core:frame({request, 1, {declare_publisher, PublisherId, <<>>, Stream}}),
51+
ok = gen_tcp:send(Sock, DeclarePublisherFrame),
52+
{{response, 1, {declare_publisher, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
53+
{ok, C1}.
54+
55+
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit) ->
56+
SubscribeFrame = rabbit_stream_core:frame({request, 1, {subscribe, SubscriptionId, Stream, _OffsetSpec = first, InitialCredit, _Props = #{}}}),
57+
ok = gen_tcp:send(Sock, SubscribeFrame),
58+
{{response, 1, {subscribe, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
59+
{ok, C1}.
60+
61+
publish(Sock, C0, PublisherId, Sequence0, Payloads) ->
62+
SeqIds = lists:seq(Sequence0, Sequence0 + length(Payloads) - 1),
63+
Messages = [simple_entry(Seq, P)
64+
|| {Seq, P} <- lists:zip(SeqIds, Payloads)],
65+
{ok, SeqIds, C1} = publish_entries(Sock, C0, PublisherId, length(Messages), Messages),
66+
{ok, C1}.
67+
68+
publish_entries(Sock, C0, PublisherId, MsgCount, Messages) ->
69+
PublishFrame1 = rabbit_stream_core:frame({publish, PublisherId, MsgCount, Messages}),
70+
ok = gen_tcp:send(Sock, PublishFrame1),
71+
{{publish_confirm, PublisherId, SeqIds}, C1} = receive_stream_commands(Sock, C0),
72+
{ok, SeqIds, C1}.
73+
74+
%% Streams contain AMQP 1.0 encoded messages.
75+
%% In this case, the AMQP 1.0 encoded message contains a single data section.
76+
simple_entry(Sequence, Body)
77+
when is_binary(Body) ->
78+
DataSect = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
79+
DataSectSize = byte_size(DataSect),
80+
<<Sequence:64, 0:1, DataSectSize:31, DataSect:DataSectSize/binary>>.
81+
82+
%% Streams contain AMQP 1.0 encoded messages.
83+
%% In this case, the AMQP 1.0 encoded message consists of an application-properties section and a data section.
84+
simple_entry(Sequence, Body, AppProps)
85+
when is_binary(Body) ->
86+
AppPropsSect = iolist_to_binary(amqp10_framing:encode_bin(AppProps)),
87+
DataSect = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
88+
Sects = <<AppPropsSect/binary, DataSect/binary>>,
89+
SectSize = byte_size(Sects),
90+
<<Sequence:64, 0:1, SectSize:31, Sects:SectSize/binary>>.
91+
92+
%% Here, each AMQP 1.0 encoded message consists of an application-properties section and a data section.
93+
%% All data sections are delivered uncompressed in 1 batch.
94+
sub_batch_entry_uncompressed(Sequence, Bodies) ->
95+
Batch = lists:foldl(fun(Body, Acc) ->
96+
AppProps = #'v1_0.application_properties'{
97+
content = [{{utf8, <<"my key">>}, {utf8, <<"my value">>}}]},
98+
Sect0 = iolist_to_binary(amqp10_framing:encode_bin(AppProps)),
99+
Sect1 = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
100+
Sect = <<Sect0/binary, Sect1/binary>>,
101+
<<Acc/binary, 0:1, (byte_size(Sect)):31, Sect/binary>>
102+
end, <<>>, Bodies),
103+
Size = byte_size(Batch),
104+
<<Sequence:64, 1:1, 0:3, 0:4, (length(Bodies)):16, Size:32, Size:32, Batch:Size/binary>>.
105+
106+
%% Here, each AMQP 1.0 encoded message contains a single data section.
107+
%% All data sections are delivered in 1 gzip compressed batch.
108+
sub_batch_entry_compressed(Sequence, Bodies) ->
109+
Uncompressed = lists:foldl(fun(Body, Acc) ->
110+
Bin = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
111+
<<Acc/binary, Bin/binary>>
112+
end, <<>>, Bodies),
113+
Compressed = zlib:gzip(Uncompressed),
114+
CompressedLen = byte_size(Compressed),
115+
<<Sequence:64, 1:1, 1:3, 0:4, (length(Bodies)):16, (byte_size(Uncompressed)):32,
116+
CompressedLen:32, Compressed:CompressedLen/binary>>.
117+
118+
receive_stream_commands(Sock, C0) ->
119+
case rabbit_stream_core:next_command(C0) of
120+
empty ->
121+
case gen_tcp:recv(Sock, 0, 5000) of
122+
{ok, Data} ->
123+
C1 = rabbit_stream_core:incoming_data(Data, C0),
124+
case rabbit_stream_core:next_command(C1) of
125+
empty ->
126+
{ok, Data2} = gen_tcp:recv(Sock, 0, 5000),
127+
rabbit_stream_core:next_command(
128+
rabbit_stream_core:incoming_data(Data2, C1));
129+
Res ->
130+
Res
131+
end;
132+
{error, Err} ->
133+
ct:fail("error receiving stream data ~w", [Err])
134+
end;
135+
Res ->
136+
Res
137+
end.

deps/rabbitmq_stream/test/protocol_interop_SUITE.erl

Lines changed: 22 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -322,110 +322,32 @@ amqp_filter_expression(Config) ->
322322
%% -------------------------------------------------------------------
323323

324324
publish_via_stream_protocol(Stream, Config) ->
325-
%% There is no open source Erlang RabbitMQ Stream client.
326-
%% Therefore, we have to build the Stream protocol commands manually.
327-
328-
StreamPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stream),
329-
{ok, S} = gen_tcp:connect("localhost", StreamPort, [{active, false}, {mode, binary}]),
330-
331-
C0 = rabbit_stream_core:init(0),
332-
PeerPropertiesFrame = rabbit_stream_core:frame({request, 1, {peer_properties, #{}}}),
333-
ok = gen_tcp:send(S, PeerPropertiesFrame),
334-
{{response, 1, {peer_properties, _, _}}, C1} = receive_stream_commands(S, C0),
335-
336-
ok = gen_tcp:send(S, rabbit_stream_core:frame({request, 1, sasl_handshake})),
337-
{{response, _, {sasl_handshake, _, _}}, C2} = receive_stream_commands(S, C1),
338-
Username = <<"guest">>,
339-
Password = <<"guest">>,
340-
Null = 0,
341-
PlainSasl = <<Null:8, Username/binary, Null:8, Password/binary>>,
342-
ok = gen_tcp:send(S, rabbit_stream_core:frame({request, 2, {sasl_authenticate, <<"PLAIN">>, PlainSasl}})),
343-
{{response, 2, {sasl_authenticate, _}}, C3} = receive_stream_commands(S, C2),
344-
{{tune, DefaultFrameMax, _}, C4} = receive_stream_commands(S, C3),
345-
346-
ok = gen_tcp:send(S, rabbit_stream_core:frame({response, 0, {tune, DefaultFrameMax, 0}})),
347-
ok = gen_tcp:send(S, rabbit_stream_core:frame({request, 3, {open, <<"/">>}})),
348-
{{response, 3, {open, _, _ConnectionProperties}}, C5} = receive_stream_commands(S, C4),
349-
350-
CreateStreamFrame = rabbit_stream_core:frame({request, 1, {create_stream, Stream, #{}}}),
351-
ok = gen_tcp:send(S, CreateStreamFrame),
352-
{{response, 1, {create_stream, _}}, C6} = receive_stream_commands(S, C5),
325+
{ok, S, C0} = stream_test_utils:connect(Config, 0),
326+
327+
{ok, C1} = stream_test_utils:create_stream(S, C0, Stream),
353328

354329
PublisherId = 99,
355-
DeclarePublisherFrame = rabbit_stream_core:frame({request, 1, {declare_publisher, PublisherId, <<>>, Stream}}),
356-
ok = gen_tcp:send(S, DeclarePublisherFrame),
357-
{{response, 1, {declare_publisher, _}}, C7} = receive_stream_commands(S, C6),
358-
359-
M1 = simple_entry(1, <<"m1">>),
360-
M2 = simple_entry(2, <<"m2">>, #'v1_0.application_properties'{
361-
content = [{{utf8, <<"my key">>},
362-
{utf8, <<"my value">>}}]}),
363-
M3 = simple_entry(3, <<"m3">>),
330+
{ok, C2} = stream_test_utils:declare_publisher(S, C1, Stream, PublisherId),
331+
332+
M1 = stream_test_utils:simple_entry(1, <<"m1">>),
333+
M2 = stream_test_utils:simple_entry(2, <<"m2">>, #'v1_0.application_properties'{
334+
content = [{{utf8, <<"my key">>},
335+
{utf8, <<"my value">>}}]}),
336+
M3 = stream_test_utils:simple_entry(3, <<"m3">>),
364337
Messages1 = [M1, M2, M3],
365-
PublishFrame1 = rabbit_stream_core:frame({publish, PublisherId, length(Messages1), Messages1}),
366-
ok = gen_tcp:send(S, PublishFrame1),
367-
{{publish_confirm, PublisherId, _}, C8} = receive_stream_commands(S, C7),
368-
369-
UncompressedSubbatch = sub_batch_entry_uncompressed(4, [<<"m4">>, <<"m5">>, <<"m6">>]),
370-
PublishFrame2 = rabbit_stream_core:frame({publish, PublisherId, 3, UncompressedSubbatch}),
371-
ok = gen_tcp:send(S, PublishFrame2),
372-
{{publish_confirm, PublisherId, _}, C9} = receive_stream_commands(S, C8),
373-
374-
CompressedSubbatch = sub_batch_entry_compressed(5, [<<"m7">>, <<"m8">>, <<"m9">>]),
375-
PublishFrame3 = rabbit_stream_core:frame({publish, PublisherId, 3, CompressedSubbatch}),
376-
ok = gen_tcp:send(S, PublishFrame3),
377-
{{publish_confirm, PublisherId, _}, C10} = receive_stream_commands(S, C9),
378-
379-
M10 = simple_entry(6, <<"m10">>),
380-
M11 = simple_entry(7, <<"m11">>),
338+
339+
{ok, _, C3} = stream_test_utils:publish_entries(S, C2, PublisherId, length(Messages1), Messages1),
340+
341+
UncompressedSubbatch = stream_test_utils:sub_batch_entry_uncompressed(4, [<<"m4">>, <<"m5">>, <<"m6">>]),
342+
{ok, _, C4} = stream_test_utils:publish_entries(S, C3, PublisherId, 3, UncompressedSubbatch),
343+
344+
CompressedSubbatch = stream_test_utils:sub_batch_entry_compressed(5, [<<"m7">>, <<"m8">>, <<"m9">>]),
345+
{ok, _, C5} = stream_test_utils:publish_entries(S, C4, PublisherId, 3, CompressedSubbatch),
346+
347+
M10 = stream_test_utils:simple_entry(6, <<"m10">>),
348+
M11 = stream_test_utils:simple_entry(7, <<"m11">>),
381349
Messages2 = [M10, M11],
382-
PublishFrame4 = rabbit_stream_core:frame({publish, PublisherId, length(Messages2), Messages2}),
383-
ok = gen_tcp:send(S, PublishFrame4),
384-
{{publish_confirm, PublisherId, _}, _C11} = receive_stream_commands(S, C10).
385-
386-
%% Streams contain AMQP 1.0 encoded messages.
387-
%% In this case, the AMQP 1.0 encoded message contains a single data section.
388-
simple_entry(Sequence, Body)
389-
when is_binary(Body) ->
390-
DataSect = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
391-
DataSectSize = byte_size(DataSect),
392-
<<Sequence:64, 0:1, DataSectSize:31, DataSect:DataSectSize/binary>>.
393-
394-
%% Streams contain AMQP 1.0 encoded messages.
395-
%% In this case, the AMQP 1.0 encoded message consists of an application-properties section and a data section.
396-
simple_entry(Sequence, Body, AppProps)
397-
when is_binary(Body) ->
398-
AppPropsSect = iolist_to_binary(amqp10_framing:encode_bin(AppProps)),
399-
DataSect = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
400-
Sects = <<AppPropsSect/binary, DataSect/binary>>,
401-
SectSize = byte_size(Sects),
402-
<<Sequence:64, 0:1, SectSize:31, Sects:SectSize/binary>>.
403-
404-
%% Here, each AMQP 1.0 encoded message consists of an application-properties section and a data section.
405-
%% All data sections are delivered uncompressed in 1 batch.
406-
sub_batch_entry_uncompressed(Sequence, Bodies) ->
407-
Batch = lists:foldl(fun(Body, Acc) ->
408-
AppProps = #'v1_0.application_properties'{
409-
content = [{{utf8, <<"my key">>}, {utf8, <<"my value">>}}]},
410-
Sect0 = iolist_to_binary(amqp10_framing:encode_bin(AppProps)),
411-
Sect1 = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
412-
Sect = <<Sect0/binary, Sect1/binary>>,
413-
<<Acc/binary, 0:1, (byte_size(Sect)):31, Sect/binary>>
414-
end, <<>>, Bodies),
415-
Size = byte_size(Batch),
416-
<<Sequence:64, 1:1, 0:3, 0:4, (length(Bodies)):16, Size:32, Size:32, Batch:Size/binary>>.
417-
418-
%% Here, each AMQP 1.0 encoded message contains a single data section.
419-
%% All data sections are delivered in 1 gzip compressed batch.
420-
sub_batch_entry_compressed(Sequence, Bodies) ->
421-
Uncompressed = lists:foldl(fun(Body, Acc) ->
422-
Bin = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
423-
<<Acc/binary, Bin/binary>>
424-
end, <<>>, Bodies),
425-
Compressed = zlib:gzip(Uncompressed),
426-
CompressedLen = byte_size(Compressed),
427-
<<Sequence:64, 1:1, 1:3, 0:4, (length(Bodies)):16, (byte_size(Uncompressed)):32,
428-
CompressedLen:32, Compressed:CompressedLen/binary>>.
350+
{ok, _, _C6} = stream_test_utils:publish_entries(S, C5, PublisherId, length(Messages2), Messages2).
429351

430352
connection_config(Config) ->
431353
Host = ?config(rmq_hostname, Config),
@@ -435,27 +357,6 @@ connection_config(Config) ->
435357
container_id => <<"my container">>,
436358
sasl => {plain, <<"guest">>, <<"guest">>}}.
437359

438-
receive_stream_commands(Sock, C0) ->
439-
case rabbit_stream_core:next_command(C0) of
440-
empty ->
441-
case gen_tcp:recv(Sock, 0, 5000) of
442-
{ok, Data} ->
443-
C1 = rabbit_stream_core:incoming_data(Data, C0),
444-
case rabbit_stream_core:next_command(C1) of
445-
empty ->
446-
{ok, Data2} = gen_tcp:recv(Sock, 0, 5000),
447-
rabbit_stream_core:next_command(
448-
rabbit_stream_core:incoming_data(Data2, C1));
449-
Res ->
450-
Res
451-
end;
452-
{error, Err} ->
453-
ct:fail("error receiving stream data ~w", [Err])
454-
end;
455-
Res ->
456-
Res
457-
end.
458-
459360
receive_amqp_messages(Receiver, N) ->
460361
receive_amqp_messages0(Receiver, N, []).
461362

0 commit comments

Comments
 (0)