Skip to content

Commit a4d424a

Browse files
authored
Merge pull request #14769 from rabbitmq/loic-remove-cqv1
4.3.0: remove CQv1-era code (now that CQv2 is the default in 4.2.x)
2 parents 8553314 + a918ce5 commit a4d424a

18 files changed

+537
-2727
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -950,7 +950,6 @@ declare_args() ->
950950
{<<"x-max-in-memory-bytes">>, fun check_non_neg_int_arg/2},
951951
{<<"x-max-priority">>, fun check_max_priority_arg/2},
952952
{<<"x-overflow">>, fun check_overflow/2},
953-
{<<"x-queue-mode">>, fun check_queue_mode/2},
954953
{<<"x-queue-version">>, fun check_queue_version/2},
955954
{<<"x-single-active-consumer">>, fun check_single_active_consumer_arg/2},
956955
{<<"x-queue-type">>, fun check_queue_type/2},
@@ -1167,32 +1166,14 @@ check_stream_offset_arg(Val, _Args) ->
11671166
{error, {invalid_stream_offset_arg, Val}}
11681167
end.
11691168

1170-
-define(KNOWN_QUEUE_MODES, [<<"default">>, <<"lazy">>]).
1171-
check_queue_mode({longstr, Val}, _Args) ->
1172-
case lists:member(Val, ?KNOWN_QUEUE_MODES) of
1173-
true -> ok;
1174-
false -> {error, rabbit_misc:format("unsupported queue mode '~ts'", [Val])}
1175-
end;
1176-
check_queue_mode({Type, _}, _Args) ->
1177-
{error, {unacceptable_type, Type}};
1178-
check_queue_mode(Val, _Args) when is_binary(Val) ->
1179-
case lists:member(Val, ?KNOWN_QUEUE_MODES) of
1180-
true -> ok;
1181-
false -> {error, rabbit_misc:format("unsupported queue mode '~ts'", [Val])}
1182-
end;
1183-
check_queue_mode(_Val, _Args) ->
1184-
{error, invalid_queue_mode}.
1185-
11861169
check_queue_version({Type, Val}, Args) ->
11871170
case check_non_neg_int_arg({Type, Val}, Args) of
1188-
ok when Val == 1 -> ok;
11891171
ok when Val == 2 -> ok;
11901172
ok -> {error, rabbit_misc:format("unsupported queue version '~b'", [Val])};
11911173
Error -> Error
11921174
end;
11931175
check_queue_version(Val, Args) ->
11941176
case check_non_neg_int_arg(Val, Args) of
1195-
ok when Val == 1 -> ok;
11961177
ok when Val == 2 -> ok;
11971178
ok -> {error, rabbit_misc:format("unsupported queue version '~b'", [Val])};
11981179
Error -> Error

deps/rabbit/src/rabbit_amqqueue_process.erl

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -490,9 +490,7 @@ process_args_policy(State = #q{q = Q,
490490
{<<"message-ttl">>, fun res_min/2, fun init_ttl/2},
491491
{<<"max-length">>, fun res_min/2, fun init_max_length/2},
492492
{<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2},
493-
{<<"overflow">>, fun res_arg/2, fun init_overflow/2},
494-
{<<"queue-mode">>, fun res_arg/2, fun init_queue_mode/2},
495-
{<<"queue-version">>, fun res_arg/2, fun init_queue_version/2}],
493+
{<<"overflow">>, fun res_arg/2, fun init_overflow/2}],
496494
drop_expired_msgs(
497495
lists:foldl(fun({Name, Resolve, Fun}, StateN) ->
498496
Fun(rabbit_queue_type_util:args_policy_lookup(Name, Resolve, Q), StateN)
@@ -543,22 +541,6 @@ init_overflow(Overflow, State) ->
543541
State#q{overflow = OverflowVal}
544542
end.
545543

546-
init_queue_mode(undefined, State) ->
547-
State;
548-
init_queue_mode(Mode, State = #q {backing_queue = BQ,
549-
backing_queue_state = BQS}) ->
550-
BQS1 = BQ:set_queue_mode(binary_to_existing_atom(Mode, utf8), BQS),
551-
State#q{backing_queue_state = BQS1}.
552-
553-
init_queue_version(Version0, State = #q {backing_queue = BQ,
554-
backing_queue_state = BQS}) ->
555-
Version = case Version0 of
556-
undefined -> 2;
557-
_ -> Version0
558-
end,
559-
BQS1 = BQ:set_queue_version(Version, BQS),
560-
State#q{backing_queue_state = BQS1}.
561-
562544
reply(Reply, NewState) ->
563545
{NewState1, Timeout} = next_state(NewState),
564546
{reply, Reply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}.

deps/rabbit/src/rabbit_backing_queue.erl

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@
1414
message_bytes, message_bytes_ready,
1515
message_bytes_unacknowledged, message_bytes_ram,
1616
message_bytes_persistent, head_message_timestamp,
17-
disk_reads, disk_writes, backing_queue_status,
18-
messages_paged_out, message_bytes_paged_out]).
17+
disk_reads, disk_writes, backing_queue_status]).
1918

2019
%% We can't specify a per-queue ack/state with callback signatures
2120
-type ack() :: any().
@@ -37,9 +36,6 @@
3736
-type msg_fun(A) :: fun ((mc:state(), ack(), A) -> A).
3837
-type msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean()).
3938

40-
-type queue_mode() :: atom().
41-
-type queue_version() :: pos_integer().
42-
4339
%% Called on startup with a vhost and a list of durable queue names on this vhost.
4440
%% The queues aren't being started at this point, but this call allows the
4541
%% backing queue to perform any checking necessary for the consistency
@@ -173,13 +169,6 @@
173169
%% each message, its ack tag, and an accumulator.
174170
-callback ackfold(msg_fun(A), A, state(), [ack()]) -> {A, state()}.
175171

176-
%% Fold over all the messages in a queue and return the accumulated
177-
%% results, leaving the queue undisturbed.
178-
-callback fold(fun((mc:state(),
179-
rabbit_types:message_properties(),
180-
boolean(), A) -> {('stop' | 'cont'), A}),
181-
A, state()) -> {A, state()}.
182-
183172
%% How long is my queue?
184173
-callback len(state()) -> non_neg_integer().
185174

@@ -223,10 +212,6 @@
223212
%% or discarded previously).
224213
-callback is_duplicate(mc:state(), state()) -> {boolean(), state()}.
225214

226-
-callback set_queue_mode(queue_mode(), state()) -> state().
227-
228-
-callback set_queue_version(queue_version(), state()) -> state().
229-
230215
-callback zip_msgs_and_acks([delivered_publish()],
231216
[ack()], Acc, state())
232217
-> Acc.

0 commit comments

Comments
 (0)