Skip to content

Commit 6f54928

Browse files
authored
Merge pull request #205 from cloudamqp/retention_backlog
Deduplicate retention evaluation requests
2 parents d776bce + 27ffec9 commit 6f54928

File tree

1 file changed

+46
-52
lines changed

1 file changed

+46
-52
lines changed

src/osiris_retention.erl

Lines changed: 46 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,16 @@
77

88
-module(osiris_retention).
99

10-
-behaviour(gen_server).
10+
-behaviour(gen_batch_server).
1111

1212
-include("osiris.hrl").
1313
%% API functions
1414
-export([start_link/0,
1515
eval/4]).
16-
%% gen_server callbacks
16+
%% gen_batch_server callbacks
1717
-export([init/1,
18-
handle_call/3,
19-
handle_cast/2,
20-
handle_info/2,
21-
terminate/2,
22-
code_change/3]).
18+
handle_batch/2,
19+
terminate/2]).
2320

2421
-define(DEFAULT_SCHEDULED_EVAL_TIME, 1000 * 60 * 60). %% 1HR
2522

@@ -29,44 +26,59 @@
2926
%%% API functions
3027
%%%===================================================================
3128

32-
%% @spec start_link() -> {ok, Pid} | ignore | {error, Error}
29+
-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
3330
start_link() ->
34-
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
31+
gen_batch_server:start_link({local, ?MODULE}, ?MODULE, [], [{reversed_batch, true}]).
3532

3633
-spec eval(osiris:name(), file:name_all(), [osiris:retention_spec()],
3734
fun((osiris_log:range()) -> ok)) ->
3835
ok.
3936
eval(_Name, _Dir, [], _Fun) ->
4037
ok;
4138
eval(Name, Dir, Specs, Fun) ->
42-
gen_server:cast(?MODULE, {eval, self(), Name, Dir, Specs, Fun}).
39+
gen_batch_server:cast(?MODULE, {eval, self(), Name, Dir, Specs, Fun}).
4340

4441
%%%===================================================================
45-
%%% gen_server callbacks
42+
%%% gen_batch_server callbacks
4643
%%%===================================================================
4744

48-
% @spec init(Args) -> {ok, State} |
49-
%% {ok, State, Timeout} |
50-
%% ignore |
51-
%% {stop, Reason}
45+
-spec init([]) -> {ok, #state{}}.
5246
init([]) ->
5347
{ok, #state{}}.
5448

55-
%% @spec handle_call(Request, From, State) ->
56-
%% {reply, Reply, State} |
57-
%% {reply, Reply, State, Timeout} |
58-
%% {noreply, State} |
59-
%% {noreply, State, Timeout} |
60-
%% {stop, Reason, Reply, State} |
61-
%% {stop, Reason, State}
62-
handle_call(_Request, _From, State) ->
63-
Reply = ok,
64-
{reply, Reply, State}.
65-
66-
%% @spec handle_cast(Msg, State) -> {noreply, State} |
67-
%% {noreply, State, Timeout} |
68-
%% {stop, Reason, State}
69-
handle_cast({eval, Pid, Name, Dir, Specs, Fun} = Eval, State) ->
49+
-spec handle_batch([gen_batch_server:op()], #state{}) -> {ok, #state{}}.
50+
handle_batch(Ops, State0) ->
51+
%% Ops are in reverse order of arrival. Process newest first.
52+
{State1, _Seen} = lists:foldl(fun process_op/2, {State0, sets:new()}, Ops),
53+
{ok, State1}.
54+
55+
-spec terminate(term(), #state{}) -> ok.
56+
terminate(_Reason, _State) ->
57+
ok.
58+
59+
%%%===================================================================
60+
%%% Internal functions
61+
%%%===================================================================
62+
63+
%% Multiple requests from the same stream might have arrived while
64+
%% processing the previous batch.
65+
%% Only process newest request for each stream (with the newest
66+
%% retention config), as the config could have changed.
67+
process_op({cast, {eval, _Pid, Name, _Dir, _Specs, _Fun} = Eval}, {StateAcc, Seen}) ->
68+
case sets:is_element(Name, Seen) of
69+
true ->
70+
%% Retention for this stream already evaluated
71+
{StateAcc, Seen};
72+
false ->
73+
{evaluate_retention(Eval, StateAcc), sets:add_element(Name, Seen)}
74+
end;
75+
process_op({call, From, _}, {StateAcc, Seen}) ->
76+
gen:reply(From, ok),
77+
{StateAcc, Seen};
78+
process_op(_, {StateAcc, Seen}) ->
79+
{StateAcc, Seen}.
80+
81+
evaluate_retention({eval, Pid, Name, Dir, Specs, Fun} = Eval, State) ->
7082
%% only do retention evaluation for stream processes that are
7183
%% alive as the callback Fun passed in would update a shared atomic
7284
%% value and this atomic is new per process incarnation
@@ -75,32 +87,15 @@ handle_cast({eval, Pid, Name, Dir, Specs, Fun} = Eval, State) ->
7587
try osiris_log:evaluate_retention(Dir, Specs) of
7688
Result ->
7789
_ = Fun(Result),
78-
{noreply, schedule(Eval, Result, State)}
90+
schedule(Eval, Result, State)
7991
catch _:Err ->
80-
?DEBUG_(Name, "retention evaluation failed with ~w", [Err]),
81-
{noreply, State}
92+
?DEBUG_(Name, "retention evaluation failed with ~w", [Err]),
93+
State
8294
end;
8395
false ->
84-
{noreply, State}
96+
State
8597
end.
8698

87-
%% @spec handle_info(Info, State) -> {noreply, State} |
88-
%% {noreply, State, Timeout} |
89-
%% {stop, Reason, State}
90-
handle_info(_Info, State) ->
91-
{noreply, State}.
92-
93-
%% @spec terminate(Reason, State) -> void()
94-
terminate(_Reason, _State) ->
95-
ok.
96-
97-
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
98-
code_change(_OldVsn, State, _Extra) ->
99-
{ok, State}.
100-
101-
%%%===================================================================
102-
%%% Internal functions
103-
%%%===================================================================
10499
schedule({eval, _Pid, Name, _Dir, Specs, _Fun} = Eval,
105100
{_, _, NumSegmentRemaining},
106101
#state{scheduled = Scheduled0} = State) ->
@@ -123,4 +118,3 @@ schedule({eval, _Pid, Name, _Dir, Specs, _Fun} = Eval,
123118
false ->
124119
State#state{scheduled = Scheduled}
125120
end.
126-

0 commit comments

Comments
 (0)