Skip to content

Commit 9a83513

Browse files
committed
Cleanup compat code
1 parent 738a548 commit 9a83513

13 files changed

+252
-769
lines changed

deps/rabbit/src/mc_amqpl.erl

Lines changed: 7 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,12 @@ init(#content{} = Content0) ->
5858
Content = strip_header(Content1, ?DELETED_HEADER),
5959
{Content, Anns}.
6060

61-
convert_from(mc_amqp, Sections, Env) ->
62-
{H, MAnn, Prop, AProp, BodyRev, Footer} =
61+
convert_from(mc_amqp, Sections, _Env) ->
62+
{H, MAnn, Prop, AProp, BodyRev} =
6363
lists:foldl(
6464
fun(#'v1_0.header'{} = S, Acc) ->
6565
setelement(1, Acc, S);
66-
(_Ignore = #'v1_0.delivery_annotations'{}, Acc) ->
66+
(#'v1_0.delivery_annotations'{}, Acc) ->
6767
Acc;
6868
(#'v1_0.message_annotations'{} = S, Acc) ->
6969
setelement(2, Acc, S);
@@ -81,10 +81,10 @@ convert_from(mc_amqp, Sections, Env) ->
8181
%% assertions
8282
[] = element(5, Acc),
8383
setelement(5, Acc, Body);
84-
(#'v1_0.footer'{} = S, Acc) ->
85-
setelement(6, Acc, S)
84+
(#'v1_0.footer'{}, Acc) ->
85+
Acc
8686
end,
87-
{undefined, undefined, undefined, undefined, [], undefined},
87+
{undefined, undefined, undefined, undefined, []},
8888
Sections),
8989

9090
{PFR, Type0} = case BodyRev of
@@ -181,42 +181,7 @@ convert_from(mc_amqp, Sections, Env) ->
181181
false
182182
end, MA),
183183
{Headers1, MsgId091} = message_id(MsgId, <<"x-message-id">>, Headers0),
184-
{Headers2, CorrId091} = message_id(CorrId, <<"x-correlation-id">>, Headers1),
185-
186-
Headers = case Env of
187-
#{'rabbitmq_4.0.0' := false} ->
188-
Headers3 = case AProp of
189-
undefined ->
190-
Headers2;
191-
#'v1_0.application_properties'{} ->
192-
APropBin = amqp_encoded_binary(AProp),
193-
[{?AMQP10_APP_PROPERTIES_HEADER, longstr, APropBin} | Headers2]
194-
end,
195-
Headers4 = case Prop of
196-
undefined ->
197-
Headers3;
198-
#'v1_0.properties'{} ->
199-
PropBin = amqp_encoded_binary(Prop),
200-
[{?AMQP10_PROPERTIES_HEADER, longstr, PropBin} | Headers3]
201-
end,
202-
Headers5 = case MAnn of
203-
undefined ->
204-
Headers4;
205-
#'v1_0.message_annotations'{} ->
206-
MAnnBin = amqp_encoded_binary(MAnn),
207-
[{?AMQP10_MESSAGE_ANNOTATIONS_HEADER, longstr, MAnnBin} | Headers4]
208-
end,
209-
Headers6 = case Footer of
210-
undefined ->
211-
Headers5;
212-
#'v1_0.footer'{} ->
213-
FootBin = amqp_encoded_binary(Footer),
214-
[{?AMQP10_FOOTER, longstr, FootBin} | Headers5]
215-
end,
216-
Headers6;
217-
_ ->
218-
Headers2
219-
end,
184+
{Headers, CorrId091} = message_id(CorrId, <<"x-correlation-id">>, Headers1),
220185

221186
UserId1 = unwrap(UserId0),
222187
%% user-id is a binary type so we need to validate

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 186 additions & 393 deletions
Large diffs are not rendered by default.

deps/rabbit/src/rabbit_amqqueue_process.erl

Lines changed: 6 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1357,7 +1357,7 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
13571357
end;
13581358

13591359
handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
1360-
ModeOrPrefetch, ConsumerTag, ExclusiveConsume, Args, OkMsg, ActingUser},
1360+
Mode, ConsumerTag, ExclusiveConsume, Args, OkMsg, ActingUser},
13611361
_From, State = #q{consumers = Consumers,
13621362
active_consumer = Holder,
13631363
single_active_consumer_on = SingleActiveConsumerOn}) ->
@@ -1369,7 +1369,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
13691369
false ->
13701370
Consumers1 = rabbit_queue_consumers:add(
13711371
ChPid, ConsumerTag, NoAck,
1372-
LimiterPid, LimiterActive, ModeOrPrefetch,
1372+
LimiterPid, LimiterActive, Mode,
13731373
Args, ActingUser, Consumers),
13741374
case Holder of
13751375
none ->
@@ -1388,7 +1388,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
13881388
ok ->
13891389
Consumers1 = rabbit_queue_consumers:add(
13901390
ChPid, ConsumerTag, NoAck,
1391-
LimiterPid, LimiterActive, ModeOrPrefetch,
1391+
LimiterPid, LimiterActive, Mode,
13921392
Args, ActingUser, Consumers),
13931393
ExclusiveConsumer =
13941394
if ExclusiveConsume -> {ChPid, ConsumerTag};
@@ -1416,7 +1416,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
14161416
{false, _} ->
14171417
{true, up}
14181418
end,
1419-
PrefetchCount = rabbit_queue_consumers:parse_prefetch_count(ModeOrPrefetch),
1419+
PrefetchCount = rabbit_queue_consumers:parse_prefetch_count(Mode),
14201420
rabbit_core_metrics:consumer_created(
14211421
ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName,
14221422
PrefetchCount, ConsumerIsActive, ActivityStatus, Args),
@@ -1427,13 +1427,6 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
14271427
reply(ok, run_message_queue(false, State1))
14281428
end;
14291429

1430-
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}, From, State) ->
1431-
handle_call({stop_consumer, #{pid => ChPid,
1432-
consumer_tag => ConsumerTag,
1433-
ok_msg => OkMsg,
1434-
user => ActingUser}},
1435-
From, State);
1436-
14371430
handle_call({stop_consumer, #{pid := ChPid,
14381431
consumer_tag := ConsumerTag,
14391432
user := ActingUser} = Spec},
@@ -1597,16 +1590,6 @@ handle_cast({deactivate_limit, ChPid}, State) ->
15971590
noreply(possibly_unblock(rabbit_queue_consumers:deactivate_limit_fun(),
15981591
ChPid, State));
15991592

