Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions deps/amqp10_client/src/amqp10_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
attach_receiver_link/5,
attach_receiver_link/6,
attach_receiver_link/7,
attach_receiver_link/8,
attach_link/2,
detach_link/1,
send_msg/2,
Expand Down Expand Up @@ -277,7 +278,17 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter) ->
-spec attach_receiver_link(pid(), binary(), binary(), snd_settle_mode(),
terminus_durability(), filter(), properties()) ->
{ok, link_ref()}.
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties)
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties) ->
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties, false).

%% @doc Attaches a receiver link to a source.
%% This is asynchronous and will notify completion of the attach request to the
%% caller using an amqp10_event of the following format:
%% {amqp10_event, {link, LinkRef, attached | {detached, Why}}}
-spec attach_receiver_link(pid(), binary(), binary(), snd_settle_mode(),
terminus_durability(), filter(), properties(), boolean()) ->
{ok, link_ref()}.
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties, RawMode)
when is_pid(Session) andalso
is_binary(Name) andalso
is_binary(Source) andalso
Expand All @@ -286,14 +297,16 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Prop
SettleMode == mixed) andalso
is_atom(Durability) andalso
is_map(Filter) andalso
is_map(Properties) ->
is_map(Properties) andalso
is_boolean(RawMode) ->
AttachArgs = #{name => Name,
role => {receiver, #{address => Source,
durable => Durability}, self()},
snd_settle_mode => SettleMode,
rcv_settle_mode => first,
filter => Filter,
properties => Properties},
properties => Properties,
raw_mode => RawMode},
amqp10_client_session:attach(Session, AttachArgs).

-spec attach_link(pid(), attach_args()) -> {ok, link_ref()}.
Expand Down Expand Up @@ -355,11 +368,16 @@ stop_receiver_link(#link_ref{role = receiver,
%%% messages

%% @doc Send a message on a the link referred to be the 'LinkRef'.
-spec send_msg(link_ref(), amqp10_msg:amqp10_msg()) ->
-spec send_msg(link_ref(), amqp10_msg:amqp10_msg() | amqp10_raw_msg:amqp10_raw_msg()) ->
ok | amqp10_client_session:transfer_error().
send_msg(#link_ref{role = sender, session = Session,
link_handle = Handle}, Msg0) ->
Msg = amqp10_msg:set_handle(Handle, Msg0),
Msg = case amqp10_raw_msg:is(Msg0) of
true ->
amqp10_raw_msg:set_handle(Handle, Msg0);
false ->
amqp10_msg:set_handle(Handle, Msg0)
end,
amqp10_client_session:transfer(Session, Msg, ?TIMEOUT).

%% @doc Accept a message on a the link referred to be the 'LinkRef'.
Expand Down
54 changes: 44 additions & 10 deletions deps/amqp10_client/src/amqp10_client_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@
properties => amqp10_client_types:properties(),
max_message_size => max_message_size(),
handle => output_handle(),
footer_opt => footer_opt()
footer_opt => footer_opt(),
raw_mode => boolean()
}.

-type transfer_error() :: {error,
Expand Down Expand Up @@ -142,7 +143,8 @@
auto_flow :: never | {RenewWhenBelow :: pos_integer(),
Credit :: pos_integer()},
incoming_unsettled = #{} :: #{delivery_number() => ok},
footer_opt :: footer_opt() | undefined
footer_opt :: footer_opt() | undefined,
raw_mode = false :: boolean()
}).

