Skip to content

Commit 351ec1b

Browse files
Merge pull request #14593 from rabbitmq/shovel-raw
Shovels: Optimise amqp10 client messages for shovel usage
2 parents cd8029e + 897260c commit 351ec1b

File tree

5 files changed

+169
-31
lines changed

5 files changed

+169
-31
lines changed

deps/amqp10_client/src/amqp10_client.erl

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
attach_receiver_link/5,
2929
attach_receiver_link/6,
3030
attach_receiver_link/7,
31+
attach_receiver_link/8,
3132
attach_link/2,
3233
detach_link/1,
3334
send_msg/2,
@@ -277,7 +278,17 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter) ->
277278
-spec attach_receiver_link(pid(), binary(), binary(), snd_settle_mode(),
278279
terminus_durability(), filter(), properties()) ->
279280
{ok, link_ref()}.
280-
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties)
281+
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties) ->
282+
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties, false).
283+
284+
%% @doc Attaches a receiver link to a source.
285+
%% This is asynchronous and will notify completion of the attach request to the
286+
%% caller using an amqp10_event of the following format:
287+
%% {amqp10_event, {link, LinkRef, attached | {detached, Why}}}
288+
-spec attach_receiver_link(pid(), binary(), binary(), snd_settle_mode(),
289+
terminus_durability(), filter(), properties(), boolean()) ->
290+
{ok, link_ref()}.
291+
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties, RawMode)
281292
when is_pid(Session) andalso
282293
is_binary(Name) andalso
283294
is_binary(Source) andalso
@@ -286,14 +297,16 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Prop
286297
SettleMode == mixed) andalso
287298
is_atom(Durability) andalso
288299
is_map(Filter) andalso
289-
is_map(Properties) ->
300+
is_map(Properties) andalso
301+
is_boolean(RawMode) ->
290302
AttachArgs = #{name => Name,
291303
role => {receiver, #{address => Source,
292304
durable => Durability}, self()},
293305
snd_settle_mode => SettleMode,
294306
rcv_settle_mode => first,
295307
filter => Filter,
296-
properties => Properties},
308+
properties => Properties,
309+
raw_mode => RawMode},
297310
amqp10_client_session:attach(Session, AttachArgs).
298311

299312
-spec attach_link(pid(), attach_args()) -> {ok, link_ref()}.
@@ -355,11 +368,16 @@ stop_receiver_link(#link_ref{role = receiver,
355368
%%% messages
356369

357370
%% @doc Send a message on a the link referred to be the 'LinkRef'.
358-
-spec send_msg(link_ref(), amqp10_msg:amqp10_msg()) ->
371+
-spec send_msg(link_ref(), amqp10_msg:amqp10_msg() | amqp10_raw_msg:amqp10_raw_msg()) ->
359372
ok | amqp10_client_session:transfer_error().
360373
send_msg(#link_ref{role = sender, session = Session,
361374
link_handle = Handle}, Msg0) ->
362-
Msg = amqp10_msg:set_handle(Handle, Msg0),
375+
Msg = case amqp10_raw_msg:is(Msg0) of
376+
true ->
377+
amqp10_raw_msg:set_handle(Handle, Msg0);
378+
false ->
379+
amqp10_msg:set_handle(Handle, Msg0)
380+
end,
363381
amqp10_client_session:transfer(Session, Msg, ?TIMEOUT).
364382

365383
%% @doc Accept a message on a the link referred to be the 'LinkRef'.

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,8 @@
100100
properties => amqp10_client_types:properties(),
101101
max_message_size => max_message_size(),
102102
handle => output_handle(),
103-
footer_opt => footer_opt()
103+
footer_opt => footer_opt(),
104+
raw_mode => boolean()
104105
}.
105106

