Skip to content

Commit 7406ed2

Browse files
authored
Merge pull request #488 from rabbitmq/sparse-read-fd
Fix segment read bug when segment has been appended to
2 parents 9fbb2e0 + ed27d3c commit 7406ed2

File tree

6 files changed

+125
-39
lines changed

6 files changed

+125
-39
lines changed

src/ra_log_reader.erl

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -224,9 +224,16 @@ exec_read_plan(Dir, Plan, Open0, TransformFun, Acc0)
224224
end,
225225
lists:foldl(
226226
fun ({Idxs, BaseName}, {Acc1, Open1}) ->
227-
{Seg, Open} = get_segment_ext(Dir, Open1, BaseName),
228-
{_, Acc} = ra_log_segment:read_sparse(Seg, Idxs, Fun, Acc1),
229-
{Acc, Open}
227+
{Seg, Open2} = get_segment_ext(Dir, Open1, BaseName),
228+
case ra_log_segment:read_sparse(Seg, Idxs, Fun, Acc1) of
229+
{ok, _, Acc} ->
230+
{Acc, Open2};
231+
{error, modified} ->
232+
{_, Open3} = ra_flru:evict(BaseName, Open2),
233+
{SegNew, Open} = get_segment_ext(Dir, Open3, BaseName),
234+
{ok, _, Acc} = ra_log_segment:read_sparse(SegNew, Idxs, Fun, Acc1),
235+
{Acc, Open}
236+
end
230237
end, {Acc0, Open0}, Plan).
231238

232239
-spec fetch_term(ra_index(), state()) -> {option(ra_index()), state()}.
@@ -335,7 +342,7 @@ segment_sparse_read(#?STATE{segment_refs = SegRefs,
335342
lists:foldl(
336343
fun ({Idxs, Fn}, {Open0, C, En0}) ->
337344
{Seg, Open} = get_segment(Cfg, Open0, Fn),
338-
{ReadSparseCount, Entries} =
345+
{ok, ReadSparseCount, Entries} =
339346
ra_log_segment:read_sparse(Seg, Idxs,
340347
fun (I, T, B, Acc) ->
341348
[{I, T, binary_to_term(B)} | Acc]

src/ra_log_segment.erl

Lines changed: 50 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
append/4,
1313
sync/1,
1414
fold/6,
15+
is_modified/1,
1516
read_sparse/4,
1617
term_query/2,
1718
close/1,
@@ -27,6 +28,8 @@
2728

2829
-include("ra.hrl").
2930

31+
-include_lib("kernel/include/file.hrl").
32+
3033
-define(VERSION, 2).
3134
-define(MAGIC, "RASG").
3235
-define(HEADER_SIZE, 4 + (16 div 8) + (16 div 8)).
@@ -112,6 +115,7 @@ open(Filename, Options) ->
112115
end.
113116

114117
process_file(true, Mode, Filename, Fd, Options) ->
118+
AccessPattern = maps:get(access_pattern, Options, random),
115119
case read_header(Fd) of
116120
{ok, Version, MaxCount} ->
117121
MaxPending = maps:get(max_pending, Options, ?SEGMENT_MAX_PENDING),
@@ -120,7 +124,6 @@ process_file(true, Mode, Filename, Fd, Options) ->
120124
{NumIndexRecords, DataOffset, Range, Index} =
121125
recover_index(Fd, Version, MaxCount),
122126
IndexOffset = ?HEADER_SIZE + NumIndexRecords * IndexRecordSize,
123-
AccessPattern = maps:get(access_pattern, Options, random),
124127
Mode = maps:get(mode, Options, append),
125128
ComputeChecksums = maps:get(compute_checksums, Options, true),
126129
{ok, #state{cfg = #cfg{version = Version,
@@ -184,16 +187,15 @@ append(#state{cfg = #cfg{max_pending = PendingCount},
184187
append(#state{cfg = #cfg{version = Version,
185188
mode = append} = Cfg,
186189
index_offset = IndexOffset,
187-
data_start = DataStart,
188190
data_offset = DataOffset,
189191
range = Range0,
190192
pending_count = PendCnt,
191193
pending_index = IdxPend0,
192194
pending_data = DataPend0} = State,
193195
Index, Term, {Length, Data}) ->
194-
% check if file is full
195-
case IndexOffset < DataStart of
196-
true ->
196+
197+
case is_full(State) of
198+
false ->
197199
% TODO: check length is less than #FFFFFFFF ??
198200
Checksum = compute_checksum(Cfg, Data),
199201
OSize = offset_size(Version),
@@ -209,7 +211,7 @@ append(#state{cfg = #cfg{version = Version,
209211
pending_data = [DataPend0, Data],
210212
pending_count = PendCnt + 1}
211213
};
212-
false ->
214+
true ->
213215
{error, full}
214216
end;
215217
append(State, Index, Term, Data)
@@ -271,38 +273,58 @@ fold(#state{cfg = #cfg{mode = read} = Cfg,
271273
FromIdx, ToIdx, Fun, AccFun, Acc) ->
272274
fold0(Cfg, Cache, FromIdx, ToIdx, Index, Fun, AccFun, Acc).
273275

276+
-spec is_modified(state()) -> boolean().
277+
is_modified(#state{cfg = #cfg{fd = Fd},
278+
data_offset = DataOffset} = State) ->
279+
case is_full(State) of
280+
true ->
281+
%% a full segment cannot be appended to.
282+
false;
283+
false ->
284+
%% get info and compare to data_offset
285+
{ok, #file_info{size = Size}} = prim_file:read_handle_info(Fd),
286+
Size > DataOffset
287+
end.
288+
274289
-spec read_sparse(state(), [ra_index()],
275290
fun((ra:index(), ra_term(), binary(), Acc) -> Acc),
276291
Acc) ->
277-
{NumRead :: non_neg_integer(), Acc}
292+
{ok, NumRead :: non_neg_integer(), Acc} | {error, modified}
278293
when Acc :: term().
279294
read_sparse(#state{index = Index,
280-
cfg = Cfg}, Indexes, AccFun, Acc) ->
281-
Cache0 = prepare_cache(Cfg, Indexes, Index),
282-
read_sparse0(Cfg, Indexes, Index, Cache0, Acc, AccFun, 0).
295+
cfg = #cfg{fd = Fd}} = State,
296+
Indexes, AccFun, Acc) ->
297+
case is_modified(State) of
298+
true ->
299+
{error, modified};
300+
false ->
301+
Cache0 = prepare_cache(Fd, Indexes, Index),
302+
read_sparse0(Fd, Indexes, Index, Cache0, Acc, AccFun, 0)
303+
end.
283304

