Skip to content

Commit 179f687

Browse files
committed
CQ: Simplify handling of confirms from index writes
We avoid an extra unnecessary message to the queue.
1 parent f8fd4bf commit 179f687

File tree

3 files changed

+54
-74
lines changed

3 files changed

+54
-74
lines changed

deps/rabbit/src/rabbit_classic_queue_index_v2.erl

Lines changed: 17 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
-module(rabbit_classic_queue_index_v2).
99

10-
-export([erase/1, init/2, reset_state/1, recover/5,
10+
-export([erase/1, init/1, reset_state/1, recover/4,
1111
terminate/3, delete_and_terminate/1,
1212
info/1, publish/7, ack/2, read/3]).
1313

@@ -144,17 +144,11 @@
144144

145145
%% File descriptors. We will keep up to 4 FDs
146146
%% at a time. See comments in reduce_fd_usage/2.
147-
fds = #{} :: #{non_neg_integer() => file:fd()},
148-
149-
%% This fun must be called when messages that expect
150-
%% confirms have either an ack or their entry
151-
%% written to disk and file:sync/1 has been called.
152-
on_sync :: on_sync_fun()
147+
fds = #{} :: #{non_neg_integer() => file:fd()}
153148
}).
154149

155150
-type state() :: #qi{}.
156151

157-
-type on_sync_fun() :: fun ((sets:set()) -> ok).
158152
-type contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean()).
159153
-type shutdown_terms() :: list() | 'non_clean_shutdown'.
160154

@@ -168,24 +162,21 @@ erase(#resource{ virtual_host = VHost } = Name) ->
168162
Dir = queue_dir(VHostDir, Name),
169163
erase_index_dir(Dir).
170164

171-
-spec init(rabbit_amqqueue:name(), on_sync_fun()) -> state().
172-
173-
%% We do not embed messages and as a result never need the OnSyncMsgFun.
165+
-spec init(rabbit_amqqueue:name()) -> state().
174166

175-
init(#resource{ virtual_host = VHost } = Name, OnSyncFun) ->
176-
?DEBUG("~0p ~0p ~0p", [Name, OnSyncFun]),
167+
init(#resource{ virtual_host = VHost } = Name) ->
168+
?DEBUG("~0p", [Name]),
177169
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
178170
Dir = queue_dir(VHostDir, Name),
179171
false = rabbit_file:is_file(Dir), %% is_file == is file or dir
180-
init1(Name, Dir, OnSyncFun).
172+
init1(Name, Dir).
181173

182-
init1(Name, Dir, OnSyncFun) ->
174+
init1(Name, Dir) ->
183175
ensure_queue_name_stub_file(Name, Dir),
184176
DirBin = rabbit_file:filename_to_binary(Dir),
185177
#qi{
186178
queue_name = Name,
187-
dir = << DirBin/binary, "/" >>,
188-
on_sync = OnSyncFun
179+
dir = << DirBin/binary, "/" >>
189180
}.
190181

191182
ensure_queue_name_stub_file(#resource{virtual_host = VHost, name = QName}, Dir) ->
@@ -197,15 +188,13 @@ ensure_queue_name_stub_file(#resource{virtual_host = VHost, name = QName}, Dir)
197188
-spec reset_state(State) -> State when State::state().
198189

199190
reset_state(State = #qi{ queue_name = Name,
200-
dir = Dir,
201-
on_sync = OnSyncFun }) ->
191+
dir = Dir }) ->
202192
?DEBUG("~0p", [State]),
203193
_ = delete_and_terminate(State),
204-
init1(Name, rabbit_file:binary_to_filename(Dir), OnSyncFun).
194+
init1(Name, rabbit_file:binary_to_filename(Dir)).
205195

206196
-spec recover(rabbit_amqqueue:name(), shutdown_terms(), boolean(),
207-
contains_predicate(),
208-
on_sync_fun()) ->
197+
contains_predicate()) ->
209198
{'undefined' | non_neg_integer(),
210199
'undefined' | non_neg_integer(), state()}.
211200

