Skip to content

Commit 861df59

Browse files
authored
Merge pull request #203 from rabbitmq/last-committed-offset
Add last committed offset to stream stats
2 parents 6f54928 + 7a1796f commit 861df59

File tree

9 files changed

+366
-103
lines changed

9 files changed

+366
-103
lines changed

src/osiris.erl

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
reference => term(),
4141
event_formatter => {module(), atom(), list()},
4242
retention => [osiris:retention_spec()],
43+
features => features(),
4344
atom() => term()}.
4445

4546
-type mfarg() :: {module(), atom(), list()}.
@@ -77,6 +78,7 @@
7778
-type data() :: iodata() |
7879
batch() |
7980
{filter_value(), iodata() | batch()}.
81+
-type features() :: #{committed_offset_calculate => boolean()}.
8082

8183
%% returned when reading
8284
-type entry() :: binary() | batch().
@@ -307,13 +309,16 @@ configure_logger(Module) ->
307309
persistent_term:put('$osiris_logger', Module).
308310

309311
-spec get_stats(pid()) -> #{committed_chunk_id => integer(),
310-
first_chunk_id => integer()}.
312+
first_chunk_id => integer(),
313+
last_chunk_id => integer(),
314+
committed_offset => integer()}.
311315
get_stats(Pid)
312316
when node(Pid) =:= node() ->
313317
#{shared := Shared} = osiris_util:get_reader_context(Pid),
314318
#{committed_chunk_id => osiris_log_shared:committed_chunk_id(Shared),
315319
first_chunk_id => osiris_log_shared:first_chunk_id(Shared),
316-
last_chunk_id => osiris_log_shared:last_chunk_id(Shared)};
320+
last_chunk_id => osiris_log_shared:last_chunk_id(Shared),
321+
committed_offset => osiris_log_shared:committed_offset(Shared)};
317322
get_stats(Pid) when is_pid(Pid) ->
318323
erpc:call(node(Pid), ?MODULE, ?FUNCTION_NAME, [Pid]).
319324

src/osiris.hrl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@
3030
domain => [osiris]}),
3131
ok).
3232

33+
%% tail info pattern matching
34+
-define(TAIL_INFO(ChunkId, Ts), {_, {_, ChunkId, Ts}}).
35+
-define(TAIL_INFO(ChunkId), ?TAIL_INFO(ChunkId, _)).
36+
-define(TAIL_INFO_NEXT(NextOffset, ChunkId), {NextOffset, {_, ChunkId, _}}).
37+
3338
-define(IS_STRING(S), is_list(S) orelse is_binary(S)).
3439

3540
-define(C_NUM_LOG_FIELDS, 5).

src/osiris_counters.erl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
fetch/1,
1313
overview/0,
1414
overview/1,
15+
counters/2,
1516
delete/1
1617
]).
1718

@@ -45,3 +46,7 @@ overview() ->
4546
-spec overview(name()) -> #{atom() => non_neg_integer()} | undefined.
4647
overview(Name) ->
4748
seshat:counters(osiris, Name).
49+
50+
-spec counters(name(), [atom()]) -> #{atom() => non_neg_integer()} | undefined.
51+
counters(Name, Fields) ->
52+
seshat:counters(osiris, Name, Fields).

src/osiris_log.erl

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,10 @@
3636
read_chunk/1,
3737
read_chunk_parsed/1,
3838
read_chunk_parsed/2,
39-
committed_offset/1,
4039
committed_chunk_id/1,
40+
committed_offset/1,
4141
set_committed_chunk_id/2,
42+
set_committed_offset/2,
4243
last_chunk_id/1,
4344
get_current_epoch/1,
4445
get_directory/1,
@@ -1353,10 +1354,6 @@ last_user_chunk_id_in_index(NextPos, IdxFd) ->
13531354
Error
13541355
end.
13551356

