Skip to content

Commit db53b56

Browse files
committed
ra_log_reader -> ra_log_segments
1 parent 2363616 commit db53b56

File tree

3 files changed

+30
-30
lines changed

3 files changed

+30
-30
lines changed

src/ra_log.erl

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@
111111
current_snapshot :: option(ra_idxterm()),
112112
last_resend_time :: option({integer(), WalPid :: pid() | undefined}),
113113
last_wal_write :: {pid(), Ms :: integer()},
114-
reader :: ra_log_reader:state(),
114+
reader :: ra_log_segments:state(),
115115
mem_table :: ra_mt:state(),
116116
tx = false :: boolean(),
117117
pending = [] :: ra_seq:state(),
@@ -120,7 +120,7 @@
120120

121121
-record(read_plan, {dir :: file:filename_all(),
122122
read :: #{ra_index() := log_entry()},
123-
plan :: ra_log_reader:read_plan()}).
123+
plan :: ra_log_segments:read_plan()}).
124124

125125
-opaque read_plan() :: #read_plan{}.
126126
-opaque state() :: #?MODULE{}.
@@ -228,9 +228,9 @@ init(#{uid := UId,
228228
% segments it is currently processed have been finished
229229
MtRange = ra_mt:range(Mt0),
230230
SegRefs = my_segrefs(UId, SegWriter),
231-
Reader = ra_log_reader:init(UId, Dir, MaxOpen, AccessPattern, SegRefs,
231+
Reader = ra_log_segments:init(UId, Dir, MaxOpen, AccessPattern, SegRefs,
232232
Names, Counter),
233-
SegmentRange = ra_log_reader:range(Reader),
233+
SegmentRange = ra_log_segments:range(Reader),
234234
%% TODO: check ra_range:add/2 actually performas the correct logic we expect
235235
Range = ra_range:add(MtRange, SegmentRange),
236236

@@ -240,7 +240,7 @@ init(#{uid := UId,
240240
[LogId, SR]),
241241
catch prim_file:delete(filename:join(Dir, F))
242242
end
243-
|| {_, F} = SR <- SegRefs -- ra_log_reader:segment_refs(Reader)],
243+
|| {_, F} = SR <- SegRefs -- ra_log_segments:segment_refs(Reader)],
244244

245245
%% assert there is no gap between the snapshot
246246
%% and the first index in the log
@@ -291,7 +291,7 @@ init(#{uid := UId,
291291
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, LastIdx),
292292
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, LastIdx),
293293
put_counter(Cfg, ?C_RA_SVR_METRIC_NUM_SEGMENTS,
294-
ra_log_reader:segment_ref_count(Reader)),
294+
ra_log_segments:segment_ref_count(Reader)),
295295
case ra_snapshot:latest_checkpoint(SnapshotState) of
296296
undefined ->
297297
ok;
@@ -351,7 +351,7 @@ close(#?MODULE{cfg = #cfg{uid = _UId},
351351
reader = Reader}) ->
352352
% deliberately ignoring return value
353353
% close all open segments
354-
_ = ra_log_reader:close(Reader),
354+
_ = ra_log_segments:close(Reader),
355355
ok.
356356