@@ -218,12 +207,11 @@ reset_state(State = #qi{ queue_name = Name,
218207
-define(RECOVER_COUNTER_SIZE, 6).
219208

220209
recover(#resource{ virtual_host = VHost, name = QueueName } = Name, Terms,
221-
IsMsgStoreClean, ContainsCheckFun, OnSyncFun) ->
222-
?DEBUG("~0p ~0p ~0p ~0p ~0p", [Name, Terms, IsMsgStoreClean,
223-
ContainsCheckFun, OnSyncFun]),
210+
IsMsgStoreClean, ContainsCheckFun) ->
211+
?DEBUG("~0p ~0p ~0p ~0p", [Name, Terms, IsMsgStoreClean, ContainsCheckFun]),
224212
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
225213
Dir = queue_dir(VHostDir, Name),
226-
State0 = init1(Name, Dir, OnSyncFun),
214+
State0 = init1(Name, Dir),
227215
%% We go over all segments if either the index or the
228216
%% message store has/had to recover. Otherwise we just
229217
%% take our state from Terms.
@@ -922,22 +910,14 @@ parse_entries(<< Status:8,
922910

923911
%% ----
924912
%%
925-
%% Syncing and flushing to disk requested by the queue.
926-
%% Note: the v2 no longer calls fsync, it only flushes.
913+
%% Flushing to disk requested by the queue.
927914

928915
-spec sync(State) -> State when State::state().
929916

930-
sync(State0 = #qi{ confirms = Confirms,
931-
on_sync = OnSyncFun }) ->
917+
sync(State0 = #qi{ confirms = Confirms }) ->
932918
?DEBUG("~0p", [State0]),
933919
State = flush_buffer(State0, full, segment_entry_count()),
934-
_ = case sets:is_empty(Confirms) of
935-
true ->
936-
ok;
937-
false ->
938-
OnSyncFun(Confirms)
939-
end,
940-
State#qi{ confirms = sets:new([{version,2}]) }.
920+
{Confirms, State#qi{ confirms = sets:new([{version,2}]) }}.
941921

942922
-spec needs_sync(state()) -> 'false' | 'confirms'.
943923

deps/rabbit/src/rabbit_variable_queue.erl

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
-export([start/2, stop/1]).
2323

2424
%% exported for testing only
25-
-export([start_msg_store/3, stop_msg_store/1, init/4]).
25+
-export([start_msg_store/3, stop_msg_store/1, init/3]).
2626

2727
-include("mc.hrl").
2828
-include_lib("stdlib/include/qlc.hrl").
@@ -404,17 +404,16 @@ stop_msg_store(VHost) ->
404404
ok.
405405

406406
init(Queue, Recover, Callback) ->
407-
init(
407+
init1(
408408
Queue, Recover,
409409
fun (MsgIds, ActionTaken) ->
410410
msgs_written_to_disk(Callback, MsgIds, ActionTaken)
411-
end,
412-
fun (MsgIds) -> msg_indices_written_to_disk(Callback, MsgIds) end).
411+
end).
413412

414-
init(Q, new, MsgOnDiskFun, MsgIdxOnDiskFun) when ?is_amqqueue(Q) ->
413+
init1(Q, new, MsgOnDiskFun) when ?is_amqqueue(Q) ->
415414
QueueName = amqqueue:get_name(Q),
416415
IsDurable = amqqueue:is_durable(Q),
417-
IndexState = rabbit_classic_queue_index_v2:init(QueueName, MsgIdxOnDiskFun),
416+
IndexState = rabbit_classic_queue_index_v2:init(QueueName),
418417
StoreState = rabbit_classic_queue_store_v2:init(QueueName),
419418
VHost = QueueName#resource.virtual_host,
420419
init(IsDurable, IndexState, StoreState, 0, 0, [],
@@ -427,7 +426,7 @@ init(Q, new, MsgOnDiskFun, MsgIdxOnDiskFun) when ?is_amqqueue(Q) ->
427426
VHost), VHost);
428427

429428
%% We can be recovering a transient queue if it crashed
430-
init(Q, Terms, MsgOnDiskFun, MsgIdxOnDiskFun) when ?is_amqqueue(Q) ->
429+
init1(Q, Terms, MsgOnDiskFun) when ?is_amqqueue(Q) ->
431430
QueueName = amqqueue:get_name(Q),
432431
IsDurable = amqqueue:is_durable(Q),
433432
{PRef, RecoveryTerms} = process_recovery_terms(Terms),
@@ -451,7 +450,7 @@ init(Q, Terms, MsgOnDiskFun, MsgIdxOnDiskFun) when ?is_amqqueue(Q) ->
451450
rabbit_vhost_msg_store:successfully_recovered_state(
452451
VHost,
453452
?PERSISTENT_MSG_STORE),
454-
ContainsCheckFun, MsgIdxOnDiskFun),
453+
ContainsCheckFun),
455454
StoreState = rabbit_classic_queue_store_v2:init(QueueName),
456455
init(IsDurable, IndexState, StoreState,
457456
DeltaCount, DeltaBytes, RecoveryTerms,
@@ -714,12 +713,28 @@ sync(State = #vqstate { index_state = IndexState0,
714713
store_state = StoreState0,
715714
unconfirmed_simple = UCS,
716715
confirmed = C }) ->
717-
IndexState = rabbit_classic_queue_index_v2:sync(IndexState0),
716+
{MsgIdSet, IndexState} = rabbit_classic_queue_index_v2:sync(IndexState0),
718717
StoreState = rabbit_classic_queue_store_v2:sync(StoreState0),
719-
State #vqstate { index_state = IndexState,
720-
store_state = StoreState,
721-
unconfirmed_simple = sets:new([{version,2}]),
722-
confirmed = sets:union(C, UCS) }.
718+
State1 = State #vqstate { index_state = IndexState,
719+
store_state = StoreState,
720+
unconfirmed_simple = sets:new([{version,2}]),
721+
confirmed = sets:union(C, UCS) },
722+
index_synced(MsgIdSet, State1).
723+
724+
index_synced(MsgIdSet, State = #vqstate{
725+
msgs_on_disk = MOD,
726+
msg_indices_on_disk = MIOD,
727+
unconfirmed = UC }) ->
728+
case sets:is_empty(MsgIdSet) of
729+
true ->
730+
State;
731+
false ->
732+
Confirmed = sets:intersection(UC, MsgIdSet),
733+
record_confirms(sets:intersection(MsgIdSet, MOD),
734+
State #vqstate {
735+
msg_indices_on_disk =
736+
sets:union(MIOD, Confirmed) })
737+
end.
723738