1356-
-spec committed_offset(state()) -> integer().
1357-
committed_offset(State) ->
1358-
committed_chunk_id(State).
1359-
13601357
-spec committed_chunk_id(state()) -> integer().
13611358
committed_chunk_id(#?MODULE{cfg = #cfg{shared = Ref}}) ->
13621359
osiris_log_shared:committed_chunk_id(Ref).
@@ -1367,6 +1364,16 @@ set_committed_chunk_id(#?MODULE{mode = #write{},
13671364
when is_integer(ChunkId) ->
13681365
osiris_log_shared:set_committed_chunk_id(Ref, ChunkId).
13691366

1367+
-spec set_committed_offset(state(), offset()) -> ok.
1368+
set_committed_offset(#?MODULE{mode = #write{},
1369+
cfg = #cfg{shared = Ref}}, Offset)
1370+
when is_integer(Offset) ->
1371+
osiris_log_shared:set_committed_offset(Ref, Offset).
1372+
1373+
-spec committed_offset(state()) -> integer().
1374+
committed_offset(#?MODULE{cfg = #cfg{shared = Ref}}) ->
1375+
osiris_log_shared:committed_offset(Ref).
1376+
13701377
-spec last_chunk_id(state()) -> integer().
13711378
last_chunk_id(#?MODULE{cfg = #cfg{shared = Ref}}) ->
13721379
osiris_log_shared:last_chunk_id(Ref).

src/osiris_log_shared.erl

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,39 @@
11
-module(osiris_log_shared).
22

3-
-define(COMMITTED_IDX, 1).
3+
-define(COMMITTED_CHUNK_ID, 1).
44
-define(FIRST_IDX, 2).
55
-define(LAST_IDX, 3).
6+
-define(COMMITTED_OFFSET, 4).
67

78
-export([
89
new/0,
910
committed_chunk_id/1,
1011
first_chunk_id/1,
1112
last_chunk_id/1,
13+
committed_offset/1,
1214
set_committed_chunk_id/2,
1315
set_first_chunk_id/2,
14-
set_last_chunk_id/2
16+
set_last_chunk_id/2,
17+
set_committed_offset/2
1518
]).
1619

1720
-type chunk_id() :: -1 | non_neg_integer().
21+
-type offset() :: -1 | non_neg_integer().
1822

1923
-spec new() -> atomics:atomics_ref().
2024
new() ->
2125
%% Oh why, oh why did we think the first chunk id in
2226
%% a stream should have offset 0?
23-
Ref = atomics:new(3, [{signed, true}]),
24-
atomics:put(Ref, ?COMMITTED_IDX, -1),
27+
Ref = atomics:new(4, [{signed, true}]),
28+
atomics:put(Ref, ?COMMITTED_CHUNK_ID, -1),
2529
atomics:put(Ref, ?FIRST_IDX, -1),
2630
atomics:put(Ref, ?LAST_IDX, -1),
31+
atomics:put(Ref, ?COMMITTED_OFFSET, -1),
2732
Ref.
2833

2934
-spec committed_chunk_id(atomics:atomics_ref()) -> chunk_id().
3035
committed_chunk_id(Ref) ->
31-
atomics:get(Ref, ?COMMITTED_IDX).
36+
atomics:get(Ref, ?COMMITTED_CHUNK_ID).
3237

3338
-spec first_chunk_id(atomics:atomics_ref()) -> chunk_id().
3439
first_chunk_id(Ref) ->
@@ -38,9 +43,13 @@ first_chunk_id(Ref) ->
3843
last_chunk_id(Ref) ->
3944
atomics:get(Ref, ?LAST_IDX).
4045

46+
-spec committed_offset(atomics:atomics_ref()) -> chunk_id().
47+
committed_offset(Ref) ->
48+
atomics:get(Ref, ?COMMITTED_OFFSET).
49+
4150
-spec set_committed_chunk_id(atomics:atomics_ref(), chunk_id()) -> ok.
4251
set_committed_chunk_id(Ref, Value) when is_integer(Value) ->
43-
atomics:put(Ref, ?COMMITTED_IDX, Value).
52+
atomics:put(Ref, ?COMMITTED_CHUNK_ID, Value).
4453

4554
-spec set_first_chunk_id(atomics:atomics_ref(), chunk_id()) -> ok.
4655
set_first_chunk_id(Ref, Value) when is_integer(Value) ->
@@ -50,6 +59,9 @@ set_first_chunk_id(Ref, Value) when is_integer(Value) ->
5059
set_last_chunk_id(Ref, Value) when is_integer(Value) ->
5160
atomics:put(Ref, ?LAST_IDX, Value).
5261

62+
-spec set_committed_offset(atomics:atomics_ref(), offset()) -> ok.
63+
set_committed_offset(Ref, Value) when is_integer(Value) ->
64+
atomics:put(Ref, ?COMMITTED_OFFSET, Value).
5365

5466

5567
-ifdef(TEST).
@@ -60,12 +72,15 @@ basics_test() ->
6072
?assertEqual(-1, committed_chunk_id(R)),
6173
?assertEqual(-1, first_chunk_id(R)),
6274
?assertEqual(-1, last_chunk_id(R)),
75+
?assertEqual(-1, committed_offset(R)),
6376
ok = set_committed_chunk_id(R, 2),
77+
ok = set_committed_offset(R, 3),
6478
ok = set_first_chunk_id(R, 1),
65-
ok = set_last_chunk_id(R, 3),
79+
ok = set_last_chunk_id(R, 4),
6680
?assertEqual(2, committed_chunk_id(R)),
81+
?assertEqual(3, committed_offset(R)),
6782
?assertEqual(1, first_chunk_id(R)),
68-
?assertEqual(3, last_chunk_id(R)),
83+
?assertEqual(4, last_chunk_id(R)),
6984

7085
ok.
7186

src/osiris_replica.erl

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@
5050
reference :: term(),
5151
event_formatter :: undefined | mfa(),
5252
counter :: counters:counters_ref(),
53-
token :: undefined | binary()}).
53+
token :: undefined | binary(),
54+
committed_offset_calculate :: boolean()}).
5455

5556
-type parse_state() ::
5657
undefined |
@@ -75,12 +76,15 @@
7576
-define(C_PACKETS, ?C_NUM_LOG_FIELDS + 3).
7677
-define(C_READERS, ?C_NUM_LOG_FIELDS + 4).
7778
-define(C_EPOCH, ?C_NUM_LOG_FIELDS + 5).
79+
-define(C_COMMITTED_CHUNK_ID, ?C_NUM_LOG_FIELDS + 6).
7880
-define(ADD_COUNTER_FIELDS,
7981
[{committed_offset, ?C_COMMITTED_OFFSET, counter, "Last committed offset"},
8082
{forced_gcs, ?C_FORCED_GCS, counter, "Number of garbage collection runs"},
8183
{packets, ?C_PACKETS, counter, "Number of packets"},
8284
{readers, ?C_READERS, counter, "Number of readers"},
83-
{epoch, ?C_EPOCH, counter, "Current epoch"}]).
85+
{epoch, ?C_EPOCH, counter, "Current epoch"},
86+
{committed_chunk_id, ?C_COMMITTED_CHUNK_ID, counter, "Last committed chunk ID"}
87+
]).
8488
-define(FIELDSPEC_KEY, osiris_replica_seshat_fields_spec).
8589