1600-
handle_cast({credit, SessionPid, CTag, Credit, Drain},
1601-
#q{q = Q,
1602-
backing_queue = BQ,
1603-
backing_queue_state = BQS0} = State) ->
1604-
%% Credit API v1.
1605-
%% Delete this function clause when feature flag rabbitmq_4.0.0 becomes required.
1606-
%% Behave like non-native AMQP 1.0: Send send_credit_reply before deliveries.
1607-
rabbit_classic_queue:send_credit_reply_credit_api_v1(
1608-
SessionPid, amqqueue:get_name(Q), BQ:len(BQS0)),
1609-
handle_cast({credit, SessionPid, CTag, credit_api_v1, Credit, Drain}, State);
16101593
handle_cast({credit, SessionPid, CTag, DeliveryCountRcv, Credit, Drain},
16111594
#q{consumers = Consumers0,
16121595
q = Q} = State0) ->
@@ -1622,25 +1605,15 @@ handle_cast({credit, SessionPid, CTag, DeliveryCountRcv, Credit, Drain},
16221605
run_message_queue(true, State1)
16231606
end,
16241607
case rabbit_queue_consumers:get_link_state(SessionPid, CTag) of
1625-
{credit_api_v1, PostCred}
1626-
when Drain andalso
1627-
is_integer(PostCred) andalso PostCred > 0 ->
1628-
%% credit API v1
1629-
rabbit_queue_consumers:drained(credit_api_v1, SessionPid, CTag),
1630-
rabbit_classic_queue:send_drained_credit_api_v1(SessionPid, QName, CTag, PostCred);
16311608
{PostDeliveryCountSnd, PostCred}
1632-
when is_integer(PostDeliveryCountSnd) andalso
1633-
Drain andalso
1609+
when Drain andalso
16341610
is_integer(PostCred) andalso PostCred > 0 ->
1635-
%% credit API v2
16361611
AdvancedDeliveryCount = serial_number:add(PostDeliveryCountSnd, PostCred),
16371612
rabbit_queue_consumers:drained(AdvancedDeliveryCount, SessionPid, CTag),
16381613
Avail = BQ:len(PostBQS),
16391614
rabbit_classic_queue:send_credit_reply(
16401615
SessionPid, QName, CTag, AdvancedDeliveryCount, 0, Avail, Drain);
1641-
{PostDeliveryCountSnd, PostCred}
1642-
when is_integer(PostDeliveryCountSnd) ->
1643-
%% credit API v2
1616+
{PostDeliveryCountSnd, PostCred} ->
16441617
Avail = BQ:len(PostBQS),
16451618
rabbit_classic_queue:send_credit_reply(
16461619
SessionPid, QName, CTag, PostDeliveryCountSnd, PostCred, Avail, Drain);

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 5 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
supports_stateful_delivery/0,
5656
deliver/3,
5757
settle/5,
58-
credit_v1/5,
5958
credit/6,
6059
dequeue/5,
6160
info/2,
@@ -72,8 +71,6 @@
7271
-export([confirm_to_sender/3,
7372
send_rejection/3,
7473
deliver_to_consumer/5,
75-
send_credit_reply_credit_api_v1/3,
76-
send_drained_credit_api_v1/4,
7774
send_credit_reply/7]).
7875

7976
-export([policy_apply_to_name/0,
@@ -303,14 +300,13 @@ consume(Q, Spec, State0) when ?amqqueue_is_classic(Q) ->
303300
mode := Mode,
304301
consumer_tag := ConsumerTag,
305302
exclusive_consume := ExclusiveConsume,
306-
args := Args0,
303+
args := Args,
307304
ok_msg := OkMsg,
308305
acting_user := ActingUser} = Spec,
309-
{ModeOrPrefetch, Args} = consume_backwards_compat(Mode, Args0),
310306
case delegate:invoke(QPid,
311307
{gen_server2, call,
312308
[{basic_consume, NoAck, ChPid, LimiterPid,
313-
LimiterActive, ModeOrPrefetch, ConsumerTag,
309+
LimiterActive, Mode, ConsumerTag,
314310
ExclusiveConsume, Args, OkMsg, ActingUser},
315311
infinity]}) of
316312
ok ->
@@ -324,34 +320,9 @@ consume(Q, Spec, State0) when ?amqqueue_is_classic(Q) ->
324320
[rabbit_misc:rs(QRef), Reason]}
325321
end.
326322

327-
%% Delete this function when feature flag rabbitmq_4.0.0 becomes required.
328-
consume_backwards_compat({simple_prefetch, PrefetchCount} = Mode, Args) ->
329-
case rabbit_feature_flags:is_enabled('rabbitmq_4.0.0') of
330-
true -> {Mode, Args};
331-
false -> {PrefetchCount, Args}
332-
end;
333-
consume_backwards_compat({credited, InitialDeliveryCount} = Mode, Args)
334-
when is_integer(InitialDeliveryCount) ->
335-
%% credit API v2
336-
{Mode, Args};
337-
consume_backwards_compat({credited, credit_api_v1}, Args) ->
338-
%% credit API v1
339-
{_PrefetchCount = 0,
340-
[{<<"x-credit">>, table, [{<<"credit">>, long, 0},
341-
{<<"drain">>, bool, false}]} | Args]}.
342-
343323
cancel(Q, Spec, State) ->
344-
%% Cancel API v2 reuses feature flag rabbitmq_4.0.0.
345-
Request = case rabbit_feature_flags:is_enabled('rabbitmq_4.0.0') of
346-
true ->
347-
{stop_consumer, Spec#{pid => self()}};
348-
false ->
349-
#{consumer_tag := ConsumerTag,
350-
user := ActingUser} = Spec,
351-
OkMsg = maps:get(ok_msg, Spec, undefined),
352-
{basic_cancel, self(), ConsumerTag, OkMsg, ActingUser}
353-
end,
354324
Pid = amqqueue:get_pid(Q),
325+
Request = {stop_consumer, Spec#{pid => self()}},
355326
case delegate:invoke(Pid, {gen_server2, call, [Request, infinity]}) of
356327
ok ->
357328
{ok, State#?STATE{pid = Pid}};
@@ -381,11 +352,6 @@ settle(_QName, Op, _CTag, MsgIds, State = #?STATE{pid = Pid}) ->
381352
delegate:invoke_no_result(Pid, {gen_server2, cast, [Arg]}),
382353
{State, []}.
383354

384-
credit_v1(_QName, Ctag, LinkCreditSnd, Drain, #?STATE{pid = QPid} = State) ->
385-
Request = {credit, self(), Ctag, LinkCreditSnd, Drain},
386-
delegate:invoke_no_result(QPid, {gen_server2, cast, [Request]}),
387-
{State, []}.
388-
389355
credit(_QName, Ctag, DeliveryCountRcv, LinkCreditRcv, Drain, #?STATE{pid = QPid} = State) ->
390356
Request = {credit, self(), Ctag, DeliveryCountRcv, LinkCreditRcv, Drain},
391357
delegate:invoke_no_result(QPid, {gen_server2, cast, [Request]}),
@@ -448,11 +414,6 @@ handle_event(QName, {down, Pid, Info}, #?STATE{monitored = Monitored,
448414
end;
449415
handle_event(_QName, Action, State)
450416
when element(1, Action) =:= credit_reply ->
451-
{ok, State, [Action]};
452-
handle_event(_QName, {send_drained, {Ctag, Credit}}, State) ->
453-
%% This function clause should be deleted when feature flag
454-
%% rabbitmq_4.0.0 becomes required.
455-
Action = {credit_reply_v1, Ctag, Credit, _Available = 0, _Drain = true},
456417
{ok, State, [Action]}.
457418

458419
settlement_action(_Type, _QRef, [], Acc) ->
@@ -661,11 +622,8 @@ capabilities() ->
661622
<<"x-dead-letter-routing-key">>, <<"x-max-length">>,
662623
<<"x-max-length-bytes">>, <<"x-max-priority">>,
663624
<<"x-overflow">>, <<"x-queue-mode">>, <<"x-queue-version">>,
664-
<<"x-single-active-consumer">>, <<"x-queue-type">>, <<"x-queue-master-locator">>]
665-
++ case rabbit_feature_flags:is_enabled('rabbitmq_4.0.0') of
666-
true -> [<<"x-queue-leader-locator">>];
667-
false -> []
668-
end,
625+
<<"x-single-active-consumer">>, <<"x-queue-type">>,
626+
<<"x-queue-master-locator">>, <<"x-queue-leader-locator">>],
669627
consumer_arguments => [<<"x-priority">>],
670628
server_named => true,
671629
rebalance_module => undefined,
@@ -742,16 +700,6 @@ deliver_to_consumer(Pid, QName, CTag, AckRequired, Message) ->
742700
Evt = {deliver, CTag, AckRequired, [Message]},
743701
send_queue_event(Pid, QName, Evt).
744702

745-
%% Delete this function when feature flag rabbitmq_4.0.0 becomes required.
746-
send_credit_reply_credit_api_v1(Pid, QName, Available) ->
747-
Evt = {send_credit_reply, Available},
748-
send_queue_event(Pid, QName, Evt).
749-
750-
%% Delete this function when feature flag rabbitmq_4.0.0 becomes required.
751-
send_drained_credit_api_v1(Pid, QName, Ctag, Credit) ->
752-
Evt = {send_drained, {Ctag, Credit}},
753-
send_queue_event(Pid, QName, Evt).
754-
755703
send_credit_reply(Pid, QName, Ctag, DeliveryCount, Credit, Available, Drain) ->
756704
Evt = {credit_reply, Ctag, DeliveryCount, Credit, Available, Drain},
757705
send_queue_event(Pid, QName, Evt).

0 commit comments

Comments
 (0)