Skip to content

Commit 798bbfb

Browse files
committed
Fix bug
Fixes following regression: ``` make run-broker TEST_TMPDIR="$HOME/scratch/rabbit/test" PLUGINS="rabbitmq_management” FULL=1 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 3" deps/rabbitmq_management/bin/rabbitmqadmin declare queue name=s1 queue_type=stream durable=true quiver //host.docker.internal//queue/s1 --durable -d 30s --credit 260 --body-size 12 ``` This command just stopped receiving after a few messages when executed the 2nd time because only 256 messages were sent by the writer proc although 260 delivery action were emitted by rabbit_stream_queue. 4 messages in the outgoing_pending queue were wrongly overwritten.
1 parent 4557811 commit 798bbfb

File tree

1 file changed

+24
-24
lines changed

1 file changed

+24
-24
lines changed

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1380,22 +1380,22 @@ send_pending(#state{remote_incoming_window = RemoteIncomingWindow,
13801380
{{value, #'v1_0.flow'{} = Flow0}, Buf} ->
13811381
#cfg{writer_pid = WriterPid,
13821382
channel_num = Ch} = State#state.cfg,
1383-
Flow = session_flow_fields(Flow0, State),
1383+
State1 = State#state{outgoing_pending = Buf},
1384+
Flow = session_flow_fields(Flow0, State1),
13841385
rabbit_amqp_writer:send_command(WriterPid, Ch, Flow),
1385-
send_pending(State#state{outgoing_pending = Buf});
1386-
{{value, Delivery}, Buf1} ->
1386+
send_pending(State1);
1387+
{{value, Delivery}, Buf} ->
13871388
case RemoteIncomingWindow =:= 0 orelse
13881389
credit_flow:blocked() of
13891390
true ->
13901391
State;
13911392
false ->
1392-
{NewRemoteIncomingWindow, Buf, State1} =
1393-
send_pending_delivery(Delivery, Buf1, State),
1393+
{NewRemoteIncomingWindow, State1} =
1394+
send_pending_delivery(Delivery, Buf, State),
13941395
NumTransfersSent = RemoteIncomingWindow - NewRemoteIncomingWindow,
13951396
State2 = session_flow_control_sent_transfers(NumTransfersSent, State1),
1396-
State3 = State2#state{outgoing_pending = Buf},
13971397
%% Recurse to possibly send FLOW frames.
1398-
send_pending(State3)
1398+
send_pending(State2)
13991399
end
14001400
end.
14011401

@@ -1605,13 +1605,13 @@ send_pending_delivery(#pending_delivery{
16051605
outgoing_unsettled = #outgoing_unsettled{consumer_tag = Ctag,
16061606
queue_name = QName}
16071607
} = Pending,
1608-
Buf,
1608+
Buf0,
16091609
#state{remote_incoming_window = Space,
16101610
outgoing_links = OutgoingLinks,
16111611
queue_states = QStates,
16121612
cfg = #cfg{writer_pid = WriterPid,
16131613
channel_num = Ch}
1614-
} = State) ->
1614+
} = State0) ->
16151615
Handle = ctag_to_handle(Ctag),
16161616
case is_map_key(Handle, OutgoingLinks) of
16171617
true ->
@@ -1634,37 +1634,37 @@ send_pending_delivery(#pending_delivery{
16341634
end,
16351635
case send_frames(SendFun, Frames, Space) of
16361636
{sent_all, SpaceLeft} ->
1637-
{SpaceLeft,
1638-
Buf,
1639-
sent_pending_delivery(Pending, Handle, State)};
1637+
State1 = State0#state{outgoing_pending = Buf0},
1638+
State = sent_pending_delivery(Pending, Handle, State1),
1639+
{SpaceLeft, State};
16401640
{sent_some, SpaceLeft, Rest} ->
1641-
{SpaceLeft,
1642-
queue:in_r(Pending#pending_delivery{frames = Rest}, Buf),
1643-
State}
1641+
Buf = queue:in_r(Pending#pending_delivery{frames = Rest}, Buf0),
1642+
State = State0#state{outgoing_pending = Buf},
1643+
{SpaceLeft, State}
16441644
end;
16451645
false ->
16461646
%% Link got detached. Either the client closed the link in which case the queue
16471647
%% already requeued all checked out messages or the queue doesn't exist anymore
16481648
%% in which case there is no point in requeuing this message.
16491649
%% Therefore, ignore (drop) this delivery.
1650-
{Space, Buf, State}
1650+
State = State0#state{outgoing_pending = Buf0},
1651+
{Space, State}
16511652
end;
16521653
send_pending_delivery(#pending_management_delivery{frames = Frames} = Pending,
1653-
Buf,
1654+
Buf0,
16541655
#state{remote_incoming_window = Space,
16551656
cfg = #cfg{writer_pid = WriterPid,
16561657
channel_num = Ch}
1657-
} = State) ->
1658+
} = State0) ->
16581659
SendFun = send_fun(WriterPid, Ch),
16591660
case send_frames(SendFun, Frames, Space) of
16601661
{sent_all, SpaceLeft} ->
1661-
{SpaceLeft,
1662-
Buf,
1663-
State};
1662+
State = State0#state{outgoing_pending = Buf0},
1663+
{SpaceLeft, State};
16641664
{sent_some, SpaceLeft, Rest} ->
1665-
{SpaceLeft,
1666-
queue:in_r(Pending#pending_management_delivery{frames = Rest}, Buf),
1667-
State}
1665+
Buf = queue:in_r(Pending#pending_management_delivery{frames = Rest}, Buf0),
1666+
State = State0#state{outgoing_pending = Buf},
1667+
{SpaceLeft, State}
16681668
end.
16691669

16701670
send_frames(_, [], SpaceLeft) ->

0 commit comments

Comments
 (0)