Skip to content

Commit 56d9665

Browse files
Add function that returns several landmark offsets
1 parent 2c92e61 commit 56d9665

File tree

2 files changed

+311
-2
lines changed

2 files changed

+311
-2
lines changed

src/osiris_log.erl

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@
5757
directory/1,
5858
delete_directory/1,
5959
counter_fields/0,
60+
stream_offset_landmarks/1,
61+
last_offset_and_timestamp/1,
6062
make_counter/1,
6163
generate_log/4]).
6264

@@ -3504,6 +3506,210 @@ write_in_chunks(ToWrite, MsgsPerChunk, Msg, W0) when ToWrite > 0 ->
35043506
write_in_chunks(_, _, _, W) ->
35053507
W.
35063508

3509+
%% Scans all index files for the log at Dir and returns the first chunk
3510+
%% (offset + timestamp), last chunk (offset + timestamp), and the chunk
3511+
%% closest to 25%, 50% and 75% of the offset range (with offset and
3512+
%% timestamp). Percent positions may not fall on a chunk boundary, so
3513+
%% the chunk with the closest offset is chosen.
3514+
-spec stream_offset_landmarks(file:filename_all() | config()) ->
3515+
{ok, #{first => {offset(), osiris:timestamp()},
3516+
last => {offset(), osiris:timestamp()},
3517+
p25 => {offset(), osiris:timestamp()},
3518+
p50 => {offset(), osiris:timestamp()},
3519+
p75 => {offset(), osiris:timestamp()}}} |
3520+
{error, empty}.
3521+
stream_offset_landmarks(#{dir := Dir}) ->
3522+
stream_offset_landmarks(Dir);
3523+
stream_offset_landmarks(Dir) when ?IS_STRING(Dir) ->
3524+
IdxFiles = sorted_index_files(Dir),
3525+
case scan_index_chunks_files(IdxFiles, []) of
3526+
{ok, []} ->
3527+
{error, empty};
3528+
{ok, [One]} ->
3529+
{LastOff, LastTs} =
3530+
case last_offset_and_timestamp_from_files(IdxFiles) of
3531+
{ok, L} -> L;
3532+
_ -> One
3533+
end,
3534+
{ok, #{first => One,
3535+
last => {LastOff, LastTs},
3536+
p25 => One,
3537+
p50 => One,
3538+
p75 => One}};
3539+
{ok, Chunks} ->
3540+
First = hd(Chunks),
3541+
LastChunk = lists:last(Chunks),
3542+
{FirstOffset, _FirstTs} = First,
3543+
{LastChunkId, _LastChunkTs} = LastChunk,
3544+
Last = case last_offset_and_timestamp_from_files(IdxFiles) of
3545+
{ok, L} -> L;
3546+
_ -> LastChunk
3547+
end,
3548+
Range = LastChunkId - FirstOffset,
3549+
Targets = case Range of
3550+
0 ->
3551+
[FirstOffset, FirstOffset, FirstOffset];
3552+
_ ->
3553+
[FirstOffset + (Range * 25) div 100,
3554+
FirstOffset + (Range * 50) div 100,
3555+
FirstOffset + (Range * 75) div 100]
3556+
end,
3557+
[P25, P50, P75] = closest_chunks_to_targets(Chunks, Targets),
3558+
{ok, #{first => First,
3559+
last => Last,
3560+
p25 => P25,
3561+
p50 => P50,
3562+
p75 => P75}}
3563+
end.
3564+
3565+
%% Returns {ok, {LastOffset, Timestamp}} where LastOffset is the very last
3566+
%% offset in the log (last offset in the last chunk), not the last chunk's
3567+
%% first offset. Timestamp is the last chunk's timestamp.
3568+
-spec last_offset_and_timestamp(file:filename_all()) ->
3569+
{ok, {offset(), osiris:timestamp()}} | {error, empty}.
3570+
last_offset_and_timestamp(Dir) ->
3571+
last_offset_and_timestamp_from_files(sorted_index_files(Dir)).
3572+
3573+
last_offset_and_timestamp_from_files(IdxFiles) ->
3574+
case non_empty_index_files(IdxFiles) of
3575+
[] ->
3576+
{error, empty};
3577+
NonEmpty ->
3578+
LastIdxFile = lists:last(NonEmpty),
3579+
last_offset_and_timestamp_from_file(LastIdxFile)
3580+
end.
3581+
3582+
last_offset_and_timestamp_from_file(LastIdxFile) ->
3583+
case file:open(LastIdxFile, [read, raw, binary]) of
3584+
{ok, IdxFd} ->
3585+
try
3586+
case position_at_idx_record_boundary(IdxFd, eof) of
3587+
{ok, Pos} when Pos >= ?IDX_HEADER_SIZE + ?INDEX_RECORD_SIZE_B ->
3588+
ReadPos = Pos - ?INDEX_RECORD_SIZE_B,
3589+
case file:pread(IdxFd, ReadPos, ?INDEX_RECORD_SIZE_B) of
3590+
{ok, <<ChunkId:64/unsigned,
3591+
IdxTs:64/signed,
3592+
_Epoch:64/unsigned,
3593+
FilePos:32/unsigned,
3594+
_ChType:8/unsigned>>}
3595+
when ChunkId =/= 0 orelse IdxTs =/= 0 ->
3596+
SegFile = segment_from_index_file(LastIdxFile),
3597+
case file:open(SegFile, [read, raw, binary]) of
3598+
{ok, SegFd} ->
3599+
try
3600+
case file:pread(SegFd, FilePos, ?HEADER_SIZE_B) of
3601+
{ok, <<_:32,
3602+
NumRecords:32/unsigned,
3603+
SegTs:64/signed,
3604+
_/binary>>} ->
3605+
LastOffset = ChunkId + NumRecords - 1,
3606+
Ts = if IdxTs < 1000000000000 -> SegTs;
3607+
true -> IdxTs
3608+
end,
3609+
{ok, {LastOffset, Ts}};
3610+
_ ->
3611+
{ok, {ChunkId, IdxTs}}
3612+
end
3613+
after
3614+
file:close(SegFd)
3615+
end;
3616+
_ ->
3617+
{ok, {ChunkId, IdxTs}}
3618+
end;
3619+
_ ->
3620+
{error, empty}
3621+
end;
3622+
_ ->
3623+
{error, empty}
3624+
end
3625+
after
3626+
file:close(IdxFd)
3627+
end;
3628+
_ ->
3629+
{error, empty}
3630+
end.
3631+
3632+
scan_index_chunks_files([], Acc) ->
3633+
{ok, lists:reverse(Acc)};
3634+
scan_index_chunks_files([IdxFile | Rest], Acc) ->
3635+
case scan_one_index_file(IdxFile) of
3636+
{ok, Chunks} ->
3637+
scan_index_chunks_files(Rest, lists:reverse(Chunks) ++ Acc);
3638+
{error, _} = Err ->
3639+
Err
3640+
end.
3641+
3642+
scan_one_index_file(IdxFile) ->
3643+
case file:open(IdxFile, [read, raw, binary]) of
3644+
{ok, Fd} ->
3645+
try
3646+
{ok, _} = file:position(Fd, ?IDX_HEADER_SIZE),
3647+
scan_index_records(Fd, [])
3648+
after
3649+
_ = file:close(Fd)
3650+
end;
3651+
Err ->
3652+
Err
3653+
end.
3654+
3655+
scan_index_records(Fd, Acc) ->
3656+
case file:read(Fd, ?INDEX_RECORD_SIZE_B) of
3657+
{ok, <<ChunkId:64/unsigned,
3658+
Timestamp:64/signed,
3659+
_Epoch:64/unsigned,
3660+
_FilePos:32/unsigned,
3661+
_ChType:8/unsigned>>} when ChunkId =/= 0 orelse Timestamp =/= 0 ->
3662+
scan_index_records(Fd, [{ChunkId, Timestamp} | Acc]);
3663+
{ok, ?ZERO_IDX_MATCH(_)} ->
3664+
scan_index_records(Fd, Acc);
3665+
{ok, _} ->
3666+
scan_index_records(Fd, Acc);
3667+
eof ->
3668+
{ok, lists:reverse(Acc)}
3669+
end.
3670+
3671+
%% Returns [chunk closest to T25, to T50, to T75]. Chunks are ordered by offset.
3672+
%% Uses binary search per target for O(log n) lookups after O(n) list-to-tuple.
3673+
closest_chunks_to_targets(Chunks, [T25, T50, T75]) ->
3674+
Tuple = list_to_tuple(Chunks),
3675+
[closest_to_target(Tuple, T25),
3676+
closest_to_target(Tuple, T50),
3677+
closest_to_target(Tuple, T75)].
3678+
3679+
%% First 1-based index i such that element(i, Tuple) has offset >= Target,
3680+
%% or tuple_size(Tuple) + 1 if all offsets are < Target.
3681+
find_first_ge(Tuple, Target, Low, High) when Low < High ->
3682+
Mid = (Low + High) div 2,
3683+
{O, _} = element(Mid, Tuple),
3684+
if O >= Target -> find_first_ge(Tuple, Target, Low, Mid);
3685+
true -> find_first_ge(Tuple, Target, Mid + 1, High)
3686+
end;
3687+
find_first_ge(Tuple, Target, Low, _High) ->
3688+
{O, _} = element(Low, Tuple),
3689+
if O >= Target -> Low; true -> Low + 1 end.
3690+
3691+
find_first_ge(Tuple, Target) ->
3692+
Size = tuple_size(Tuple),
3693+
find_first_ge(Tuple, Target, 1, Size).
3694+
3695+
%% Chunk in Tuple whose offset is closest to Target (Chunks ordered by offset).
3696+
closest_to_target(Tuple, Target) ->
3697+
Size = tuple_size(Tuple),
3698+
Idx = find_first_ge(Tuple, Target),
3699+
if Idx =< 1 ->
3700+
element(1, Tuple);
3701+
Idx > Size ->
3702+
element(Size, Tuple);
3703+
true ->
3704+
C1 = element(Idx, Tuple),
3705+
C2 = element(Idx - 1, Tuple),
3706+
{O1, _} = C1,
3707+
{O2, _} = C2,
3708+
if abs(O1 - Target) =< abs(O2 - Target) -> C1;
3709+
true -> C2
3710+
end
3711+
end.
3712+
35073713
-ifdef(TEST).
35083714
-include_lib("eunit/include/eunit.hrl").
35093715

