Skip to content

Commit 65c8456

Browse files
committed
Fix failing mixed version test
Prior to this commit, mixed version test classic_queue_on_old_node of amqp_client_SUITE was failing. Commit 02c29ac must make sure that the new (4.0) AMQP 1.0 classic queue client continues to convey RabbitMQ internal credit flow control information back to the old (3.13.1) classic queue server. Otherwise, the old classic queue server will stop sending more messages to the new classic queue client after exactly 200 messages, which caused this mixed version test to fail.
1 parent 697ffc2 commit 65c8456

File tree

3 files changed

+76
-22
lines changed

3 files changed

+76
-22
lines changed

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,9 @@
133133
-record(pending_transfer, {
134134
frames :: iolist(),
135135
queue_ack_required :: boolean(),
136+
%% Queue that sent us this message.
137+
%% When feature flag credit_api_v2 becomes required, this field should be deleted.
138+
queue_pid :: pid() | credit_api_v2,
136139
delivery_id :: delivery_number(),
137140
outgoing_unsettled :: #outgoing_unsettled{}
138141
}).
@@ -1253,10 +1256,31 @@ send_pending(#state{remote_incoming_window = Space,
12531256
Flow = session_flow_fields(Flow0, State0),
12541257
rabbit_amqp_writer:send_command(WriterPid, Ch, Flow),
12551258
send_pending(State0#state{outgoing_pending = Buf});
1256-
{{value, #pending_transfer{frames = Frames} = Pending}, Buf1}
1259+
{{value, #pending_transfer{
1260+
frames = Frames,
1261+
queue_pid = QPid,
1262+
outgoing_unsettled = #outgoing_unsettled{queue_name = QName}
1263+
} = Pending}, Buf1}
12571264
when Space > 0 ->
1265+
SendFun = case QPid of
1266+
credit_api_v2 ->
1267+
send_fun(WriterPid, Ch);
1268+
_ ->
1269+
case rabbit_queue_type:module(QName, State0#state.queue_states) of
1270+
{ok, rabbit_classic_queue} ->
1271+
%% Classic queue client and classic queue process that
1272+
%% communicate via credit API v1 use RabbitMQ internal
1273+
%% credit flow control.
1274+
fun(Transfer, Sections) ->
1275+
rabbit_amqp_writer:send_command_and_notify(
1276+
WriterPid, Ch, QPid, self(), Transfer, Sections)
1277+
end;
1278+
{ok, _QType} ->
1279+
send_fun(WriterPid, Ch)
1280+
end
1281+
end,
12581282
{NumTransfersSent, Buf, State1} =
1259-
case send_frames(WriterPid, Ch, Frames, Space) of
1283+
case send_frames(SendFun, Frames, Space) of
12601284
{sent_all, SpaceLeft} ->
12611285
{Space - SpaceLeft,
12621286
Buf1,
@@ -1271,8 +1295,9 @@ send_pending(#state{remote_incoming_window = Space,
12711295
send_pending(State);
12721296
{{value, Pending = #pending_management_transfer{frames = Frames}}, Buf1}
12731297
when Space > 0 ->
1298+
SendFun = send_fun(WriterPid, Ch),
12741299
{NumTransfersSent, Buf} =
1275-
case send_frames(WriterPid, Ch, Frames, Space) of
1300+
case send_frames(SendFun, Frames, Space) of
12761301
{sent_all, SpaceLeft} ->
12771302
{Space - SpaceLeft, Buf1};
12781303
{sent_some, Rest} ->
@@ -1285,13 +1310,18 @@ send_pending(#state{remote_incoming_window = Space,
12851310
State0
12861311
end.
12871312

1288-
send_frames(_, _, [], SpaceLeft) ->
1313+
send_frames(_, [], SpaceLeft) ->
12891314
{sent_all, SpaceLeft};
1290-
send_frames(_, _, Rest, 0) ->
1315+
send_frames(_, Rest, 0) ->
12911316
{sent_some, Rest};
1292-
send_frames(Writer, Ch, [[Transfer, Sections] | Rest], SpaceLeft) ->
1293-
rabbit_amqp_writer:send_command(Writer, Ch, Transfer, Sections),
1294-
send_frames(Writer, Ch, Rest, SpaceLeft - 1).
1317+
send_frames(SendFun, [[Transfer, Sections] | Rest], SpaceLeft) ->
1318+
SendFun(Transfer, Sections),
1319+
send_frames(SendFun, Rest, SpaceLeft - 1).
1320+
1321+
send_fun(WriterPid, Ch) ->
1322+
fun(Transfer, Sections) ->
1323+
rabbit_amqp_writer:send_command(WriterPid, Ch, Transfer, Sections)
1324+
end.
12951325

12961326
record_outgoing_unsettled(#pending_transfer{queue_ack_required = true,
12971327
delivery_id = DeliveryId,
@@ -1532,7 +1562,7 @@ handle_queue_actions(Actions, State) ->
15321562
end, State, Actions).
15331563

15341564
handle_deliver(ConsumerTag, AckRequired,
1535-
Msg = {QName, _QPid, MsgId, Redelivered, Mc0},
1565+
Msg = {QName, QPid0, MsgId, Redelivered, Mc0},
15361566
State = #state{outgoing_pending = Pending,
15371567
outgoing_delivery_id = DeliveryId,
15381568
outgoing_links = OutgoingLinks0,
@@ -1565,20 +1595,23 @@ handle_deliver(ConsumerTag, AckRequired,
15651595
Frames = transfer_frames(Transfer, Sections, MaxFrameSize),
15661596
messages_delivered(Redelivered, QType),
15671597
rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, Trace),
1568-
OutgoingLinks = case DelCount of
1569-
credit_api_v2 ->
1570-
OutgoingLinks0;
1571-
{credit_api_v1, C} ->
1572-
Link = Link0#outgoing_link{delivery_count = {credit_api_v1, add(C, 1)}},
1573-
maps:update(Handle, Link, OutgoingLinks0)
1574-
end,
1598+
{OutgoingLinks, QPid
1599+
} = case DelCount of
1600+
credit_api_v2 ->
1601+
{OutgoingLinks0, credit_api_v2};
1602+
{credit_api_v1, C} ->
1603+
Link = Link0#outgoing_link{delivery_count = {credit_api_v1, add(C, 1)}},
1604+
OutgoingLinks1 = maps:update(Handle, Link, OutgoingLinks0),
1605+
{OutgoingLinks1, QPid0}
1606+
end,
15751607
Del = #outgoing_unsettled{
15761608
msg_id = MsgId,
15771609
consumer_tag = ConsumerTag,
15781610
queue_name = QName},
15791611
PendingTransfer = #pending_transfer{
15801612
frames = Frames,
15811613
queue_ack_required = AckRequired,
1614+
queue_pid = QPid,
15821615
delivery_id = DeliveryId,
15831616
outgoing_unsettled = Del},
15841617
State#state{outgoing_pending = queue:in(PendingTransfer, Pending),

deps/rabbit/src/rabbit_amqp_writer.erl

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
send_command/3,
1616
send_command/4,
1717
send_command_sync/3,
18+
send_command_and_notify/6,
1819
internal_send_command/3]).
1920

2021
%% gen_server callbacks
@@ -70,6 +71,17 @@ send_command_sync(Writer, ChannelNum, MethodRecord) ->
7071
Request = {send_command, ChannelNum, MethodRecord},
7172
gen_server:call(Writer, Request, ?CALL_TIMEOUT).
7273

74+
%% Delete this function when feature flag credit_api_v2 becomes required.
75+
-spec send_command_and_notify(pid(),
76+
rabbit_types:channel_number(),
77+
pid(),
78+
pid(),
79+
rabbit_framing:amqp_method_record(),
80+
rabbit_types:content()) -> ok.
81+
send_command_and_notify(Writer, ChannelNum, QueuePid, SessionPid, MethodRecord, Content) ->
82+
Request = {send_command_and_notify, ChannelNum, QueuePid, SessionPid, MethodRecord, Content},
83+
gen_server:cast(Writer, Request).
84+
7385
-spec internal_send_command(rabbit_net:socket(),
7486
rabbit_framing:amqp_method_record(),
7587
amqp10_framing | rabbit_amqp_sasl) -> ok.
@@ -95,6 +107,11 @@ handle_cast({send_command, ChannelNum, MethodRecord}, State0) ->
95107
no_reply(State);
96108
handle_cast({send_command, ChannelNum, MethodRecord, Content}, State0) ->
97109
State = internal_send_command_async(ChannelNum, MethodRecord, Content, State0),
110+
no_reply(State);
111+
%% Delete below function clause when feature flag credit_api_v2 becomes required.
112+
handle_cast({send_command_and_notify, ChannelNum, QueuePid, SessionPid, MethodRecord, Content}, State0) ->
113+
State = internal_send_command_async(ChannelNum, MethodRecord, Content, State0),
114+
rabbit_amqqueue:notify_sent(QueuePid, SessionPid),
98115
no_reply(State).
99116

100117
handle_call({send_command, ChannelNum, MethodRecord}, _From, State0) ->
@@ -104,7 +121,11 @@ handle_call({send_command, ChannelNum, MethodRecord}, _From, State0) ->
104121

105122
handle_info(timeout, State0) ->
106123
State = flush(State0),
107-
{noreply, State}.
124+
{noreply, State};
125+
%% Delete below function clause when feature flag credit_api_v2 becomes required.
126+
handle_info({'DOWN', _MRef, process, QueuePid, _Reason}, State) ->
127+
rabbit_amqqueue:notify_sent_queue_down(QueuePid),
128+
no_reply(State).
108129

109130
format_status(Status) ->
110131
maps:update_with(

deps/rabbit/src/rabbit_queue_consumers.erl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
%% The limiter itself
4949
limiter,
5050
%% Internal flow control for queue -> writer
51-
unsent_message_count,
51+
unsent_message_count :: non_neg_integer(),
5252
link_states :: #{rabbit_types:ctag() => #link_state{}}
5353
}).
5454

@@ -271,11 +271,11 @@ deliver(FetchFun, QName, ConsumersChanged,
271271
end
272272
end.
273273

274-
deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, QName) ->
275-
C = lookup_ch(ChPid),
274+
deliver_to_consumer(FetchFun,
275+
E = {ChPid, Consumer = #consumer{tag = CTag}},
276+
QName) ->
277+
C = #cr{link_states = LinkStates} = lookup_ch(ChPid),
276278
ChBlocked = is_ch_blocked(C),
277-
LinkStates = C#cr.link_states,
278-
CTag = Consumer#consumer.tag,
279279
case LinkStates of
280280
#{CTag := #link_state{delivery_count = DeliveryCount0,
281281
credit = Credit} = LinkState0} ->

0 commit comments

Comments
 (0)