-record(state,
Expand Down Expand Up @@ -208,11 +210,18 @@ attach(Session, Args) ->
detach(Session, Handle) ->
gen_statem:call(Session, {detach, Handle}, ?TIMEOUT).

-spec transfer(pid(), amqp10_msg:amqp10_msg(), timeout()) ->
-spec transfer(pid(), amqp10_msg:amqp10_msg() | amqp10_raw_msg:amqp10_raw_msg(), timeout()) ->
ok | transfer_error().
transfer(Session, Amqp10Msg, Timeout) ->
[Transfer | Sections] = amqp10_msg:to_amqp_records(Amqp10Msg),
gen_statem:call(Session, {transfer, Transfer, Sections}, Timeout).
case amqp10_raw_msg:is(Amqp10Msg) of
true ->
Transfer = amqp10_raw_msg:transfer(Amqp10Msg),
Payload = amqp10_raw_msg:payload(Amqp10Msg),
gen_statem:call(Session, {transfer, Transfer, {raw, Payload}}, Timeout);
false ->
[Transfer | Sections] = amqp10_msg:to_amqp_records(Amqp10Msg),
gen_statem:call(Session, {transfer, Transfer, Sections}, Timeout)
end.

-spec flow(pid(), non_neg_integer(), never | pos_integer()) -> ok.
flow(Session, IncomingWindow, RenewWhenBelow) when
Expand Down Expand Up @@ -413,7 +422,8 @@ mapped(cast, {Transfer0 = #'v1_0.transfer'{handle = {uint, InHandle}},
{ok, #link{target = {pid, TargetPid},
ref = LinkRef,
incoming_unsettled = Unsettled,
footer_opt = FooterOpt
footer_opt = FooterOpt,
raw_mode = RawMode
} = Link0} = find_link_by_input_handle(InHandle, State0),

{Transfer = #'v1_0.transfer'{settled = Settled,
Expand All @@ -428,7 +438,7 @@ mapped(cast, {Transfer0 = #'v1_0.transfer'{handle = {uint, InHandle}},
%% then the settled flag MUST be interpreted as being false." [2.7.5]
Link1#link{incoming_unsettled = Unsettled#{DeliveryId => ok}}
end,
case decode_as_msg(Transfer, Payload, FooterOpt) of
case decode_as_msg(Transfer, Payload, FooterOpt, RawMode) of
{ok, Msg} ->
% link bookkeeping
% notify when credit is exhausted (link_credit = 0)
Expand Down Expand Up @@ -619,6 +629,25 @@ send(Record, #state{socket = Socket} = State) ->
Frame = encode_frame(Record, State),
socket_send(Socket, Frame).

send_transfer(Transfer0, {raw, Payload}, _FooterOpt, MaxMessageSize,
#state{socket = Socket,
channel = Channel,
connection_config = Config}) ->
OutMaxFrameSize = maps:get(outgoing_max_frame_size, Config),
Transfer = Transfer0#'v1_0.transfer'{more = false},
TransferSize = iolist_size(amqp10_framing:encode_bin(Transfer)),
if is_integer(MaxMessageSize) andalso
MaxMessageSize > 0 andalso
byte_size(Payload) > MaxMessageSize ->
{error, message_size_exceeded};
true ->
% TODO: this does not take the extended header into account
% see: 2.3
MaxPayloadSize = OutMaxFrameSize - ?FRAME_HEADER_SIZE - TransferSize,
Frames = build_frames(Channel, Transfer, Payload, MaxPayloadSize, []),
ok = socket_send(Socket, Frames),
{ok, length(Frames)}
end;
send_transfer(Transfer0, Sections0, FooterOpt, MaxMessageSize,
#state{socket = Socket,
channel = Channel,
Expand Down Expand Up @@ -918,7 +947,8 @@ send_attach(Send, #{name := Name, role := RoleTuple} = Args, {FromPid, _},
target = LinkTarget,
delivery_count = unpack(InitialDeliveryCount),
max_message_size = unpack(MaxMessageSize),
footer_opt = maps:get(footer_opt, Args, undefined)},
footer_opt = maps:get(footer_opt, Args, undefined),
raw_mode = maps:get(raw_mode, Args, false)},

{State#state{links = Links#{OutHandle => Link},
next_link_handle = NextLinkHandle,
Expand Down Expand Up @@ -1199,10 +1229,14 @@ complete_partial_transfer(_Transfer, Payload,
{T, iolist_to_binary(lists:reverse([Payload | Payloads])),
Link#link{partial_transfers = undefined}}.

decode_as_msg(Transfer, Payload, undefined) ->
decode_as_msg(#'v1_0.transfer'{settled = Settled,
delivery_id = {uint, DeliveryId}},
Payload, _, true) ->
{ok, amqp10_raw_msg:new(Settled, DeliveryId, Payload)};
decode_as_msg(Transfer, Payload, undefined, _) ->
Sections = amqp10_framing:decode_bin(Payload),
{ok, amqp10_msg:from_amqp_records([Transfer | Sections])};
decode_as_msg(Transfer, Payload, FooterOpt) ->
decode_as_msg(Transfer, Payload, FooterOpt, _) ->
PosSections = decode_sections([], Payload, byte_size(Payload), 0),
Sections = lists:map(fun({_Pos, S}) -> S end, PosSections),
Msg = amqp10_msg:from_amqp_records([Transfer | Sections]),
Expand Down
76 changes: 76 additions & 0 deletions deps/amqp10_client/src/amqp10_raw_msg.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(amqp10_raw_msg).

-include_lib("amqp10_common/include/amqp10_framing.hrl").
-include_lib("amqp10_common/include/amqp10_types.hrl").

%% Just for AMQP10 shovel usage. It avoids the binary <-> amqp10 <-> mc
%% conversion, with all the unnecessary encoding/decoding steps.
%% It allows for just binary <-> mc conversion, as the payload is stored as is

-export([new/3,
settled/1,
delivery_tag/1,
payload/1,
handle/1,
set_handle/2,
transfer/1,
is/1]).

-record(amqp10_raw_msg,
{settled :: boolean(),
delivery_tag :: non_neg_integer(),
payload :: binary(),
handle :: non_neg_integer() | undefined
}).

-opaque amqp10_raw_msg() :: #amqp10_raw_msg{}.

-export_type([amqp10_raw_msg/0]).

-spec new(boolean(), non_neg_integer(), binary()) ->
amqp10_raw_msg().
new(Settled, DeliveryTag, Payload) ->
#amqp10_raw_msg{settled = Settled,
delivery_tag = DeliveryTag,
payload = Payload}.

-spec settled(amqp10_raw_msg()) -> boolean().
settled(#amqp10_raw_msg{settled = Settled}) ->
Settled.

-spec delivery_tag(amqp10_raw_msg()) -> non_neg_integer().
delivery_tag(#amqp10_raw_msg{delivery_tag = DeliveryTag}) ->
DeliveryTag.

-spec payload(amqp10_raw_msg()) -> binary().
payload(#amqp10_raw_msg{payload = Payload}) ->
Payload.

-spec handle(amqp10_raw_msg()) -> non_neg_integer().
handle(#amqp10_raw_msg{handle = Handle}) ->
Handle.

-spec set_handle(non_neg_integer(), amqp10_raw_msg()) ->
amqp10_raw_msg().
set_handle(Handle, #amqp10_raw_msg{} = Msg) ->
Msg#amqp10_raw_msg{handle = Handle}.

-spec transfer(amqp10_raw_msg()) -> #'v1_0.transfer'{}.
transfer(#amqp10_raw_msg{settled = Settled,
delivery_tag = DeliveryTag,
handle = Handle}) ->
#'v1_0.transfer'{
delivery_tag = {binary, rabbit_data_coercion:to_binary(DeliveryTag)},
settled = Settled,
handle = {uint, Handle},
message_format = {uint, ?MESSAGE_FORMAT}}.

-spec is(term()) -> boolean().
is(Record) ->
is_record(Record, amqp10_raw_msg).
33 changes: 19 additions & 14 deletions deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

-include_lib("rabbit/include/mc.hrl").
-include("rabbit_shovel.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").

-export([
parse/2,
Expand Down Expand Up @@ -81,7 +82,9 @@ connect_source(State = #{name := Name,
on_publish -> unsettled;
on_confirm -> unsettled
end,
AttachFun = fun amqp10_client:attach_receiver_link/5,
AttachFun = fun(S, L, A, SSM, D) ->
amqp10_client:attach_receiver_link(S, L, A, SSM, D, #{}, #{}, true)
end,
{Conn, Sess, LinkRef} = connect(Name, SndSettleMode, Uri, "receiver", Addr, Src,
AttachFun),
State#{source => Src#{current => #{conn => Conn,
Expand Down Expand Up @@ -128,6 +131,7 @@ connect(Name, SndSettleMode, Uri, Postfix, Addr, Map, AttachFun) ->
% needs to be sync, i.e. awaits the 'attach' event as
% else we may try to use the link before it is ready
Durability = maps:get(durability, Map, unsettled_state),
%% Attach in raw mode
{ok, LinkRef} = AttachFun(Sess, LinkName, Addr,
SndSettleMode,
Durability),
Expand Down Expand Up @@ -181,11 +185,10 @@ dest_endpoint(#{shovel_type := dynamic,

-spec handle_source(Msg :: any(), state()) ->
not_handled | state() | {stop, any()}.
handle_source({amqp10_msg, _LinkRef, Msg0}, State) ->
Tag = amqp10_msg:delivery_id(Msg0),
[_ | Rest] = amqp10_msg:to_amqp_records(Msg0),
Bin = iolist_to_binary([amqp10_framing:encode_bin(D) || D <- Rest]),
Msg = mc:init(mc_amqp, Bin, #{}),
handle_source({amqp10_msg, _LinkRef, RawMsg}, State) ->
Tag = amqp10_raw_msg:delivery_tag(RawMsg),
Payload = amqp10_raw_msg:payload(RawMsg),
Msg = mc:init(mc_amqp, Payload, #{}),
rabbit_shovel_behaviour:forward(Tag, Msg, State);
handle_source({amqp10_event, {connection, Conn, opened}},
State = #{source := #{current := #{conn := Conn}}}) ->
Expand Down Expand Up @@ -333,11 +336,10 @@ forward(Tag, Msg0,
unacked := Unacked} = Dst,
ack_mode := AckMode} = State) ->
OutTag = rabbit_data_coercion:to_binary(Tag),
Msg1 = mc:protocol_state(mc:convert(mc_amqp, Msg0)),
Records = lists:flatten([amqp10_framing:decode_bin(iolist_to_binary(S)) || S <- Msg1]),
Msg2 = amqp10_msg:new(OutTag, Records, AckMode =/= on_confirm),
Msg = add_timestamp_header(State, add_forward_headers(State, Msg2)),
case send_msg(Link, Msg) of
Msg1 = add_timestamp_header(State, add_forward_headers(State, Msg0)),
Msg2 = mc:protocol_state(mc:convert(mc_amqp, Msg1)),
Msg3 = amqp10_raw_msg:new(AckMode =/= on_confirm, Tag, iolist_to_binary(Msg2)),
case send_msg(Link, Msg3) of
ok ->
rabbit_shovel_behaviour:decr_remaining_unacked(
case AckMode of
Expand Down Expand Up @@ -366,10 +368,13 @@ send_msg(Link, Msg) ->
end.

add_timestamp_header(#{dest := #{add_timestamp_header := true}}, Msg) ->
Anns = #{<<"x-opt-shovelled-timestamp">> => os:system_time(milli_seconds)},
amqp10_msg:set_message_annotations(Anns, Msg);
mc:set_annotation(
<<"x-opt-shovelled-timestamp">>, os:system_time(milli_seconds),
Msg);
add_timestamp_header(_, Msg) -> Msg.

add_forward_headers(#{dest := #{cached_forward_headers := Anns}}, Msg) ->
amqp10_msg:set_message_annotations(Anns, Msg);
maps:fold(fun(K, V, Acc) ->
mc:set_annotation(K, V, Acc)
end, Msg, Anns);
add_forward_headers(_, Msg) -> Msg.
9 changes: 7 additions & 2 deletions deps/rabbitmq_shovel/test/amqp10_shovel_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ amqp_encoded_data_list(_Config) ->
#'v1_0.data'{content = <<"one">>},
#'v1_0.data'{content = <<"two">>}
],
Msg = amqp10_msg:new(55, Body),
[_Transfer | Sections] = amqp10_msg:to_amqp_records(amqp10_msg:new(<<"55">>, Body)),
Bin = iolist_to_binary([amqp10_framing:encode_bin(S) || S <- Sections]),
Msg = amqp10_raw_msg:new(true, 55, Bin),
rabbit_amqp10_shovel:handle_source({amqp10_msg, linkref, Msg}, State),

?assert(meck:validate(rabbit_shovel_behaviour)),
Expand All @@ -91,8 +93,11 @@ amqp_encoded_amqp_value(_Config) ->
State = #{source => #{},
dest => #{module => rabbit_amqp10_shovel},
ack_mode => no_ack},

Body = #'v1_0.amqp_value'{content = {utf8, <<"hi">>}},
Msg = amqp10_msg:new(55, Body),
[_Transfer | Sections] = amqp10_msg:to_amqp_records(amqp10_msg:new(<<"55">>, Body)),
Bin = iolist_to_binary([amqp10_framing:encode_bin(S) || S <- Sections]),
Msg = amqp10_raw_msg:new(true, 55, Bin),
rabbit_amqp10_shovel:handle_source({amqp10_msg, linkref, Msg}, State),

?assert(meck:validate(rabbit_shovel_behaviour)),
Expand Down
Loading