Skip to content

Commit b2fadc8

Browse files
committed
fixup! WIP Remove CQv1
1 parent 0a7c919 commit b2fadc8

File tree

2 files changed

+10
-17
lines changed

2 files changed

+10
-17
lines changed

deps/rabbit/src/rabbit_classic_queue_index_v2.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
-export([sync/1, needs_sync/1, flush/1,
2323
bounds/2, next_segment_boundary/1]).
2424

25+
%% Called by rabbit_vhost.
26+
-export([all_queue_directory_names/1]).
27+
2528
%% Shared with rabbit_classic_queue_store_v2.
2629
-export([queue_dir/2]).
2730

deps/rabbit/src/rabbit_variable_queue.erl

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,6 @@
229229

230230
-type msg_location() :: memory
231231
| rabbit_msg_store
232-
| rabbit_queue_index
233232
| rabbit_classic_queue_store_v2:msg_location().
234233
-export_type([msg_location/0]).
235234

@@ -239,7 +238,7 @@
239238
msg,
240239
is_persistent,
241240
is_delivered,
242-
msg_location, %% ?IN_SHARED_STORE | ?IN_QUEUE_STORE | ?IN_QUEUE_INDEX | ?IN_MEMORY
241+
msg_location, %% ?IN_SHARED_STORE | ?IN_QUEUE_STORE | ?IN_MEMORY
243242
index_on_disk,
244243
persist_to,
245244
msg_props
@@ -260,7 +259,6 @@
260259

261260
-define(IN_SHARED_STORE, rabbit_msg_store).
262261
-define(IN_QUEUE_STORE, {rabbit_classic_queue_store_v2, _, _}).
263-
-define(IN_QUEUE_INDEX, rabbit_queue_index).
264262
-define(IN_MEMORY, memory).
265263

266264
-include_lib("rabbit_common/include/rabbit.hrl").
@@ -975,14 +973,10 @@ beta_msg_status({Msg, SeqId, MsgLocation, MsgProps, IsPersistent}) ->
975973
MS0#msg_status{msg_id = MsgId,
976974
msg = Msg,
977975
persist_to = case MsgLocation of
978-
rabbit_queue_index -> queue_index;
979976
{rabbit_classic_queue_store_v2, _, _} -> queue_store;
980977
rabbit_msg_store -> msg_store
981978
end,
982-
msg_location = case MsgLocation of
983-
rabbit_queue_index -> memory;
984-
_ -> MsgLocation
985-
end}.
979+
msg_location = MsgLocation}.
986980

987981
beta_msg_status0(SeqId, MsgProps, IsPersistent) ->
988982
#msg_status{seq_id = SeqId,
@@ -1396,7 +1390,6 @@ remove_from_disk(#msg_status {
13961390
{StoreState0, record_confirms(sets:add_element(MsgId, sets:new([{version,2}])), State)}
13971391
end;
13981392
?IN_QUEUE_STORE -> {rabbit_classic_queue_store_v2:remove(SeqId, StoreState0), State};
1399-
?IN_QUEUE_INDEX -> {StoreState0, State};
14001393
?IN_MEMORY -> {StoreState0, State}
14011394
end,
14021395
StoreState = rabbit_classic_queue_store_v2:delete_segments(DeletedSegments, StoreState1),
@@ -1407,7 +1400,7 @@ remove_from_disk(#msg_status {
14071400

14081401
%% This function exists as a way to improve dropwhile/2
14091402
%% performance. The idea of having this function is to optimise calls
1410-
%% to rabbit_queue_index by batching delivers and acks, instead of
1403+
%% to the queue index by batching delivers and acks, instead of
14111404
%% sending them one by one.
14121405
%%
14131406
%% Instead of removing every message as their are popped from the
@@ -1449,7 +1442,7 @@ remove_by_predicate(Pred, State = #vqstate {out_counter = OutCount}) ->
14491442

14501443
%% This function exists as a way to improve fetchwhile/4
14511444
%% performance. The idea of having this function is to optimise calls
1452-
%% to rabbit_queue_index by batching delivers, instead of sending them
1445+
%% to the queue index by batching delivers, instead of sending them
14531446
%% one by one.
14541447
%%
14551448
%% Fun is the function passed to fetchwhile/4 that's
@@ -1472,7 +1465,7 @@ fetch_by_predicate(Pred, Fun, FetchAcc,
14721465

14731466
%% We try to do here the same as what remove(true, State) does but
14741467
%% processing several messages at the same time. The idea is to
1475-
%% optimize rabbit_queue_index:deliver/2 calls by sending a list of
1468+
%% optimize IndexMod:deliver/2 calls by sending a list of
14761469
%% SeqIds instead of one by one, thus process_queue_entries1 will
14771470
%% accumulate the required deliveries, will record_pending_ack for
14781471
%% each message, and will update stats, like remove/2 does.
@@ -1716,8 +1709,7 @@ maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
17161709
queue_store -> {MsgLocation, StoreState} = rabbit_classic_queue_store_v2:write(SeqId, prepare_to_store(Msg), Props, StoreState0),
17171710
{MsgStatus#msg_status{ msg_location = MsgLocation },
17181711
State#vqstate{ store_state = StoreState,
1719-
disk_write_count = Count + 1}};
1720-
queue_index -> {MsgStatus, State}
1712+
disk_write_count = Count + 1}}
17211713
end;
17221714
maybe_write_msg_to_disk(_Force, MsgStatus, State) ->
17231715
{MsgStatus, State}.
@@ -1738,8 +1730,7 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
17381730
{MsgOrId, DiskWriteCount1} =
17391731
case persist_to(MsgStatus) of
17401732
msg_store -> {MsgId, DiskWriteCount};
1741-
queue_store -> {MsgId, DiskWriteCount};
1742-
queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1}
1733+
queue_store -> {MsgId, DiskWriteCount}
17431734
end,
17441735
IndexState2 = rabbit_classic_queue_index_v2:publish(
17451736
MsgOrId, SeqId, MsgLocation, MsgProps, IsPersistent,
@@ -1907,7 +1898,6 @@ accumulate_ack(#msg_status { seq_id = SeqId,
19071898
end,
19081899
case MsgLocation of
19091900
?IN_QUEUE_STORE -> [SeqId|SeqIdsInStore];
1910-
?IN_QUEUE_INDEX -> [SeqId|SeqIdsInStore];
19111901
_ -> SeqIdsInStore
19121902
end,
19131903
[MsgId | AllMsgIds]}.

0 commit comments

Comments
 (0)