Skip to content

Commit 257ded2

Browse files
authored
Merge pull request #154 from rabbitmq/next-with-filter-fix
Calculate the correct chunk size when filters are used.
2 parents 9f8bfc9 + 9a2c6fa commit 257ded2

File tree

2 files changed

+40
-18
lines changed

2 files changed

+40
-18
lines changed

src/osiris_log.erl

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1922,7 +1922,8 @@ build_segment_info(SegFile, LastChunkPos, IdxFile) ->
19221922
FirstChId:64/unsigned,
19231923
_FirstCrc:32/integer,
19241924
FirstSize:32/unsigned,
1925-
FirstTSize:32/unsigned,
1925+
FirstFSize:8/unsigned,
1926+
FirstTSize:24/unsigned,
19261927
_/binary>>} ->
19271928
case file:pread(Fd, LastChunkPos, ?HEADER_SIZE_B) of
19281929
{ok,
@@ -1939,33 +1940,34 @@ build_segment_info(SegFile, LastChunkPos, IdxFile) ->
19391940
LastTSize:32/unsigned,
19401941
LastFSize:8/unsigned,
19411942
_Reserved:24>>} ->
1942-
Size = LastChunkPos + ?HEADER_SIZE_B + LastFSize + LastSize + LastTSize,
1943+
LastChunkSize = LastFSize + LastSize + LastTSize,
1944+
Size = LastChunkPos + ?HEADER_SIZE_B + LastChunkSize,
19431945
%% TODO: this file:position/2 all has no actual function and
19441946
%% is only used to emit a debug log. Remove?
19451947
{ok, Eof} = file:position(Fd, eof),
19461948
?DEBUG_IF("~s: segment ~ts has trailing data ~w ~w",
19471949
[?MODULE, filename:basename(SegFile),
19481950
Size, Eof], Size =/= Eof),
19491951
_ = file:close(Fd),
1952+
FstChInfo = #chunk_info{epoch = FirstEpoch,
1953+
timestamp = FirstTs,
1954+
id = FirstChId,
1955+
num = FirstNumRecords,
1956+
type = FirstChType,
1957+
size = FirstFSize + FirstSize + FirstTSize,
1958+
pos = ?LOG_HEADER_SIZE},
1959+
LastChInfo = #chunk_info{epoch = LastEpoch,
1960+
timestamp = LastTs,
1961+
id = LastChId,
1962+
num = LastNumRecords,
1963+
type = LastChType,
1964+
size = LastChunkSize,
1965+
pos = LastChunkPos},
19501966
{ok, #seg_info{file = SegFile,
19511967
index = IdxFile,
19521968
size = Size,
1953-
first =
1954-
#chunk_info{epoch = FirstEpoch,
1955-
timestamp = FirstTs,
1956-
id = FirstChId,
1957-
num = FirstNumRecords,
1958-
type = FirstChType,
1959-
size = FirstSize + FirstTSize,
1960-
pos = ?LOG_HEADER_SIZE},
1961-
last =
1962-
#chunk_info{epoch = LastEpoch,
1963-
timestamp = LastTs,
1964-
id = LastChId,
1965-
num = LastNumRecords,
1966-
type = LastChType,
1967-
size = LastSize + LastTSize,
1968-
pos = LastChunkPos}}};
1969+
first = FstChInfo,
1970+
last = LastChInfo}};
19691971
_ ->
19701972
% last chunk is corrupted - try the previous one
19711973
_ = file:close(Fd),

test/osiris_log_SUITE.erl

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ all_tests() ->
3232
init_recover_with_writers,
3333
init_with_lower_epoch,
3434
write_batch,
35+
write_with_filter_attach_next,
3536
write_batch_with_filter,
3637
write_batch_with_filters_variable_size,
3738
subbatch,
@@ -207,6 +208,25 @@ write_batch(Config) ->
207208
?assertEqual(1, osiris_log:next_offset(S1)),
208209
ok.
209210

211+
write_with_filter_attach_next(Config) ->
212+
%% bug fix where the chunk_info size didn't include the filter size which
213+
%% cause the next attach strategy to point to an invalid location in the
214+
%% segment when filters were used.
215+
Conf0 = ?config(osiris_conf, Config),
216+
S0 = osiris_log:init(Conf0#{filter_size => 32}),
217+
?assertEqual(0, osiris_log:next_offset(S0)),
218+
%% write an entry with a filter value and an entry without a filter value
219+
{_, S1} = write_committed([<<"ho">>, {<<"banana">>, <<"hi">>}], S0),
220+
Shared = osiris_log:get_shared(S1),
221+
Conf = Conf0#{shared => Shared},
222+
{ok, R0} = osiris_log:init_offset_reader(next, Conf),
223+
{end_of_stream, R1} = osiris_log:read_chunk_parsed(R0),
224+
%% then write a chunk without any filtred entries at all
225+
{_, _S2} = write_committed([<<"hum">>], S1),
226+
?assertMatch({[{2, <<"hum">>}], _R1},
227+
osiris_log:read_chunk_parsed(R1)),
228+
ok.
229+
210230
write_batch_with_filter(Config) ->
211231
Conf0 = ?config(osiris_conf, Config),
212232
S0 = osiris_log:init(Conf0#{filter_size => 32}),

0 commit comments

Comments
 (0)