111111 max_consumers , % taken from rabbit.consumer_max_per_channel
112112 % % defines how ofter gc will be executed
113113 writer_gc_threshold ,
114- msg_interceptor_ctx :: rabbit_message_interceptor :context ()
114+ msg_interceptor_ctx :: rabbit_msg_interceptor :context ()
115115 }).
116116
117117-record (pending_ack , {
@@ -662,13 +662,14 @@ handle_cast({deliver_reply, _K, _Del},
662662 noreply (State );
663663handle_cast ({deliver_reply , _K , _Msg }, State = # ch {reply_consumer = none }) ->
664664 noreply (State );
665- handle_cast ({deliver_reply , Key , Msg },
666- State = # ch {cfg = # conf {writer_pid = WriterPid },
665+ handle_cast ({deliver_reply , Key , Mc },
666+ State = # ch {cfg = # conf {writer_pid = WriterPid ,
667+ msg_interceptor_ctx = MsgIcptCtx },
667668 next_tag = DeliveryTag ,
668669 reply_consumer = {ConsumerTag , _Suffix , Key }}) ->
669- Content = mc :protocol_state ( mc : convert ( mc_amqpl , Msg ) ),
670- ExchName = mc :exchange ( Msg ),
671- [ RoutingKey | _ ] = mc : routing_keys ( Msg ),
670+ ExchName = mc :exchange ( Mc ),
671+ [ RoutingKey | _ ] = mc :routing_keys ( Mc ),
672+ Content = outgoing_content ( Mc , MsgIcptCtx ),
672673 ok = rabbit_writer :send_command (
673674 WriterPid ,
674675 # 'basic.deliver' {consumer_tag = ConsumerTag ,
@@ -1174,7 +1175,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
11741175 trace_state = TraceState ,
11751176 authz_context = AuthzContext ,
11761177 writer_gc_threshold = GCThreshold ,
1177- msg_interceptor_ctx = MsgInterceptorCtx
1178+ msg_interceptor_ctx = MsgIcptCtx
11781179 },
11791180 tx = Tx ,
11801181 confirm_enabled = ConfirmEnabled ,
@@ -1214,9 +1215,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
12141215 {ok , Message0 } ->
12151216 check_write_permitted_on_topics (Exchange , User , Message0 , AuthzContext ),
12161217 check_user_id_header (Message0 , User ),
1217- Message = rabbit_message_interceptor :intercept (Message0 ,
1218- MsgInterceptorCtx ,
1219- incoming_message_interceptors ),
1218+ Message = rabbit_msg_interceptor :intercept_incoming (Message0 , MsgIcptCtx ),
12201219 QNames = rabbit_exchange :route (Exchange , Message , #{return_binding_keys => true }),
12211220 [deliver_reply (RK , Message ) || {virtual_reply_queue , RK } <- QNames ],
12221221 Queues = rabbit_amqqueue :lookup_many (QNames ),
@@ -2601,15 +2600,15 @@ handle_deliver(CTag, Ack, Msgs, State) when is_list(Msgs) ->
26012600 end , State , Msgs ).
26022601
26032602handle_deliver0 (ConsumerTag , AckRequired ,
2604- {QName , QPid , _MsgId , Redelivered , MsgCont0 } = Msg ,
2603+ {QName , QPid , _MsgId , Redelivered , Mc } = Msg ,
26052604 State = # ch {cfg = # conf {writer_pid = WriterPid ,
2606- writer_gc_threshold = GCThreshold },
2605+ writer_gc_threshold = GCThreshold ,
2606+ msg_interceptor_ctx = MsgIcptCtx },
26072607 next_tag = DeliveryTag ,
26082608 queue_states = Qs }) ->
2609- Exchange = mc :exchange (MsgCont0 ),
2610- [RoutingKey | _ ] = mc :routing_keys (MsgCont0 ),
2611- MsgCont = mc :convert (mc_amqpl , MsgCont0 ),
2612- Content = mc :protocol_state (MsgCont ),
2609+ Exchange = mc :exchange (Mc ),
2610+ [RoutingKey | _ ] = mc :routing_keys (Mc ),
2611+ Content = outgoing_content (Mc , MsgIcptCtx ),
26132612 Deliver = # 'basic.deliver' {consumer_tag = ConsumerTag ,
26142613 delivery_tag = DeliveryTag ,
26152614 redelivered = Redelivered ,
@@ -2630,12 +2629,11 @@ handle_deliver0(ConsumerTag, AckRequired,
26302629 record_sent (deliver , QueueType , ConsumerTag , AckRequired , Msg , State ).
26312630
26322631handle_basic_get (WriterPid , DeliveryTag , NoAck , MessageCount ,
2633- Msg0 = {_QName , _QPid , _MsgId , Redelivered , MsgCont0 },
2632+ Msg0 = {_QName , _QPid , _MsgId , Redelivered , Mc },
26342633 QueueType , State ) ->
2635- Exchange = mc :exchange (MsgCont0 ),
2636- [RoutingKey | _ ] = mc :routing_keys (MsgCont0 ),
2637- MsgCont = mc :convert (mc_amqpl , MsgCont0 ),
2638- Content = mc :protocol_state (MsgCont ),
2634+ Exchange = mc :exchange (Mc ),
2635+ [RoutingKey | _ ] = mc :routing_keys (Mc ),
2636+ Content = outgoing_content (Mc , State # ch .cfg # conf .msg_interceptor_ctx ),
26392637 ok = rabbit_writer :send_command (
26402638 WriterPid ,
26412639 # 'basic.get_ok' {delivery_tag = DeliveryTag ,
@@ -2646,6 +2644,11 @@ handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount,
26462644 Content ),
26472645 {noreply , record_sent (get , QueueType , DeliveryTag , not (NoAck ), Msg0 , State )}.
26482646
2647+ outgoing_content (Mc , MsgIcptCtx ) ->
2648+ Mc1 = mc :convert (mc_amqpl , Mc ),
2649+ Mc2 = rabbit_msg_interceptor :intercept_outgoing (Mc1 , MsgIcptCtx ),
2650+ mc :protocol_state (Mc2 ).
2651+
26492652init_tick_timer (State = # ch {tick_timer = undefined }) ->
26502653 {ok , Interval } = application :get_env (rabbit , channel_tick_interval ),
26512654 State # ch {tick_timer = erlang :send_after (Interval , self (), tick )};
0 commit comments