Skip to content

Commit fd711b7

Browse files
committed
Fix issue with sparse reading a segment that is being modified.
When a sparse read detects a modified segment and re-initialises it is possible that the segment will again be modified before the second read attempt. It is not necessary in this case to check if the segment has been modified as the read plan would have been generated before the segment was re-initialised so this commit introduces a new ra_log_segment:read_sparse_no_checks function that skips the modified check to avoid a crash.
1 parent cc54084 commit fd711b7

File tree

4 files changed

+33
-10
lines changed

4 files changed

+33
-10
lines changed

src/ra_log_reader.erl

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,9 +235,18 @@ exec_read_plan(Dir, Plan, Open0, TransformFun, Options, Acc0)
235235
{ok, _, Acc} ->
236236
{Acc, Open2};
237237
{error, modified} ->
238+
%% if the segment has been modified since it was opened
239+
%% it is not safe to attempt the read as the read plan
240+
%% may refer to indexes that weren't in the segment at
241+
%% that time. In this case we evict all segments and
242+
%% re-open what we need.
238243
{_, Open3} = ra_flru:evict(BaseName, Open2),
239244
{SegNew, Open} = get_segment_ext(Dir, Open3, BaseName, Options),
240-
{ok, _, Acc} = ra_log_segment:read_sparse(SegNew, Idxs, Fun, Acc1),
245+
%% at this point we can read without checking for modification
246+
%% as the read plan would have been created before we
247+
%% read the index from the segment
248+
{ok, _, Acc} = ra_log_segment:read_sparse_no_checks(
249+
SegNew, Idxs, Fun, Acc1),
241250
{Acc, Open}
242251
end
243252
end, {Acc0, Open0}, Plan).

src/ra_log_segment.erl

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
fold/6,
1515
is_modified/1,
1616
read_sparse/4,
17+
read_sparse_no_checks/4,
1718
term_query/2,
1819
close/1,
1920
range/1,
@@ -299,7 +300,8 @@ is_modified(#state{cfg = #cfg{fd = Fd},
299300
false;
300301
false ->
301302
%% get info and compare to data_offset
302-
{ok, #file_info{size = Size}} = prim_file:read_handle_info(Fd),
303+
{ok, #file_info{size = Size}} =
304+
prim_file:read_handle_info(Fd, [posix]),
303305
Size > DataOffset
304306
end.
305307

@@ -308,17 +310,25 @@ is_modified(#state{cfg = #cfg{fd = Fd},
308310
Acc) ->
309311
{ok, NumRead :: non_neg_integer(), Acc} | {error, modified}
310312
when Acc :: term().
311-
read_sparse(#state{index = Index,
312-
cfg = #cfg{fd = Fd}} = State,
313-
Indexes, AccFun, Acc) ->
313+
read_sparse(#state{} = State, Indexes, AccFun, Acc) ->
314314
case is_modified(State) of
315315
true ->
316316
{error, modified};
317317
false ->
318-
Cache0 = prepare_cache(Fd, Indexes, Index),
319-
read_sparse0(Fd, Indexes, Index, Cache0, Acc, AccFun, 0)
318+
read_sparse_no_checks(State, Indexes, AccFun, Acc)
320319
end.
321320

321+
-spec read_sparse_no_checks(state(), [ra_index()],
322+
fun((ra:index(), ra_term(), binary(), Acc) -> Acc),
323+
Acc) ->
324+
{ok, NumRead :: non_neg_integer(), Acc}
325+
when Acc :: term().
326+
read_sparse_no_checks(#state{index = Index,
327+
cfg = #cfg{fd = Fd}},
328+
Indexes, AccFun, Acc) ->
329+
Cache0 = prepare_cache(Fd, Indexes, Index),
330+
read_sparse0(Fd, Indexes, Index, Cache0, Acc, AccFun, 0).
331+
322332
read_sparse0(_Fd, [], _Index, _Cache, Acc, _AccFun, Num) ->
323333
{ok, Num, Acc};
324334
read_sparse0(Fd, [NextIdx | Rem] = Indexes, Index, Cache0, Acc, AccFun, Num)

test/ra_log_2_SUITE.erl

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -489,11 +489,13 @@ read_plan_modified(Config) ->
489489
Plan = ra_log:partial_read([1], Log2, fun (_, _, Cmd) -> Cmd end),
490490
{#{1 := _}, Flru} = ra_log_read_plan:execute(Plan, undefined),
491491

492-
Log = deliver_all_log_events(write_and_roll(2, 3, 1, Log2, 50), 100),
493-
Plan2 = ra_log:partial_read([1,2], Log, fun (_, _, Cmd) -> Cmd end),
492+
Log3 = deliver_all_log_events(write_and_roll(2, 3, 1, Log2, 50), 100),
493+
Plan2 = ra_log:partial_read([1,2], Log3, fun (_, _, Cmd) -> Cmd end),
494494
%% assert we can read the newly appended item with the cached
495495
%% segment
496-
{#{1 := _, 2 := _}, _} = ra_log_read_plan:execute(Plan2, Flru),
496+
{#{1 := _, 2 := _}, Flru2} = ra_log_read_plan:execute(Plan2, Flru),
497+
Log = deliver_all_log_events(write_and_roll(3, 4, 1, Log3, 50), 100),
498+
{#{1 := _, 2 := _}, _} = ra_log_read_plan:execute(Plan2, Flru2),
497499
ra_log:close(Log),
498500
ok.
499501

test/ra_log_segment_SUITE.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,8 @@ full_file(Config) ->
220220
{ok, Seg} = ra_log_segment:append(Seg1, 2, 2, Data),
221221
{error, full} = ra_log_segment:append(Seg, 3, 2, Data),
222222
?assertNot(ra_log_segment:is_modified(Seg)),
223+
{ok, R} = ra_log_segment:open(Fn, #{mode => read}),
224+
?assertNot(ra_log_segment:is_modified(R)),
223225
{1,2} = ra_log_segment:range(Seg),
224226
ok = ra_log_segment:close(Seg),
225227
ok.

0 commit comments

Comments
 (0)