106107
-type transfer_error() :: {error,
@@ -142,7 +143,8 @@
142143
auto_flow :: never | {RenewWhenBelow :: pos_integer(),
143144
Credit :: pos_integer()},
144145
incoming_unsettled = #{} :: #{delivery_number() => ok},
145-
footer_opt :: footer_opt() | undefined
146+
footer_opt :: footer_opt() | undefined,
147+
raw_mode = false :: boolean()
146148
}).
147149

148150
-record(state,
@@ -208,11 +210,18 @@ attach(Session, Args) ->
208210
detach(Session, Handle) ->
209211
gen_statem:call(Session, {detach, Handle}, ?TIMEOUT).
210212

211-
-spec transfer(pid(), amqp10_msg:amqp10_msg(), timeout()) ->
213+
-spec transfer(pid(), amqp10_msg:amqp10_msg() | amqp10_raw_msg:amqp10_raw_msg(), timeout()) ->
212214
ok | transfer_error().
213215
transfer(Session, Amqp10Msg, Timeout) ->
214-
[Transfer | Sections] = amqp10_msg:to_amqp_records(Amqp10Msg),
215-
gen_statem:call(Session, {transfer, Transfer, Sections}, Timeout).
216+
case amqp10_raw_msg:is(Amqp10Msg) of
217+
true ->
218+
Transfer = amqp10_raw_msg:transfer(Amqp10Msg),
219+
Payload = amqp10_raw_msg:payload(Amqp10Msg),
220+
gen_statem:call(Session, {transfer, Transfer, {raw, Payload}}, Timeout);
221+
false ->
222+
[Transfer | Sections] = amqp10_msg:to_amqp_records(Amqp10Msg),
223+
gen_statem:call(Session, {transfer, Transfer, Sections}, Timeout)
224+
end.
216225

217226
-spec flow(pid(), non_neg_integer(), never | pos_integer()) -> ok.
218227
flow(Session, IncomingWindow, RenewWhenBelow) when
@@ -413,7 +422,8 @@ mapped(cast, {Transfer0 = #'v1_0.transfer'{handle = {uint, InHandle}},
413422
{ok, #link{target = {pid, TargetPid},
414423
ref = LinkRef,
415424
incoming_unsettled = Unsettled,
416-
footer_opt = FooterOpt
425+
footer_opt = FooterOpt,
426+
raw_mode = RawMode
417427
} = Link0} = find_link_by_input_handle(InHandle, State0),
418428

419429
{Transfer = #'v1_0.transfer'{settled = Settled,
@@ -428,7 +438,7 @@ mapped(cast, {Transfer0 = #'v1_0.transfer'{handle = {uint, InHandle}},
428438
%% then the settled flag MUST be interpreted as being false." [2.7.5]
429439
Link1#link{incoming_unsettled = Unsettled#{DeliveryId => ok}}
430440
end,
431-
case decode_as_msg(Transfer, Payload, FooterOpt) of
441+
case decode_as_msg(Transfer, Payload, FooterOpt, RawMode) of
432442
{ok, Msg} ->
433443
% link bookkeeping
434444
% notify when credit is exhausted (link_credit = 0)
@@ -619,6 +629,25 @@ send(Record, #state{socket = Socket} = State) ->
619629
Frame = encode_frame(Record, State),
620630
socket_send(Socket, Frame).
621631

632+
send_transfer(Transfer0, {raw, Payload}, _FooterOpt, MaxMessageSize,
633+
#state{socket = Socket,
634+
channel = Channel,
635+
connection_config = Config}) ->
636+
OutMaxFrameSize = maps:get(outgoing_max_frame_size, Config),
637+
Transfer = Transfer0#'v1_0.transfer'{more = false},
638+
TransferSize = iolist_size(amqp10_framing:encode_bin(Transfer)),
639+
if is_integer(MaxMessageSize) andalso
640+
MaxMessageSize > 0 andalso
641+
byte_size(Payload) > MaxMessageSize ->
642+
{error, message_size_exceeded};
643+
true ->
644+
% TODO: this does not take the extended header into account
645+
% see: 2.3
646+
MaxPayloadSize = OutMaxFrameSize - ?FRAME_HEADER_SIZE - TransferSize,
647+
Frames = build_frames(Channel, Transfer, Payload, MaxPayloadSize, []),
648+
ok = socket_send(Socket, Frames),
649+
{ok, length(Frames)}
650+
end;
622651
send_transfer(Transfer0, Sections0, FooterOpt, MaxMessageSize,
623652
#state{socket = Socket,
624653
channel = Channel,
@@ -918,7 +947,8 @@ send_attach(Send, #{name := Name, role := RoleTuple} = Args, {FromPid, _},
918947
target = LinkTarget,
919948
delivery_count = unpack(InitialDeliveryCount),
920949
max_message_size = unpack(MaxMessageSize),
921-
footer_opt = maps:get(footer_opt, Args, undefined)},
950+
footer_opt = maps:get(footer_opt, Args, undefined),
951+
raw_mode = maps:get(raw_mode, Args, false)},
922952

923953
{State#state{links = Links#{OutHandle => Link},
924954
next_link_handle = NextLinkHandle,
@@ -1199,10 +1229,14 @@ complete_partial_transfer(_Transfer, Payload,
11991229
{T, iolist_to_binary(lists:reverse([Payload | Payloads])),
12001230
Link#link{partial_transfers = undefined}}.
12011231

1202-
decode_as_msg(Transfer, Payload, undefined) ->
1232+
decode_as_msg(#'v1_0.transfer'{settled = Settled,
1233+
delivery_id = {uint, DeliveryId}},
1234+
Payload, _, true) ->
1235+
{ok, amqp10_raw_msg:new(Settled, DeliveryId, Payload)};
1236+
decode_as_msg(Transfer, Payload, undefined, _) ->
12031237
Sections = amqp10_framing:decode_bin(Payload),
12041238
{ok, amqp10_msg:from_amqp_records([Transfer | Sections])};
1205-
decode_as_msg(Transfer, Payload, FooterOpt) ->
1239+
decode_as_msg(Transfer, Payload, FooterOpt, _) ->
12061240
PosSections = decode_sections([], Payload, byte_size(Payload), 0),
12071241
Sections = lists:map(fun({_Pos, S}) -> S end, PosSections),
12081242
Msg = amqp10_msg:from_amqp_records([Transfer | Sections]),
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
-module(amqp10_raw_msg).
8+
9+
-include_lib("amqp10_common/include/amqp10_framing.hrl").
10+
-include_lib("amqp10_common/include/amqp10_types.hrl").
11+
12+
%% Just for AMQP10 shovel usage. It avoids the binary <-> amqp10 <-> mc
13+
%% conversion, with all the unnecessary encoding/decoding steps.
14+
%% It allows for just binary <-> mc conversion, as the payload is stored as is
15+
16+
-export([new/3,
17+
settled/1,
18+
delivery_tag/1,
19+
payload/1,
20+
handle/1,
21+
set_handle/2,
22+
transfer/1,
23+
is/1]).
24+
25+
-record(amqp10_raw_msg,
26+
{settled :: boolean(),
27+
delivery_tag :: non_neg_integer(),
28+
payload :: binary(),
29+
handle :: non_neg_integer() | undefined
30+
}).
31+
32+
-opaque amqp10_raw_msg() :: #amqp10_raw_msg{}.
33+
34+
-export_type([amqp10_raw_msg/0]).
35+
36+
-spec new(boolean(), non_neg_integer(), binary()) ->
37+
amqp10_raw_msg().
38+
new(Settled, DeliveryTag, Payload) ->
39+
#amqp10_raw_msg{settled = Settled,
40+
delivery_tag = DeliveryTag,
41+
payload = Payload}.
42+
43+
-spec settled(amqp10_raw_msg()) -> boolean().
44+
settled(#amqp10_raw_msg{settled = Settled}) ->
45+
Settled.
46+
47+
-spec delivery_tag(amqp10_raw_msg()) -> non_neg_integer().
48+
delivery_tag(#amqp10_raw_msg{delivery_tag = DeliveryTag}) ->
49+
DeliveryTag.
50+
51+
-spec payload(amqp10_raw_msg()) -> binary().
52+
payload(#amqp10_raw_msg{payload = Payload}) ->
53+
Payload.
54+
55+
-spec handle(amqp10_raw_msg()) -> non_neg_integer().
56+
handle(#amqp10_raw_msg{handle = Handle}) ->
57+
Handle.
58+
59+
-spec set_handle(non_neg_integer(), amqp10_raw_msg()) ->
60+
amqp10_raw_msg().
61+
set_handle(Handle, #amqp10_raw_msg{} = Msg) ->
62+
Msg#amqp10_raw_msg{handle = Handle}.
63+
64+
-spec transfer(amqp10_raw_msg()) -> #'v1_0.transfer'{}.
65+
transfer(#amqp10_raw_msg{settled = Settled,
66+
delivery_tag = DeliveryTag,
67+
handle = Handle}) ->
68+
#'v1_0.transfer'{
69+
delivery_tag = {binary, rabbit_data_coercion:to_binary(DeliveryTag)},
70+
settled = Settled,
71+
handle = {uint, Handle},
72+
message_format = {uint, ?MESSAGE_FORMAT}}.
73+
74+
-spec is(term()) -> boolean().
75+
is(Record) ->
76+
is_record(Record, amqp10_raw_msg).

deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
-include_lib("rabbit/include/mc.hrl").
1313
-include("rabbit_shovel.hrl").
14+
-include_lib("amqp10_common/include/amqp10_framing.hrl").
1415

1516
-export([
1617
parse/2,
@@ -81,7 +82,9 @@ connect_source(State = #{name := Name,
8182
on_publish -> unsettled;
8283
on_confirm -> unsettled
8384
end,
84-
AttachFun = fun amqp10_client:attach_receiver_link/5,
85+
AttachFun = fun(S, L, A, SSM, D) ->
86+
amqp10_client:attach_receiver_link(S, L, A, SSM, D, #{}, #{}, true)
87+
end,
8588
{Conn, Sess, LinkRef} = connect(Name, SndSettleMode, Uri, "receiver", Addr, Src,
8689
AttachFun),
8790
State#{source => Src#{current => #{conn => Conn,
@@ -128,6 +131,7 @@ connect(Name, SndSettleMode, Uri, Postfix, Addr, Map, AttachFun) ->
128131
% needs to be sync, i.e. awaits the 'attach' event as
129132
% else we may try to use the link before it is ready
130133
Durability = maps:get(durability, Map, unsettled_state),
134+
%% Attach in raw mode
131135
{ok, LinkRef} = AttachFun(Sess, LinkName, Addr,
132136
SndSettleMode,
133137
Durability),
@@ -181,11 +185,10 @@ dest_endpoint(#{shovel_type := dynamic,
181185

182186
-spec handle_source(Msg :: any(), state()) ->
183187
not_handled | state() | {stop, any()}.
184-
handle_source({amqp10_msg, _LinkRef, Msg0}, State) ->
185-
Tag = amqp10_msg:delivery_id(Msg0),
186-
[_ | Rest] = amqp10_msg:to_amqp_records(Msg0),
187-
Bin = iolist_to_binary([amqp10_framing:encode_bin(D) || D <- Rest]),
188-
Msg = mc:init(mc_amqp, Bin, #{}),
188+
handle_source({amqp10_msg, _LinkRef, RawMsg}, State) ->
189+
Tag = amqp10_raw_msg:delivery_tag(RawMsg),
190+
Payload = amqp10_raw_msg:payload(RawMsg),
191+
Msg = mc:init(mc_amqp, Payload, #{}),
189192
rabbit_shovel_behaviour:forward(Tag, Msg, State);
190193
handle_source({amqp10_event, {connection, Conn, opened}},
191194
State = #{source := #{current := #{conn := Conn}}}) ->
@@ -333,11 +336,10 @@ forward(Tag, Msg0,
333336
unacked := Unacked} = Dst,
334337
ack_mode := AckMode} = State) ->
335338
OutTag = rabbit_data_coercion:to_binary(Tag),
336-
Msg1 = mc:protocol_state(mc:convert(mc_amqp, Msg0)),
337-
Records = lists:flatten([amqp10_framing:decode_bin(iolist_to_binary(S)) || S <- Msg1]),
338-
Msg2 = amqp10_msg:new(OutTag, Records, AckMode =/= on_confirm),
339-
Msg = add_timestamp_header(State, add_forward_headers(State, Msg2)),
340-
case send_msg(Link, Msg) of
339+
Msg1 = add_timestamp_header(State, add_forward_headers(State, Msg0)),
340+
Msg2 = mc:protocol_state(mc:convert(mc_amqp, Msg1)),
341+
Msg3 = amqp10_raw_msg:new(AckMode =/= on_confirm, Tag, iolist_to_binary(Msg2)),
342+
case send_msg(Link, Msg3) of
341343
ok ->
342344
rabbit_shovel_behaviour:decr_remaining_unacked(
343345
case AckMode of
@@ -366,10 +368,13 @@ send_msg(Link, Msg) ->
366368
end.
367369

368370
add_timestamp_header(#{dest := #{add_timestamp_header := true}}, Msg) ->
369-
Anns = #{<<"x-opt-shovelled-timestamp">> => os:system_time(milli_seconds)},
370-
amqp10_msg:set_message_annotations(Anns, Msg);
371+
mc:set_annotation(
372+
<<"x-opt-shovelled-timestamp">>, os:system_time(milli_seconds),
373+
Msg);
371374
add_timestamp_header(_, Msg) -> Msg.
372375

373376
add_forward_headers(#{dest := #{cached_forward_headers := Anns}}, Msg) ->
374-
amqp10_msg:set_message_annotations(Anns, Msg);
377+
maps:fold(fun(K, V, Acc) ->
378+
mc:set_annotation(K, V, Acc)
379+
end, Msg, Anns);
375380
add_forward_headers(_, Msg) -> Msg.

deps/rabbitmq_shovel/test/amqp10_shovel_SUITE.erl

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,9 @@ amqp_encoded_data_list(_Config) ->
7474
#'v1_0.data'{content = <<"one">>},
7575
#'v1_0.data'{content = <<"two">>}
7676
],
77-
Msg = amqp10_msg:new(55, Body),
77+
[_Transfer | Sections] = amqp10_msg:to_amqp_records(amqp10_msg:new(<<"55">>, Body)),
78+
Bin = iolist_to_binary([amqp10_framing:encode_bin(S) || S <- Sections]),
79+
Msg = amqp10_raw_msg:new(true, 55, Bin),
7880
rabbit_amqp10_shovel:handle_source({amqp10_msg, linkref, Msg}, State),
7981

8082
?assert(meck:validate(rabbit_shovel_behaviour)),
@@ -91,8 +93,11 @@ amqp_encoded_amqp_value(_Config) ->
9193
State = #{source => #{},
9294
dest => #{module => rabbit_amqp10_shovel},
9395
ack_mode => no_ack},
96+
9497
Body = #'v1_0.amqp_value'{content = {utf8, <<"hi">>}},
95-
Msg = amqp10_msg:new(55, Body),
98+
[_Transfer | Sections] = amqp10_msg:to_amqp_records(amqp10_msg:new(<<"55">>, Body)),
99+
Bin = iolist_to_binary([amqp10_framing:encode_bin(S) || S <- Sections]),
100+
Msg = amqp10_raw_msg:new(true, 55, Bin),
96101
rabbit_amqp10_shovel:handle_source({amqp10_msg, linkref, Msg}, State),
97102

98103
?assert(meck:validate(rabbit_shovel_behaviour)),

0 commit comments

Comments
 (0)