Skip to content

Commit 9db4317

Browse files
committed
Move client-side stream protocol test helpers to a separate module
So that they can be used from multiple test suites.
1 parent 28cc121 commit 9db4317

File tree

2 files changed

+161
-121
lines changed

2 files changed

+161
-121
lines changed

deps/rabbitmq_stream/test/protocol_interop_SUITE.erl

Lines changed: 22 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -322,110 +322,32 @@ amqp_filter_expression(Config) ->
322322
%% -------------------------------------------------------------------
323323

324324
publish_via_stream_protocol(Stream, Config) ->
325-
%% There is no open source Erlang RabbitMQ Stream client.
326-
%% Therefore, we have to build the Stream protocol commands manually.
327-
328-
StreamPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stream),
329-
{ok, S} = gen_tcp:connect("localhost", StreamPort, [{active, false}, {mode, binary}]),
330-
331-
C0 = rabbit_stream_core:init(0),
332-
PeerPropertiesFrame = rabbit_stream_core:frame({request, 1, {peer_properties, #{}}}),
333-
ok = gen_tcp:send(S, PeerPropertiesFrame),
334-
{{response, 1, {peer_properties, _, _}}, C1} = receive_stream_commands(S, C0),
335-
336-
ok = gen_tcp:send(S, rabbit_stream_core:frame({request, 1, sasl_handshake})),
337-
{{response, _, {sasl_handshake, _, _}}, C2} = receive_stream_commands(S, C1),
338-
Username = <<"guest">>,
339-
Password = <<"guest">>,
340-
Null = 0,
341-
PlainSasl = <<Null:8, Username/binary, Null:8, Password/binary>>,
342-
ok = gen_tcp:send(S, rabbit_stream_core:frame({request, 2, {sasl_authenticate, <<"PLAIN">>, PlainSasl}})),
343-
{{response, 2, {sasl_authenticate, _}}, C3} = receive_stream_commands(S, C2),
344-
{{tune, DefaultFrameMax, _}, C4} = receive_stream_commands(S, C3),
345-
346-
ok = gen_tcp:send(S, rabbit_stream_core:frame({response, 0, {tune, DefaultFrameMax, 0}})),
347-
ok = gen_tcp:send(S, rabbit_stream_core:frame({request, 3, {open, <<"/">>}})),
348-
{{response, 3, {open, _, _ConnectionProperties}}, C5} = receive_stream_commands(S, C4),
349-
350-
CreateStreamFrame = rabbit_stream_core:frame({request, 1, {create_stream, Stream, #{}}}),
351-
ok = gen_tcp:send(S, CreateStreamFrame),
352-
{{response, 1, {create_stream, _}}, C6} = receive_stream_commands(S, C5),
325+
{ok, S, C0} = stream_test_utils:connect(Config, 0),
326+
327+
{ok, C1} = stream_test_utils:create_stream(S, C0, Stream),
353328

354329
PublisherId = 99,
355-
DeclarePublisherFrame = rabbit_stream_core:frame({request, 1, {declare_publisher, PublisherId, <<>>, Stream}}),
356-
ok = gen_tcp:send(S, DeclarePublisherFrame),
357-
{{response, 1, {declare_publisher, _}}, C7} = receive_stream_commands(S, C6),
358-
359-
M1 = simple_entry(1, <<"m1">>),
360-
M2 = simple_entry(2, <<"m2">>, #'v1_0.application_properties'{
361-
content = [{{utf8, <<"my key">>},
362-
{utf8, <<"my value">>}}]}),
363-
M3 = simple_entry(3, <<"m3">>),
330+
{ok, C2} = stream_test_utils:declare_publisher(S, C1, Stream, PublisherId),
331+
332+
M1 = stream_test_utils:simple_entry(1, <<"m1">>),
333+
M2 = stream_test_utils:simple_entry(2, <<"m2">>, #'v1_0.application_properties'{
334+
content = [{{utf8, <<"my key">>},
335+
{utf8, <<"my value">>}}]}),
336+
M3 = stream_test_utils:simple_entry(3, <<"m3">>),
364337
Messages1 = [M1, M2, M3],
365-
PublishFrame1 = rabbit_stream_core:frame({publish, PublisherId, length(Messages1), Messages1}),
366-
ok = gen_tcp:send(S, PublishFrame1),
367-
{{publish_confirm, PublisherId, _}, C8} = receive_stream_commands(S, C7),
368-
369-
UncompressedSubbatch = sub_batch_entry_uncompressed(4, [<<"m4">>, <<"m5">>, <<"m6">>]),
370-
PublishFrame2 = rabbit_stream_core:frame({publish, PublisherId, 3, UncompressedSubbatch}),
371-
ok = gen_tcp:send(S, PublishFrame2),
372-
{{publish_confirm, PublisherId, _}, C9} = receive_stream_commands(S, C8),
373-
374-
CompressedSubbatch = sub_batch_entry_compressed(5, [<<"m7">>, <<"m8">>, <<"m9">>]),
375-
PublishFrame3 = rabbit_stream_core:frame({publish, PublisherId, 3, CompressedSubbatch}),
376-
ok = gen_tcp:send(S, PublishFrame3),
377-
{{publish_confirm, PublisherId, _}, C10} = receive_stream_commands(S, C9),
378-
379-
M10 = simple_entry(6, <<"m10">>),
380-
M11 = simple_entry(7, <<"m11">>),
338+
339+
{ok, _, C3} = stream_test_utils:publish_entries(S, C2, PublisherId, length(Messages1), Messages1),
340+
341+
UncompressedSubbatch = stream_test_utils:sub_batch_entry_uncompressed(4, [<<"m4">>, <<"m5">>, <<"m6">>]),
342+
{ok, _, C4} = stream_test_utils:publish_entries(S, C3, PublisherId, 3, UncompressedSubbatch),
343+
344+
CompressedSubbatch = stream_test_utils:sub_batch_entry_compressed(5, [<<"m7">>, <<"m8">>, <<"m9">>]),
345+
{ok, _, C5} = stream_test_utils:publish_entries(S, C4, PublisherId, 3, CompressedSubbatch),
346+
347+
M10 = stream_test_utils:simple_entry(6, <<"m10">>),
348+
M11 = stream_test_utils:simple_entry(7, <<"m11">>),
381349
Messages2 = [M10, M11],
382-
PublishFrame4 = rabbit_stream_core:frame({publish, PublisherId, length(Messages2), Messages2}),
383-
ok = gen_tcp:send(S, PublishFrame4),
384-
{{publish_confirm, PublisherId, _}, _C11} = receive_stream_commands(S, C10).
385-
386-
%% Streams contain AMQP 1.0 encoded messages.
387-
%% In this case, the AMQP 1.0 encoded message contains a single data section.
388-
simple_entry(Sequence, Body)
389-
when is_binary(Body) ->
390-
DataSect = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
391-
DataSectSize = byte_size(DataSect),
392-
<<Sequence:64, 0:1, DataSectSize:31, DataSect:DataSectSize/binary>>.
393-
394-
%% Streams contain AMQP 1.0 encoded messages.
395-
%% In this case, the AMQP 1.0 encoded message consists of an application-properties section and a data section.
396-
simple_entry(Sequence, Body, AppProps)
397-
when is_binary(Body) ->
398-
AppPropsSect = iolist_to_binary(amqp10_framing:encode_bin(AppProps)),
399-
DataSect = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
400-
Sects = <<AppPropsSect/binary, DataSect/binary>>,
401-
SectSize = byte_size(Sects),
402-
<<Sequence:64, 0:1, SectSize:31, Sects:SectSize/binary>>.
403-
404-
%% Here, each AMQP 1.0 encoded message consists of an application-properties section and a data section.
405-
%% All data sections are delivered uncompressed in 1 batch.
406-
sub_batch_entry_uncompressed(Sequence, Bodies) ->
407-
Batch = lists:foldl(fun(Body, Acc) ->
408-
AppProps = #'v1_0.application_properties'{
409-
content = [{{utf8, <<"my key">>}, {utf8, <<"my value">>}}]},
410-
Sect0 = iolist_to_binary(amqp10_framing:encode_bin(AppProps)),
411-
Sect1 = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
412-
Sect = <<Sect0/binary, Sect1/binary>>,
413-
<<Acc/binary, 0:1, (byte_size(Sect)):31, Sect/binary>>
414-
end, <<>>, Bodies),
415-
Size = byte_size(Batch),
416-
<<Sequence:64, 1:1, 0:3, 0:4, (length(Bodies)):16, Size:32, Size:32, Batch:Size/binary>>.
417-
418-
%% Here, each AMQP 1.0 encoded message contains a single data section.
419-
%% All data sections are delivered in 1 gzip compressed batch.
420-
sub_batch_entry_compressed(Sequence, Bodies) ->
421-
Uncompressed = lists:foldl(fun(Body, Acc) ->
422-
Bin = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
423-
<<Acc/binary, Bin/binary>>
424-
end, <<>>, Bodies),
425-
Compressed = zlib:gzip(Uncompressed),
426-
CompressedLen = byte_size(Compressed),
427-
<<Sequence:64, 1:1, 1:3, 0:4, (length(Bodies)):16, (byte_size(Uncompressed)):32,
428-
CompressedLen:32, Compressed:CompressedLen/binary>>.
350+
{ok, _, _C6} = stream_test_utils:publish_entries(S, C5, PublisherId, length(Messages2), Messages2).
429351

430352
connection_config(Config) ->
431353
Host = ?config(rmq_hostname, Config),
@@ -435,27 +357,6 @@ connection_config(Config) ->
435357
container_id => <<"my container">>,
436358
sasl => {plain, <<"guest">>, <<"guest">>}}.
437359

438-
receive_stream_commands(Sock, C0) ->
439-
case rabbit_stream_core:next_command(C0) of
440-
empty ->
441-
case gen_tcp:recv(Sock, 0, 5000) of
442-
{ok, Data} ->
443-
C1 = rabbit_stream_core:incoming_data(Data, C0),
444-
case rabbit_stream_core:next_command(C1) of
445-
empty ->
446-
{ok, Data2} = gen_tcp:recv(Sock, 0, 5000),
447-
rabbit_stream_core:next_command(
448-
rabbit_stream_core:incoming_data(Data2, C1));
449-
Res ->
450-
Res
451-
end;
452-
{error, Err} ->
453-
ct:fail("error receiving stream data ~w", [Err])
454-
end;
455-
Res ->
456-
Res
457-
end.
458-
459360
receive_amqp_messages(Receiver, N) ->
460361
receive_amqp_messages0(Receiver, N, []).
461362

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
%% FIXME this clause is copied from protocol_interop_SUITE.erl
2+
3+
%% This Source Code Form is subject to the terms of the Mozilla Public
4+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
5+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
6+
%%
7+
%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
8+
%%
9+
10+
%% There is no open source Erlang RabbitMQ Stream client.
11+
%% Therefore, we have to build the Stream protocol commands manually.
12+
13+
-module(stream_test_utils).
14+
15+
-compile([export_all, nowarn_export_all]).
16+
17+
-include_lib("amqp10_common/include/amqp10_framing.hrl").
18+
19+
-define(RESPONSE_CODE_OK, 1).
20+
21+
connect(Config, Node) ->
22+
StreamPort = rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_stream),
23+
{ok, Sock} = gen_tcp:connect("localhost", StreamPort, [{active, false}, {mode, binary}]),
24+
25+
C0 = rabbit_stream_core:init(0),
26+
PeerPropertiesFrame = rabbit_stream_core:frame({request, 1, {peer_properties, #{}}}),
27+
ok = gen_tcp:send(Sock, PeerPropertiesFrame),
28+
{{response, 1, {peer_properties, _, _}}, C1} = receive_stream_commands(Sock, C0),
29+
30+
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 1, sasl_handshake})),
31+
{{response, _, {sasl_handshake, _, _}}, C2} = receive_stream_commands(Sock, C1),
32+
Username = <<"guest">>,
33+
Password = <<"guest">>,
34+
Null = 0,
35+
PlainSasl = <<Null:8, Username/binary, Null:8, Password/binary>>,
36+
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 2, {sasl_authenticate, <<"PLAIN">>, PlainSasl}})),
37+
{{response, 2, {sasl_authenticate, _}}, C3} = receive_stream_commands(Sock, C2),
38+
{{tune, DefaultFrameMax, _}, C4} = receive_stream_commands(Sock, C3),
39+
40+
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({response, 0, {tune, DefaultFrameMax, 0}})),
41+
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 3, {open, <<"/">>}})),
42+
{{response, 3, {open, _, _ConnectionProperties}}, C5} = receive_stream_commands(Sock, C4),
43+
{ok, Sock, C5}.
44+
45+
create_stream(Sock, C0, Stream) ->
46+
CreateStreamFrame = rabbit_stream_core:frame({request, 1, {create_stream, Stream, #{}}}),
47+
ok = gen_tcp:send(Sock, CreateStreamFrame),
48+
{{response, 1, {create_stream, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
49+
{ok, C1}.
50+
51+
declare_publisher(Sock, C0, Stream, PublisherId) ->
52+
DeclarePublisherFrame = rabbit_stream_core:frame({request, 1, {declare_publisher, PublisherId, <<>>, Stream}}),
53+
ok = gen_tcp:send(Sock, DeclarePublisherFrame),
54+
{{response, 1, {declare_publisher, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
55+
{ok, C1}.
56+
57+
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit) ->
58+
SubscribeFrame = rabbit_stream_core:frame({request, 1, {subscribe, SubscriptionId, Stream, _OffsetSpec = first, InitialCredit, _Props = #{}}}),
59+
ok = gen_tcp:send(Sock, SubscribeFrame),
60+
{{response, 1, {subscribe, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
61+
{ok, C1}.
62+
63+
publish(Sock, C0, PublisherId, Sequence0, Payloads) ->
64+
SeqIds = lists:seq(Sequence0, Sequence0 + length(Payloads) - 1),
65+
Messages = [simple_entry(Seq, P)
66+
|| {Seq, P} <- lists:zip(SeqIds, Payloads)],
67+
{ok, SeqIds, C1} = publish_entries(Sock, C0, PublisherId, length(Messages), Messages),
68+
{ok, C1}.
69+
70+
publish_entries(Sock, C0, PublisherId, MsgCount, Messages) ->
71+
PublishFrame1 = rabbit_stream_core:frame({publish, PublisherId, MsgCount, Messages}),
72+
ok = gen_tcp:send(Sock, PublishFrame1),
73+
{{publish_confirm, PublisherId, SeqIds}, C1} = receive_stream_commands(Sock, C0),
74+
{ok, SeqIds, C1}.
75+
76+
%% Streams contain AMQP 1.0 encoded messages.
77+
%% In this case, the AMQP 1.0 encoded message contains a single data section.
78+
simple_entry(Sequence, Body)
79+
when is_binary(Body) ->
80+
DataSect = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
81+
DataSectSize = byte_size(DataSect),
82+
<<Sequence:64, 0:1, DataSectSize:31, DataSect:DataSectSize/binary>>.
83+
84+
%% Streams contain AMQP 1.0 encoded messages.
85+
%% In this case, the AMQP 1.0 encoded message consists of an application-properties section and a data section.
86+
simple_entry(Sequence, Body, AppProps)
87+
when is_binary(Body) ->
88+
AppPropsSect = iolist_to_binary(amqp10_framing:encode_bin(AppProps)),
89+
DataSect = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
90+
Sects = <<AppPropsSect/binary, DataSect/binary>>,
91+
SectSize = byte_size(Sects),
92+
<<Sequence:64, 0:1, SectSize:31, Sects:SectSize/binary>>.
93+
94+
%% Here, each AMQP 1.0 encoded message consists of an application-properties section and a data section.
95+
%% All data sections are delivered uncompressed in 1 batch.
96+
sub_batch_entry_uncompressed(Sequence, Bodies) ->
97+
Batch = lists:foldl(fun(Body, Acc) ->
98+
AppProps = #'v1_0.application_properties'{
99+
content = [{{utf8, <<"my key">>}, {utf8, <<"my value">>}}]},
100+
Sect0 = iolist_to_binary(amqp10_framing:encode_bin(AppProps)),
101+
Sect1 = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
102+
Sect = <<Sect0/binary, Sect1/binary>>,
103+
<<Acc/binary, 0:1, (byte_size(Sect)):31, Sect/binary>>
104+
end, <<>>, Bodies),
105+
Size = byte_size(Batch),
106+
<<Sequence:64, 1:1, 0:3, 0:4, (length(Bodies)):16, Size:32, Size:32, Batch:Size/binary>>.
107+
108+
%% Here, each AMQP 1.0 encoded message contains a single data section.
109+
%% All data sections are delivered in 1 gzip compressed batch.
110+
sub_batch_entry_compressed(Sequence, Bodies) ->
111+
Uncompressed = lists:foldl(fun(Body, Acc) ->
112+
Bin = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
113+
<<Acc/binary, Bin/binary>>
114+
end, <<>>, Bodies),
115+
Compressed = zlib:gzip(Uncompressed),
116+
CompressedLen = byte_size(Compressed),
117+
<<Sequence:64, 1:1, 1:3, 0:4, (length(Bodies)):16, (byte_size(Uncompressed)):32,
118+
CompressedLen:32, Compressed:CompressedLen/binary>>.
119+
120+
receive_stream_commands(Sock, C0) ->
121+
case rabbit_stream_core:next_command(C0) of
122+
empty ->
123+
case gen_tcp:recv(Sock, 0, 5000) of
124+
{ok, Data} ->
125+
C1 = rabbit_stream_core:incoming_data(Data, C0),
126+
case rabbit_stream_core:next_command(C1) of
127+
empty ->
128+
{ok, Data2} = gen_tcp:recv(Sock, 0, 5000),
129+
rabbit_stream_core:next_command(
130+
rabbit_stream_core:incoming_data(Data2, C1));
131+
Res ->
132+
Res
133+
end;
134+
{error, Err} ->
135+
ct:fail("error receiving stream data ~w", [Err])
136+
end;
137+
Res ->
138+
Res
139+
end.

0 commit comments

Comments
 (0)