Skip to content

Commit 35dc709

Browse files
committed
Make file_advise configurable for read plan executors
1 parent 0cbd28b commit 35dc709

File tree

5 files changed

+56
-29
lines changed

5 files changed

+56
-29
lines changed

src/ra_log.erl

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
fold/5,
2222
sparse_read/2,
2323
partial_read/3,
24-
execute_read_plan/3,
24+
execute_read_plan/4,
2525
read_plan_info/1,
2626
last_index_term/1,
2727
set_last_index/2,
@@ -210,7 +210,7 @@ init(#{uid := UId,
210210
Curr -> Curr
211211
end,
212212

213-
AccessPattern = maps:get(initial_access_pattern, Conf, random),
213+
AccessPattern = maps:get(initial_access_pattern, Conf, sequential),
214214
{ok, Mt0} = ra_log_ets:mem_table_please(Names, UId),
215215
% recover current range and any references to segments
216216
% this queries the segment writer and thus blocks until any
@@ -558,12 +558,15 @@ partial_read(Indexes0, #?MODULE{cfg = Cfg,
558558

559559

560560
-spec execute_read_plan(read_plan(), undefined | ra_flru:state(),
561-
TransformFun :: transform_fun()) ->
561+
TransformFun :: transform_fun(),
562+
ra_log_reader:read_plan_options()) ->
562563
{#{ra_index() => Command :: term()}, ra_flru:state()}.
563564
execute_read_plan(#read_plan{dir = Dir,
564565
read = Read,
565-
plan = Plan}, Flru0, TransformFun) ->
566-
ra_log_reader:exec_read_plan(Dir, Plan, Flru0, TransformFun, Read).
566+
plan = Plan}, Flru0, TransformFun,
567+
Options) ->
568+
ra_log_reader:exec_read_plan(Dir, Plan, Flru0, TransformFun,
569+
Options, Read).
567570

568571
-spec read_plan_info(read_plan()) -> map().
569572
read_plan_info(#read_plan{read = Read,

src/ra_log_read_plan.erl

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,22 @@
88

99

1010
-export([execute/2,
11+
execute/3,
1112
info/1]).
1213

1314
-spec execute(ra_log:read_plan(), undefined | ra_flru:state()) ->
1415
{#{ra:index() => Command :: term()}, ra_flru:state()}.
1516
execute(Plan, Flru) ->
16-
ra_log:execute_read_plan(Plan, Flru, fun ra_server:transform_for_partial_read/3).
17+
execute(Plan, Flru, #{access_pattern => random,
18+
file_advise => normal}).
19+
20+
-spec execute(ra_log:read_plan(), undefined | ra_flru:state(),
21+
ra_log_reader:read_plan_options()) ->
22+
{#{ra:index() => Command :: term()}, ra_flru:state()}.
23+
execute(Plan, Flru, Options) ->
24+
ra_log:execute_read_plan(Plan, Flru,
25+
fun ra_server:transform_for_partial_read/3,
26+
Options).
1727

1828
-spec info(ra_log:read_plan()) -> map().
1929
info(Plan) ->

src/ra_log_reader.erl

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
fold/5,
2323
sparse_read/3,
2424
read_plan/2,
25-
exec_read_plan/5,
25+
exec_read_plan/6,
2626
fetch_term/2
2727
]).
2828

@@ -47,11 +47,14 @@
4747

4848
-opaque state() :: #?STATE{}.
4949
-type read_plan() :: [{BaseName :: file:filename_all(), [ra:index()]}].
50+
-type read_plan_options() :: #{access_pattern => random | sequential,
51+
file_advise => ra_log_segment:posix_file_advise()}.
5052

5153

5254
-export_type([
5355
state/0,
54-
read_plan/0
56+
read_plan/0,
57+
read_plan_options/0
5558
]).
5659