724739
resume(State) -> a(timeout(State)).
725740

@@ -1903,19 +1918,6 @@ msgs_written_to_disk(Callback, MsgIdSet, written) ->
19031918
sets:union(MOD, Confirmed) })
19041919
end).
19051920

1906-
%% @todo Having to call run_backing_queue is probably reducing performance...
1907-
msg_indices_written_to_disk(Callback, MsgIdSet) ->
1908-
Callback(?MODULE,
1909-
fun (?MODULE, State = #vqstate { msgs_on_disk = MOD,
1910-
msg_indices_on_disk = MIOD,
1911-
unconfirmed = UC }) ->
1912-
Confirmed = sets:intersection(UC, MsgIdSet),
1913-
record_confirms(sets:intersection(MsgIdSet, MOD),
1914-
State #vqstate {
1915-
msg_indices_on_disk =
1916-
sets:union(MIOD, Confirmed) })
1917-
end).
1918-
19191921
%%----------------------------------------------------------------------------
19201922
%% Internal plumbing for requeue
19211923
%%----------------------------------------------------------------------------

deps/rabbit/test/backing_queue_SUITE.erl

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -844,7 +844,7 @@ bq_queue_index1(_Config) ->
844844
Qi13
845845
end,
846846
{_DeletedSegments, Qi16} = IndexMod:ack(SeqIdsB, Qi15),
847-
Qi17 = IndexMod:sync(Qi16),
847+
{_Confirms, Qi17} = IndexMod:sync(Qi16),
848848
%% Everything will have gone now because #pubs == #acks
849849
{NextSeqIdB, NextSeqIdB, Qi18} = IndexMod:bounds(Qi17, NextSeqIdB),
850850
%% should get length back as 0 because all persistent
@@ -865,7 +865,7 @@ bq_queue_index1(_Config) ->
865865
_ -> Qi1
866866
end,
867867
{_DeletedSegments, Qi3} = IndexMod:ack(SeqIdsC, Qi2),
868-
Qi4 = IndexMod:sync(Qi3),
868+
{_Confirms, Qi4} = IndexMod:sync(Qi3),
869869
{Qi5, _SeqIdsMsgIdsC1} = queue_index_publish([SegmentSize],
870870
false, Qi4),
871871
Qi5
@@ -883,7 +883,8 @@ bq_queue_index1(_Config) ->
883883
{Qi3, _SeqIdsMsgIdsC3} = queue_index_publish([SegmentSize],
884884
false, Qi2),
885885
{_DeletedSegments, Qi4} = IndexMod:ack(SeqIdsC, Qi3),
886-
IndexMod:sync(Qi4)
886+
{_Confirms, Qi5} = IndexMod:sync(Qi4),
887+
Qi5
887888
end),
888889