8690
-define(DEFAULT_ONE_TIME_TOKEN_TIMEOUT, 30000).
@@ -194,10 +198,10 @@ handle_continue(#{name := Name0,
194198
case LastChunk of
195199
empty ->
196200
ok;
197-
{_, LastChId, LastTs} ->
201+
_ ->
198202
%% need to ack last chunk back to leader so that it can
199203
%% re-discover the committed offset
200-
osiris_writer:ack(LeaderPid, {LastChId, LastTs})
204+
osiris_writer:ack(LeaderPid, ack_msg(Config, TailInfo))
201205
end,
202206
?INFO_(Name, "osiris replica starting in epoch ~b, next offset ~b, tail info ~w",
203207
[Epoch, NextOffset, TailInfo]),
@@ -236,6 +240,10 @@ handle_continue(#{name := Name0,
236240
Acceptor = spawn_link(fun() -> accept(Name, Transport, LSock, Self) end),
237241
?DEBUG_(Name, "starting replica reader on node '~w'", [Node]),
238242

243+
CmttedOfstCalculate = committed_offset_calculate(Config),
244+
Features0 = maps:get(features, Config, #{}),
245+
Features1 = Features0#{committed_offset_calculate =>
246+
committed_offset_calculate(Config)},
239247
ReplicaReaderConf = #{hosts => IpsHosts,
240248
port => Port,
241249
transport => Transport,
@@ -244,7 +252,8 @@ handle_continue(#{name := Name0,
244252
leader_pid => LeaderPid,
245253
start_offset => TailInfo,
246254
reference => ExtRef,
247-
connection_token => Token},
255+
connection_token => Token,
256+
features => Features1},
248257
case osiris_replica_reader:start(Node, ReplicaReaderConf) of
249258
{ok, RRPid} ->
250259
true = link(RRPid),
@@ -260,6 +269,7 @@ handle_continue(#{name := Name0,
260269
false ->
261270
infinity
262271
end,
272+
counters:put(CntRef, ?C_COMMITTED_CHUNK_ID, -1),
263273
counters:put(CntRef, ?C_COMMITTED_OFFSET, -1),
264274
counters:put(CntRef, ?C_EPOCH, Epoch),
265275
Shared = osiris_log:get_shared(Log),
@@ -281,7 +291,8 @@ handle_continue(#{name := Name0,
281291
event_formatter = EvtFmt,
282292
counter = CntRef,
283293
token = Token,
284-
transport = Transport},
294+
transport = Transport,
295+
committed_offset_calculate = CmttedOfstCalculate},
285296
log = Log,
286297
parse_state = undefined}};
287298
{error, {connection_refused = R, _}} ->
@@ -424,22 +435,26 @@ handle_call(Unknown, _From,
424435
%% {stop, Reason, State}
425436
%% @end
426437
%%--------------------------------------------------------------------
427-
handle_cast({committed_offset, CommittedChId},
438+
handle_cast({committed_offset, {CommittedChId, LastOffset}},
428439
#?MODULE{cfg = #cfg{counter = Cnt},
429440
log = Log,
430441
committed_chunk_id = LastCommittedChId} =
431442
State) ->
432443
case CommittedChId > LastCommittedChId of
433444
true ->
434445
%% notify offset listeners
435-
counters:put(Cnt, ?C_COMMITTED_OFFSET, CommittedChId),
446+
counters:put(Cnt, ?C_COMMITTED_CHUNK_ID, CommittedChId),
447+
counters:put(Cnt, ?C_COMMITTED_OFFSET, LastOffset),
436448
ok = osiris_log:set_committed_chunk_id(Log, CommittedChId),
449+
ok = osiris_log:set_committed_offset(Log, LastOffset),
437450
{noreply,
438451
notify_offset_listeners(
439452
State#?MODULE{committed_chunk_id = CommittedChId})};
440453
false ->
441454
State
442455
end;
456+
handle_cast({committed_offset, CommittedChId}, State) ->
457+
handle_cast({committed_offset, {CommittedChId, -1}}, State);
443458
handle_cast({register_offset_listener, Pid, EvtFormatter, Offset},
444459
#?MODULE{cfg = #cfg{reference = Ref,
445460
event_formatter = DefaultFmt},
@@ -572,7 +587,7 @@ handle_incoming_data(Socket, Bin,
572587
#cfg{socket = Socket,
573588
leader_pid = LeaderPid,
574589
transport = Transport,
575-
counter = Cnt},
590+
counter = Cnt} = Cfg,
576591
parse_state = ParseState0,
577592
log = Log0} =
578593
State0) ->
@@ -594,7 +609,8 @@ handle_incoming_data(Socket, Bin,
594609
undefined ->
595610
{noreply, State1};
596611
_ ->
597-
ok = osiris_writer:ack(LeaderPid, OffsetTimestamp),
612+
TailInfo = osiris_log:tail_info(Log),
613+
ok = osiris_writer:ack(LeaderPid, ack_msg(Cfg, TailInfo)),
598614
State = notify_offset_listeners(State1),
599615
{noreply, State}
600616
end.
@@ -738,7 +754,7 @@ notify_offset_listeners(#?MODULE{cfg = #cfg{reference = Ref,
738754
State#?MODULE{offset_listeners = L}.
739755

740756
max_readable_chunk_id(Log) ->
741-
min(osiris_log:committed_offset(Log), osiris_log:last_chunk_id(Log)).
757+
min(osiris_log:committed_chunk_id(Log), osiris_log:last_chunk_id(Log)).
742758

743759
%% INTERNAL
744760

@@ -796,3 +812,20 @@ listen(ssl, Port, Options) ->
796812
init_fields_spec() ->
797813
persistent_term:put(?FIELDSPEC_KEY,
798814
?ADD_COUNTER_FIELDS ++ osiris_log:counter_fields()).
815+
816+
ack_msg(Cfg, TailInfo) ->
817+
case committed_offset_calculate(Cfg) of
818+
true ->
819+
TailInfo;
820+
false ->
821+
?TAIL_INFO(TailChkId, TailTs) = TailInfo,
822+
{TailChkId, TailTs}
823+
end.
824+
825+
committed_offset_calculate(#cfg{committed_offset_calculate = On}) ->
826+
On;
827+
committed_offset_calculate(#{features := #{committed_offset_calculate := On}})
828+
when is_boolean(On) ->
829+
On;
830+
committed_offset_calculate(_) ->
831+
false.

0 commit comments

Comments
 (0)