Skip to content

Commit 738a548

Browse files
committed
Remove RabbitMQ internal flow control for classic queue
Remove RabbitMQ internal flow control between AMQP writer and classic queue for credit API v1
1 parent 01cd169 commit 738a548

File tree

3 files changed

+4
-45
lines changed

3 files changed

+4
-45
lines changed

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1829,38 +1829,18 @@ echo(Echo, HandleInt, DeliveryCount, LinkCredit, Available, State) ->
18291829

18301830
send_pending_delivery(#pending_delivery{
18311831
frames = Frames,
1832-
queue_pid = QPid,
1833-
outgoing_unsettled = #outgoing_unsettled{consumer_tag = Ctag,
1834-
queue_name = QName}
1832+
outgoing_unsettled = #outgoing_unsettled{consumer_tag = Ctag}
18351833
} = Pending,
18361834
Buf0,
18371835
#state{remote_incoming_window = Space,
18381836
outgoing_links = OutgoingLinks,
1839-
queue_states = QStates,
18401837
cfg = #cfg{writer_pid = WriterPid,
18411838
channel_num = Ch}
18421839
} = State0) ->
18431840
Handle = ctag_to_handle(Ctag),
18441841
case is_map_key(Handle, OutgoingLinks) of
18451842
true ->
1846-
SendFun = case QPid of
1847-
credit_api_v2 ->
1848-
send_fun(WriterPid, Ch);
1849-
_ ->
1850-
case rabbit_queue_type:module(QName, QStates) of
1851-
{ok, rabbit_classic_queue} ->
1852-
%% Classic queue client and classic queue process that
1853-
%% communicate via credit API v1 use RabbitMQ internal
1854-
%% credit flow control.
1855-
fun(Transfer, Sections) ->
1856-
rabbit_amqp_writer:send_command_and_notify(
1857-
WriterPid, QPid, Ch, Transfer, Sections)
1858-
end;
1859-
{ok, _QType} ->
1860-
send_fun(WriterPid, Ch)
1861-
end
1862-
end,
1863-
case send_frames(SendFun, Frames, Space) of
1843+
case send_frames(send_fun(WriterPid, Ch), Frames, Space) of
18641844
{sent_all, SpaceLeft} ->
18651845
State1 = State0#state{outgoing_pending = Buf0},
18661846
State = sent_pending_delivery(Pending, Handle, State1),

deps/rabbit/src/rabbit_amqp_writer.erl

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
send_command/3,
1616
send_command/4,
1717
send_command_sync/3,
18-
send_command_and_notify/5,
1918
assemble_frame/3]).
2019

2120
%% gen_server callbacks
@@ -75,16 +74,6 @@ send_command_sync(Writer, ChannelNum, Performative) ->
7574
Request = {send_command, ChannelNum, Performative},
7675
gen_server:call(Writer, Request, ?CALL_TIMEOUT).
7776

78-
%% Delete this function when feature flag rabbitmq_4.0.0 becomes required.
79-
-spec send_command_and_notify(pid(),
80-
pid(),
81-
rabbit_types:channel_number(),
82-
performative(),
83-
payload()) -> ok | {error, blocked}.
84-
send_command_and_notify(Writer, QueuePid, ChannelNum, Performative, Payload) ->
85-
Request = {send_command_and_notify, QueuePid, self(), ChannelNum, Performative, Payload},
86-
maybe_send(Writer, Request).
87-
8877
%%%%%%%%%%%%%%%%%%%%%%%%%%%%
8978
%%% gen_server callbacks %%%
9079
%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@@ -105,12 +94,6 @@ handle_cast({send_command, ChannelNum, Performative}, State0) ->
10594
handle_cast({send_command, SessionPid, ChannelNum, Performative, Payload}, State0) ->
10695
State1 = internal_send_command_async(ChannelNum, Performative, Payload, State0),
10796
State = credit_flow_ack(SessionPid, State1),
108-
no_reply(State);
109-
%% Delete below function clause when feature flag rabbitmq_4.0.0 becomes required.
110-
handle_cast({send_command_and_notify, QueuePid, SessionPid, ChannelNum, Performative, Payload}, State0) ->
111-
State1 = internal_send_command_async(ChannelNum, Performative, Payload, State0),
112-
State = credit_flow_ack(SessionPid, State1),
113-
rabbit_amqqueue:notify_sent(QueuePid, SessionPid),
11497
no_reply(State).
11598

11699
handle_call({send_command, ChannelNum, Performative}, _From, State0) ->
@@ -132,10 +115,6 @@ handle_info({{'DOWN', session}, _MRef, process, SessionPid, _Reason},
132115
State0 = #state{monitored_sessions = Sessions}) ->
133116
credit_flow:peer_down(SessionPid),
134117
State = State0#state{monitored_sessions = maps:remove(SessionPid, Sessions)},
135-
no_reply(State);
136-
%% Delete below function clause when feature flag rabbitmq_4.0.0 becomes required.
137-
handle_info({'DOWN', _MRef, process, QueuePid, _Reason}, State) ->
138-
rabbit_amqqueue:notify_sent_queue_down(QueuePid),
139118
no_reply(State).
140119

141120
format_status(Status) ->

deps/rabbit/src/rabbit_ff_controller.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -782,10 +782,10 @@ get_stable_feature_flags(#{feature_flags := FeatureFlags}) ->
782782
%% There are two ways to specify that list:
783783
%% <ol>
784784
%% <li>Using the `$RABBITMQ_FEATURE_FLAGS' environment variable; for
785-
%% instance `RABBITMQ_FEATURE_FLAGS=rabbitmq_4.0.0,khepri_db'.</li>
785+
%% instance `RABBITMQ_FEATURE_FLAGS=rabbitmq_4.2.0,khepri_db'.</li>
786786
%% <li>Using the `forced_feature_flags_on_init' configuration parameter;
787787
%% for instance
788-
%% `{rabbit, [{forced_feature_flags_on_init, [rabbitmq_4.0.0, khepri_db]}]}'.</li>
788+
%% `{rabbit, [{forced_feature_flags_on_init, [rabbitmq_4.2.0, khepri_db]}]}'.</li>
789789
%% </ol>
790790
%%
791791
%% There's also a way to enable specific flags and skip others:

0 commit comments

Comments
 (0)