5760
%% PUBLIC
@@ -209,28 +212,31 @@ read_plan(#?STATE{segment_refs = SegRefs}, Indexes) ->
209212
%% TODO: add counter for number of read plans requested
210213
segment_read_plan(SegRefs, Indexes, []).
211214

212-
-spec exec_read_plan(file:filename_all(), read_plan(), undefined | ra_flru:state(),
213-
TransformFun :: fun(),
215+
-spec exec_read_plan(file:filename_all(),
216+
read_plan(),
217+
undefined | ra_flru:state(),
218+
TransformFun :: fun((ra_index(), ra_term(), binary()) -> term()),
219+
read_plan_options(),
214220
#{ra_index() => Command :: term()}) ->
215221
{#{ra_index() => Command :: term()}, ra_flru:state()}.
216-
exec_read_plan(Dir, Plan, undefined, TransformFun, Acc0) ->
222+
exec_read_plan(Dir, Plan, undefined, TransformFun, Options, Acc0) ->
217223
Open = ra_flru:new(1, fun({_, Seg}) -> ra_log_segment:close(Seg) end),
218-
exec_read_plan(Dir, Plan, Open, TransformFun, Acc0);
219-
exec_read_plan(Dir, Plan, Open0, TransformFun, Acc0)
224+
exec_read_plan(Dir, Plan, Open, TransformFun, Options, Acc0);
225+
exec_read_plan(Dir, Plan, Open0, TransformFun, Options, Acc0)
220226
when is_list(Plan) ->
221227
Fun = fun (I, T, B, Acc) ->
222228
E = TransformFun(I, T, binary_to_term(B)),
223229
Acc#{I => E}
224230
end,
225231
lists:foldl(
226232
fun ({Idxs, BaseName}, {Acc1, Open1}) ->
227-
{Seg, Open2} = get_segment_ext(Dir, Open1, BaseName),
233+
{Seg, Open2} = get_segment_ext(Dir, Open1, BaseName, Options),
228234
case ra_log_segment:read_sparse(Seg, Idxs, Fun, Acc1) of
229235
{ok, _, Acc} ->
230236
{Acc, Open2};
231237
{error, modified} ->
232238
{_, Open3} = ra_flru:evict(BaseName, Open2),
233-
{SegNew, Open} = get_segment_ext(Dir, Open3, BaseName),
239+
{SegNew, Open} = get_segment_ext(Dir, Open3, BaseName, Options),
234240
{ok, _, Acc} = ra_log_segment:read_sparse(SegNew, Idxs, Fun, Acc1),
235241
{Acc, Open}
236242
end
@@ -382,15 +388,14 @@ get_segment(#cfg{directory = Dir,
382388
end
383389
end.
384390

385-
get_segment_ext(Dir, Open0, Fn) ->
391+
get_segment_ext(Dir, Open0, Fn, Options) ->
386392
case ra_flru:fetch(Fn, Open0) of
387393
{ok, S, Open1} ->
388394
{S, Open1};
389395
error ->
390396
AbsFn = filename:join(Dir, Fn),
391397
case ra_log_segment:open(AbsFn,
392-
#{mode => read,
393-
access_pattern => random})
398+
Options#{mode => read})
394399
of
395400
{ok, S} ->
396401
{S, ra_flru:insert(Fn, S, Open0)};

src/ra_log_segment.erl

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
fd :: option(file:io_device()),
5353
index_size :: pos_integer(),
5454
access_pattern :: sequential | random,
55+
file_advise = normal :: posix_file_advise(),
5556
mode = append :: read | append,
5657
compute_checksums = true :: boolean()}).
5758

@@ -70,14 +71,19 @@
7071
cache :: undefined | {non_neg_integer(), non_neg_integer(), binary()}
7172
}).
7273

74+
-type posix_file_advise() :: 'normal' | 'sequential' | 'random'
75+
| 'no_reuse' | 'will_need' | 'dont_need'.
76+
7377
-type ra_log_segment_options() :: #{max_count => non_neg_integer(),
7478
max_pending => non_neg_integer(),
7579
compute_checksums => boolean(),
7680
mode => append | read,
77-
access_pattern => sequential | random}.
81+
access_pattern => sequential | random,
82+
file_advise => posix_file_advise()}.
7883
-opaque state() :: #state{}.
7984

8085
-export_type([state/0,
86+
posix_file_advise/0,
8187
ra_log_segment_options/0]).
8288

8389
-spec open(Filename :: file:filename_all()) ->
@@ -116,9 +122,9 @@ open(Filename, Options) ->
116122