test/osiris_log_SUITE.erl

Lines changed: 105 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,12 @@ all_tests() ->
100100
read_ahead_send_file_on_off,
101101
resolve_offset_spec_empty,
102102
resolve_offset_spec_empty_directory,
103-
resolve_offset_spec
103+
resolve_offset_spec,
104+
stream_offset_landmarks_empty,
105+
stream_offset_landmarks_single_chunk,
106+
stream_offset_landmarks_multiple_chunks,
107+
stream_offset_landmarks_percentiles,
108+
stream_offset_landmarks_config_map
104109
].
105110

106111
groups() ->
@@ -2049,6 +2054,104 @@ overview_with_missing_index_at_start(Config) ->
20492054
filename:join(?config(dir, Config), "*.index")))),
20502055
ok.
20512056

2057+
stream_offset_landmarks_empty(Config) ->
2058+
%% Empty log (init but no writes) and non-existent directory return {error, empty}.
2059+
LDir = ?config(leader_dir, Config),
2060+
Log0 = seed_log(LDir, [], Config),
2061+
osiris_log:close(Log0),
2062+
?assertEqual({error, empty}, osiris_log:stream_offset_landmarks(LDir)),
2063+
NonExistent = filename:join(?config(priv_dir, Config), "stream_offset_landmarks_empty_nonexistent"),
2064+
?assertEqual({error, empty}, osiris_log:stream_offset_landmarks(NonExistent)),
2065+
ok.
2066+
2067+
stream_offset_landmarks_single_chunk(Config) ->
2068+
%% Single chunk: first, last, p25, p50, p75 all equal. last is the last
2069+
%% message offset (same as first when the only chunk has one record).
2070+
Now = now_ms(),
2071+
FirstTs = Now - 10000,
2072+
EpochChunks = [{2, FirstTs, [<<"one">>, <<"two">>]}],
2073+
LDir = ?config(leader_dir, Config),
2074+
Log0 = seed_log(LDir, EpochChunks, Config),
2075+
osiris_log:close(Log0),
2076+
{ok, Landmarks} = osiris_log:stream_offset_landmarks(LDir),
2077+
?assertMatch(#{first := {0, FirstTs},
2078+
last := {1, FirstTs},
2079+
p25 := {0, FirstTs},
2080+
p50 := {0, FirstTs},
2081+
p75 := {0, FirstTs}}, Landmarks),
2082+
ok.
2083+
2084+
stream_offset_landmarks_multiple_chunks(Config) ->
2085+
%% Multiple chunks: first < p25 <= p50 <= p75 < last (by offset). last is
2086+
%% the very last message offset in the log (last offset in the last chunk),
2087+
%% not the last chunk's first offset. Last chunk here has 2 records -> 5.
2088+
Now = now_ms(),
2089+
FirstTs = Now - 10000,
2090+
LastTs = Now - 3000,
2091+
EpochChunks =
2092+
[{1, FirstTs, [<<"one">>]},
2093+
{1, Now - 8000, [<<"two">>]},
2094+
{2, Now - 5000, [<<"three">>, <<"four">>]},
2095+
{2, LastTs, [<<"five">>, <<"six">>]}],
2096+
2097+
LDir = ?config(leader_dir, Config),
2098+
Log0 = seed_log(LDir, EpochChunks, Config),
2099+
osiris_log:close(Log0),
2100+
{ok, Landmarks} = osiris_log:stream_offset_landmarks(LDir),
2101+
#{first := First, last := Last, p25 := P25, p50 := P50, p75 := P75} = Landmarks,
2102+
{FirstOff, FirstTs} = First,
2103+
{LastOff, LastTs} = Last,
2104+
{P25Off, _} = P25,
2105+
{P50Off, _} = P50,
2106+
{P75Off, _} = P75,
2107+
?assert(FirstOff =< P25Off),
2108+
?assert(P25Off =< P50Off),
2109+
?assert(P50Off =< P75Off),
2110+
?assert(P75Off =< LastOff),
2111+
?assertEqual(FirstOff, 0),
2112+
?assertEqual(LastOff, 5),
2113+
ok.
2114+
2115+
stream_offset_landmarks_percentiles(Config) ->
2116+
%% Minimum layout for non-overlapping percentiles: chunk starts at 0,1,2,3,4
2117+
%% so Range=4, T25=1, T50=2, T75=3 each land on a distinct chunk.
2118+
Now = now_ms(),
2119+
Ts0 = Now - 10000,
2120+
Ts1 = Now - 8000,
2121+
Ts2 = Now - 5000,
2122+
Ts3 = Now - 3000,
2123+
Ts4 = Now - 1000,
2124+
EpochChunks =
2125+
[{1, Ts0, [<<"a">>]},
2126+
{1, Ts1, [<<"b">>]},
2127+
{1, Ts2, [<<"c">>]},
2128+
{1, Ts3, [<<"d">>]},
2129+
{1, Ts4, [<<"e">>]}],
2130+
2131+
LDir = ?config(leader_dir, Config),
2132+
Log0 = seed_log(LDir, EpochChunks, Config),
2133+
osiris_log:close(Log0),
2134+
{ok, Landmarks} = osiris_log:stream_offset_landmarks(LDir),
2135+
#{first := First, last := Last, p25 := P25, p50 := P50, p75 := P75} = Landmarks,
2136+
{0, Ts0} = First,
2137+
{4, Ts4} = Last,
2138+
{1, Ts1} = P25,
2139+
{2, Ts2} = P50,
2140+
{3, Ts3} = P75.
2141+
2142+
stream_offset_landmarks_config_map(Config) ->
2143+
%% Calling with config map #{dir => Dir} works like path.
2144+
EpochChunks = [{1, [<<"a">>]}, {1, [<<"b">>]}],
2145+
LDir = ?config(leader_dir, Config),
2146+
Log0 = seed_log(LDir, EpochChunks, Config),
2147+
osiris_log:close(Log0),
2148+
{ok, ByPath} = osiris_log:stream_offset_landmarks(LDir),
2149+
Conf = ?config(osiris_conf, Config),
2150+
RConf = Conf#{dir => LDir},
2151+
{ok, ByConf} = osiris_log:stream_offset_landmarks(RConf),
2152+
?assertEqual(ByPath, ByConf),
2153+
ok.
2154+
20522155
read_ahead_send_file(Config) ->
20532156
RAL = 4096, %% read ahead limit
20542157
HS = ?HEADER_SIZE_B,
@@ -2587,7 +2690,7 @@ write_chunk(Conf, Epoch, Now, Records, Trk0, Log0) ->
25872690
%% need to re-init as new epoch
25882691
osiris_log:close(Log1),
25892692
Log = osiris_log:init(Conf#{epoch => Epoch}),
2590-
{Trk1, osiris_log:write(lists:reverse(Records), Log)}
2693+
{Trk1, osiris_log:write(lists:reverse(Records), Now, Log)}
25912694
end.
25922695

25932696
now_ms() ->

0 commit comments

Comments
 (0)