100100 properties => amqp10_client_types :properties (),
101101 max_message_size => max_message_size (),
102102 handle => output_handle (),
103- footer_opt => footer_opt ()
103+ footer_opt => footer_opt (),
104+ raw_mode => boolean ()
104105 }.
105106
106107-type transfer_error () :: {error ,
142143 auto_flow :: never | {RenewWhenBelow :: pos_integer (),
143144 Credit :: pos_integer ()},
144145 incoming_unsettled = #{} :: #{delivery_number () => ok },
145- footer_opt :: footer_opt () | undefined
146+ footer_opt :: footer_opt () | undefined ,
147+ raw_mode = false :: boolean ()
146148 }).
147149
148150-record (state ,
@@ -208,11 +210,18 @@ attach(Session, Args) ->
208210detach (Session , Handle ) ->
209211 gen_statem :call (Session , {detach , Handle }, ? TIMEOUT ).
210212
211- -spec transfer (pid (), amqp10_msg :amqp10_msg (), timeout ()) ->
213+ -spec transfer (pid (), amqp10_msg :amqp10_msg () | amqp10_raw_msg : amqp10_raw_msg () , timeout ()) ->
212214 ok | transfer_error ().
213215transfer (Session , Amqp10Msg , Timeout ) ->
214- [Transfer | Sections ] = amqp10_msg :to_amqp_records (Amqp10Msg ),
215- gen_statem :call (Session , {transfer , Transfer , Sections }, Timeout ).
216+ case amqp10_raw_msg :is (Amqp10Msg ) of
217+ true ->
218+ Transfer = amqp10_raw_msg :transfer (Amqp10Msg ),
219+ Payload = amqp10_raw_msg :payload (Amqp10Msg ),
220+ gen_statem :call (Session , {transfer , Transfer , {raw , Payload }}, Timeout );
221+ false ->
222+ [Transfer | Sections ] = amqp10_msg :to_amqp_records (Amqp10Msg ),
223+ gen_statem :call (Session , {transfer , Transfer , Sections }, Timeout )
224+ end .
216225
217226-spec flow (pid (), non_neg_integer (), never | pos_integer ()) -> ok .
218227flow (Session , IncomingWindow , RenewWhenBelow ) when
@@ -413,7 +422,8 @@ mapped(cast, {Transfer0 = #'v1_0.transfer'{handle = {uint, InHandle}},
413422 {ok , # link {target = {pid , TargetPid },
414423 ref = LinkRef ,
415424 incoming_unsettled = Unsettled ,
416- footer_opt = FooterOpt
425+ footer_opt = FooterOpt ,
426+ raw_mode = RawMode
417427 } = Link0 } = find_link_by_input_handle (InHandle , State0 ),
418428
419429 {Transfer = # 'v1_0.transfer' {settled = Settled ,
@@ -428,7 +438,7 @@ mapped(cast, {Transfer0 = #'v1_0.transfer'{handle = {uint, InHandle}},
428438 % % then the settled flag MUST be interpreted as being false." [2.7.5]
429439 Link1 # link {incoming_unsettled = Unsettled #{DeliveryId => ok }}
430440 end ,
431- case decode_as_msg (Transfer , Payload , FooterOpt ) of
441+ case decode_as_msg (Transfer , Payload , FooterOpt , RawMode ) of
432442 {ok , Msg } ->
433443 % link bookkeeping
434444 % notify when credit is exhausted (link_credit = 0)
@@ -619,6 +629,25 @@ send(Record, #state{socket = Socket} = State) ->
619629 Frame = encode_frame (Record , State ),
620630 socket_send (Socket , Frame ).
621631
632+ send_transfer (Transfer0 , {raw , Payload }, _FooterOpt , MaxMessageSize ,
633+ # state {socket = Socket ,
634+ channel = Channel ,
635+ connection_config = Config }) ->
636+ OutMaxFrameSize = maps :get (outgoing_max_frame_size , Config ),
637+ Transfer = Transfer0 # 'v1_0.transfer' {more = false },
638+ TransferSize = iolist_size (amqp10_framing :encode_bin (Transfer )),
639+ if is_integer (MaxMessageSize ) andalso
640+ MaxMessageSize > 0 andalso
641+ byte_size (Payload ) > MaxMessageSize ->
642+ {error , message_size_exceeded };
643+ true ->
644+ % TODO: this does not take the extended header into account
645+ % see: 2.3
646+ MaxPayloadSize = OutMaxFrameSize - ? FRAME_HEADER_SIZE - TransferSize ,
647+ Frames = build_frames (Channel , Transfer , Payload , MaxPayloadSize , []),
648+ ok = socket_send (Socket , Frames ),
649+ {ok , length (Frames )}
650+ end ;
622651send_transfer (Transfer0 , Sections0 , FooterOpt , MaxMessageSize ,
623652 # state {socket = Socket ,
624653 channel = Channel ,
@@ -918,7 +947,8 @@ send_attach(Send, #{name := Name, role := RoleTuple} = Args, {FromPid, _},
918947 target = LinkTarget ,
919948 delivery_count = unpack (InitialDeliveryCount ),
920949 max_message_size = unpack (MaxMessageSize ),
921- footer_opt = maps :get (footer_opt , Args , undefined )},
950+ footer_opt = maps :get (footer_opt , Args , undefined ),
951+ raw_mode = maps :get (raw_mode , Args , false )},
922952
923953 {State # state {links = Links #{OutHandle => Link },
924954 next_link_handle = NextLinkHandle ,
@@ -1199,10 +1229,14 @@ complete_partial_transfer(_Transfer, Payload,
11991229 {T , iolist_to_binary (lists :reverse ([Payload | Payloads ])),
12001230 Link # link {partial_transfers = undefined }}.
12011231
1202- decode_as_msg (Transfer , Payload , undefined ) ->
1232+ decode_as_msg (# 'v1_0.transfer' {settled = Settled ,
1233+ delivery_id = {uint , DeliveryId }},
1234+ Payload , _ , true ) ->
1235+ {ok , amqp10_raw_msg :new (Settled , DeliveryId , Payload )};
1236+ decode_as_msg (Transfer , Payload , undefined , _ ) ->
12031237 Sections = amqp10_framing :decode_bin (Payload ),
12041238 {ok , amqp10_msg :from_amqp_records ([Transfer | Sections ])};
1205- decode_as_msg (Transfer , Payload , FooterOpt ) ->
1239+ decode_as_msg (Transfer , Payload , FooterOpt , _ ) ->
12061240 PosSections = decode_sections ([], Payload , byte_size (Payload ), 0 ),
12071241 Sections = lists :map (fun ({_Pos , S }) -> S end , PosSections ),
12081242 Msg = amqp10_msg :from_amqp_records ([Transfer | Sections ]),
0 commit comments