Skip to content

Commit 773e754

Browse files
authored
Merge branch 'main' into lukebakken/ldap-validation-api
2 parents 9a4cb9c + ed05807 commit 773e754

File tree

57 files changed

+1547
-390
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+1547
-390
lines changed

deps/amqp10_client/src/amqp10_client.erl

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
attach_receiver_link/5,
2929
attach_receiver_link/6,
3030
attach_receiver_link/7,
31+
attach_receiver_link/8,
3132
attach_link/2,
3233
detach_link/1,
3334
send_msg/2,
@@ -277,7 +278,17 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter) ->
277278
-spec attach_receiver_link(pid(), binary(), binary(), snd_settle_mode(),
278279
terminus_durability(), filter(), properties()) ->
279280
{ok, link_ref()}.
280-
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties)
281+
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties) ->
282+
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties, false).
283+
284+
%% @doc Attaches a receiver link to a source.
285+
%% This is asynchronous and will notify completion of the attach request to the
286+
%% caller using an amqp10_event of the following format:
287+
%% {amqp10_event, {link, LinkRef, attached | {detached, Why}}}
288+
-spec attach_receiver_link(pid(), binary(), binary(), snd_settle_mode(),
289+
terminus_durability(), filter(), properties(), boolean()) ->
290+
{ok, link_ref()}.
291+
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties, RawMode)
281292
when is_pid(Session) andalso
282293
is_binary(Name) andalso
283294
is_binary(Source) andalso
@@ -286,14 +297,16 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Prop
286297
SettleMode == mixed) andalso
287298
is_atom(Durability) andalso
288299
is_map(Filter) andalso
289-
is_map(Properties) ->
300+
is_map(Properties) andalso
301+
is_boolean(RawMode) ->
290302
AttachArgs = #{name => Name,
291303
role => {receiver, #{address => Source,
292304
durable => Durability}, self()},
293305
snd_settle_mode => SettleMode,
294306
rcv_settle_mode => first,
295307
filter => Filter,
296-
properties => Properties},
308+
properties => Properties,
309+
raw_mode => RawMode},
297310
amqp10_client_session:attach(Session, AttachArgs).
298311

299312
-spec attach_link(pid(), attach_args()) -> {ok, link_ref()}.
@@ -355,11 +368,16 @@ stop_receiver_link(#link_ref{role = receiver,
355368
%%% messages
356369

357370
%% @doc Send a message on a the link referred to be the 'LinkRef'.
358-
-spec send_msg(link_ref(), amqp10_msg:amqp10_msg()) ->
371+
-spec send_msg(link_ref(), amqp10_msg:amqp10_msg() | amqp10_raw_msg:amqp10_raw_msg()) ->
359372
ok | amqp10_client_session:transfer_error().
360373
send_msg(#link_ref{role = sender, session = Session,
361374
link_handle = Handle}, Msg0) ->
362-
Msg = amqp10_msg:set_handle(Handle, Msg0),
375+
Msg = case amqp10_raw_msg:is(Msg0) of
376+
true ->
377+
amqp10_raw_msg:set_handle(Handle, Msg0);
378+
false ->
379+
amqp10_msg:set_handle(Handle, Msg0)
380+
end,
363381
amqp10_client_session:transfer(Session, Msg, ?TIMEOUT).
364382

365383
%% @doc Accept a message on a the link referred to be the 'LinkRef'.

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,8 @@
100100
properties => amqp10_client_types:properties(),
101101
max_message_size => max_message_size(),
102102
handle => output_handle(),
103-
footer_opt => footer_opt()
103+
footer_opt => footer_opt(),
104+
raw_mode => boolean()
104105
}.
105106

106107
-type transfer_error() :: {error,
@@ -142,7 +143,8 @@
142143
auto_flow :: never | {RenewWhenBelow :: pos_integer(),
143144
Credit :: pos_integer()},
144145
incoming_unsettled = #{} :: #{delivery_number() => ok},
145-
footer_opt :: footer_opt() | undefined
146+
footer_opt :: footer_opt() | undefined,
147+
raw_mode = false :: boolean()
146148
}).
147149

