diff --git a/deps/rabbit/src/rabbit_backing_queue.erl b/deps/rabbit/src/rabbit_backing_queue.erl index 5bae9eef606..37bd7d4967e 100644 --- a/deps/rabbit/src/rabbit_backing_queue.erl +++ b/deps/rabbit/src/rabbit_backing_queue.erl @@ -173,13 +173,6 @@ %% each message, its ack tag, and an accumulator. -callback ackfold(msg_fun(A), A, state(), [ack()]) -> {A, state()}. -%% Fold over all the messages in a queue and return the accumulated -%% results, leaving the queue undisturbed. --callback fold(fun((mc:state(), - rabbit_types:message_properties(), - boolean(), A) -> {('stop' | 'cont'), A}), - A, state()) -> {A, state()}. - %% How long is my queue? -callback len(state()) -> non_neg_integer(). diff --git a/deps/rabbit/src/rabbit_classic_queue_index_v2.erl b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl index 087e8e35591..67922c57885 100644 --- a/deps/rabbit/src/rabbit_classic_queue_index_v2.erl +++ b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl @@ -7,9 +7,9 @@ -module(rabbit_classic_queue_index_v2). --export([erase/1, init/3, reset_state/1, recover/7, +-export([erase/1, init/2, reset_state/1, recover/5, terminate/3, delete_and_terminate/1, - info/1, publish/7, publish/8, ack/2, read/3]). + info/1, publish/7, ack/2, read/3]). %% Recovery. Unlike other functions in this module, these %% apply to all queues all at once. @@ -18,14 +18,12 @@ %% rabbit_queue_index/rabbit_variable_queue-specific functions. %% Implementation details from the queue index leaking into the %% queue implementation itself. --export([pre_publish/7, flush_pre_publish_cache/2, - sync/1, needs_sync/1, flush/1, +%% @todo TODO +-export([sync/1, needs_sync/1, bounds/2, next_segment_boundary/1]). -%% Used to upgrade/downgrade from/to the v1 index. --export([init_for_conversion/3]). --export([init_args/1]). --export([delete_segment_file_for_seq_id/2]). +%% Called by rabbit_vhost. +-export([all_queue_directory_names/1]). %% Shared with rabbit_classic_queue_store_v2. -export([queue_dir/2]). @@ -151,17 +149,11 @@ %% This fun must be called when messages that expect %% confirms have either an ack or their entry %% written to disk and file:sync/1 has been called. - on_sync :: on_sync_fun(), - - %% This fun is never called. It is kept so that we - %% can downgrade the queue back to v1. - on_sync_msg :: fun() + on_sync :: on_sync_fun() }). -type state() :: #qi{}. -%% Types copied from rabbit_queue_index. - -type on_sync_fun() :: fun ((sets:set()) -> ok). -type contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean()). -type shutdown_terms() :: list() | 'non_clean_shutdown'. @@ -176,37 +168,24 @@ erase(#resource{ virtual_host = VHost } = Name) -> Dir = queue_dir(VHostDir, Name), erase_index_dir(Dir). --spec init(rabbit_amqqueue:name(), - on_sync_fun(), on_sync_fun()) -> state(). +-spec init(rabbit_amqqueue:name(), on_sync_fun()) -> state(). %% We do not embed messages and as a result never need the OnSyncMsgFun. -init(#resource{ virtual_host = VHost } = Name, OnSyncFun, OnSyncMsgFun) -> - ?DEBUG("~0p ~0p ~0p", [Name, OnSyncFun, OnSyncMsgFun]), +init(#resource{ virtual_host = VHost } = Name, OnSyncFun) -> + ?DEBUG("~0p ~0p ~0p", [Name, OnSyncFun]), VHostDir = rabbit_vhost:msg_store_dir_path(VHost), Dir = queue_dir(VHostDir, Name), false = rabbit_file:is_file(Dir), %% is_file == is file or dir - init1(Name, Dir, OnSyncFun, OnSyncMsgFun). - -init_args(#qi{ queue_name = QueueName, - on_sync = OnSyncFun, - on_sync_msg = OnSyncMsgFun }) -> - {QueueName, OnSyncFun, OnSyncMsgFun}. - -init_for_conversion(#resource{ virtual_host = VHost } = Name, OnSyncFun, OnSyncMsgFun) -> - ?DEBUG("~0p ~0p ~0p", [Name, OnSyncFun, OnSyncMsgFun]), - VHostDir = rabbit_vhost:msg_store_dir_path(VHost), - Dir = queue_dir(VHostDir, Name), - init1(Name, Dir, OnSyncFun, OnSyncMsgFun). + init1(Name, Dir, OnSyncFun). -init1(Name, Dir, OnSyncFun, OnSyncMsgFun) -> +init1(Name, Dir, OnSyncFun) -> ensure_queue_name_stub_file(Name, Dir), DirBin = rabbit_file:filename_to_binary(Dir), #qi{ queue_name = Name, dir = << DirBin/binary, "/" >>, - on_sync = OnSyncFun, - on_sync_msg = OnSyncMsgFun + on_sync = OnSyncFun }. ensure_queue_name_stub_file(#resource{virtual_host = VHost, name = QName}, Dir) -> @@ -219,16 +198,14 @@ ensure_queue_name_stub_file(#resource{virtual_host = VHost, name = QName}, Dir) reset_state(State = #qi{ queue_name = Name, dir = Dir, - on_sync = OnSyncFun, - on_sync_msg = OnSyncMsgFun }) -> + on_sync = OnSyncFun }) -> ?DEBUG("~0p", [State]), _ = delete_and_terminate(State), - init1(Name, rabbit_file:binary_to_filename(Dir), OnSyncFun, OnSyncMsgFun). + init1(Name, rabbit_file:binary_to_filename(Dir), OnSyncFun). -spec recover(rabbit_amqqueue:name(), shutdown_terms(), boolean(), contains_predicate(), - on_sync_fun(), on_sync_fun(), - main | convert) -> + on_sync_fun()) -> {'undefined' | non_neg_integer(), 'undefined' | non_neg_integer(), state()}. @@ -241,12 +218,12 @@ reset_state(State = #qi{ queue_name = Name, -define(RECOVER_COUNTER_SIZE, 6). recover(#resource{ virtual_host = VHost, name = QueueName } = Name, Terms, - IsMsgStoreClean, ContainsCheckFun, OnSyncFun, OnSyncMsgFun, Context) -> - ?DEBUG("~0p ~0p ~0p ~0p ~0p ~0p", [Name, Terms, IsMsgStoreClean, - ContainsCheckFun, OnSyncFun, OnSyncMsgFun]), + IsMsgStoreClean, ContainsCheckFun, OnSyncFun) -> + ?DEBUG("~0p ~0p ~0p ~0p ~0p", [Name, Terms, IsMsgStoreClean, + ContainsCheckFun, OnSyncFun]), VHostDir = rabbit_vhost:msg_store_dir_path(VHost), Dir = queue_dir(VHostDir, Name), - State0 = init1(Name, Dir, OnSyncFun, OnSyncMsgFun), + State0 = init1(Name, Dir, OnSyncFun), %% We go over all segments if either the index or the %% message store has/had to recover. Otherwise we just %% take our state from Terms. @@ -254,10 +231,6 @@ recover(#resource{ virtual_host = VHost, name = QueueName } = Name, Terms, case IsIndexClean andalso IsMsgStoreClean of true -> State = case proplists:get_value(v2_index_state, Terms, undefined) of - %% We are recovering a queue that was using the v1 index. - undefined when Context =:= main -> - recover_index_v1_clean(State0, Terms, IsMsgStoreClean, - ContainsCheckFun, OnSyncFun, OnSyncMsgFun); {?VERSION, Segments} -> State0#qi{ segments = Segments } end, @@ -268,9 +241,7 @@ recover(#resource{ virtual_host = VHost, name = QueueName } = Name, Terms, State}; false -> CountersRef = counters:new(?RECOVER_COUNTER_SIZE, []), - State = recover_segments(State0, Terms, IsMsgStoreClean, - ContainsCheckFun, OnSyncFun, OnSyncMsgFun, - CountersRef, Context), + State = recover_segments(State0, ContainsCheckFun, CountersRef), ?LOG_WARNING("Queue ~ts in vhost ~ts dropped ~b/~b/~b persistent messages " "and ~b transient messages after unclean shutdown", [QueueName, VHost, @@ -283,11 +254,11 @@ recover(#resource{ virtual_host = VHost, name = QueueName } = Name, Terms, State} end. -recover_segments(State0 = #qi { queue_name = Name, dir = DirBin }, Terms, IsMsgStoreClean, - ContainsCheckFun, OnSyncFun, OnSyncMsgFun, CountersRef, Context) -> +recover_segments(State0 = #qi { queue_name = Name, dir = DirBin }, + ContainsCheckFun, CountersRef) -> Dir = rabbit_file:binary_to_filename(DirBin), SegmentFiles = rabbit_file:wildcard(".*\\" ++ ?SEGMENT_EXTENSION, Dir), - State = case SegmentFiles of + case SegmentFiles of %% No segments found. [] -> State0; @@ -298,26 +269,9 @@ recover_segments(State0 = #qi { queue_name = Name, dir = DirBin }, Terms, IsMsgS || F <- SegmentFiles]), %% We use a temporary store state to check that messages do exist. StoreState0 = rabbit_classic_queue_store_v2:init(Name), - {State1, StoreState} = recover_segments(State0, ContainsCheckFun, StoreState0, CountersRef, Segments), + {State, StoreState} = recover_segments(State0, ContainsCheckFun, StoreState0, CountersRef, Segments), _ = rabbit_classic_queue_store_v2:terminate(StoreState), - State1 - end, - case Context of - convert -> - State; - main -> - %% We try to see if there are segment files from the v1 index. - case rabbit_file:wildcard(".*\\.idx", Dir) of - %% We are recovering a dirty queue that was using the v1 index or in - %% the process of converting from v1 to v2. - [_|_] -> - recover_index_v1_dirty(State, Terms, IsMsgStoreClean, - ContainsCheckFun, OnSyncFun, OnSyncMsgFun, - CountersRef); - %% Otherwise keep default values. - [] -> - State - end + State end. recover_segments(State, _, StoreState, _, []) -> @@ -449,89 +403,6 @@ recover_segment(State, ContainsCheckFun, StoreState0, CountersRef, Fd, Unacked - (SegmentEntryCount - ThisEntry), LocBytes0) end. -recover_index_v1_clean(State0 = #qi{ queue_name = Name }, Terms, IsMsgStoreClean, - ContainsCheckFun, OnSyncFun, OnSyncMsgFun) -> - #resource{virtual_host = VHost, name = QName} = Name, - ?LOG_INFO("Converting queue ~ts in vhost ~ts from v1 to v2 after clean shutdown", [QName, VHost]), - {_, _, V1State} = rabbit_queue_index:recover(Name, Terms, IsMsgStoreClean, - ContainsCheckFun, OnSyncFun, OnSyncMsgFun, - convert), - %% We will ignore the counter results because on clean shutdown - %% we do not need to calculate the values again. This lets us - %% share code with dirty recovery. - CountersRef = counters:new(?RECOVER_COUNTER_SIZE, []), - State = recover_index_v1_common(State0, V1State, CountersRef), - ?LOG_INFO("Queue ~ts in vhost ~ts converted ~b total messages from v1 to v2", - [QName, VHost, counters:get(CountersRef, ?RECOVER_COUNT)]), - State. - -recover_index_v1_dirty(State0 = #qi{ queue_name = Name }, Terms, IsMsgStoreClean, - ContainsCheckFun, OnSyncFun, OnSyncMsgFun, - CountersRef) -> - #resource{virtual_host = VHost, name = QName} = Name, - ?LOG_INFO("Converting queue ~ts in vhost ~ts from v1 to v2 after unclean shutdown", [QName, VHost]), - %% We ignore the count and bytes returned here because we cannot trust - %% rabbit_queue_index: it has a bug that may lead to more bytes being - %% returned than it really has. - %% - %% On top of that some messages may also be in both the v1 and v2 indexes - %% after a crash. - {_, _, V1State} = rabbit_queue_index:recover(Name, Terms, IsMsgStoreClean, - ContainsCheckFun, OnSyncFun, OnSyncMsgFun, - convert), - State = recover_index_v1_common(State0, V1State, CountersRef), - ?LOG_INFO("Queue ~ts in vhost ~ts converted ~b total messages from v1 to v2", - [QName, VHost, counters:get(CountersRef, ?RECOVER_COUNT)]), - State. - -%% At this point all messages are persistent because transient messages -%% were dropped during the v1 index recovery. -recover_index_v1_common(State0 = #qi{ queue_name = Name, dir = DirBin }, - V1State, CountersRef) -> - Dir = rabbit_file:binary_to_filename(DirBin), - %% Use a temporary per-queue store state to store embedded messages. - StoreState0 = rabbit_classic_queue_store_v2:init(Name), - %% Go through the v1 index and publish messages to the v2 index. - {LoSeqId, HiSeqId, _} = rabbit_queue_index:bounds(V1State), - %% When resuming after a crash we need to double check the messages that are both - %% in the v1 and v2 index (effectively the messages below the upper bound of the - %% v2 index that are about to be written to it). - {_, V2HiSeqId, _} = bounds(State0, undefined), - SkipFun = fun - (SeqId, FunState0) when SeqId < V2HiSeqId -> - case read(SeqId, SeqId + 1, FunState0) of - %% Message already exists, skip. - {[_], FunState} -> - {skip, FunState}; - %% Message doesn't exist, write. - {[], FunState} -> - {write, FunState} - end; - %% Message is out of bounds of the v1 index. - (_, FunState) -> - {write, FunState} - end, - %% We use a common function also used with conversion on policy change. - {State1, StoreState} = rabbit_variable_queue:convert_from_v1_to_v2_loop(Name, V1State, State0, StoreState0, - {CountersRef, ?RECOVER_COUNT, ?RECOVER_BYTES}, - LoSeqId, HiSeqId, SkipFun), - %% Terminate the v2 store client. - _ = rabbit_classic_queue_store_v2:terminate(StoreState), - %% Close the v1 index journal handle if any. - JournalHdl = element(4, V1State), - ok = case JournalHdl of - undefined -> ok; - _ -> file_handle_cache:close(JournalHdl) - end, - %% Delete the v1 index files. - OldFiles = ["journal.jif"|rabbit_file:wildcard(".*\\.idx", Dir)], - _ = [rabbit_file:delete(filename:join(Dir, F)) || F <- OldFiles], - %% Ensure that everything in the v2 index is written to disk. - State = flush(State1), - %% Clean up all the garbage that we have surely been creating. - garbage_collect(), - State. - -spec terminate(rabbit_types:vhost(), [any()], State) -> State when State::state(). terminate(VHost, Terms, State0 = #qi { dir = Dir, @@ -577,17 +448,14 @@ info(#qi{ write_buffer = WriteBuffer, write_buffer_updates = NumUpdates }) -> -spec publish(rabbit_types:msg_id(), rabbit_variable_queue:seq_id(), rabbit_variable_queue:msg_location(), rabbit_types:message_properties(), boolean(), - non_neg_integer() | infinity, State) -> State when State::state(). - -publish(MsgId, SeqId, Location, Props, IsPersistent, TargetRamCount, State) -> - publish(MsgId, SeqId, Location, Props, IsPersistent, true, TargetRamCount, State). + boolean(), State) -> State when State::state(). %% Because we always persist to the msg_store, the Msg(Or)Id argument %% here is always a binary, never a record. -publish(MsgId, SeqId, Location, Props, IsPersistent, ShouldConfirm, TargetRamCount, +publish(MsgId, SeqId, Location, Props, IsPersistent, ShouldConfirm, State0 = #qi { write_buffer = WriteBuffer0, segments = Segments }) -> - ?DEBUG("~0p ~0p ~0p ~0p ~0p ~0p ~0p", [MsgId, SeqId, Location, Props, IsPersistent, TargetRamCount, State0]), + ?DEBUG("~0p ~0p ~0p ~0p ~0p ~0p", [MsgId, SeqId, Location, Props, IsPersistent, State0]), %% Add the entry to the write buffer. WriteBuffer = WriteBuffer0#{SeqId => {MsgId, SeqId, Location, Props, IsPersistent}}, State1 = State0#qi{ write_buffer = WriteBuffer }, @@ -1080,26 +948,48 @@ needs_sync(State = #qi{ confirms = Confirms }) -> false -> confirms end. --spec flush(State) -> State when State::state(). +%% ---- -flush(State) -> - ?DEBUG("~0p", [State]), - %% Flushing to disk is the same operation as sync - %% except it is called before hibernating or when - %% reducing memory use. - sync(State). +-type walker(A) :: fun ((A) -> 'finished' | + {rabbit_types:msg_id(), non_neg_integer(), A}). -%% ---- -%% -%% Defer to rabbit_queue_index for recovery for the time being. -%% We can move the functions here when the v1 index is removed. +-spec start(rabbit_types:vhost(), [rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}. start(VHost, DurableQueueNames) -> ?DEBUG("~0p ~0p", [VHost, DurableQueueNames]), - %% We replace the queue_index_walker function with our own. - %% Everything else remains the same. - {OrderedTerms, {_QueueIndexWalkerFun, FunState}} = rabbit_queue_index:start(VHost, DurableQueueNames), - {OrderedTerms, {fun queue_index_walker/1, FunState}}. + {ok, RecoveryTermsPid} = rabbit_recovery_terms:start(VHost), + rabbit_vhost_sup_sup:save_vhost_recovery_terms(VHost, RecoveryTermsPid), + {DurableTerms, DurableDirectories} = + lists:foldl( + fun(QName, {RecoveryTerms, ValidDirectories}) -> + DirName = queue_name_to_dir_name(QName), + RecoveryInfo = case rabbit_recovery_terms:read(VHost, DirName) of + {error, _} -> non_clean_shutdown; + {ok, Terms} -> Terms + end, + {[RecoveryInfo | RecoveryTerms], + sets:add_element(DirName, ValidDirectories)} + end, {[], sets:new()}, DurableQueueNames), + %% Any queue directory we've not been asked to recover is considered garbage + ToDelete = [filename:join([rabbit_vhost:msg_store_dir_path(VHost), "queues", Dir]) + || Dir <- lists:subtract(all_queue_directory_names(VHost), + sets:to_list(DurableDirectories))], + ?LOG_DEBUG("Deleting unknown files/folders: ~p", [ToDelete]), + _ = rabbit_file:recursive_delete(ToDelete), + rabbit_recovery_terms:clear(VHost), + %% The backing queue interface requires that the queue recovery terms + %% which come back from start/1 are in the same order as DurableQueueNames + OrderedTerms = lists:reverse(DurableTerms), + {OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}. + +all_queue_directory_names(VHost) -> + VHostQueuesPath = filename:join([rabbit_vhost:msg_store_dir_path(VHost), "queues"]), + case filelib:is_dir(VHostQueuesPath) of + true -> + {ok, Dirs} = file:list_dir(VHostQueuesPath), + Dirs; + false -> [] + end. queue_index_walker({start, DurableQueues}) when is_list(DurableQueues) -> ?DEBUG("~0p", [{start, DurableQueues}]), @@ -1120,9 +1010,6 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) -> empty -> ok = gatherer:stop(Gatherer), finished; - %% From v1 index walker. @todo Remove when no longer possible to convert from v1. - {value, {MsgId, Count}} -> - {MsgId, Count, {next, Gatherer}}; {value, MsgIds} -> {MsgIds, {next, Gatherer}} end. @@ -1133,16 +1020,7 @@ queue_index_walker_reader(#resource{ virtual_host = VHost } = Name, Gatherer) -> Dir = queue_dir(VHostDir, Name), SegmentFiles = rabbit_file:wildcard(".*\\" ++ ?SEGMENT_EXTENSION, Dir), _ = [queue_index_walker_segment(filename:join(Dir, F), Gatherer) || F <- SegmentFiles], - %% When there are files belonging to the v1 index, we go through - %% the v1 index walker function as well. - case rabbit_file:wildcard(".*\\.(idx|jif)", Dir) of - [_|_] -> - %% This function will call gatherer:finish/1, we do not - %% need to call it here. - rabbit_queue_index:queue_index_walker_reader(Name, Gatherer); - [] -> - ok = gatherer:finish(Gatherer) - end. + ok = gatherer:finish(Gatherer). queue_index_walker_segment(F, Gatherer) -> ?DEBUG("~0p ~0p", [F, Gatherer]), @@ -1180,27 +1058,11 @@ queue_index_walker_segment(Fd, Gatherer, N, Total, Acc) -> stop(VHost) -> ?DEBUG("~0p", [VHost]), - rabbit_queue_index:stop(VHost). + rabbit_recovery_terms:stop(VHost). %% ---- %% -%% These functions either call the normal functions or are no-ops. -%% They relate to specific optimizations of rabbit_queue_index and -%% rabbit_variable_queue. -%% -%% @todo The way pre_publish works is still fairly puzzling. -%% When the v1 index gets removed we can just drop -%% these functions. - -pre_publish(MsgOrId, SeqId, Location, Props, IsPersistent, TargetRamCount, State) -> - ?DEBUG("~0p ~0p ~0p ~0p ~0p ~0p ~0p", [MsgOrId, SeqId, Location, Props, IsPersistent, TargetRamCount, State]), - publish(MsgOrId, SeqId, Location, Props, IsPersistent, false, TargetRamCount, State). - -flush_pre_publish_cache(TargetRamCount, State) -> - ?DEBUG("~0p ~0p", [TargetRamCount, State]), - State. - -%% See comment in rabbit_queue_index:bounds/1. We do not need to be +%% Technical leftover from CQv1. We do not need to be %% accurate about these values because they are simply used as lowest %% and highest possible bounds. In fact we HAVE to be inaccurate for %% the test suite to pass. This can probably be made more accurate @@ -1237,15 +1099,6 @@ next_segment_boundary(SeqId) -> SegmentEntryCount = segment_entry_count(), (1 + (SeqId div SegmentEntryCount)) * SegmentEntryCount. -%% This function is only used when downgrading to the v1 index. -%% We potentially close the relevant fd and then delete the -%% segment file. -delete_segment_file_for_seq_id(SeqId, State0) -> - SegmentEntryCount = segment_entry_count(), - Segment = SeqId div SegmentEntryCount, - State = delete_segment(Segment, State0), - {[Segment], State}. - %% ---- %% %% Internal. diff --git a/deps/rabbit/src/rabbit_classic_queue_store_v2.erl b/deps/rabbit/src/rabbit_classic_queue_store_v2.erl index 7c28ceb7a37..354f4e1189a 100644 --- a/deps/rabbit/src/rabbit_classic_queue_store_v2.erl +++ b/deps/rabbit/src/rabbit_classic_queue_store_v2.erl @@ -147,8 +147,8 @@ info(#qs{ write_buffer = WriteBuffer }) -> %% @todo I think we can disable the old message store at the same %% place where we create MsgId. If many queues receive the -%% message, then we create an MsgId. If not, we don't. But -%% we can only do this after removing support for v1. +%% message, then we create an MsgId. If not, we don't until +%% strictly necessary (large messages). write(SeqId, Msg, Props, State0 = #qs{ write_buffer = WriteBuffer0, write_buffer_size = WriteBufferSize }) -> ?DEBUG("~0p ~0p ~0p ~0p", [SeqId, Msg, Props, State0]), diff --git a/deps/rabbit/src/rabbit_guid.erl b/deps/rabbit/src/rabbit_guid.erl index d33081c8d86..fd525e5606a 100644 --- a/deps/rabbit/src/rabbit_guid.erl +++ b/deps/rabbit/src/rabbit_guid.erl @@ -31,6 +31,7 @@ -spec start_link() -> rabbit_types:ok_pid_or_error(). +%% @todo Serial can be in persistent_term instead of process. start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [update_disk_serial()], []). diff --git a/deps/rabbit/src/rabbit_priority_queue.erl b/deps/rabbit/src/rabbit_priority_queue.erl index 6777ec31bc6..ef60d05ee6e 100644 --- a/deps/rabbit/src/rabbit_priority_queue.erl +++ b/deps/rabbit/src/rabbit_priority_queue.erl @@ -29,7 +29,7 @@ purge/1, purge_acks/1, publish/5, publish_delivered/4, discard/3, drain_confirmed/1, dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2, - ackfold/4, fold/3, len/1, is_empty/1, depth/1, + ackfold/4, len/1, is_empty/1, depth/1, update_rates/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2, @@ -302,11 +302,6 @@ ackfold(MsgFun, Acc, State = #state{bq = BQ}, AckTags) -> ackfold(MsgFun, Acc, State = #passthrough{bq = BQ, bqs = BQS}, AckTags) -> ?passthrough2(ackfold(MsgFun, Acc, BQS, AckTags)). -fold(Fun, Acc, State = #state{bq = BQ}) -> - fold2(fun (_P, BQSN, AccN) -> BQ:fold(Fun, AccN, BQSN) end, Acc, State); -fold(Fun, Acc, State = #passthrough{bq = BQ, bqs = BQS}) -> - ?passthrough2(fold(Fun, Acc, BQS)). - len(#state{bq = BQ, bqss = BQSs}) -> add0(fun (_P, BQSN) -> BQ:len(BQSN) end, BQSs); len(#passthrough{bq = BQ, bqs = BQS}) -> diff --git a/deps/rabbit/src/rabbit_queue_index.erl b/deps/rabbit/src/rabbit_queue_index.erl deleted file mode 100644 index c8a084bd414..00000000000 --- a/deps/rabbit/src/rabbit_queue_index.erl +++ /dev/null @@ -1,1417 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. -%% - --module(rabbit_queue_index). - --compile({inline, [segment_entry_count/0]}). - --export([erase/1, init/3, reset_state/1, recover/7, - terminate/3, delete_and_terminate/1, info/1, - pre_publish/7, flush_pre_publish_cache/2, - publish/7, publish/8, deliver/2, ack/2, sync/1, needs_sync/1, flush/1, - read/3, next_segment_boundary/1, bounds/1, start/2, stop/1]). - -%% Used by rabbit_vhost to set the segment_entry_count. --export([all_queue_directory_names/1]). - -%% Used by rabbit_classic_queue_index_v2 when upgrading -%% after a non-clean shutdown. --export([queue_index_walker_reader/2]). - -%% Used to upgrade/downgrade to/from the v2 index. --export([init_args/1]). --export([init_for_conversion/3]). --export([delete_segment_file_for_seq_id/2]). --export([delete_journal/1]). - --define(CLEAN_FILENAME, "clean.dot"). - -%%---------------------------------------------------------------------------- - -%% The queue index is responsible for recording the order of messages -%% within a queue on disk. As such it contains records of messages -%% being published, delivered and acknowledged. The publish record -%% includes the sequence ID, message ID and a small quantity of -%% metadata about the message; the delivery and acknowledgement -%% records just contain the sequence ID. A publish record may also -%% contain the complete message if provided to publish/5; this allows -%% the message store to be avoided altogether for small messages. In -%% either case the publish record is stored in memory in the same -%% serialised format it will take on disk. -%% -%% Because of the fact that the queue can decide at any point to send -%% a queue entry to disk, you can not rely on publishes appearing in -%% order. The only thing you can rely on is a message being published, -%% then delivered, then ack'd. -%% -%% In order to be able to clean up ack'd messages, we write to segment -%% files. These files have a fixed number of entries: segment_entry_count() -%% publishes, delivers and acknowledgements. They are numbered, and so -%% it is known that the 0th segment contains messages 0 -> -%% segment_entry_count() - 1, the 1st segment contains messages -%% segment_entry_count() -> 2*segment_entry_count() - 1 and so on. As -%% such, in the segment files, we only refer to message sequence ids -%% by the LSBs as SeqId rem segment_entry_count(). This gives them a -%% fixed size. -%% -%% However, transient messages which are not sent to disk at any point -%% will cause gaps to appear in segment files. Therefore, we delete a -%% segment file whenever the number of publishes == number of acks -%% (note that although it is not fully enforced, it is assumed that a -%% message will never be ackd before it is delivered, thus this test -%% also implies == number of delivers). In practise, this does not -%% cause disk churn in the pathological case because of the journal -%% and caching (see below). -%% -%% Because of the fact that publishes, delivers and acks can occur all -%% over, we wish to avoid lots of seeking. Therefore we have a fixed -%% sized journal to which all actions are appended. When the number of -%% entries in this journal reaches max_journal_entries, the journal -%% entries are scattered out to their relevant files, and the journal -%% is truncated to zero size. Note that entries in the journal must -%% carry the full sequence id, thus the format of entries in the -%% journal is different to that in the segments. -%% -%% The journal is also kept fully in memory, pre-segmented: the state -%% contains a mapping from segment numbers to state-per-segment (this -%% state is held for all segments which have been "seen": thus a -%% segment which has been read but has no pending entries in the -%% journal is still held in this mapping. Also note that a map is -%% used for this mapping, not an array because with an array, you will -%% always have entries from 0). Actions are stored directly in this -%% state. Thus at the point of flushing the journal, firstly no -%% reading from disk is necessary, but secondly if the known number of -%% acks and publishes in a segment are equal, given the known state of -%% the segment file combined with the journal, no writing needs to be -%% done to the segment file either (in fact it is deleted if it exists -%% at all). This is safe given that the set of acks is a subset of the -%% set of publishes. When it is necessary to sync messages, it is -%% sufficient to fsync on the journal: when entries are distributed -%% from the journal to segment files, those segments appended to are -%% fsync'd prior to the journal being truncated. -%% -%% This module is also responsible for scanning the queue index files -%% and seeding the message store on start up. -%% -%% Note that in general, the representation of a message's state as -%% the tuple: {('no_pub'|{IsPersistent, Bin, MsgBin}), -%% ('del'|'no_del'), ('ack'|'no_ack')} is richer than strictly -%% necessary for most operations. However, for startup, and to ensure -%% the safe and correct combination of journal entries with entries -%% read from the segment on disk, this richer representation vastly -%% simplifies and clarifies the code. -%% -%% For notes on Clean Shutdown and startup, see documentation in -%% rabbit_variable_queue. -%% -%% v2 UPDATE: The queue index is still keeping track of delivers -%% as noted in the above comment. However the queue will immediately -%% mark messages as delivered, because it now keeps track of delivers -%% at the queue level. The index still needs to keep track of deliver -%% entries because of its pub->del->ack logic. -%% -%%---------------------------------------------------------------------------- - -%% ---- Journal details ---- - --define(JOURNAL_FILENAME, "journal.jif"). --define(QUEUE_NAME_STUB_FILE, ".queue_name"). - --define(PUB_PERSIST_JPREFIX, 2#00). --define(PUB_TRANS_JPREFIX, 2#01). --define(DEL_JPREFIX, 2#10). --define(ACK_JPREFIX, 2#11). --define(JPREFIX_BITS, 2). --define(SEQ_BYTES, 8). --define(SEQ_BITS, ((?SEQ_BYTES * 8) - ?JPREFIX_BITS)). - -%% ---- Segment details ---- - --define(SEGMENT_EXTENSION, ".idx"). - -%% TODO: The segment size would be configurable, but deriving all the -%% other values is quite hairy and quite possibly noticeably less -%% efficient, depending on how clever the compiler is when it comes to -%% binary generation/matching with constant vs variable lengths. - --define(REL_SEQ_BITS, 14). - -%% seq only is binary 01 followed by 14 bits of rel seq id -%% (range: 0 - 16383) --define(REL_SEQ_ONLY_PREFIX, 01). --define(REL_SEQ_ONLY_PREFIX_BITS, 2). --define(REL_SEQ_ONLY_RECORD_BYTES, 2). - -%% publish record is binary 1 followed by a bit for is_persistent, -%% then 14 bits of rel seq id, 64 bits for message expiry, 32 bits of -%% size and then 128 bits of md5sum msg id. --define(PUB_PREFIX, 1). --define(PUB_PREFIX_BITS, 1). - --define(EXPIRY_BYTES, 8). --define(EXPIRY_BITS, (?EXPIRY_BYTES * 8)). --define(NO_EXPIRY, 0). - --define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes --define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)). - -%% This is the size of the message body content, for stats --define(SIZE_BYTES, 4). --define(SIZE_BITS, (?SIZE_BYTES * 8)). - -%% This is the size of the message record embedded in the queue -%% index. If 0, the message can be found in the message store. --define(EMBEDDED_SIZE_BYTES, 4). --define(EMBEDDED_SIZE_BITS, (?EMBEDDED_SIZE_BYTES * 8)). - -%% 16 bytes for md5sum + 8 for expiry --define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES + ?SIZE_BYTES)). -%% + 4 for size --define(PUB_RECORD_SIZE_BYTES, (?PUB_RECORD_BODY_BYTES + ?EMBEDDED_SIZE_BYTES)). - -%% + 2 for seq, bits and prefix --define(PUB_RECORD_PREFIX_BYTES, 2). - -%% ---- misc ---- - --define(PUB, {_, _, _}). %% {IsPersistent, Bin, MsgBin} - --define(READ_MODE, [binary, raw, read]). --define(WRITE_MODE, [write | ?READ_MODE]). - -%%---------------------------------------------------------------------------- - --record(qistate, { - %% queue directory where segment and journal files are stored - dir, - %% map of #segment records - segments, - %% journal file handle obtained from/used by file_handle_cache - journal_handle, - %% how many not yet flushed entries are there - dirty_count, - %% this many not yet flushed journal entries will force a flush - max_journal_entries, - %% callback function invoked when a message is "handled" - %% by the index and potentially can be confirmed to the publisher - on_sync, - on_sync_msg, - %% set of IDs of unconfirmed [to publishers] messages - unconfirmed, - unconfirmed_msg, - %% optimisation - pre_publish_cache, - %% optimisation - delivered_cache, - %% queue name resource record - queue_name}). - --record(segment, { - %% segment ID (an integer) - num, - %% segment file path (see also ?SEGMENT_EXTENSION) - path, - %% index operation log entries in this segment - journal_entries, - entries_to_segment, - %% counter of unacknowledged messages - unacked -}). - --include_lib("rabbit_common/include/rabbit.hrl"). --include_lib("kernel/include/logger.hrl"). - -%%---------------------------------------------------------------------------- - --type hdl() :: ('undefined' | any()). --type segment() :: ('undefined' | - #segment { num :: non_neg_integer(), - path :: file:filename(), - journal_entries :: array:array(), - entries_to_segment :: array:array(), - unacked :: non_neg_integer() - }). --type seg_map() :: {map(), [segment()]}. --type on_sync_fun() :: fun ((sets:set()) -> ok). --type qistate() :: #qistate { dir :: file:filename(), - segments :: 'undefined' | seg_map(), - journal_handle :: hdl(), - dirty_count :: integer(), - max_journal_entries :: non_neg_integer(), - on_sync :: on_sync_fun(), - on_sync_msg :: on_sync_fun(), - unconfirmed :: sets:set(), - unconfirmed_msg :: sets:set(), - pre_publish_cache :: list(), - delivered_cache :: list() - }. --type contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean()). --type walker(A) :: fun ((A) -> 'finished' | - {rabbit_types:msg_id(), non_neg_integer(), A}). --type shutdown_terms() :: [term()] | 'non_clean_shutdown'. - -%%---------------------------------------------------------------------------- -%% public API -%%---------------------------------------------------------------------------- - --spec erase(rabbit_amqqueue:name()) -> 'ok'. - -erase(#resource{ virtual_host = VHost } = Name) -> - VHostDir = rabbit_vhost:msg_store_dir_path(VHost), - #qistate { dir = Dir } = blank_state(VHostDir, Name), - erase_index_dir(Dir). - -%% used during variable queue purge when there are no pending acks - --spec reset_state(qistate()) -> qistate(). - -reset_state(#qistate{ queue_name = Name, - dir = Dir, - on_sync = OnSyncFun, - on_sync_msg = OnSyncMsgFun, - journal_handle = JournalHdl }) -> - ok = case JournalHdl of - undefined -> ok; - _ -> file_handle_cache:close(JournalHdl) - end, - ok = erase_index_dir(Dir), - blank_state_name_dir_funs(Name, Dir, OnSyncFun, OnSyncMsgFun). - --spec init(rabbit_amqqueue:name(), - on_sync_fun(), on_sync_fun()) -> qistate(). - -init(#resource{ virtual_host = VHost } = Name, OnSyncFun, OnSyncMsgFun) -> - #{segment_entry_count := SegmentEntryCount} = rabbit_vhost:read_config(VHost), - put(segment_entry_count, SegmentEntryCount), - VHostDir = rabbit_vhost:msg_store_dir_path(VHost), - State = #qistate { dir = Dir } = blank_state(VHostDir, Name), - false = rabbit_file:is_file(Dir), %% is_file == is file or dir - State#qistate{on_sync = OnSyncFun, - on_sync_msg = OnSyncMsgFun}. - -init_args(#qistate{ queue_name = QueueName, - on_sync = OnSyncFun, - on_sync_msg = OnSyncMsgFun }) -> - {QueueName, OnSyncFun, OnSyncMsgFun}. - -init_for_conversion(#resource{ virtual_host = VHost } = Name, OnSyncFun, OnSyncMsgFun) -> - #{segment_entry_count := SegmentEntryCount} = rabbit_vhost:read_config(VHost), - put(segment_entry_count, SegmentEntryCount), - VHostDir = rabbit_vhost:msg_store_dir_path(VHost), - State = blank_state(VHostDir, Name), - State#qistate{on_sync = OnSyncFun, - on_sync_msg = OnSyncMsgFun}. - --spec recover(rabbit_amqqueue:name(), shutdown_terms(), boolean(), - contains_predicate(), - on_sync_fun(), on_sync_fun(), - main | convert) -> - {'undefined' | non_neg_integer(), - 'undefined' | non_neg_integer(), qistate()}. - -recover(#resource{ virtual_host = VHost } = Name, Terms, MsgStoreRecovered, - ContainsCheckFun, OnSyncFun, OnSyncMsgFun, - %% We only allow using this module when converting to v2. - convert) -> - #{segment_entry_count := SegmentEntryCount} = rabbit_vhost:read_config(VHost), - put(segment_entry_count, SegmentEntryCount), - VHostDir = rabbit_vhost:msg_store_dir_path(VHost), - State = blank_state(VHostDir, Name), - State1 = State #qistate{on_sync = OnSyncFun, - on_sync_msg = OnSyncMsgFun}, - CleanShutdown = Terms /= non_clean_shutdown, - case CleanShutdown andalso MsgStoreRecovered of - true -> case proplists:get_value(segments, Terms, non_clean_shutdown) of - non_clean_shutdown -> init_dirty(false, ContainsCheckFun, State1); - RecoveredCounts -> init_clean(RecoveredCounts, State1) - end; - false -> init_dirty(CleanShutdown, ContainsCheckFun, State1) - end. - --spec terminate(rabbit_types:vhost(), [any()], qistate()) -> qistate(). - -terminate(VHost, Terms, State = #qistate { dir = Dir }) -> - {SegmentCounts, State1} = terminate(State), - _ = rabbit_recovery_terms:store(VHost, filename:basename(Dir), - [{segments, SegmentCounts} | Terms]), - State1. - --spec delete_and_terminate(qistate()) -> qistate(). - -delete_and_terminate(State) -> - {_SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State), - ok = rabbit_file:recursive_delete([Dir]), - State1. - --spec info(qistate()) -> []. - -%% No info is implemented for v1 at this time. -info(_) -> []. - -pre_publish(MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered, JournalSizeHint, - State = #qistate{pre_publish_cache = PPC, - delivered_cache = DC}) -> - State1 = maybe_needs_confirming(MsgProps, MsgOrId, State), - - {Bin, MsgBin} = create_pub_record_body(MsgOrId, MsgProps), - - PPC1 = - [[<<(case IsPersistent of - true -> ?PUB_PERSIST_JPREFIX; - false -> ?PUB_TRANS_JPREFIX - end):?JPREFIX_BITS, - SeqId:?SEQ_BITS, Bin/binary, - (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin] | PPC], - - DC1 = - case IsDelivered of - true -> - [SeqId | DC]; - false -> - DC - end, - - State2 = add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State1), - maybe_flush_pre_publish_cache( - JournalSizeHint, - State2#qistate{pre_publish_cache = PPC1, - delivered_cache = DC1}). - -%% pre_publish_cache is the entry with most elements when compared to -%% delivered_cache so we only check the former in the guard. -maybe_flush_pre_publish_cache(JournalSizeHint, - #qistate{pre_publish_cache = PPC} = State) -> - case length(PPC) >= segment_entry_count() of - true -> flush_pre_publish_cache(JournalSizeHint, State); - false -> State - end. - -flush_pre_publish_cache(JournalSizeHint, State) -> - State1 = flush_pre_publish_cache(State), - State2 = flush_delivered_cache(State1), - maybe_flush_journal(JournalSizeHint, State2). - -flush_pre_publish_cache(#qistate{pre_publish_cache = []} = State) -> - State; -flush_pre_publish_cache(State = #qistate{pre_publish_cache = PPC}) -> - {JournalHdl, State1} = get_journal_handle(State), - ok = file_handle_cache:append(JournalHdl, lists:reverse(PPC)), - State1#qistate{pre_publish_cache = []}. - -flush_delivered_cache(#qistate{delivered_cache = []} = State) -> - State; -flush_delivered_cache(State = #qistate{delivered_cache = DC}) -> - State1 = deliver(lists:reverse(DC), State), - State1#qistate{delivered_cache = []}. - -publish(MsgOrId, SeqId, _Location, MsgProps, IsPersistent, JournalSizeHint, State) -> - {JournalHdl, State1} = - get_journal_handle( - maybe_needs_confirming(MsgProps, MsgOrId, State)), - {Bin, MsgBin} = create_pub_record_body(MsgOrId, MsgProps), - ok = file_handle_cache:append( - JournalHdl, [<<(case IsPersistent of - true -> ?PUB_PERSIST_JPREFIX; - false -> ?PUB_TRANS_JPREFIX - end):?JPREFIX_BITS, - SeqId:?SEQ_BITS, Bin/binary, - (byte_size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin]), - maybe_flush_journal( - JournalSizeHint, - add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State1)). - -publish(MsgOrId, SeqId, Location, MsgProps, IsPersistent, _, JournalSizeHint, State) -> - publish(MsgOrId, SeqId, Location, MsgProps, IsPersistent, JournalSizeHint, State). - -maybe_needs_confirming(MsgProps, MsgOrId, - State = #qistate{unconfirmed = UC, - unconfirmed_msg = UCM}) -> - MsgId = case MsgOrId of - Id when is_binary(Id) -> Id; - Msg -> - mc:get_annotation(id, Msg) - end, - ?MSG_ID_BYTES = byte_size(MsgId), - case {MsgProps#message_properties.needs_confirming, MsgOrId} of - {true, MsgId} -> UC1 = sets:add_element(MsgId, UC), - State#qistate{unconfirmed = UC1}; - {true, _} -> UCM1 = sets:add_element(MsgId, UCM), - State#qistate{unconfirmed_msg = UCM1}; - {false, _} -> State - end. - --spec deliver([rabbit_variable_queue:seq_id()], qistate()) -> qistate(). - -deliver(SeqIds, State) -> - deliver_or_ack(del, SeqIds, State). - --spec ack([rabbit_variable_queue:seq_id()], qistate()) -> {[], qistate()}. - -ack(SeqIds, State) -> - {[], deliver_or_ack(ack, SeqIds, State)}. - -%% This is called when there are outstanding confirms or when the -%% queue is idle and the journal needs syncing (see needs_sync/1). - --spec sync(qistate()) -> qistate(). - -sync(State = #qistate { journal_handle = undefined }) -> - State; -sync(State = #qistate { journal_handle = JournalHdl }) -> - ok = file_handle_cache:sync(JournalHdl), - notify_sync(State). - --spec needs_sync(qistate()) -> 'confirms' | 'other' | 'false'. - -needs_sync(#qistate{journal_handle = undefined}) -> - false; -needs_sync(#qistate{journal_handle = JournalHdl, - unconfirmed = UC, - unconfirmed_msg = UCM}) -> - case sets:is_empty(UC) andalso sets:is_empty(UCM) of - true -> case file_handle_cache:needs_sync(JournalHdl) of - true -> other; - false -> false - end; - false -> confirms - end. - --spec flush(qistate()) -> qistate(). - -flush(State = #qistate { dirty_count = 0 }) -> State; -flush(State) -> flush_journal(State). - --spec read(rabbit_variable_queue:seq_id(), - rabbit_variable_queue:seq_id(), - qistate()) -> - {[{rabbit_types:msg_id(), rabbit_variable_queue:seq_id(), - rabbit_variable_queue:msg_location(), - rabbit_types:message_properties(), - boolean()}], qistate()}. - -read(StartEnd, StartEnd, State) -> - {[], State}; -read(Start, End, State = #qistate { segments = Segments, - dir = Dir }) when Start =< End -> - %% Start is inclusive, End is exclusive. - LowerB = {StartSeg, _StartRelSeq} = seq_id_to_seg_and_rel_seq_id(Start), - UpperB = {EndSeg, _EndRelSeq} = seq_id_to_seg_and_rel_seq_id(End - 1), - {Messages, Segments1} = - lists:foldr(fun (Seg, Acc) -> - read_bounded_segment(Seg, LowerB, UpperB, Acc, Dir) - end, {[], Segments}, lists:seq(StartSeg, EndSeg)), - {Messages, State #qistate { segments = Segments1 }}. - --spec next_segment_boundary(rabbit_variable_queue:seq_id()) -> rabbit_variable_queue:seq_id(). - -next_segment_boundary(SeqId) -> - {Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), - reconstruct_seq_id(Seg + 1, 0). - --spec bounds(qistate()) -> - {non_neg_integer(), non_neg_integer(), qistate()}. - -bounds(State = #qistate { segments = Segments }) -> - %% This is not particularly efficient, but only gets invoked on - %% queue initialisation. - SegNums = lists:sort(segment_nums(Segments)), - %% Don't bother trying to figure out the lowest seq_id, merely the - %% seq_id of the start of the lowest segment. That seq_id may not - %% actually exist, but that's fine. The important thing is that - %% the segment exists and the seq_id reported is on a segment - %% boundary. - %% - %% We also don't really care about the max seq_id. Just start the - %% next segment: it makes life much easier. - %% - %% SegNums is sorted, ascending. - {LowSeqId, NextSeqId} = - case SegNums of - [] -> {0, 0}; - [MinSeg|_] -> {reconstruct_seq_id(MinSeg, 0), - reconstruct_seq_id(1 + lists:last(SegNums), 0)} - end, - {LowSeqId, NextSeqId, State}. - --spec start(rabbit_types:vhost(), [rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}. - -start(VHost, DurableQueueNames) -> - {ok, RecoveryTermsPid} = rabbit_recovery_terms:start(VHost), - rabbit_vhost_sup_sup:save_vhost_recovery_terms(VHost, RecoveryTermsPid), - {DurableTerms, DurableDirectories} = - lists:foldl( - fun(QName, {RecoveryTerms, ValidDirectories}) -> - DirName = queue_name_to_dir_name(QName), - RecoveryInfo = case rabbit_recovery_terms:read(VHost, DirName) of - {error, _} -> non_clean_shutdown; - {ok, Terms} -> Terms - end, - {[RecoveryInfo | RecoveryTerms], - sets:add_element(DirName, ValidDirectories)} - end, {[], sets:new()}, DurableQueueNames), - %% Any queue directory we've not been asked to recover is considered garbage - ToDelete = [filename:join([rabbit_vhost:msg_store_dir_path(VHost), "queues", Dir]) - || Dir <- lists:subtract(all_queue_directory_names(VHost), - sets:to_list(DurableDirectories))], - ?LOG_DEBUG("Deleting unknown files/folders: ~p", [ToDelete]), - _ = rabbit_file:recursive_delete(ToDelete), - - rabbit_recovery_terms:clear(VHost), - - %% The backing queue interface requires that the queue recovery terms - %% which come back from start/1 are in the same order as DurableQueueNames - OrderedTerms = lists:reverse(DurableTerms), - {OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}. - - -stop(VHost) -> rabbit_recovery_terms:stop(VHost). - -all_queue_directory_names(VHost) -> - VHostQueuesPath = filename:join([rabbit_vhost:msg_store_dir_path(VHost), "queues"]), - case filelib:is_dir(VHostQueuesPath) of - true -> - {ok, Dirs} = file:list_dir(VHostQueuesPath), - Dirs; - false -> [] - end. - -%%---------------------------------------------------------------------------- -%% startup and shutdown -%%---------------------------------------------------------------------------- - -erase_index_dir(Dir) -> - case rabbit_file:is_dir(Dir) of - true -> rabbit_file:recursive_delete([Dir]); - false -> ok - end. - -blank_state(VHostDir, QueueName) -> - Dir = queue_dir(VHostDir, QueueName), - blank_state_name_dir_funs(QueueName, - Dir, - fun (_) -> ok end, - fun (_) -> ok end). - -queue_dir(VHostDir, QueueName) -> - %% Queue directory is - %% {node_database_dir}/msg_stores/vhosts/{vhost}/queues/{queue} - QueueDir = queue_name_to_dir_name(QueueName), - filename:join([VHostDir, "queues", QueueDir]). - -queue_name_to_dir_name(#resource { kind = queue, - virtual_host = VHost, - name = QName }) -> - <> = erlang:md5(<<"queue", VHost/binary, QName/binary>>), - rabbit_misc:format("~.36B", [Num]). - -blank_state_name_dir_funs(Name, Dir, OnSyncFun, OnSyncMsgFun) -> - {ok, MaxJournal} = - application:get_env(rabbit, queue_index_max_journal_entries), - #qistate { dir = Dir, - segments = segments_new(), - journal_handle = undefined, - dirty_count = 0, - max_journal_entries = MaxJournal, - on_sync = OnSyncFun, - on_sync_msg = OnSyncMsgFun, - unconfirmed = sets:new([{version,2}]), - unconfirmed_msg = sets:new([{version,2}]), - pre_publish_cache = [], - delivered_cache = [], - queue_name = Name }. - -init_clean(RecoveredCounts, State) -> - %% Load the journal. Since this is a clean recovery this (almost) - %% gets us back to where we were on shutdown. - State1 = #qistate { dir = Dir, segments = Segments } = load_journal(State), - %% The journal loading only creates records for segments touched - %% by the journal, and the counts are based on the journal entries - %% only. We need *complete* counts for *all* segments. By an - %% amazing coincidence we stored that information on shutdown. - Segments1 = - lists:foldl( - fun ({Seg, UnackedCount}, SegmentsN) -> - Segment = segment_find_or_new(Seg, Dir, SegmentsN), - segment_store(Segment #segment { unacked = UnackedCount }, - SegmentsN) - end, Segments, RecoveredCounts), - %% the counts above include transient messages, which would be the - %% wrong thing to return - {undefined, undefined, State1 # qistate { segments = Segments1 }}. - --define(RECOVER_COUNT, 1). --define(RECOVER_BYTES, 2). --define(RECOVER_COUNTER_SIZE, 2). - -init_dirty(CleanShutdown, ContainsCheckFun, State) -> - %% Recover the journal completely. This will also load segments - %% which have entries in the journal and remove duplicates. The - %% counts will correctly reflect the combination of the segment - %% and the journal. - State1 = #qistate { dir = Dir, segments = Segments } = - recover_journal(State), - {Segments1, Count, Bytes, DirtyCount} = - %% Load each segment in turn and filter out messages that are - %% not in the msg_store, by adding acks to the journal. These - %% acks only go to the RAM journal as it doesn't matter if we - %% lose them. Also mark delivered if not clean shutdown. Also - %% find the number of unacked messages. Also accumulate the - %% dirty count here, so we can call maybe_flush_journal below - %% and avoid unnecessary file system operations. - lists:foldl( - fun (Seg, {Segments2, CountAcc, BytesAcc, DirtyCount}) -> - {{Segment = #segment { unacked = UnackedCount }, Dirty}, - UnackedBytes} = - recover_segment(ContainsCheckFun, CleanShutdown, - segment_find_or_new(Seg, Dir, Segments2), - State1#qistate.max_journal_entries), - {segment_store(Segment, Segments2), - CountAcc + UnackedCount, - BytesAcc + UnackedBytes, DirtyCount + Dirty} - end, {Segments, 0, 0, 0}, all_segment_nums(State1)), - %% We force flush the journal to avoid getting into a bad state - %% when the node gets shut down immediately after init. It takes - %% a few restarts for the problem to materialize itself, with - %% at least one message published, followed by the process crashing, - %% followed by a recovery that is dirty due to term mismatch in the - %% message store, followed by two clean recoveries. This last - %% recovery fails with a crash. - State2 = flush_journal(State1 #qistate { segments = Segments1, - dirty_count = DirtyCount }), - {Count, Bytes, State2}. - -terminate(State = #qistate { journal_handle = JournalHdl, - segments = Segments }) -> - ok = case JournalHdl of - undefined -> ok; - _ -> file_handle_cache:close(JournalHdl) - end, - SegmentCounts = - segment_fold( - fun (#segment { num = Seg, unacked = UnackedCount }, Acc) -> - [{Seg, UnackedCount} | Acc] - end, [], Segments), - {SegmentCounts, State #qistate { journal_handle = undefined, - segments = undefined }}. - -recover_segment(ContainsCheckFun, CleanShutdown, - Segment = #segment { journal_entries = JEntries }, MaxJournal) -> - {SegEntries, UnackedCount} = load_segment(false, Segment), - {SegEntries1, UnackedCountDelta} = - segment_plus_journal(SegEntries, JEntries), - array:sparse_foldl( - fun (RelSeq, {{IsPersistent, Bin, MsgBin}, Del, no_ack}, - {SegmentAndDirtyCount, Bytes}) -> - {MsgOrId, MsgProps} = parse_pub_record_body(Bin, MsgBin), - {recover_message(ContainsCheckFun(MsgOrId), CleanShutdown, - Del, RelSeq, SegmentAndDirtyCount, MaxJournal), - %% @todo If the message is dropped we shouldn't add the size? - Bytes + case IsPersistent of - true -> MsgProps#message_properties.size; - false -> 0 - end} - end, - {{Segment #segment { unacked = UnackedCount + UnackedCountDelta }, 0}, 0}, - SegEntries1). - -recover_message( true, true, _Del, _RelSeq, SegmentAndDirtyCount, _MaxJournal) -> - SegmentAndDirtyCount; -recover_message( true, false, del, _RelSeq, SegmentAndDirtyCount, _MaxJournal) -> - SegmentAndDirtyCount; -recover_message( true, false, no_del, RelSeq, {Segment, _DirtyCount}, MaxJournal) -> - %% force to flush the segment - {add_to_journal(RelSeq, del, Segment), MaxJournal + 1}; -recover_message(false, _, del, RelSeq, {Segment, DirtyCount}, _MaxJournal) -> - {add_to_journal(RelSeq, ack, Segment), DirtyCount + 1}; -recover_message(false, _, no_del, RelSeq, {Segment, DirtyCount}, _MaxJournal) -> - {add_to_journal(RelSeq, ack, - add_to_journal(RelSeq, del, Segment)), - DirtyCount + 2}. - -%%---------------------------------------------------------------------------- -%% msg store startup delta function -%%---------------------------------------------------------------------------- - -queue_index_walker({start, DurableQueues}) when is_list(DurableQueues) -> - {ok, Gatherer} = gatherer:start_link(), - [begin - ok = gatherer:fork(Gatherer), - ok = worker_pool:submit_async( - fun () -> link(Gatherer), - ok = queue_index_walker_reader(QueueName, Gatherer), - unlink(Gatherer), - ok - end) - end || QueueName <- DurableQueues], - queue_index_walker({next, Gatherer}); - -queue_index_walker({next, Gatherer}) when is_pid(Gatherer) -> - case gatherer:out(Gatherer) of - empty -> - ok = gatherer:stop(Gatherer), - finished; - {value, {MsgId, Count}} -> - {MsgId, Count, {next, Gatherer}} - end. - -queue_index_walker_reader(QueueName, Gatherer) -> - ok = scan_queue_segments( - fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok) - when is_binary(MsgId) -> - gatherer:sync_in(Gatherer, {MsgId, 1}); - (_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered, - _IsAcked, Acc) -> - Acc - end, ok, QueueName), - ok = gatherer:finish(Gatherer). - -scan_queue_segments(Fun, Acc, #resource{ virtual_host = VHost } = QueueName) -> - %% Set the segment_entry_count for this worker process. - #{segment_entry_count := SegmentEntryCount} = rabbit_vhost:read_config(VHost), - put(segment_entry_count, SegmentEntryCount), - VHostDir = rabbit_vhost:msg_store_dir_path(VHost), - scan_queue_segments(Fun, Acc, VHostDir, QueueName). - -scan_queue_segments(Fun, Acc, VHostDir, QueueName) -> - State = #qistate { segments = Segments, dir = Dir } = - recover_journal(blank_state(VHostDir, QueueName)), - Result = lists:foldr( - fun (Seg, AccN) -> - segment_entries_foldr( - fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent}, - IsDelivered, IsAcked}, AccM) -> - Fun(reconstruct_seq_id(Seg, RelSeq), MsgOrId, MsgProps, - IsPersistent, IsDelivered, IsAcked, AccM) - end, AccN, segment_find_or_new(Seg, Dir, Segments)) - end, Acc, all_segment_nums(State)), - {_SegmentCounts, _State} = terminate(State), - Result. - -%%---------------------------------------------------------------------------- -%% expiry/binary manipulation -%%---------------------------------------------------------------------------- - -create_pub_record_body(MsgOrId, #message_properties { expiry = Expiry, - size = Size }) -> - ExpiryBin = expiry_to_binary(Expiry), - case MsgOrId of - MsgId when is_binary(MsgId) -> - {<>, <<>>}; - Msg -> - MsgId = mc:get_annotation(id, Msg), - MsgBin = term_to_binary(MsgOrId), - {<>, MsgBin} - end. - -expiry_to_binary(undefined) -> <>; -expiry_to_binary(Expiry) -> <>. - -parse_pub_record_body(<>, MsgBin) -> - %% work around for binary data fragmentation. See - %% rabbit_msg_file:read_next/2 - <> = <>, - Props = #message_properties{expiry = case Expiry of - ?NO_EXPIRY -> undefined; - X -> X - end, - size = Size}, - case MsgBin of - <<>> -> {MsgId, Props}; - _ -> - Msg = binary_to_term(MsgBin), - %% assertion - MsgId = mc:get_annotation(id, Msg), - {Msg, Props} - end. - -%%---------------------------------------------------------------------------- -%% journal manipulation -%%---------------------------------------------------------------------------- - -add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount, - segments = Segments, - dir = Dir }) -> - {Seg, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), - Segment = segment_find_or_new(Seg, Dir, Segments), - Segment1 = add_to_journal(RelSeq, Action, Segment), - State #qistate { dirty_count = DCount + 1, - segments = segment_store(Segment1, Segments) }; - -add_to_journal(RelSeq, Action, - Segment = #segment { journal_entries = JEntries, - entries_to_segment = EToSeg, - unacked = UnackedCount }) -> - - {Fun, Entry} = action_to_entry(RelSeq, Action, JEntries), - - {JEntries1, EToSeg1} = - case Fun of - set -> - {array:set(RelSeq, Entry, JEntries), - array:set(RelSeq, entry_to_segment(RelSeq, Entry, []), - EToSeg)}; - reset -> - {array:reset(RelSeq, JEntries), - array:reset(RelSeq, EToSeg)} - end, - - Segment #segment { - journal_entries = JEntries1, - entries_to_segment = EToSeg1, - unacked = UnackedCount + case Action of - ?PUB -> +1; - del -> 0; - ack -> -1 - end}. - -action_to_entry(RelSeq, Action, JEntries) -> - case array:get(RelSeq, JEntries) of - undefined -> - {set, - case Action of - ?PUB -> {Action, no_del, no_ack}; - del -> {no_pub, del, no_ack}; - ack -> {no_pub, no_del, ack} - end}; - ({Pub, no_del, no_ack}) when Action == del -> - {set, {Pub, del, no_ack}}; - ({no_pub, del, no_ack}) when Action == ack -> - {set, {no_pub, del, ack}}; - ({?PUB, del, no_ack}) when Action == ack -> - {reset, none}; - %% Special case, missing del - %% See journal_minus_segment1/2 - ({?PUB, no_del, no_ack}) when Action == ack -> - {reset, none} - end. - -maybe_flush_journal(State) -> - maybe_flush_journal(infinity, State). - -maybe_flush_journal(Hint, State = #qistate { dirty_count = DCount, - max_journal_entries = MaxJournal }) - when DCount > MaxJournal orelse (Hint =/= infinity andalso DCount > Hint) -> - flush_journal(State); -maybe_flush_journal(_Hint, State) -> - State. - -flush_journal(State = #qistate { segments = Segments }) -> - Segments1 = - segment_fold( - fun (#segment { unacked = 0, path = Path }, SegmentsN) -> - case rabbit_file:is_file(Path) of - true -> ok = rabbit_file:delete(Path); - false -> ok - end, - SegmentsN; - (#segment {} = Segment, SegmentsN) -> - segment_store(append_journal_to_segment(Segment), SegmentsN) - end, segments_new(), Segments), - {JournalHdl, State1} = - get_journal_handle(State #qistate { segments = Segments1 }), - ok = file_handle_cache:clear(JournalHdl), - notify_sync(State1 #qistate { dirty_count = 0 }). - -append_journal_to_segment(#segment { journal_entries = JEntries, - entries_to_segment = EToSeg, - path = Path } = Segment) -> - case array:sparse_size(JEntries) of - 0 -> Segment; - _ -> - {ok, Hdl} = file_handle_cache:open_with_absolute_path( - Path, ?WRITE_MODE, - [{write_buffer, infinity}]), - %% the file_handle_cache also does a list reverse, so this - %% might not be required here, but before we were doing a - %% sparse_foldr, a lists:reverse/1 seems to be the correct - %% thing to do for now. - _ = file_handle_cache:append(Hdl, lists:reverse(array:to_list(EToSeg))), - ok = file_handle_cache:close(Hdl), - Segment #segment { journal_entries = array_new(), - entries_to_segment = array_new([]) } - end. - -get_journal_handle(State = #qistate { journal_handle = undefined, - dir = Dir, - queue_name = Name }) -> - Path = filename:join(Dir, ?JOURNAL_FILENAME), - ok = rabbit_file:ensure_dir(Path), - ok = ensure_queue_name_stub_file(Dir, Name), - {ok, Hdl} = file_handle_cache:open_with_absolute_path( - Path, ?WRITE_MODE, [{write_buffer, infinity}]), - {Hdl, State #qistate { journal_handle = Hdl }}; -get_journal_handle(State = #qistate { journal_handle = Hdl }) -> - {Hdl, State}. - -%% Loading Journal. This isn't idempotent and will mess up the counts -%% if you call it more than once on the same state. Assumes the counts -%% are 0 to start with. -load_journal(State = #qistate { dir = Dir }) -> - Path = filename:join(Dir, ?JOURNAL_FILENAME), - case rabbit_file:is_file(Path) of - true -> {JournalHdl, State1} = get_journal_handle(State), - Size = rabbit_file:file_size(Path), - {ok, 0} = file_handle_cache:position(JournalHdl, 0), - {ok, JournalBin} = file_handle_cache:read(JournalHdl, Size), - parse_journal_entries(JournalBin, State1); - false -> State - end. - -%% ditto -recover_journal(State) -> - State1 = #qistate { segments = Segments } = load_journal(State), - Segments1 = - segment_map( - fun (Segment = #segment { journal_entries = JEntries, - entries_to_segment = EToSeg, - unacked = UnackedCountInJournal }) -> - %% We want to keep ack'd entries in so that we can - %% remove them if duplicates are in the journal. The - %% counts here are purely from the segment itself. - {SegEntries, UnackedCountInSeg} = load_segment(true, Segment), - {JEntries1, EToSeg1, UnackedCountDuplicates} = - journal_minus_segment(JEntries, EToSeg, SegEntries), - Segment #segment { journal_entries = JEntries1, - entries_to_segment = EToSeg1, - unacked = (UnackedCountInJournal + - UnackedCountInSeg - - UnackedCountDuplicates) } - end, Segments), - State1 #qistate { segments = Segments1 }. - -parse_journal_entries(<>, State) -> - parse_journal_entries(Rest, add_to_journal(SeqId, del, State)); - -parse_journal_entries(<>, State) -> - parse_journal_entries(Rest, add_to_journal(SeqId, ack, State)); -parse_journal_entries(<<0:?JPREFIX_BITS, 0:?SEQ_BITS, - 0:?PUB_RECORD_SIZE_BYTES/unit:8, _/binary>>, State) -> - %% Journal entry composed only of zeroes was probably - %% produced during a dirty shutdown so stop reading - State; -parse_journal_entries(<>, State) -> - IsPersistent = case Prefix of - ?PUB_PERSIST_JPREFIX -> true; - ?PUB_TRANS_JPREFIX -> false - end, - parse_journal_entries( - Rest, add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State)); -parse_journal_entries(_ErrOrEoF, State) -> - State. - -deliver_or_ack(_Kind, [], State) -> - State; -deliver_or_ack(Kind, SeqIds, State) -> - JPrefix = case Kind of ack -> ?ACK_JPREFIX; del -> ?DEL_JPREFIX end, - {JournalHdl, State1} = get_journal_handle(State), - ok = file_handle_cache:append( - JournalHdl, - [<> || SeqId <- SeqIds]), - maybe_flush_journal(lists:foldl(fun (SeqId, StateN) -> - add_to_journal(SeqId, Kind, StateN) - end, State1, SeqIds)). - -notify_sync(State = #qistate{unconfirmed = UC, - unconfirmed_msg = UCM, - on_sync = OnSyncFun, - on_sync_msg = OnSyncMsgFun}) -> - State1 = case sets:is_empty(UC) of - true -> State; - false -> OnSyncFun(UC), - State#qistate{unconfirmed = sets:new([{version,2}])} - end, - case sets:is_empty(UCM) of - true -> State1; - false -> OnSyncMsgFun(UCM), - State1#qistate{unconfirmed_msg = sets:new([{version,2}])} - end. - -%%---------------------------------------------------------------------------- -%% segment manipulation -%%---------------------------------------------------------------------------- - -seq_id_to_seg_and_rel_seq_id(SeqId) -> - SegmentEntryCount = segment_entry_count(), - { SeqId div SegmentEntryCount, SeqId rem SegmentEntryCount }. - -reconstruct_seq_id(Seg, RelSeq) -> - (Seg * segment_entry_count()) + RelSeq. - -all_segment_nums(#qistate { dir = Dir, segments = Segments }) -> - lists:sort( - sets:to_list( - lists:foldl( - fun (SegName, Set) -> - sets:add_element( - list_to_integer( - lists:takewhile(fun (C) -> $0 =< C andalso C =< $9 end, - SegName)), Set) - end, sets:from_list(segment_nums(Segments)), - rabbit_file:wildcard(".*\\" ++ ?SEGMENT_EXTENSION, Dir)))). - -segment_find_or_new(Seg, Dir, Segments) -> - case segment_find(Seg, Segments) of - {ok, Segment} -> Segment; - error -> SegName = integer_to_list(Seg) ++ ?SEGMENT_EXTENSION, - Path = filename:join(Dir, SegName), - #segment { num = Seg, - path = Path, - journal_entries = array_new(), - entries_to_segment = array_new([]), - unacked = 0 } - end. - -segment_find(Seg, {_Segments, [Segment = #segment { num = Seg } |_]}) -> - {ok, Segment}; %% 1 or (2, matches head) -segment_find(Seg, {_Segments, [_, Segment = #segment { num = Seg }]}) -> - {ok, Segment}; %% 2, matches tail -segment_find(Seg, {Segments, _}) -> %% no match - maps:find(Seg, Segments). - -segment_store(Segment = #segment { num = Seg }, %% 1 or (2, matches head) - {Segments, [#segment { num = Seg } | Tail]}) -> - {Segments, [Segment | Tail]}; -segment_store(Segment = #segment { num = Seg }, %% 2, matches tail - {Segments, [SegmentA, #segment { num = Seg }]}) -> - {Segments, [Segment, SegmentA]}; -segment_store(Segment = #segment { num = Seg }, {Segments, []}) -> - {maps:remove(Seg, Segments), [Segment]}; -segment_store(Segment = #segment { num = Seg }, {Segments, [SegmentA]}) -> - {maps:remove(Seg, Segments), [Segment, SegmentA]}; -segment_store(Segment = #segment { num = Seg }, - {Segments, [SegmentA, SegmentB]}) -> - {maps:put(SegmentB#segment.num, SegmentB, maps:remove(Seg, Segments)), - [Segment, SegmentA]}. - -segment_fold(Fun, Acc, {Segments, CachedSegments}) -> - maps:fold(fun (_Seg, Segment, Acc1) -> Fun(Segment, Acc1) end, - lists:foldl(Fun, Acc, CachedSegments), Segments). - -segment_map(Fun, {Segments, CachedSegments}) -> - {maps:map(fun (_Seg, Segment) -> Fun(Segment) end, Segments), - lists:map(Fun, CachedSegments)}. - -segment_nums({Segments, CachedSegments}) -> - lists:map(fun (#segment { num = Num }) -> Num end, CachedSegments) ++ - maps:keys(Segments). - -segments_new() -> - {#{}, []}. - -entry_to_segment(_RelSeq, {?PUB, del, ack}, Initial) -> - Initial; -entry_to_segment(RelSeq, {Pub, Del, Ack}, Initial) -> - %% NB: we are assembling the segment in reverse order here, so - %% del/ack comes first. - Buf1 = case {Del, Ack} of - {no_del, no_ack} -> - Initial; - _ -> - Binary = <>, - case {Del, Ack} of - {del, ack} -> [[Binary, Binary] | Initial]; - _ -> [Binary | Initial] - end - end, - case Pub of - no_pub -> - Buf1; - {IsPersistent, Bin, MsgBin} -> - [[<>, MsgBin] | Buf1] - end. - -read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq}, - {Messages, Segments}, Dir) -> - Segment = segment_find_or_new(Seg, Dir, Segments), - {segment_entries_foldr( - fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent}, _IsDelivered, no_ack}, - Acc) - when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso - (Seg < EndSeg orelse EndRelSeq >= RelSeq) -> - MsgLocation = case is_tuple(MsgOrId) of - true -> rabbit_queue_index; - false -> rabbit_msg_store - end, - [{MsgOrId, reconstruct_seq_id(StartSeg, RelSeq), MsgLocation, MsgProps, - IsPersistent} | Acc]; - (_RelSeq, _Value, Acc) -> - Acc - end, Messages, Segment), - segment_store(Segment, Segments)}. - -segment_entries_foldr(Fun, Init, - Segment = #segment { journal_entries = JEntries }) -> - {SegEntries, _UnackedCount} = load_segment(false, Segment), - {SegEntries1, _UnackedCountD} = segment_plus_journal(SegEntries, JEntries), - array:sparse_foldr( - fun (RelSeq, {{IsPersistent, Bin, MsgBin}, Del, Ack}, Acc) -> - {MsgOrId, MsgProps} = parse_pub_record_body(Bin, MsgBin), - Fun(RelSeq, {{MsgOrId, MsgProps, IsPersistent}, Del, Ack}, Acc) - end, Init, SegEntries1). - -%% Loading segments -%% -%% Does not do any combining with the journal at all. -load_segment(KeepAcked, #segment { path = Path }) -> - Empty = {array_new(), 0}, - case rabbit_file:is_file(Path) of - false -> Empty; - true -> Size = rabbit_file:file_size(Path), - {ok, Hdl} = file_handle_cache:open_with_absolute_path( - Path, ?READ_MODE, []), - {ok, 0} = file_handle_cache:position(Hdl, bof), - {ok, SegBin} = file_handle_cache:read(Hdl, Size), - ok = file_handle_cache:close(Hdl), - %% We check if the file is full of 0s. I do not know why this can happen - %% but this happens AT LEAST during v2->v1 conversion when resuming after - %% a crash has happened. Since the file is invalid, we delete it and - %% return no entries instead of just crashing (just like if the file - %% was missing above). We also log some information. - case SegBin of - <<0:Size/unit:8>> -> - ?LOG_WARNING("Deleting invalid v1 segment file ~ts (file only contains NUL bytes)", - [Path]), - _ = rabbit_file:delete(Path), - Empty; - _ -> - Res = parse_segment_entries(SegBin, KeepAcked, Empty), - Res - end - end. - -parse_segment_entries(<>, - KeepAcked, Acc) -> - parse_segment_publish_entry( - Rest, 1 == IsPersistNum, RelSeq, KeepAcked, Acc); -parse_segment_entries(<>, KeepAcked, Acc) -> - parse_segment_entries( - Rest, KeepAcked, add_segment_relseq_entry(KeepAcked, RelSeq, Acc)); -parse_segment_entries(<<>>, _KeepAcked, Acc) -> - Acc. - -parse_segment_publish_entry(<>, - IsPersistent, RelSeq, KeepAcked, - {SegEntries, Unacked}) -> - Obj = {{IsPersistent, Bin, MsgBin}, no_del, no_ack}, - SegEntries1 = array:set(RelSeq, Obj, SegEntries), - parse_segment_entries(Rest, KeepAcked, {SegEntries1, Unacked + 1}); -parse_segment_publish_entry(Rest, _IsPersistent, _RelSeq, KeepAcked, Acc) -> - parse_segment_entries(Rest, KeepAcked, Acc). - -add_segment_relseq_entry(KeepAcked, RelSeq, {SegEntries, Unacked}) -> - case array:get(RelSeq, SegEntries) of - {Pub, no_del, no_ack} -> - {array:set(RelSeq, {Pub, del, no_ack}, SegEntries), Unacked}; - {Pub, del, no_ack} when KeepAcked -> - {array:set(RelSeq, {Pub, del, ack}, SegEntries), Unacked - 1}; - {_Pub, del, no_ack} -> - {array:reset(RelSeq, SegEntries), Unacked - 1} - end. - -array_new() -> - array_new(undefined). - -array_new(Default) -> - array:new([{default, Default}, fixed, {size, segment_entry_count()}]). - -segment_entry_count() -> - get(segment_entry_count). - -bool_to_int(true ) -> 1; -bool_to_int(false) -> 0. - -%%---------------------------------------------------------------------------- -%% journal & segment combination -%%---------------------------------------------------------------------------- - -%% Combine what we have just read from a segment file with what we're -%% holding for that segment in memory. There must be no duplicates. -segment_plus_journal(SegEntries, JEntries) -> - array:sparse_foldl( - fun (RelSeq, JObj, {SegEntriesOut, AdditionalUnacked}) -> - SegEntry = array:get(RelSeq, SegEntriesOut), - {Obj, AdditionalUnackedDelta} = - segment_plus_journal1(SegEntry, JObj), - {case Obj of - undefined -> array:reset(RelSeq, SegEntriesOut); - _ -> array:set(RelSeq, Obj, SegEntriesOut) - end, - AdditionalUnacked + AdditionalUnackedDelta} - end, {SegEntries, 0}, JEntries). - -%% Here, the result is a tuple with the first element containing the -%% item which we may be adding to (for items only in the journal), -%% modifying in (bits in both), or, when returning 'undefined', -%% erasing from (ack in journal, not segment) the segment array. The -%% other element of the tuple is the delta for AdditionalUnacked. -segment_plus_journal1(undefined, {?PUB, no_del, no_ack} = Obj) -> - {Obj, 1}; -segment_plus_journal1(undefined, {?PUB, del, no_ack} = Obj) -> - {Obj, 1}; -segment_plus_journal1(undefined, {?PUB, del, ack}) -> - {undefined, 0}; - -segment_plus_journal1({?PUB = Pub, no_del, no_ack}, {no_pub, del, no_ack}) -> - {{Pub, del, no_ack}, 0}; -segment_plus_journal1({?PUB, no_del, no_ack}, {no_pub, del, ack}) -> - {undefined, -1}; -segment_plus_journal1({?PUB, del, no_ack}, {no_pub, no_del, ack}) -> - {undefined, -1}; - -%% Special case, missing del -%% See journal_minus_segment1/2 -segment_plus_journal1({?PUB, no_del, no_ack}, {no_pub, no_del, ack}) -> - {undefined, -1}. - -%% Remove from the journal entries for a segment, items that are -%% duplicates of entries found in the segment itself. Used on start up -%% to clean up the journal. -%% -%% We need to update the entries_to_segment since they are just a -%% cache of what's on the journal. -journal_minus_segment(JEntries, EToSeg, SegEntries) -> - array:sparse_foldl( - fun (RelSeq, JObj, {JEntriesOut, EToSegOut, UnackedRemoved}) -> - SegEntry = array:get(RelSeq, SegEntries), - {Obj, UnackedRemovedDelta} = - journal_minus_segment1(JObj, SegEntry), - {JEntriesOut1, EToSegOut1} = - case Obj of - keep -> - {JEntriesOut, EToSegOut}; - undefined -> - {array:reset(RelSeq, JEntriesOut), - array:reset(RelSeq, EToSegOut)}; - _ -> - {array:set(RelSeq, Obj, JEntriesOut), - array:set(RelSeq, entry_to_segment(RelSeq, Obj, []), - EToSegOut)} - end, - {JEntriesOut1, EToSegOut1, UnackedRemoved + UnackedRemovedDelta} - end, {JEntries, EToSeg, 0}, JEntries). - -%% Here, the result is a tuple with the first element containing the -%% item we are adding to or modifying in the (initially fresh) journal -%% array. If the item is 'undefined' we leave the journal array -%% alone. The other element of the tuple is the deltas for -%% UnackedRemoved. - -%% Both the same. Must be at least the publish -journal_minus_segment1({?PUB, _Del, no_ack} = Obj, Obj) -> - {undefined, 1}; -journal_minus_segment1({?PUB, _Del, ack} = Obj, Obj) -> - {undefined, 0}; - -%% Just publish in journal -journal_minus_segment1({?PUB, no_del, no_ack}, undefined) -> - {keep, 0}; - -%% Publish and deliver in journal -journal_minus_segment1({?PUB, del, no_ack}, undefined) -> - {keep, 0}; -journal_minus_segment1({?PUB = Pub, del, no_ack}, {Pub, no_del, no_ack}) -> - {{no_pub, del, no_ack}, 1}; - -%% Publish, deliver and ack in journal -journal_minus_segment1({?PUB, del, ack}, undefined) -> - {keep, 0}; -journal_minus_segment1({?PUB = Pub, del, ack}, {Pub, no_del, no_ack}) -> - {{no_pub, del, ack}, 1}; -journal_minus_segment1({?PUB = Pub, del, ack}, {Pub, del, no_ack}) -> - {{no_pub, no_del, ack}, 1}; - -%% Just deliver in journal -journal_minus_segment1({no_pub, del, no_ack}, {?PUB, no_del, no_ack}) -> - {keep, 0}; -journal_minus_segment1({no_pub, del, no_ack}, {?PUB, del, no_ack}) -> - {undefined, 0}; - -%% Just ack in journal -journal_minus_segment1({no_pub, no_del, ack}, {?PUB, del, no_ack}) -> - {keep, 0}; -journal_minus_segment1({no_pub, no_del, ack}, {?PUB, del, ack}) -> - {undefined, -1}; - -%% Just ack in journal, missing del -%% Since 3.10 message delivery is tracked per-queue, not per-message, -%% but to keep queue index v1 format messages are always marked as -%% delivered on publish. But for a message that was published before -%% 3.10 this is not the case and the delivery marker can be missing. -%% As a workaround we add the del marker because if a message is acked -%% it must have been delivered as well. -journal_minus_segment1({no_pub, no_del, ack}, {?PUB, no_del, no_ack}) -> - {{no_pub, del, ack}, 0}; - -%% Deliver and ack in journal -journal_minus_segment1({no_pub, del, ack}, {?PUB, no_del, no_ack}) -> - {keep, 0}; -journal_minus_segment1({no_pub, del, ack}, {?PUB, del, no_ack}) -> - {{no_pub, no_del, ack}, 0}; -journal_minus_segment1({no_pub, del, ack}, {?PUB, del, ack}) -> - {undefined, -1}; - -%% Missing segment. If flush_journal/1 is interrupted after deleting -%% the segment but before truncating the journal we can get these -%% cases: a delivery and an acknowledgement in the journal, or just an -%% acknowledgement in the journal, but with no segment. In both cases -%% we have really forgotten the message; so ignore what's in the -%% journal. -journal_minus_segment1({no_pub, no_del, ack}, undefined) -> - {undefined, 0}; -journal_minus_segment1({no_pub, del, ack}, undefined) -> - {undefined, 0}. - -%%---------------------------------------------------------------------------- -%% Migration functions -%%---------------------------------------------------------------------------- - -ensure_queue_name_stub_file(Dir, #resource{virtual_host = VHost, name = QName}) -> - QueueNameFile = filename:join(Dir, ?QUEUE_NAME_STUB_FILE), - file:write_file(QueueNameFile, <<"VHOST: ", VHost/binary, "\n", - "QUEUE: ", QName/binary, "\n">>). - -%% This function is only used when upgrading to the v2 index. -%% We delete the segment file without updating the state. -%% We will drop the state later on so we don't care much -%% about how accurate it is as long as we can read from -%% subsequent segment files. -delete_segment_file_for_seq_id(SeqId, #qistate { segments = Segments }) -> - {Seg, _} = seq_id_to_seg_and_rel_seq_id(SeqId), - case segment_find(Seg, Segments) of - {ok, #segment { path = Path }} -> - case rabbit_file:delete(Path) of - ok -> ok; - %% The file may not exist on disk yet. - {error, enoent} -> ok - end; - error -> - ok - end. - -delete_journal(#qistate { dir = Dir, journal_handle = JournalHdl }) -> - %% Close the journal handle if any. - ok = case JournalHdl of - undefined -> ok; - _ -> file_handle_cache:close(JournalHdl) - end, - %% Delete the journal file. - _ = rabbit_file:delete(filename:join(Dir, "journal.jif")), - ok. diff --git a/deps/rabbit/src/rabbit_variable_queue.erl b/deps/rabbit/src/rabbit_variable_queue.erl index 0c2ec75767a..16d76e27ebc 100644 --- a/deps/rabbit/src/rabbit_variable_queue.erl +++ b/deps/rabbit/src/rabbit_variable_queue.erl @@ -12,7 +12,7 @@ publish/5, publish_delivered/4, discard/3, drain_confirmed/1, dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2, - ackfold/4, fold/3, len/1, is_empty/1, depth/1, + ackfold/4, len/1, is_empty/1, depth/1, update_rates/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2, @@ -21,12 +21,8 @@ -export([start/2, stop/1]). -%% This function is used by rabbit_classic_queue_index_v2 -%% to convert v1 queues to v2 after an upgrade to 4.0. --export([convert_from_v1_to_v2_loop/8]). - %% exported for testing only --export([start_msg_store/3, stop_msg_store/1, init/5]). +-export([start_msg_store/3, stop_msg_store/1, init/4]). -include("mc.hrl"). -include_lib("stdlib/include/qlc.hrl"). @@ -187,7 +183,7 @@ persistent_bytes, %% w unacked delta_transient_bytes, %% - target_ram_count, + target_ram_count, %% Unused. ram_msg_count, %% w/o unacked ram_msg_count_prev, ram_ack_count_prev, @@ -196,7 +192,7 @@ in_counter, rates, %% There are two confirms paths: either store/index produce confirms - %% separately (v1 and v2 with per-vhost message store) or the confirms + %% separately (v2 with per-vhost message store) or the confirms %% are produced all at once while syncing/flushing (v2 with per-queue %% message store). The latter is more efficient as it avoids many %% sets operations. @@ -233,7 +229,6 @@ -type msg_location() :: memory | rabbit_msg_store - | rabbit_queue_index | rabbit_classic_queue_store_v2:msg_location(). -export_type([msg_location/0]). @@ -243,7 +238,7 @@ msg, is_persistent, is_delivered, - msg_location, %% ?IN_SHARED_STORE | ?IN_QUEUE_STORE | ?IN_QUEUE_INDEX | ?IN_MEMORY + msg_location, %% ?IN_SHARED_STORE | ?IN_QUEUE_STORE | ?IN_MEMORY index_on_disk, persist_to, msg_props @@ -256,7 +251,6 @@ end_seq_id %% end_seq_id is exclusive }). --define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2 -define(PERSISTENT_MSG_STORE, msg_store_persistent). -define(TRANSIENT_MSG_STORE, msg_store_transient). @@ -264,7 +258,6 @@ -define(IN_SHARED_STORE, rabbit_msg_store). -define(IN_QUEUE_STORE, {rabbit_classic_queue_store_v2, _, _}). --define(IN_QUEUE_INDEX, rabbit_queue_index). -define(IN_MEMORY, memory). -include_lib("rabbit_common/include/rabbit.hrl"). @@ -319,7 +312,7 @@ persistent_count :: non_neg_integer(), persistent_bytes :: non_neg_integer(), - target_ram_count :: non_neg_integer() | 'infinity', + target_ram_count :: 'infinity', ram_msg_count :: non_neg_integer(), ram_msg_count_prev :: non_neg_integer(), ram_ack_count_prev :: non_neg_integer(), @@ -367,7 +360,6 @@ %%---------------------------------------------------------------------------- start(VHost, DurableQueues) -> - %% The v2 index walker function covers both v1 and v2 index files. {AllTerms, StartFunState} = rabbit_classic_queue_index_v2:start(VHost, DurableQueues), %% Group recovery terms by vhost. ClientRefs = [Ref || Terms <- AllTerms, @@ -417,14 +409,12 @@ init(Queue, Recover, Callback) -> fun (MsgIds, ActionTaken) -> msgs_written_to_disk(Callback, MsgIds, ActionTaken) end, - fun (MsgIds) -> msg_indices_written_to_disk(Callback, MsgIds) end, - fun (MsgIds) -> msgs_and_indices_written_to_disk(Callback, MsgIds) end). + fun (MsgIds) -> msg_indices_written_to_disk(Callback, MsgIds) end). -init(Q, new, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) when ?is_amqqueue(Q) -> +init(Q, new, MsgOnDiskFun, MsgIdxOnDiskFun) when ?is_amqqueue(Q) -> QueueName = amqqueue:get_name(Q), IsDurable = amqqueue:is_durable(Q), - IndexState = rabbit_classic_queue_index_v2:init(QueueName, - MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), + IndexState = rabbit_classic_queue_index_v2:init(QueueName, MsgIdxOnDiskFun), StoreState = rabbit_classic_queue_store_v2:init(QueueName), VHost = QueueName#resource.virtual_host, init(IsDurable, IndexState, StoreState, 0, 0, [], @@ -437,7 +427,7 @@ init(Q, new, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) when ?is_amqqueu VHost), VHost); %% We can be recovering a transient queue if it crashed -init(Q, Terms, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) when ?is_amqqueue(Q) -> +init(Q, Terms, MsgOnDiskFun, MsgIdxOnDiskFun) when ?is_amqqueue(Q) -> QueueName = amqqueue:get_name(Q), IsDurable = amqqueue:is_durable(Q), {PRef, RecoveryTerms} = process_recovery_terms(Terms), @@ -461,8 +451,7 @@ init(Q, Terms, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) when ?is_amqqu rabbit_vhost_msg_store:successfully_recovered_state( VHost, ?PERSISTENT_MSG_STORE), - ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun, - main), + ContainsCheckFun, MsgIdxOnDiskFun), StoreState = rabbit_classic_queue_store_v2:init(QueueName), init(IsDurable, IndexState, StoreState, DeltaCount, DeltaBytes, RecoveryTerms, @@ -637,17 +626,19 @@ requeue(AckTags, #vqstate { delta = Delta, len = Len } = State) -> %% @todo This can be heavily simplified: if the message falls into delta, %% add it there. Otherwise just add it to q3 in the correct position. + %% @todo I think if the message falls within Q3 we must add it back there, + %% otherwise there's nothing to do? Except update stats. {SeqIds, Q3a, MsgIds, State1} = requeue_merge(lists:sort(AckTags), Q3, [], delta_limit(Delta), State), {Delta1, MsgIds1, State2} = delta_merge(SeqIds, Delta, MsgIds, State1), MsgCount = length(MsgIds1), {MsgIds1, a( - maybe_update_rates(ui( + maybe_update_rates( State2 #vqstate { delta = Delta1, q3 = Q3a, in_counter = InCounter + MsgCount, - len = Len + MsgCount })))}. + len = Len + MsgCount }))}. ackfold(MsgFun, Acc, State, AckTags) -> {AccN, StateN} = @@ -658,13 +649,6 @@ ackfold(MsgFun, Acc, State, AckTags) -> end, {Acc, State}, AckTags), {AccN, a(StateN)}. -fold(Fun, Acc, State = #vqstate{index_state = IndexState}) -> - {Its, IndexState1} = lists:foldl(fun inext/2, {[], IndexState}, - [msg_iterator(State), - disk_ack_iterator(State), - ram_ack_iterator(State)]), - ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}). - len(#vqstate { len = Len }) -> Len. is_empty(State) -> 0 == len(State). @@ -719,28 +703,21 @@ needs_timeout(#vqstate { index_state = IndexState, {false, true} -> false end. -timeout(State = #vqstate { index_state = IndexState0, - store_state = StoreState0, - unconfirmed_simple = UCS, - confirmed = C }) -> - IndexState = rabbit_classic_queue_index_v2:sync(IndexState0), - StoreState = rabbit_classic_queue_store_v2:sync(StoreState0), - State #vqstate { index_state = IndexState, - store_state = StoreState, - unconfirmed_simple = sets:new([{version,2}]), - confirmed = sets:union(C, UCS) }. +timeout(State) -> + sync(State). -handle_pre_hibernate(State = #vqstate { index_state = IndexState0, - store_state = StoreState0, - msg_store_clients = MSCState0, - unconfirmed_simple = UCS, - confirmed = C }) -> +handle_pre_hibernate(State = #vqstate{ msg_store_clients = MSCState0 }) -> MSCState = msg_store_pre_hibernate(MSCState0), - IndexState = rabbit_classic_queue_index_v2:flush(IndexState0), + sync(State#vqstate{ msg_store_clients = MSCState }). + +sync(State = #vqstate { index_state = IndexState0, + store_state = StoreState0, + unconfirmed_simple = UCS, + confirmed = C }) -> + IndexState = rabbit_classic_queue_index_v2:sync(IndexState0), StoreState = rabbit_classic_queue_store_v2:sync(StoreState0), State #vqstate { index_state = IndexState, store_state = StoreState, - msg_store_clients = MSCState, unconfirmed_simple = sets:new([{version,2}]), confirmed = sets:union(C, UCS) }. @@ -789,7 +766,6 @@ info(backing_queue_status, #vqstate { delta = Delta, q3 = Q3, mode = Mode, len = Len, - target_ram_count = TargetRamCount, next_seq_id = NextSeqId, next_deliver_seq_id = NextDeliverSeqId, ram_pending_ack = RPA, @@ -810,7 +786,7 @@ info(backing_queue_status, #vqstate { {q3 , ?QUEUE:len(Q3)}, {q4 , 0}, {len , Len}, - {target_ram_count , TargetRamCount}, + {target_ram_count , infinity}, {next_seq_id , NextSeqId}, {next_deliver_seq_id , NextDeliverSeqId}, {num_pending_acks , map_size(RPA) + map_size(DPA)}, @@ -843,60 +819,6 @@ zip_msgs_and_acks(Msgs, AckTags, Accumulator, _State) -> set_queue_version(_, State) -> State. -%% This function is used by rabbit_classic_queue_index_v2 -%% to convert v1 queues to v2 after an upgrade to 4.0. -convert_from_v1_to_v2_loop(_, _, V2Index, V2Store, _, HiSeqId, HiSeqId, _) -> - {V2Index, V2Store}; -convert_from_v1_to_v2_loop(QueueName, V1Index0, V2Index0, V2Store0, - Counters = {CountersRef, CountIx, BytesIx}, - LoSeqId, HiSeqId, SkipFun) -> - UpSeqId = lists:min([rabbit_queue_index:next_segment_boundary(LoSeqId), - HiSeqId]), - {Messages, V1Index} = rabbit_queue_index:read(LoSeqId, UpSeqId, V1Index0), - %% We do a garbage collect immediately after the old index read - %% because that may have created a lot of garbage. - garbage_collect(), - {V2Index3, V2Store3} = lists:foldl(fun - %% Move embedded messages to the per-queue store. - ({Msg, SeqId, rabbit_queue_index, Props, IsPersistent}, - {V2Index1, V2Store1}) -> - MsgId = mc:get_annotation(id, Msg), - {MsgLocation, V2Store2} = rabbit_classic_queue_store_v2:write(SeqId, Msg, Props, V2Store1), - V2Index2 = case SkipFun(SeqId, V2Index1) of - {skip, V2Index1a} -> - V2Index1a; - {write, V2Index1a} -> - counters:add(CountersRef, CountIx, 1), - counters:add(CountersRef, BytesIx, Props#message_properties.size), - rabbit_classic_queue_index_v2:publish(MsgId, SeqId, MsgLocation, Props, IsPersistent, infinity, V2Index1a) - end, - {V2Index2, V2Store2}; - %% Keep messages in the per-vhost store where they are. - ({MsgId, SeqId, rabbit_msg_store, Props, IsPersistent}, - {V2Index1, V2Store1}) -> - V2Index2 = case SkipFun(SeqId, V2Index1) of - {skip, V2Index1a} -> - V2Index1a; - {write, V2Index1a} -> - counters:add(CountersRef, CountIx, 1), - counters:add(CountersRef, BytesIx, Props#message_properties.size), - rabbit_classic_queue_index_v2:publish(MsgId, SeqId, rabbit_msg_store, Props, IsPersistent, infinity, V2Index1a) - end, - {V2Index2, V2Store1} - end, {V2Index0, V2Store0}, Messages), - %% Flush to disk to avoid keeping too much in memory between segments. - V2Index = rabbit_classic_queue_index_v2:flush(V2Index3), - V2Store = rabbit_classic_queue_store_v2:sync(V2Store3), - %% We have written everything to disk. We can delete the old segment file - %% to free up much needed space, to avoid doubling disk usage during the upgrade. - rabbit_queue_index:delete_segment_file_for_seq_id(LoSeqId, V1Index), - %% Log some progress to keep the user aware of what's going on, as moving - %% embedded messages can take quite some time. - #resource{virtual_host = VHost, name = Name} = QueueName, - ?LOG_INFO("Queue ~ts in vhost ~ts converted ~b messages from v1 to v2", - [Name, VHost, length(Messages)]), - convert_from_v1_to_v2_loop(QueueName, V1Index, V2Index, V2Store, Counters, UpSeqId, HiSeqId, SkipFun). - %% Get the Timestamp property of the first msg, if present. This is %% the one with the oldest timestamp among the heads of the pending %% acks and unread queues. We can't check disk_pending_acks as these @@ -1016,7 +938,7 @@ msg_status(IsPersistent, IsDelivered, SeqId, is_delivered = IsDelivered, msg_location = memory, index_on_disk = false, - persist_to = determine_persist_to(Msg, MsgProps, IndexMaxSize), + persist_to = determine_persist_to(Msg, IndexMaxSize), msg_props = MsgProps}. beta_msg_status({MsgId, SeqId, MsgLocation, MsgProps, IsPersistent}) @@ -1036,14 +958,10 @@ beta_msg_status({Msg, SeqId, MsgLocation, MsgProps, IsPersistent}) -> MS0#msg_status{msg_id = MsgId, msg = Msg, persist_to = case MsgLocation of - rabbit_queue_index -> queue_index; {rabbit_classic_queue_store_v2, _, _} -> queue_store; rabbit_msg_store -> msg_store end, - msg_location = case MsgLocation of - rabbit_queue_index -> memory; - _ -> MsgLocation - end}. + msg_location = MsgLocation}. beta_msg_status0(SeqId, MsgProps, IsPersistent) -> #msg_status{seq_id = SeqId, @@ -1382,15 +1300,12 @@ stats_requeued_memory(MS, St) -> St#vqstate{?UP(len, ram_msg_count, +1), ?UP(bytes, +msg_size(MS)), ?UP(unacked_bytes, -msg_size(MS))}. +%% TODO!!! %% @todo For v2 since we don't remove from disk until we ack, we don't need %% to write to disk again on requeue. If the message falls within delta %% we can just drop the MsgStatus. Otherwise we just put it in q3 and %% we don't do any disk writes. %% -%% For v1 I'm not sure? I don't think we need to write to the index -%% at least, but maybe we need to write the message if not embedded? -%% I don't think we need to... -%% %% So we don't need to change anything except how we count stats as %% well as delta stats if the message falls within delta. stats_requeued_disk(MS = #msg_status{is_persistent = true}, St) -> @@ -1460,7 +1375,6 @@ remove_from_disk(#msg_status { {StoreState0, record_confirms(sets:add_element(MsgId, sets:new([{version,2}])), State)} end; ?IN_QUEUE_STORE -> {rabbit_classic_queue_store_v2:remove(SeqId, StoreState0), State}; - ?IN_QUEUE_INDEX -> {StoreState0, State}; ?IN_MEMORY -> {StoreState0, State} end, StoreState = rabbit_classic_queue_store_v2:delete_segments(DeletedSegments, StoreState1), @@ -1471,7 +1385,7 @@ remove_from_disk(#msg_status { %% This function exists as a way to improve dropwhile/2 %% performance. The idea of having this function is to optimise calls -%% to rabbit_queue_index by batching delivers and acks, instead of +%% to the queue index by batching delivers and acks, instead of %% sending them one by one. %% %% Instead of removing every message as their are popped from the @@ -1513,7 +1427,7 @@ remove_by_predicate(Pred, State = #vqstate {out_counter = OutCount}) -> %% This function exists as a way to improve fetchwhile/4 %% performance. The idea of having this function is to optimise calls -%% to rabbit_queue_index by batching delivers, instead of sending them +%% to the queue index by batching delivers, instead of sending them %% one by one. %% %% Fun is the function passed to fetchwhile/4 that's @@ -1536,7 +1450,7 @@ fetch_by_predicate(Pred, Fun, FetchAcc, %% We try to do here the same as what remove(true, State) does but %% processing several messages at the same time. The idea is to -%% optimize rabbit_queue_index:deliver/2 calls by sending a list of +%% optimize IndexMod:deliver/2 calls by sending a list of %% SeqIds instead of one by one, thus process_queue_entries1 will %% accumulate the required deliveries, will record_pending_ack for %% each message, and will update stats, like remove/2 does. @@ -1780,77 +1694,28 @@ maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { queue_store -> {MsgLocation, StoreState} = rabbit_classic_queue_store_v2:write(SeqId, prepare_to_store(Msg), Props, StoreState0), {MsgStatus#msg_status{ msg_location = MsgLocation }, State#vqstate{ store_state = StoreState, - disk_write_count = Count + 1}}; - queue_index -> {MsgStatus, State} + disk_write_count = Count + 1}} end; maybe_write_msg_to_disk(_Force, MsgStatus, State) -> {MsgStatus, State}. -%% Due to certain optimisations made inside -%% rabbit_queue_index:pre_publish/7 we need to have two separate -%% functions for index persistence. This one is only used when paging -%% during memory pressure. We didn't want to modify -%% maybe_write_index_to_disk/3 because that function is used in other -%% places. -maybe_batch_write_index_to_disk(_Force, - MsgStatus = #msg_status { - index_on_disk = true }, State) -> - {MsgStatus, State}; -maybe_batch_write_index_to_disk(Force, - MsgStatus = #msg_status { - msg = Msg, - msg_id = MsgId, - seq_id = SeqId, - is_persistent = IsPersistent, - msg_location = MsgLocation, - msg_props = MsgProps}, - State = #vqstate { - target_ram_count = TargetRamCount, - disk_write_count = DiskWriteCount, - index_state = IndexState}) - when Force orelse IsPersistent -> - {MsgOrId, DiskWriteCount1} = - case persist_to(MsgStatus) of - msg_store -> {MsgId, DiskWriteCount}; - queue_store -> {MsgId, DiskWriteCount}; - queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1} - end, - IndexState1 = rabbit_classic_queue_index_v2:pre_publish( - MsgOrId, SeqId, MsgLocation, MsgProps, - IsPersistent, TargetRamCount, IndexState), - {MsgStatus#msg_status{index_on_disk = true}, - State#vqstate{index_state = IndexState1, - disk_write_count = DiskWriteCount1}}; -maybe_batch_write_index_to_disk(_Force, MsgStatus, State) -> - {MsgStatus, State}. - maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { index_on_disk = true }, State) -> {MsgStatus, State}; maybe_write_index_to_disk(Force, MsgStatus = #msg_status { - msg = Msg, msg_id = MsgId, seq_id = SeqId, is_persistent = IsPersistent, msg_location = MsgLocation, msg_props = MsgProps}, - State = #vqstate{target_ram_count = TargetRamCount, - disk_write_count = DiskWriteCount, - index_state = IndexState}) + State = #vqstate{index_state = IndexState}) when Force orelse IsPersistent -> - {MsgOrId, DiskWriteCount1} = - case persist_to(MsgStatus) of - msg_store -> {MsgId, DiskWriteCount}; - queue_store -> {MsgId, DiskWriteCount}; - queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1} - end, IndexState2 = rabbit_classic_queue_index_v2:publish( - MsgOrId, SeqId, MsgLocation, MsgProps, IsPersistent, - persist_to(MsgStatus) =:= msg_store, TargetRamCount, + MsgId, SeqId, MsgLocation, MsgProps, IsPersistent, + persist_to(MsgStatus) =:= msg_store, IndexState), {MsgStatus#msg_status{index_on_disk = true}, - State#vqstate{index_state = IndexState2, - disk_write_count = DiskWriteCount1}}; + State#vqstate{index_state = IndexState2}}; maybe_write_index_to_disk(_Force, MsgStatus, State) -> {MsgStatus, State}. @@ -1859,45 +1724,13 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) -> {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State), maybe_write_index_to_disk(ForceIndex, MsgStatus1, State1). -maybe_prepare_write_to_disk(ForceMsg, ForceIndex0, MsgStatus, State) -> - {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State), - %% We want messages written to the v2 per-queue store to also - %% be written to the index for proper accounting. The situation - %% where a message can be in the store but not in the index can - %% only occur when going through this function (not via maybe_write_to_disk). - ForceIndex = case persist_to(MsgStatus) of - queue_store -> true; - _ -> ForceIndex0 - end, - maybe_batch_write_index_to_disk(ForceIndex, MsgStatus1, State1). - -determine_persist_to(Msg, - #message_properties{size = BodySize}, - IndexMaxSize) -> +determine_persist_to(Msg, IndexMaxSize) -> %% The >= is so that you can set the env to 0 and never persist %% to the index. - %% - %% We want this to be fast, so we avoid size(term_to_binary()) - %% here, or using the term size estimation from truncate.erl, both - %% of which are too slow. So instead, if the message body size - %% goes over the limit then we avoid any other checks. - %% - %% If it doesn't we need to decide if the properties will push - %% it past the limit. If we have the encoded properties (usual - %% case) we can just check their size. If we don't (message came - %% via the direct client), we make a guess based on the number of - %% headers. - - %% @todo We can probably simplify this. - {MetaSize, _BodySize} = mc:size(Msg), - case BodySize >= IndexMaxSize of + {MetaSize, BodySize} = mc:size(Msg), + case MetaSize + BodySize >= IndexMaxSize of true -> msg_store; - false -> - Est = MetaSize + BodySize, - case Est >= IndexMaxSize of - true -> msg_store; - false -> queue_store - end + false -> queue_store end. persist_to(#msg_status{persist_to = To}) -> To. @@ -2022,7 +1855,6 @@ accumulate_ack(#msg_status { seq_id = SeqId, end, case MsgLocation of ?IN_QUEUE_STORE -> [SeqId|SeqIdsInStore]; - ?IN_QUEUE_INDEX -> [SeqId|SeqIdsInStore]; _ -> SeqIdsInStore end, [MsgId | AllMsgIds]}. @@ -2071,6 +1903,7 @@ msgs_written_to_disk(Callback, MsgIdSet, written) -> sets:union(MOD, Confirmed) }) end). +%% @todo Having to call run_backing_queue is probably reducing performance... msg_indices_written_to_disk(Callback, MsgIdSet) -> Callback(?MODULE, fun (?MODULE, State = #vqstate { msgs_on_disk = MOD, @@ -2083,11 +1916,6 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> sets:union(MIOD, Confirmed) }) end). -%% @todo Having to call run_backing_queue is probably reducing performance... -msgs_and_indices_written_to_disk(Callback, MsgIdSet) -> - Callback(?MODULE, - fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end). - %%---------------------------------------------------------------------------- %% Internal plumbing for requeue %%---------------------------------------------------------------------------- @@ -2130,10 +1958,8 @@ delta_merge(SeqIds, Delta, MsgIds, State) -> Acc; {#msg_status { msg_id = MsgId, is_persistent = IsPersistent } = MsgStatus, State1} -> - {_MsgStatus, State2} = - maybe_prepare_write_to_disk(true, true, MsgStatus, State1), {expand_delta(SeqId, Delta0, IsPersistent), [MsgId | MsgIds0], - stats_requeued_disk(MsgStatus, State2)} + stats_requeued_disk(MsgStatus, State1)} end end, {Delta, MsgIds, State}, SeqIds). @@ -2151,89 +1977,6 @@ msg_from_pending_ack(SeqId, State) -> delta_limit(?BLANK_DELTA_PATTERN(_)) -> undefined; delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId. -%%---------------------------------------------------------------------------- -%% Iterator -%%---------------------------------------------------------------------------- - -ram_ack_iterator(State) -> - {ack, maps:iterator(State#vqstate.ram_pending_ack)}. - -disk_ack_iterator(State) -> - {ack, maps:iterator(State#vqstate.disk_pending_ack)}. - -msg_iterator(State) -> istate(start, State). - -istate(start, State) -> {q3, State#vqstate.q3, State}; -istate(q3, State) -> {delta, State#vqstate.delta, State}; -istate(delta, _State) -> done. - -next({ack, It}, IndexState) -> - case maps:next(It) of - none -> {empty, IndexState}; - {_SeqId, MsgStatus, It1} -> Next = {ack, It1}, - {value, MsgStatus, true, Next, IndexState} - end; -next(done, IndexState) -> {empty, IndexState}; -next({delta, #delta{start_seq_id = SeqId, - end_seq_id = SeqId}, State}, IndexState) -> - next(istate(delta, State), IndexState); -next({delta, #delta{start_seq_id = SeqId, - end_seq_id = SeqIdEnd} = Delta, State}, IndexState) -> - SeqIdB = rabbit_classic_queue_index_v2:next_segment_boundary(SeqId), - %% It may make sense to limit this based on rate. But this - %% is not called outside of CMQs so I will leave it alone - %% for the time being. - SeqId1 = lists:min([SeqIdB, - %% We must limit the number of messages read at once - %% otherwise the queue will attempt to read up to segment_entry_count() - %% messages from the index each time. The value - %% chosen here is arbitrary. - SeqId + 2048, - SeqIdEnd]), - {List, IndexState1} = rabbit_classic_queue_index_v2:read(SeqId, SeqId1, IndexState), - next({delta, Delta#delta{start_seq_id = SeqId1}, List, State}, IndexState1); -next({delta, Delta, [], State}, IndexState) -> - next({delta, Delta, State}, IndexState); -next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) -> - case is_msg_in_pending_acks(SeqId, State) of - false -> Next = {delta, Delta, Rest, State}, - {value, beta_msg_status(M), false, Next, IndexState}; - true -> next({delta, Delta, Rest, State}, IndexState) - end; -next({Key, Q, State}, IndexState) -> - case ?QUEUE:out(Q) of - {empty, _Q} -> next(istate(Key, State), IndexState); - {{value, MsgStatus}, QN} -> Next = {Key, QN, State}, - {value, MsgStatus, false, Next, IndexState} - end. - -inext(It, {Its, IndexState}) -> - case next(It, IndexState) of - {empty, IndexState1} -> - {Its, IndexState1}; - {value, MsgStatus1, Unacked, It1, IndexState1} -> - {[{MsgStatus1, Unacked, It1} | Its], IndexState1} - end. - -ifold(_Fun, Acc, [], State0) -> - {Acc, State0}; -ifold(Fun, Acc, Its0, State0) -> - [{MsgStatus, Unacked, It} | Rest] = - lists:sort(fun ({#msg_status{seq_id = SeqId1}, _, _}, - {#msg_status{seq_id = SeqId2}, _, _}) -> - SeqId1 =< SeqId2 - end, Its0), - {Msg, State1} = read_msg(MsgStatus, State0), - case Fun(Msg, MsgStatus#msg_status.msg_props, Unacked, Acc) of - {stop, Acc1} -> - {Acc1, State1}; - {cont, Acc1} -> - IndexState0 = State1#vqstate.index_state, - {Its1, IndexState1} = inext(It, {Rest, IndexState0}), - State2 = State1#vqstate{index_state = IndexState1}, - ifold(Fun, Acc1, Its1, State2) - end. - %%---------------------------------------------------------------------------- %% Phase changes %%---------------------------------------------------------------------------- @@ -2296,9 +2039,7 @@ maybe_deltas_to_betas(DelsAndAcksFun, {List, StoreState3, MCStateP3, MCStateT3} = case WhatToRead of messages -> %% We try to read messages from disk all at once instead of - %% 1 by 1 at fetch time. When v1 is used and messages are - %% embedded, then the message content is already read from - %% disk at this point. For v2 embedded we must do a separate + %% 1 by 1 at fetch time. For v2 embedded we must do a separate %% call to obtain the contents and then merge the contents %% back into the #msg_status records. %% @@ -2419,13 +2160,6 @@ merge_sh_read_msgs([M = {MsgId, _, _, _, _}|MTail], Reads) -> merge_sh_read_msgs(MTail, _Reads) -> MTail. -%% Flushes queue index batch caches and updates queue index state. -ui(#vqstate{index_state = IndexState, - target_ram_count = TargetRamCount} = State) -> - IndexState1 = rabbit_classic_queue_index_v2:flush_pre_publish_cache( - TargetRamCount, IndexState), - State#vqstate{index_state = IndexState1}. - maybe_client_terminate(MSCStateP) -> %% Queue might have been asked to stop by the supervisor, it needs a clean %% shutdown in order for the supervising strategy to work - if it reaches max diff --git a/deps/rabbit/src/rabbit_vhost.erl b/deps/rabbit/src/rabbit_vhost.erl index 7b08e3fec70..2c14a35ad71 100644 --- a/deps/rabbit/src/rabbit_vhost.erl +++ b/deps/rabbit/src/rabbit_vhost.erl @@ -110,7 +110,7 @@ ensure_config_file(VHost) -> %% The config file does not exist. %% Check if there are queues in this vhost. false -> - QueueDirs = rabbit_queue_index:all_queue_directory_names(VHost), + QueueDirs = rabbit_classic_queue_index_v2:all_queue_directory_names(VHost), SegmentEntryCount = case QueueDirs of %% There are no queues. Write the configured value for %% the segment entry count, or the new RabbitMQ default diff --git a/deps/rabbit/test/backing_queue_SUITE.erl b/deps/rabbit/test/backing_queue_SUITE.erl index 01ff9f5aa25..bda171c2ce7 100644 --- a/deps/rabbit/test/backing_queue_SUITE.erl +++ b/deps/rabbit/test/backing_queue_SUITE.erl @@ -33,8 +33,7 @@ variable_queue_ack_limiting, variable_queue_purge, variable_queue_requeue, - variable_queue_requeue_ram_beta, - variable_queue_fold + variable_queue_requeue_ram_beta ]). -define(BACKING_QUEUE_TESTCASES, [ @@ -162,15 +161,9 @@ orelse Group =:= backing_queue_embed_limit_1024 -> end_per_group1(_, Config) -> Config. -init_per_testcase(Testcase, Config) when Testcase == variable_queue_requeue; - Testcase == variable_queue_fold -> - rabbit_ct_helpers:testcase_started(Config, Testcase); init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_started(Config, Testcase). -end_per_testcase(Testcase, Config) when Testcase == variable_queue_requeue; - Testcase == variable_queue_fold -> - rabbit_ct_helpers:testcase_finished(Config, Testcase); end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). @@ -806,7 +799,6 @@ index_mod() -> rabbit_classic_queue_index_v2. bq_queue_index1(_Config) -> - init_queue_index(), IndexMod = index_mod(), SegmentSize = IndexMod:next_segment_boundary(0), TwoSegs = SegmentSize + SegmentSize, @@ -852,7 +844,7 @@ bq_queue_index1(_Config) -> Qi13 end, {_DeletedSegments, Qi16} = IndexMod:ack(SeqIdsB, Qi15), - Qi17 = IndexMod:flush(Qi16), + Qi17 = IndexMod:sync(Qi16), %% Everything will have gone now because #pubs == #acks {NextSeqIdB, NextSeqIdB, Qi18} = IndexMod:bounds(Qi17, NextSeqIdB), %% should get length back as 0 because all persistent @@ -873,7 +865,7 @@ bq_queue_index1(_Config) -> _ -> Qi1 end, {_DeletedSegments, Qi3} = IndexMod:ack(SeqIdsC, Qi2), - Qi4 = IndexMod:flush(Qi3), + Qi4 = IndexMod:sync(Qi3), {Qi5, _SeqIdsMsgIdsC1} = queue_index_publish([SegmentSize], false, Qi4), Qi5 @@ -891,7 +883,7 @@ bq_queue_index1(_Config) -> {Qi3, _SeqIdsMsgIdsC3} = queue_index_publish([SegmentSize], false, Qi2), {_DeletedSegments, Qi4} = IndexMod:ack(SeqIdsC, Qi3), - IndexMod:flush(Qi4) + IndexMod:sync(Qi4) end), %% c) just fill up several segments of all pubs, then +acks @@ -904,7 +896,7 @@ bq_queue_index1(_Config) -> _ -> Qi1 end, {_DeletedSegments, Qi3} = IndexMod:ack(SeqIdsD, Qi2), - IndexMod:flush(Qi3) + IndexMod:sync(Qi3) end), %% d) get messages in all states to a segment, then flush, then do @@ -918,7 +910,7 @@ bq_queue_index1(_Config) -> _ -> Qi1 end, {_DeletedSegments3, Qi3} = IndexMod:ack([0], Qi2), - Qi4 = IndexMod:flush(Qi3), + Qi4 = IndexMod:sync(Qi3), {Qi5, [Eight,Six|_]} = queue_index_publish([3,6,8], false, Qi4), Qi6 = case IndexMod of rabbit_queue_index -> IndexMod:deliver([2,3,5,6], Qi5); @@ -984,7 +976,7 @@ bq_queue_index_props1(_Config) -> MsgId = rabbit_guid:gen(), Props = #message_properties{expiry=12345, size = 10}, Qi1 = IndexMod:publish( - MsgId, 0, memory, Props, true, infinity, Qi0), + MsgId, 0, memory, Props, true, true, Qi0), {[{MsgId, 0, _, Props, _}], Qi2} = IndexMod:read(0, 1, Qi1), Qi2 @@ -1115,7 +1107,6 @@ bq_queue_recover(Config) -> ?MODULE, bq_queue_recover1, [Config]). bq_queue_recover1(Config) -> - init_queue_index(), IndexMod = index_mod(), Count = 2 * IndexMod:next_segment_boundary(0), QName0 = queue_name(Config, <<"bq_queue_recover-q">>), @@ -1578,39 +1569,6 @@ variable_queue_requeue_ram_beta2(VQ0, _Config) -> {_, VQ8} = rabbit_variable_queue:ack(AcksAll, VQ7), VQ8. -variable_queue_fold(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, variable_queue_fold1, [Config]). - -variable_queue_fold1(Config) -> - with_fresh_variable_queue( - fun variable_queue_fold2/2, - ?config(variable_queue_type, Config)). - -variable_queue_fold2(VQ0, _Config) -> - {PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} = - variable_queue_with_holes(VQ0), - Count = rabbit_variable_queue:depth(VQ1), - Msgs = lists:sort(PendingMsgs ++ RequeuedMsgs ++ FreshMsgs), - lists:foldl(fun (Cut, VQ2) -> - test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ2) - end, VQ1, [0, 1, 2, Count div 2, - Count - 1, Count, Count + 1, Count * 2]). - -test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ0) -> - {Acc, VQ1} = rabbit_variable_queue:fold( - fun (M, _, Pending, A) -> - MInt = msg2int(M), - Pending = lists:member(MInt, PendingMsgs), %% assert - case MInt =< Cut of - true -> {cont, [MInt | A]}; - false -> {stop, A} - end - end, [], VQ0), - Expected = lists:takewhile(fun (I) -> I =< Cut end, Msgs), - Expected = lists:reverse(Acc), %% assertion - VQ1. - %% same as test_variable_queue_requeue_ram_beta but randomly changing %% the queue mode after every step. variable_queue_mode_change(Config) -> @@ -1684,8 +1642,7 @@ init_test_queue(QName) -> fun (MsgId) -> rabbit_msg_store:contains(MsgId, PersistentClient) end, - fun nop/1, fun nop/1, - main), + fun nop/1), ok = rabbit_msg_store:client_delete_and_terminate(PersistentClient), Res. @@ -1717,13 +1674,6 @@ with_empty_test_queue(Fun) -> IndexMod = index_mod(), IndexMod:delete_and_terminate(Fun(Qi, QName)). -init_queue_index() -> - %% We must set the segment entry count in the process dictionary - %% for tests that call the v1 queue index directly to have a correct - %% value. - put(segment_entry_count, 2048), - ok. - restart_app() -> rabbit:stop(), rabbit:start(). @@ -1743,7 +1693,7 @@ queue_index_publish(SeqIds, Persistent, Qi) -> QiM = IndexMod:publish( MsgId, SeqId, rabbit_msg_store, #message_properties{size = 10}, - Persistent, infinity, QiN), + Persistent, true, QiN), ok = rabbit_msg_store:write(SeqId, MsgId, MsgId, MSCState), {QiM, [{SeqId, MsgId} | SeqIdsMsgIdsAcc]} end, {Qi, []}, SeqIds), @@ -1764,7 +1714,7 @@ variable_queue_init(Q, Recover) -> true -> non_clean_shutdown; false -> new; Terms -> Terms - end, fun nop/2, fun nop/1, fun nop/1). + end, fun nop/2, fun nop/1). variable_queue_read_terms(QName) -> #resource { kind = queue,