889890
%% c) just fill up several segments of all pubs, then +acks
@@ -896,7 +897,8 @@ bq_queue_index1(_Config) ->
896897
_ -> Qi1
897898
end,
898899
{_DeletedSegments, Qi3} = IndexMod:ack(SeqIdsD, Qi2),
899-
IndexMod:sync(Qi3)
900+
{_Confirms, Qi4} = IndexMod:sync(Qi3),
901+
Qi4
900902
end),
901903

902904
%% d) get messages in all states to a segment, then flush, then do
@@ -910,7 +912,7 @@ bq_queue_index1(_Config) ->
910912
_ -> Qi1
911913
end,
912914
{_DeletedSegments3, Qi3} = IndexMod:ack([0], Qi2),
913-
Qi4 = IndexMod:sync(Qi3),
915+
{_Confirms, Qi4} = IndexMod:sync(Qi3),
914916
{Qi5, [Eight,Six|_]} = queue_index_publish([3,6,8], false, Qi4),
915917
Qi6 = case IndexMod of
916918
rabbit_queue_index -> IndexMod:deliver([2,3,5,6], Qi5);
@@ -1641,8 +1643,7 @@ init_test_queue(QName) ->
16411643
QName, [], false,
16421644
fun (MsgId) ->
16431645
rabbit_msg_store:contains(MsgId, PersistentClient)
1644-
end,
1645-
fun nop/1),
1646+
end),
16461647
ok = rabbit_msg_store:client_delete_and_terminate(PersistentClient),
16471648
Res.
16481649

@@ -1702,9 +1703,6 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
17021703
ok = rabbit_msg_store:client_delete_and_terminate(MSCState),
17031704
{A, B}.
17041705

1705-
nop(_) -> ok.
1706-
nop(_, _) -> ok.
1707-
17081706
msg_store_client_init(MsgStore, Ref) ->
17091707
rabbit_vhost_msg_store:client_init(?VHOST, MsgStore, Ref, undefined).
17101708

@@ -1714,7 +1712,7 @@ variable_queue_init(Q, Recover) ->
17141712
true -> non_clean_shutdown;
17151713
false -> new;
17161714
Terms -> Terms
1717-
end, fun nop/2, fun nop/1).
1715+
end, fun(_, _) -> ok end).
17181716

17191717
variable_queue_read_terms(QName) ->
17201718
#resource { kind = queue,

0 commit comments

Comments
 (0)