99
1010-behaviour (rabbit_shovel_behaviour ).
1111
12+ -include_lib (" rabbit/include/mc.hrl" ).
1213-include (" rabbit_shovel.hrl" ).
1314
1415-export ([
3031 ack /3 ,
3132 nack /3 ,
3233 status /1 ,
33- forward /4
34+ forward /3
3435 ]).
3536
3637-import (rabbit_misc , [pget /2 , pget /3 ]).
@@ -184,10 +185,12 @@ dest_endpoint(#{shovel_type := dynamic,
184185
185186-spec handle_source (Msg :: any (), state ()) ->
186187 not_handled | state () | {stop , any ()}.
187- handle_source ({amqp10_msg , _LinkRef , Msg }, State ) ->
188- Tag = amqp10_msg :delivery_id (Msg ),
189- Payload = amqp10_msg :body_bin (Msg ),
190- rabbit_shovel_behaviour :forward (Tag , #{}, Payload , State );
188+ handle_source ({amqp10_msg , _LinkRef , Msg0 }, State ) ->
189+ Tag = amqp10_msg :delivery_id (Msg0 ),
190+ [_ | Rest ] = amqp10_msg :to_amqp_records (Msg0 ),
191+ Bin = iolist_to_binary ([amqp10_framing :encode_bin (D ) || D <- Rest ]),
192+ Msg = mc :init (mc_amqp , Bin , #{}),
193+ rabbit_shovel_behaviour :forward (Tag , Msg , State );
191194handle_source ({amqp10_event , {connection , Conn , opened }},
192195 State = #{source := #{current := #{conn := Conn }}}) ->
193196 State ;
@@ -260,8 +263,8 @@ handle_dest({amqp10_event, {link, Link, credited}},
260263 % % we have credit so can begin to forward
261264 State = State0 #{dest => Dst #{link_state => credited ,
262265 pending => []}},
263- lists :foldl (fun ({A , B , C }, S ) ->
264- forward (A , B , C , S )
266+ lists :foldl (fun ({A , B }, S ) ->
267+ forward (A , B , S )
265268 end , State , lists :reverse (Pend ));
266269handle_dest ({amqp10_event , {link , Link , _Evt }},
267270 State = #{dest := #{current := #{link := Link }}}) ->
@@ -315,27 +318,27 @@ status(_) ->
315318 % % Destination not yet connected
316319 ignore .
317320
318- -spec forward (Tag :: tag (), Props :: #{atom () => any ()},
319- Payload :: binary (), state ()) ->
321+ -spec forward (Tag :: tag (), Mc :: mc :state (), state ()) ->
320322 state () | {stop , any ()}.
321- forward (_Tag , _Props , _Payload ,
323+ forward (_Tag , _Mc ,
322324 #{source := #{remaining_unacked := 0 }} = State ) ->
323325 State ;
324- forward (Tag , Props , Payload ,
326+ forward (Tag , Mc ,
325327 #{dest := #{current := #{link_state := attached },
326328 pending := Pend0 } = Dst } = State ) ->
327329 % % simply cache the forward oo
328- Pend = [{Tag , Props , Payload } | Pend0 ],
330+ Pend = [{Tag , Mc } | Pend0 ],
329331 State #{dest => Dst #{pending => {Pend }}};
330- forward (Tag , Props , Payload ,
332+ forward (Tag , Msg0 ,
331333 #{dest := #{current := #{link := Link },
332334 unacked := Unacked } = Dst ,
333335 ack_mode := AckMode } = State ) ->
334336 OutTag = rabbit_data_coercion :to_binary (Tag ),
335- Msg0 = new_message (OutTag , Payload , State ),
336- Msg = add_timestamp_header (
337- State , set_message_properties (
338- Props , add_forward_headers (State , Msg0 ))),
337+ Msg1 = mc :protocol_state (mc :convert (mc_amqp , Msg0 )),
338+ Records = [amqp10_framing :decode_bin (iolist_to_binary (S )) || S <- Msg1 ],
339+ Rs = amqp10_msg :to_amqp_records (amqp10_msg :new (OutTag , Records , AckMode =/= on_confirm )),
340+ Msg2 = amqp10_msg :from_amqp_records (Rs ),
341+ Msg = update_amqp10_message (Msg2 , mc :exchange (Msg0 ), mc :routing_keys (Msg0 ), State ),
339342 case send_msg (Link , Msg ) of
340343 ok ->
341344 rabbit_shovel_behaviour :decr_remaining_unacked (
@@ -364,73 +367,25 @@ send_msg(Link, Msg) ->
364367 end
365368 end .
366369
367- new_message (Tag , Payload , #{ack_mode := AckMode ,
368- dest := #{properties := Props ,
369- application_properties := AppProps ,
370- message_annotations := MsgAnns }}) ->
371- Msg0 = amqp10_msg :new (Tag , Payload , AckMode =/= on_confirm ),
370+ update_amqp10_message (Msg0 , Exchange , RK , #{dest := #{properties := Props ,
371+ application_properties := AppProps0 ,
372+ message_annotations := MsgAnns }} = State ) ->
372373 Msg1 = amqp10_msg :set_properties (Props , Msg0 ),
373- Msg = amqp10_msg :set_message_annotations (MsgAnns , Msg1 ),
374- amqp10_msg :set_application_properties (AppProps , Msg ).
374+ Msg2 = amqp10_msg :set_message_annotations (MsgAnns , Msg1 ),
375+ AppProps = AppProps0 #{<<" exchange" >> => Exchange ,
376+ <<" routing_key" >> => RK },
377+ Msg = amqp10_msg :set_application_properties (AppProps , Msg2 ),
378+ add_timestamp_header (State , add_forward_headers (State , Msg )).
375379
376380add_timestamp_header (#{dest := #{add_timestamp_header := true }}, Msg ) ->
377381 P = #{creation_time => os :system_time (milli_seconds )},
378382 amqp10_msg :set_properties (P , Msg );
379383add_timestamp_header (_ , Msg ) -> Msg .
380384
381385add_forward_headers (#{dest := #{cached_forward_headers := Props }}, Msg ) ->
382- amqp10_msg :set_application_properties (Props , Msg );
386+ amqp10_msg :set_application_properties (Props , Msg );
383387add_forward_headers (_ , Msg ) -> Msg .
384388
385- set_message_properties (Props , Msg ) ->
386- % % this is effectively special handling properties from amqp 0.9.1
387- maps :fold (
388- fun (content_type , Ct , M ) ->
389- amqp10_msg :set_properties (
390- #{content_type => to_binary (Ct )}, M );
391- (content_encoding , Ct , M ) ->
392- amqp10_msg :set_properties (
393- #{content_encoding => to_binary (Ct )}, M );
394- (delivery_mode , 2 , M ) ->
395- amqp10_msg :set_headers (#{durable => true }, M );
396- (delivery_mode , 1 , M ) ->
397- % by default the durable flag is false
398- M ;
399- (priority , P , M ) when is_integer (P ) ->
400- amqp10_msg :set_headers (#{priority => P }, M );
401- (correlation_id , Ct , M ) ->
402- amqp10_msg :set_properties (#{correlation_id => to_binary (Ct )}, M );
403- (reply_to , Ct , M ) ->
404- amqp10_msg :set_properties (#{reply_to => to_binary (Ct )}, M );
405- (message_id , Ct , M ) ->
406- amqp10_msg :set_properties (#{message_id => to_binary (Ct )}, M );
407- (timestamp , Ct , M ) ->
408- amqp10_msg :set_properties (#{creation_time => Ct }, M );
409- (user_id , Ct , M ) ->
410- amqp10_msg :set_properties (#{user_id => Ct }, M );
411- (headers , Headers0 , M ) when is_list (Headers0 ) ->
412- % % AMPQ 0.9.1 are added as applicatin properties
413- % % TODO: filter headers to make safe
414- Headers = lists :foldl (
415- fun ({K , _T , V }, Acc ) ->
416- case is_amqp10_compat (V ) of
417- true ->
418- Acc #{to_binary (K ) => V };
419- false ->
420- Acc
421- end
422- end , #{}, Headers0 ),
423- amqp10_msg :set_application_properties (Headers , M );
424- (Key , Value , M ) ->
425- case is_amqp10_compat (Value ) of
426- true ->
427- amqp10_msg :set_application_properties (
428- #{to_binary (Key ) => Value }, M );
429- false ->
430- M
431- end
432- end , Msg , Props ).
433-
434389gen_unique_name (Pre0 , Post0 ) ->
435390 Pre = to_binary (Pre0 ),
436391 Post = to_binary (Post0 ),
@@ -441,8 +396,3 @@ bin_to_hex(Bin) ->
441396 <<<<if N >= 10 -> N - 10 + $a ;
442397 true -> N + $0 end >>
443398 || <<N :4 >> <= Bin >>.
444-
445- is_amqp10_compat (T ) ->
446- is_binary (T ) orelse
447- is_number (T ) orelse
448- is_boolean (T ).
0 commit comments