117123
process_file(true, Mode, Filename, Fd, Options) ->
118124
AccessPattern = maps:get(access_pattern, Options, random),
119-
if AccessPattern == random andalso
125+
FileAdvise = maps:get(file_advise, Options, normal),
126+
if FileAdvise == random andalso
120127
Mode == read ->
121-
%% this is a guess using the defaults
122128
Offs = maps:get(max_count, Options, ?SEGMENT_MAX_ENTRIES) * ?INDEX_RECORD_SIZE_V2,
123129
_ = file:advise(Fd, Offs, 0, random),
124130
ok;
@@ -133,7 +139,6 @@ process_file(true, Mode, Filename, Fd, Options) ->
133139
{NumIndexRecords, DataOffset, Range, Index} =
134140
recover_index(Fd, Version, MaxCount),
135141
IndexOffset = ?HEADER_SIZE + NumIndexRecords * IndexRecordSize,
136-
Mode = maps:get(mode, Options, append),
137142
ComputeChecksums = maps:get(compute_checksums, Options, true),
138143
{ok, #state{cfg = #cfg{version = Version,
139144
max_count = MaxCount,
@@ -142,6 +147,7 @@ process_file(true, Mode, Filename, Fd, Options) ->
142147
mode = Mode,
143148
index_size = IndexSize,
144149
access_pattern = AccessPattern,
150+
file_advise = FileAdvise,
145151
compute_checksums = ComputeChecksums,
146152
fd = Fd},
147153
data_start = ?HEADER_SIZE + IndexSize,
@@ -165,6 +171,7 @@ process_file(false, Mode, Filename, Fd, Options) ->
165171
ComputeChecksums = maps:get(compute_checksums, Options, true),
166172
IndexSize = MaxCount * ?INDEX_RECORD_SIZE_V2,
167173
ok = write_header(MaxCount, Fd),
174+
FileAdvise = maps:get(file_advise, Options, dont_need),
168175
{ok, #state{cfg = #cfg{version = ?VERSION,
169176
max_count = MaxCount,
170177
max_pending = MaxPending,
@@ -173,6 +180,7 @@ process_file(false, Mode, Filename, Fd, Options) ->
173180
index_size = IndexSize,
174181
fd = Fd,
175182
compute_checksums = ComputeChecksums,
183+
file_advise = FileAdvise,
176184
access_pattern = random},
177185
index_write_offset = ?HEADER_SIZE,
178186
index_offset = ?HEADER_SIZE,
@@ -431,13 +439,15 @@ is_same_as(#state{cfg = #cfg{filename = Fn0}}, Fn) ->
431439
is_same_filename_all(Fn0, Fn).
432440

433441
-spec close(state()) -> ok.
434-
close(#state{cfg = #cfg{fd = Fd, mode = append}} = State) ->
442+
close(#state{cfg = #cfg{fd = Fd,
443+
mode = append,
444+
file_advise = FileAdvise}} = State) ->
435445
% close needs to be defensive and idempotent so we ignore the return
436446
% values here
437447
_ = sync(State),
438448
case is_full(State) of
439449
true ->
440-
_ = file:advise(Fd, 0, 0, dont_need);
450+
_ = file:advise(Fd, 0, 0, FileAdvise);
441451
false ->
442452
ok
443453
end,
@@ -684,3 +694,4 @@ cache_length_test() ->
684694

685695
-endif.
686696

697+

src/ra_server.erl

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -409,12 +409,10 @@ recover(#{cfg := #cfg{log_id = LogId,
409409
FromScan = CommitIndex + 1,
410410
{ToScan, _} = ra_log:last_index_term(Log0),
411411
?DEBUG("~ts: scanning for cluster changes ~b:~b ", [LogId, FromScan, ToScan]),
412-
{State, Log1} = ra_log:fold(FromScan, ToScan,
413-
fun cluster_scan_fun/2,
414-
State1, Log0),
412+
{State, Log} = ra_log:fold(FromScan, ToScan,
413+
fun cluster_scan_fun/2,
414+
State1, Log0),
415415

416-
%% disable segment read cache by setting random access pattern
417-
Log = ra_log:release_resources(1, random, Log1),
418416
put_counter(Cfg, ?C_RA_SVR_METRIC_COMMIT_LATENCY, 0),
419417
State#{log => Log,
420418
%% reset commit latency as recovery may calculate a very old value

0 commit comments

Comments
 (0)