Skip to content

Commit 0b1aefd

Browse files
committed
Local shovels: move unacked_message_q inside source config
1 parent 8c9f79f commit 0b1aefd

File tree

1 file changed

+14
-12
lines changed

1 file changed

+14
-12
lines changed

deps/rabbitmq_shovel/src/rabbit_local_shovel.erl

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,9 @@ connect_source(State = #{source := Src = #{resource_decl := {M, F, MFArgs},
103103
State#{source => Src#{current => #{queue_states => QState,
104104
next_tag => 1,
105105
user => User,
106-
vhost => VHost},
107-
queue => QName},
108-
unacked_message_q => ?QUEUE:new()}.
106+
vhost => VHost,
107+
unacked_message_q => ?QUEUE:new()},
108+
queue => QName}}.
109109

110110
connect_dest(State = #{dest := Dest = #{resource_decl := {M, F, MFArgs},
111111
uris := [Uri | _]
@@ -452,10 +452,11 @@ handle_dest_queue_actions(Actions, State) ->
452452

453453
record_pending(false, _DeliveryTag, _MsgId, State) ->
454454
State;
455-
record_pending(true, DeliveryTag, MsgId, #{unacked_message_q := UAMQ0} = State) ->
455+
record_pending(true, DeliveryTag, MsgId,
456+
#{source := Src = #{current := Current = #{unacked_message_q := UAMQ0}}} = State) ->
456457
UAMQ = ?QUEUE:in(#pending_ack{delivery_tag = DeliveryTag,
457458
msg_id = MsgId}, UAMQ0),
458-
State#{unacked_message_q => UAMQ}.
459+
State#{source => Src#{current => Current#{unacked_message_q => UAMQ}}}.
459460

460461
remaining(_Q, #{source := #{delete_after := never}}) ->
461462
unlimited;
@@ -567,18 +568,19 @@ get_user_vhost_from_amqp_param(Uri) ->
567568
exit({shutdown, {access_refused, Username}})
568569
end.
569570

570-
settle(Op, DeliveryTag, Multiple, #{unacked_message_q := UAMQ0,
571-
source := #{queue := Queue,
572-
current := Current = #{queue_states := QState0,
573-
consumer_tag := CTag,
574-
vhost := VHost}} = Src} = State0) ->
571+
settle(Op, DeliveryTag, Multiple,
572+
#{source := #{queue := Queue,
573+
current := Current = #{queue_states := QState0,
574+
consumer_tag := CTag,
575+
unacked_message_q := UAMQ0,
576+
vhost := VHost}} = Src} = State0) ->
575577
{Acked, UAMQ} = collect_acks(UAMQ0, DeliveryTag, Multiple),
576578
QRef = rabbit_misc:r(VHost, queue, Queue),
577579
MsgIds = [Ack#pending_ack.msg_id || Ack <- Acked],
578580
case rabbit_queue_type:settle(QRef, Op, CTag, MsgIds, QState0) of
579581
{ok, QState1, Actions} ->
580-
State = State0#{source => Src#{current => Current#{queue_states => QState1}},
581-
unacked_message_q => UAMQ},
582+
State = State0#{source => Src#{current => Current#{queue_states => QState1,
583+
unacked_message_q => UAMQ}}},
582584
handle_queue_actions(Actions, State);
583585
{'protocol_error', Type, Reason, Args} ->
584586
?LOG_ERROR("Shovel failed to settle ~p acknowledgments with ~tp: ~tp",

0 commit comments

Comments
 (0)