357357
-spec begin_tx(state()) -> state().
@@ -545,11 +545,11 @@ fold(From0, To0, Fun, Acc0,
545545
MtOverlap = ra_mt:range_overlap({From, To}, Mt),
546546
case MtOverlap of
547547
{undefined, {RemStart, RemEnd}} ->
548-
{Reader, Acc} = ra_log_reader:fold(RemStart, RemEnd, Fun,
548+
{Reader, Acc} = ra_log_segments:fold(RemStart, RemEnd, Fun,
549549
Acc0, Reader0),
550550
{Acc, State#?MODULE{reader = Reader}};
551551
{{MtStart, MtEnd}, {RemStart, RemEnd}} ->
552-
{Reader, Acc1} = ra_log_reader:fold(RemStart, RemEnd, Fun,
552+
{Reader, Acc1} = ra_log_segments:fold(RemStart, RemEnd, Fun,
553553
Acc0, Reader0),
554554
Acc = ra_mt:fold(MtStart, MtEnd, Fun, Acc1, Mt),
555555
NumRead = MtEnd - MtStart + 1,
@@ -595,7 +595,7 @@ sparse_read(Indexes0, #?MODULE{cfg = Cfg,
595595
end, Indexes1),
596596
{Entries0, MemTblNumRead, Indexes} = ra_mt:get_items(Indexes2, Mt),
597597
ok = incr_counter(Cfg, ?C_RA_LOG_READ_MEM_TBL, MemTblNumRead),
598-
{Entries1, Reader} = ra_log_reader:sparse_read(Reader0, Indexes, Entries0),
598+
{Entries1, Reader} = ra_log_segments:sparse_read(Reader0, Indexes, Entries0),
599599
%% here we recover the original order of indexes
600600
Entries = case Sort of
601601
descending ->
@@ -658,21 +658,21 @@ partial_read(Indexes0, #?MODULE{cfg = Cfg,
658658
maps:put(I, TransformFun(I, T, Cmd), Acc)
659659
end, #{}, Entries0),
660660

661-
Plan = ra_log_reader:read_plan(Reader0, Indexes),
661+
Plan = ra_log_segments:read_plan(Reader0, Indexes),
662662
#read_plan{dir = Cfg#cfg.directory,
663663
read = Read,
664664
plan = Plan}.
665665

666666

667667
-spec execute_read_plan(read_plan(), undefined | ra_flru:state(),
668668
TransformFun :: transform_fun(),
669-
ra_log_reader:read_plan_options()) ->
669+
ra_log_segments:read_plan_options()) ->
670670
{#{ra_index() => Command :: term()}, ra_flru:state()}.
671671
execute_read_plan(#read_plan{dir = Dir,
672672
read = Read,
673673
plan = Plan}, Flru0, TransformFun,
674674
Options) ->
675-
ra_log_reader:exec_read_plan(Dir, Plan, Flru0, TransformFun,
675+
ra_log_segments:exec_read_plan(Dir, Plan, Flru0, TransformFun,
676676
Options, Read).
677677

678678
-spec read_plan_info(read_plan()) -> map().
@@ -781,10 +781,10 @@ handle_event({segments, TidRanges, NewSegs},
781781
reader = Reader0,
782782
pending = Pend0,
783783
mem_table = Mt0} = State0) ->
784-
{Reader, OverwrittenSegRefs} = ra_log_reader:update_segments(NewSegs, Reader0),
784+
{Reader, OverwrittenSegRefs} = ra_log_segments:update_segments(NewSegs, Reader0),
785785

786786
put_counter(Cfg, ?C_RA_SVR_METRIC_NUM_SEGMENTS,
787-
ra_log_reader:segment_ref_count(Reader)),
787+
ra_log_segments:segment_ref_count(Reader)),
788788
%% the tid ranges arrive in the reverse order they were written
789789
%% (new -> old) so we need to foldr here to process the oldest first
790790
Mt = lists:foldr(
@@ -824,10 +824,10 @@ handle_event({segments_to_be_deleted, SegRefs},
824824
counter = Counter,
825825
names = Names},
826826
reader = Reader} = State) ->
827-
ActiveSegs = ra_log_reader:segment_refs(Reader) -- SegRefs,
828-
#{max_size := MaxOpenSegments} = ra_log_reader:info(Reader),
827+
ActiveSegs = ra_log_segments:segment_refs(Reader) -- SegRefs,
828+
#{max_size := MaxOpenSegments} = ra_log_segments:info(Reader),
829829
% close all open segments
830-
ok = ra_log_reader:close(Reader),
830+
ok = ra_log_segments:close(Reader),
831831
?DEBUG("~ts: ~b obsolete segments - remaining: ~b",
832832
[LogId, length(SegRefs), length(ActiveSegs)]),
833833
%% open a new segment with the new max open segment value
@@ -836,7 +836,7 @@ handle_event({segments_to_be_deleted, SegRefs},
836836
|| {_, F} <- SegRefs],
837837
ok
838838
end,
839-
{State#?MODULE{reader = ra_log_reader:init(UId, Dir, MaxOpenSegments,
839+
{State#?MODULE{reader = ra_log_segments:init(UId, Dir, MaxOpenSegments,
840840
random,
841841
ActiveSegs, Names, Counter)},
842842

@@ -970,7 +970,7 @@ fetch_term(Idx, #?MODULE{mem_table = Mt,
970970
when ?IS_IN_RANGE(Idx, Range) ->
971971
case ra_mt:lookup_term(Idx, Mt) of
972972
undefined ->
973-
{Term, Reader} = ra_log_reader:fetch_term(Idx, Reader0),
973+
{Term, Reader} = ra_log_segments:fetch_term(Idx, Reader0),
974974
{Term, State0#?MODULE{reader = Reader}};
975975
Term when is_integer(Term) ->
976976
{Term, State0}
@@ -1172,7 +1172,7 @@ should_snapshot(snapshot, Idx,
11721172
% We should take a snapshot if the new snapshot index would allow us
11731173
% to discard any segments or if the we've handled enough commands
11741174
% since the last snapshot.
1175-
CanFreeSegments = case ra_log_reader:range(Reader) of
1175+
CanFreeSegments = case ra_log_segments:range(Reader) of
11761176
undefined ->
11771177
false;
11781178
{Start, _End} ->
@@ -1239,9 +1239,9 @@ overview(#?MODULE{range = Range,
12391239
last_index => LastIndex,
12401240
last_term => LastTerm,
12411241
last_written_index_term => LWIT,
1242-
num_segments => ra_log_reader:segment_ref_count(Reader),
1243-
segments_range => ra_log_reader:range(Reader),
1244-
open_segments => ra_log_reader:num_open_segments(Reader),
1242+
num_segments => ra_log_segments:segment_ref_count(Reader),
1243+
segments_range => ra_log_segments:range(Reader),
1244+
open_segments => ra_log_segments:num_open_segments(Reader),
12451245
snapshot_index => case CurrSnap of
12461246
undefined -> undefined;
12471247
{I, _} -> I
@@ -1313,12 +1313,12 @@ release_resources(MaxOpenSegments,
13131313
counter = Counter,
13141314
names = Names},
13151315
reader = Reader} = State) ->
1316-
ActiveSegs = ra_log_reader:segment_refs(Reader),
1316+
ActiveSegs = ra_log_segments:segment_refs(Reader),
13171317
% close all open segments
13181318
% deliberately ignoring return value
1319-
_ = ra_log_reader:close(Reader),
1319+
_ = ra_log_segments:close(Reader),
13201320
%% open a new segment with the new max open segment value
1321-
State#?MODULE{reader = ra_log_reader:init(UId, Dir, MaxOpenSegments,
1321+
State#?MODULE{reader = ra_log_segments:init(UId, Dir, MaxOpenSegments,
13221322
AccessPattern,
13231323
ActiveSegs, Names, Counter)}.
13241324

@@ -1330,7 +1330,7 @@ schedule_compaction(SnapIdx, #?MODULE{cfg = #cfg{uid = _UId,
13301330
segment_writer = _SegWriter},
13311331
live_indexes = LiveIndexes,
13321332
reader = Reader0}) ->
1333-
case ra_log_reader:segment_refs(Reader0) of
1333+
case ra_log_segments:segment_refs(Reader0) of
13341334
[] ->
13351335
[];
13361336
[_ | Compactable] ->

src/ra_log_read_plan.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ execute(Plan, Flru) ->
1818
file_advise => normal}).
1919

2020
-spec execute(ra_log:read_plan(), undefined | ra_flru:state(),
21-
ra_log_reader:read_plan_options()) ->
21+
ra_log_segments:read_plan_options()) ->
2222
{#{ra:index() => Command :: term()}, ra_flru:state()}.
2323
execute(Plan, Flru, Options) ->
2424
ra_log:execute_read_plan(Plan, Flru,

src/ra_log_reader.erl renamed to src/ra_log_segments.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
%%
55
%% Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries.
66
%%
7-
-module(ra_log_reader).
7+
-module(ra_log_segments).
88

99
-compile(inline_list_funcs).
1010

0 commit comments

Comments
 (0)