Skip to content

Commit 2bc9c47

Browse files
committed
Skip periodic flush if disk written to recently
This is meant to both reduce the number of flushes when they are not necessary (fast writes scenarios) but also to prevent issues on Windows when recently deleted files are not immediately available for reopening and writing (they are in "DELETE PENDING").
1 parent 4bca14a commit 2bc9c47

File tree

1 file changed

+21
-5
lines changed

1 file changed

+21
-5
lines changed

deps/rabbit/src/rabbit_classic_queue_store_v2.erl

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,15 @@
9999
%% we are using file:pread/3 which leaves the file
100100
%% position undetermined.
101101
read_segment = undefined :: undefined | non_neg_integer(),
102-
read_fd = undefined :: undefined | file:fd()
102+
read_fd = undefined :: undefined | file:fd(),
103+
104+
%% Time at which the most recent buffer flush was done,
105+
%% or the most recent segment delete. This is used to skip
106+
%% the periodic flush when a flush was done recently, or
107+
%% when the current write file was recently deleted (to
108+
%% avoid {error,eacces} issues on Windows for files in
109+
%% "DELETE PENDING" state).
110+
last_disk_write = erlang:monotonic_time(millisecond)
103111
}).
104112

105113
-type state() :: #qs{}.
@@ -184,9 +192,15 @@ get_write_offset(Segment, Size, State = #qs{ write_segment = undefined }) ->
184192

185193
-spec sync(State) -> State when State::state().
186194

187-
sync(State) ->
195+
sync(State=#qs{ last_disk_write = LastDiskWrite }) ->
188196
?DEBUG("~0p", [State]),
189-
flush_buffer(State, fun(_) -> ok end).
197+
case erlang:monotonic_time(millisecond) of
198+
%% Half the sync interval in rabbitmq_amqqueue_process.
199+
Now when Now > LastDiskWrite + 100 ->
200+
flush_buffer(State, fun(_) -> ok end);
201+
_ ->
202+
State
203+
end.
190204

191205
maybe_flush_buffer(State = #qs{ write_buffer_size = WriteBufferSize }) ->
192206
case WriteBufferSize >= max_cache_size() of
@@ -227,7 +241,8 @@ flush_buffer(State0 = #qs{ write_buffer = WriteBuffer }, FsyncFun) ->
227241
%% Finally we move the write_buffer to the cache.
228242
State#qs{ write_buffer = #{},
229243
write_buffer_size = 0,
230-
cache = WriteBuffer }.
244+
cache = WriteBuffer,
245+
last_disk_write = erlang:monotonic_time(millisecond) }.
231246

232247
flush_buffer_build(WriteBuffer = [{FirstSeqId, {Offset, _, _}}|_],
233248
CheckCRC32, SegmentEntryCount) ->
@@ -548,7 +563,8 @@ delete_segments(Segments, State0 = #qs{ write_buffer = WriteBuffer0,
548563
end
549564
end, {#{}, WriteBufferSize0}, WriteBuffer0),
550565
State#qs{ write_buffer = WriteBuffer,
551-
write_buffer_size = WriteBufferSize }.
566+
write_buffer_size = WriteBufferSize,
567+
last_disk_write = erlang:monotonic_time(millisecond) }.
552568

553569
%% ----
554570
%%

0 commit comments

Comments
 (0)