148150
-record(state,
@@ -208,11 +210,18 @@ attach(Session, Args) ->
208210
detach(Session, Handle) ->
209211
gen_statem:call(Session, {detach, Handle}, ?TIMEOUT).
210212

211-
-spec transfer(pid(), amqp10_msg:amqp10_msg(), timeout()) ->
213+
-spec transfer(pid(), amqp10_msg:amqp10_msg() | amqp10_raw_msg:amqp10_raw_msg(), timeout()) ->
212214
ok | transfer_error().
213215
transfer(Session, Amqp10Msg, Timeout) ->
214-
[Transfer | Sections] = amqp10_msg:to_amqp_records(Amqp10Msg),
215-
gen_statem:call(Session, {transfer, Transfer, Sections}, Timeout).
216+
case amqp10_raw_msg:is(Amqp10Msg) of
217+
true ->
218+
Transfer = amqp10_raw_msg:transfer(Amqp10Msg),
219+
Payload = amqp10_raw_msg:payload(Amqp10Msg),
220+
gen_statem:call(Session, {transfer, Transfer, {raw, Payload}}, Timeout);
221+
false ->
222+
[Transfer | Sections] = amqp10_msg:to_amqp_records(Amqp10Msg),
223+
gen_statem:call(Session, {transfer, Transfer, Sections}, Timeout)
224+
end.
216225

217226
-spec flow(pid(), non_neg_integer(), never | pos_integer()) -> ok.
218227
flow(Session, IncomingWindow, RenewWhenBelow) when
@@ -413,7 +422,8 @@ mapped(cast, {Transfer0 = #'v1_0.transfer'{handle = {uint, InHandle}},
413422
{ok, #link{target = {pid, TargetPid},
414423
ref = LinkRef,
415424
incoming_unsettled = Unsettled,
416-
footer_opt = FooterOpt
425+
footer_opt = FooterOpt,
426+
raw_mode = RawMode
417427
} = Link0} = find_link_by_input_handle(InHandle, State0),
418428

419429
{Transfer = #'v1_0.transfer'{settled = Settled,
@@ -428,7 +438,7 @@ mapped(cast, {Transfer0 = #'v1_0.transfer'{handle = {uint, InHandle}},
428438
%% then the settled flag MUST be interpreted as being false." [2.7.5]
429439
Link1#link{incoming_unsettled = Unsettled#{DeliveryId => ok}}
430440
end,
431-
case decode_as_msg(Transfer, Payload, FooterOpt) of
441+
case decode_as_msg(Transfer, Payload, FooterOpt, RawMode) of
432442
{ok, Msg} ->
433443
% link bookkeeping
434444
% notify when credit is exhausted (link_credit = 0)
@@ -619,6 +629,25 @@ send(Record, #state{socket = Socket} = State) ->
619629
Frame = encode_frame(Record, State),
620630
socket_send(Socket, Frame).
621631

632+
send_transfer(Transfer0, {raw, Payload}, _FooterOpt, MaxMessageSize,
633+
#state{socket = Socket,
634+
channel = Channel,
635+
connection_config = Config}) ->
636+
OutMaxFrameSize = maps:get(outgoing_max_frame_size, Config),
637+
Transfer = Transfer0#'v1_0.transfer'{more = false},
638+
TransferSize = iolist_size(amqp10_framing:encode_bin(Transfer)),
639+
if is_integer(MaxMessageSize) andalso
640+
MaxMessageSize > 0 andalso
641+
byte_size(Payload) > MaxMessageSize ->
642+
{error, message_size_exceeded};
643+
true ->
644+
% TODO: this does not take the extended header into account
645+
% see: 2.3
646+
MaxPayloadSize = OutMaxFrameSize - ?FRAME_HEADER_SIZE - TransferSize,
647+
Frames = build_frames(Channel, Transfer, Payload, MaxPayloadSize, []),
648+
ok = socket_send(Socket, Frames),
649+
{ok, length(Frames)}
650+
end;
622651
send_transfer(Transfer0, Sections0, FooterOpt, MaxMessageSize,
623652
#state{socket = Socket,
624653
channel = Channel,
@@ -918,7 +947,8 @@ send_attach(Send, #{name := Name, role := RoleTuple} = Args, {FromPid, _},
918947
target = LinkTarget,
919948
delivery_count = unpack(InitialDeliveryCount),
920949
max_message_size = unpack(MaxMessageSize),
921-
footer_opt = maps:get(footer_opt, Args, undefined)},
950+
footer_opt = maps:get(footer_opt, Args, undefined),
951+
raw_mode = maps:get(raw_mode, Args, false)},
922952

923953
{State#state{links = Links#{OutHandle => Link},
924954
next_link_handle = NextLinkHandle,
@@ -1199,10 +1229,14 @@ complete_partial_transfer(_Transfer, Payload,
11991229
{T, iolist_to_binary(lists:reverse([Payload | Payloads])),
12001230
Link#link{partial_transfers = undefined}}.
12011231

1202-
decode_as_msg(Transfer, Payload, undefined) ->
1232+
decode_as_msg(#'v1_0.transfer'{settled = Settled,
1233+
delivery_id = {uint, DeliveryId}},
1234+
Payload, _, true) ->
1235+
{ok, amqp10_raw_msg:new(Settled, DeliveryId, Payload)};
1236+
decode_as_msg(Transfer, Payload, undefined, _) ->
12031237
Sections = amqp10_framing:decode_bin(Payload),
12041238
{ok, amqp10_msg:from_amqp_records([Transfer | Sections])};
1205-
decode_as_msg(Transfer, Payload, FooterOpt) ->
1239+
decode_as_msg(Transfer, Payload, FooterOpt, _) ->
12061240
PosSections = decode_sections([], Payload, byte_size(Payload), 0),
12071241
Sections = lists:map(fun({_Pos, S}) -> S end, PosSections),
12081242
Msg = amqp10_msg:from_amqp_records([Transfer | Sections]),
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
-module(amqp10_raw_msg).
8+
9+
-include_lib("amqp10_common/include/amqp10_framing.hrl").
10+
-include_lib("amqp10_common/include/amqp10_types.hrl").
11+
12+
%% Just for AMQP10 shovel usage. It avoids the binary <-> amqp10 <-> mc
13+
%% conversion, with all the unnecessary encoding/decoding steps.
14+
%% It allows for just binary <-> mc conversion, as the payload is stored as is
15+
16+
-export([new/3,
17+
settled/1,
18+
delivery_tag/1,
19+
payload/1,
20+
handle/1,
21+
set_handle/2,
22+
transfer/1,
23+
is/1]).
24+
25+
-record(amqp10_raw_msg,
26+
{settled :: boolean(),
27+
delivery_tag :: non_neg_integer(),
28+
payload :: binary(),
29+
handle :: non_neg_integer() | undefined
30+
}).
31+
32+
-opaque amqp10_raw_msg() :: #amqp10_raw_msg{}.
33+
34+
-export_type([amqp10_raw_msg/0]).
35+
36+
-spec new(boolean(), non_neg_integer(), binary()) ->
37+
amqp10_raw_msg().
38+
new(Settled, DeliveryTag, Payload) ->
39+
#amqp10_raw_msg{settled = Settled,
40+
delivery_tag = DeliveryTag,
41+
payload = Payload}.
42+
43+
-spec settled(amqp10_raw_msg()) -> boolean().
44+
settled(#amqp10_raw_msg{settled = Settled}) ->
45+
Settled.
46+
47+
-spec delivery_tag(amqp10_raw_msg()) -> non_neg_integer().
48+
delivery_tag(#amqp10_raw_msg{delivery_tag = DeliveryTag}) ->
49+
DeliveryTag.
50+
51+
-spec payload(amqp10_raw_msg()) -> binary().
52+
payload(#amqp10_raw_msg{payload = Payload}) ->
53+
Payload.
54+
55+
-spec handle(amqp10_raw_msg()) -> non_neg_integer().
56+
handle(#amqp10_raw_msg{handle = Handle}) ->
57+
Handle.
58+
59+
-spec set_handle(non_neg_integer(), amqp10_raw_msg()) ->
60+
amqp10_raw_msg().
61+
set_handle(Handle, #amqp10_raw_msg{} = Msg) ->
62+
Msg#amqp10_raw_msg{handle = Handle}.
63+
64+
-spec transfer(amqp10_raw_msg()) -> #'v1_0.transfer'{}.
65+
transfer(#amqp10_raw_msg{settled = Settled,
66+
delivery_tag = DeliveryTag,
67+
handle = Handle}) ->
68+
#'v1_0.transfer'{
69+
delivery_tag = {binary, rabbit_data_coercion:to_binary(DeliveryTag)},
70+
settled = Settled,
71+
handle = {uint, Handle},
72+
message_format = {uint, ?MESSAGE_FORMAT}}.
73+
74+
-spec is(term()) -> boolean().
75+
is(Record) ->
76+
is_record(Record, amqp10_raw_msg).

0 commit comments

Comments
 (0)