284-
read_sparse0(_Cfg, [], _Index, _Cache, Acc, _AccFun, Num) ->
285-
{Num, Acc};
286-
read_sparse0(Cfg, [NextIdx | Rem] = Indexes, Index, Cache0, Acc, AccFun, Num)
305+
read_sparse0(_Fd, [], _Index, _Cache, Acc, _AccFun, Num) ->
306+
{ok, Num, Acc};
307+
read_sparse0(Fd, [NextIdx | Rem] = Indexes, Index, Cache0, Acc, AccFun, Num)
287308
when is_map_key(NextIdx, Index) ->
288-
{Term, Offset, Length, _} = map_get(NextIdx, Index),
289-
case cache_read(Cache0, Offset, Length) of
309+
{Term, Pos, Length, _} = map_get(NextIdx, Index),
310+
case cache_read(Cache0, Pos, Length) of
290311
false ->
291-
case prepare_cache(Cfg, Indexes, Index) of
312+
case prepare_cache(Fd, Indexes, Index) of
292313
undefined ->
293-
{ok, Data, _} = pread(Cfg, undefined, Offset, Length),
294-
read_sparse0(Cfg, Rem, Index, undefined,
314+
%% TODO: check for partial data?
315+
{ok, Data} = file:pread(Fd, Pos, Length),
316+
read_sparse0(Fd, Rem, Index, undefined,
295317
AccFun(NextIdx, Term, Data, Acc),
296318
AccFun, Num+1);
297319
Cache ->
298-
read_sparse0(Cfg, Indexes, Index, Cache,
299-
Acc, AccFun, Num+1)
320+
read_sparse0(Fd, Indexes, Index, Cache,
321+
Acc, AccFun, Num)
300322
end;
301323
Data ->
302-
read_sparse0(Cfg, Rem, Index, Cache0,
324+
read_sparse0(Fd, Rem, Index, Cache0,
303325
AccFun(NextIdx, Term, Data, Acc), AccFun, Num+1)
304326
end;
305-
read_sparse0(_Cfg, [NextIdx | _], _Index, _Cache, _Acc, _AccFun, _Num) ->
327+
read_sparse0(_Fd, [NextIdx | _], _Index, _Cache, _Acc, _AccFun, _Num) ->
306328
exit({missing_key, NextIdx}).
307329

308330
cache_read({CPos, CLen, Bin}, Pos, Length)
@@ -313,9 +335,9 @@ cache_read({CPos, CLen, Bin}, Pos, Length)
313335
cache_read(_, _, _) ->
314336
false.
315337

316-
prepare_cache(#cfg{} = _Cfg, [_], _SegIndex) ->
338+
prepare_cache(_Fd, [_], _SegIndex) ->
317339
undefined;
318-
prepare_cache(#cfg{fd = Fd} = _Cfg, [FirstIdx | Rem], SegIndex) ->
340+
prepare_cache(Fd, [FirstIdx | Rem], SegIndex) ->
319341
case consec_run(FirstIdx, FirstIdx, Rem) of
320342
{Idx, Idx} ->
321343
%% no run, no cache;
@@ -622,6 +644,10 @@ validate_checksum(0, _) ->
622644
validate_checksum(Crc, Data) ->
623645
Crc == erlang:crc32(Data).
624646

647+
is_full(#state{index_offset = IndexOffset,
648+
data_start = DataStart}) ->
649+
IndexOffset >= DataStart.
650+
625651
-ifdef(TEST).
626652
-include_lib("eunit/include/eunit.hrl").
627653

src/ra_server.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1728,7 +1728,7 @@ machine_query(QueryFun, #{cfg := #cfg{effective_machine_module = MacMod},
17281728
become(leader, OldRaftState, #{cluster := Cluster,
17291729
cluster_change_permitted := CCP0,
17301730
log := Log0} = State) ->
1731-
Log = ra_log:release_resources(maps:size(Cluster) + 2, random, Log0),
1731+
Log = ra_log:release_resources(maps:size(Cluster), sequential, Log0),
17321732
CCP = case OldRaftState of
17331733
await_condition ->
17341734
CCP0;

test/ra_log_2_SUITE.erl

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ all_tests() ->
5656
transient_writer_is_handled,
5757
read_opt,
5858
sparse_read,
59+
read_plan_modified,
5960
read_plan,
6061
sparse_read_out_of_range,
6162
sparse_read_out_of_range_2,
@@ -481,6 +482,21 @@ sparse_read(Config) ->
481482
{99, _, _}], _LogO3} = ra_log:sparse_read([1000,5,99], LogO2),
482483
ok.
483484

485+
read_plan_modified(Config) ->
486+
Log0 = ra_log_init(Config),
487+
Log1 = write_and_roll(1, 2, 1, Log0, 50),
488+
Log2 = deliver_all_log_events(Log1, 100),
489+
Plan = ra_log:partial_read([1], Log2, fun (_, _, Cmd) -> Cmd end),
490+
{#{1 := _}, Flru} = ra_log_read_plan:execute(Plan, undefined),
491+
492+
Log = deliver_all_log_events(write_and_roll(2, 3, 1, Log2, 50), 100),
493+
Plan2 = ra_log:partial_read([1,2], Log, fun (_, _, Cmd) -> Cmd end),
494+
%% assert we can read the newly appended item with the cached
495+
%% segment
496+
{#{1 := _, 2 := _}, _} = ra_log_read_plan:execute(Plan2, Flru),
497+
ra_log:close(Log),
498+
ok.
499+
484500
read_plan(Config) ->
485501
Num = 256 * 2,
486502
Div = 2,

test/ra_log_segment_SUITE.erl

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ all_tests() ->
3232
overwrite,
3333
term_query,
3434
write_many,
35+
read_sparse_append_read,
3536
open_invalid,
3637
corrupted_segment,
3738
large_segment,
@@ -81,6 +82,7 @@ corrupted_segment(Config) ->
8182
%% ct:pal("DUMP PRE ~p", [ra_log_segment:dump_index(Fn)]),
8283
%% check that the current state throws a missing key
8384
{ok, SegR0} = ra_log_segment:open(Fn, #{mode => read}),
85+
?assertNot(ra_log_segment:is_modified(SegR0)),
8486
?assertExit({missing_key, 2},
8587
read_sparse(SegR0, [1, 2])),
8688

@@ -210,11 +212,13 @@ segref(Config) ->
210212
full_file(Config) ->
211213
Dir = ?config(data_dir, Config),
212214
Fn = filename:join(Dir, "seg1.seg"),
213-
Data = make_data(1024),
214-
{ok, Seg0} = ra_log_segment:open(Fn, #{max_count => 2}),
215+
Data = make_data(10),
216+
{ok, Seg0} = ra_log_segment:open(Fn, #{max_count => 2,
217+
max_pending => 1}),
215218
{ok, Seg1} = ra_log_segment:append(Seg0, 1, 2, Data),
216219
{ok, Seg} = ra_log_segment:append(Seg1, 2, 2, Data),
217220
{error, full} = ra_log_segment:append(Seg, 3, 2, Data),
221+
?assertNot(ra_log_segment:is_modified(Seg)),
218222
{1,2} = ra_log_segment:range(Seg),
219223
ok = ra_log_segment:close(Seg),
220224
ok.
@@ -396,6 +400,39 @@ write_many(Config) ->
396400
ct:pal("~p", [Result]),
397401
ok.
398402

403+
404+
read_sparse_append_read(Config) ->
405+
Dir = ?config(data_dir, Config),
406+
Fn = filename:join(Dir, <<"0000000.segment">>),
407+
{ok, W0} = ra_log_segment:open(Fn, #{}),
408+
Data = <<"banana">>,
409+
Term = 1,
410+
%% write two entries in term 1
411+
{ok, W1} = ra_log_segment:append(W0, 1, Term, Data),
412+
{ok, W2} = ra_log_segment:append(W1, 2, Term, Data),
413+
{ok, W3} = ra_log_segment:flush(W2),
414+
415+
416+
{ok, R0} = ra_log_segment:open(Fn, #{mode => read}),
417+
{ok, 2, [_, _]} = ra_log_segment:read_sparse(R0, [1, 2],
418+
fun (I, _, _, Acc) ->
419+
[I | Acc]
420+
end, []),
421+
422+
?assertNot(ra_log_segment:is_modified(R0)),
423+
%% overwrite in term 2
424+
{ok, W4} = ra_log_segment:append(W3, 2, 2, <<"apple">>),
425+
{ok, W5} = ra_log_segment:append(W4, 3, 2, <<"apple">>),
426+
{ok, W} = ra_log_segment:flush(W5),
427+
?assert(ra_log_segment:is_modified(R0)),
428+
{error, modified} = ra_log_segment:read_sparse(R0, [2],
429+
fun (_I, _, B, Acc) ->
430+
[B | Acc]
431+
end, []),
432+
ra_log_segment:close(W),
433+
ra_log_segment:close(R0),
434+
ok.
435+
399436
write_until_full(Idx, Term, Data, Seg0) ->
400437
case ra_log_segment:append(Seg0, Idx, Term, Data) of
401438
{ok, Seg} ->
@@ -410,8 +447,8 @@ make_data(Size) ->
410447
term_to_binary(crypto:strong_rand_bytes(Size)).
411448

412449
read_sparse(R, Idxs) ->
413-
{_, Entries} = ra_log_segment:read_sparse(R, Idxs,
414-
fun (I, T, B, Acc) ->
415-
[{I, T, B} | Acc]
416-
end, []),
450+
{ok, _, Entries} = ra_log_segment:read_sparse(R, Idxs,
451+
fun (I, T, B, Acc) ->
452+
[{I, T, B} | Acc]
453+
end, []),
417454
lists:reverse(Entries).

test/ra_log_segment_writer_SUITE.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -800,10 +800,10 @@ segments_for(UId, DataDir) ->
800800
SegFiles.
801801

802802
read_sparse(R, Idxs) ->
803-
{_, Entries} = ra_log_segment:read_sparse(R, Idxs,
804-
fun(I, T, B, Acc) ->
805-
[{I, T, B} | Acc]
806-
end, []),
803+
{ok, _, Entries} = ra_log_segment:read_sparse(R, Idxs,
804+
fun(I, T, B, Acc) ->
805+
[{I, T, B} | Acc]
806+
end, []),
807807
lists:reverse(Entries).
808808

809809
get_names(System) when is_atom(System) ->

0 commit comments

Comments
 (0)