Skip to content

Commit fa72b28

Browse files
authored
Monitor the process which creates the memo value (#20)
* Monitor the process which creates the memo value * Do builds for otp 25, 26 and 27 * Add proper as extra plt app to dialyzer config * Supress dialyzer warnings about use of proper:setup/2 * Let the depcache process itself do the monitoring, instead of an extra monitor process * Cleanup, let the depcache monitor the writers
1 parent 82e17e5 commit fa72b28

File tree

5 files changed

+117
-43
lines changed

5 files changed

+117
-43
lines changed

.github/workflows/test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ jobs:
1818

1919
strategy:
2020
matrix:
21-
otp_version: [20.3,21,22,23,24,25]
21+
otp_version: [25,26,27]
2222
os: [ubuntu-latest]
2323

2424
container:

rebar.config

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
]},
2020
{test, [
2121
{dialyzer, [
22+
{plt_extra_apps, [proper, eunit]},
2223
{warnings, [
2324
no_return
2425
]}

src/depcache.erl

Lines changed: 82 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
%% @author Arjan Scherpenisse
2-
%% @copyright 2009-2020 Marc Worrell, Arjan Scherpenisse
2+
%% @copyright 2009-2025 Marc Worrell, Arjan Scherpenisse
33
%% @doc Depcache API
44
%%
55
%% == depcache API ==
@@ -14,7 +14,7 @@
1414
%% {@link cleanup/1}, {@link cleanup/5}
1515
%%
1616
%% @end
17-
%% Copyright 2009-2020 Marc Worrell, Arjan Scherpenisse
17+
%% Copyright 2009-2025 Marc Worrell, Arjan Scherpenisse
1818
%%
1919
%% Licensed under the Apache License, Version 2.0 (the "License");
2020
%% you may not use this file except in compliance with the License.
@@ -79,7 +79,7 @@
7979
data_table :: ets:tab()
8080
}).
8181

82-
-record(state, {now :: sec(), serial :: non_neg_integer(), tables :: tables(), wait_pids :: map()}).
82+
-record(state, {now :: sec(), serial :: non_neg_integer(), tables :: tables(), wait_pids :: map(), writers :: map() }).
8383
-record(meta, {key :: key(), expire :: sec(), serial :: non_neg_integer(), depend :: dependencies()}).
8484
-record(depend, {key :: key(), serial :: non_neg_integer()}).
8585

@@ -92,7 +92,7 @@
9292
}).
9393

9494
-type tables() :: #tables{meta_table :: ets:tab(), deps_table :: ets:tab(), data_table :: ets:tab()}.
95-
-type state() :: #state{now :: sec(), serial :: non_neg_integer(), tables :: tables(), wait_pids :: map()}.
95+
-type state() :: #state{now :: sec(), serial :: non_neg_integer(), tables :: tables(), wait_pids :: map(), writers :: map()}.
9696
-type depend() :: #depend{key :: key(), serial :: non_neg_integer()}.
9797
-type cleanup_state() :: #cleanup_state{pid :: pid(), tables :: tables(), name :: atom(), memory_max :: non_neg_integer(), callback :: callback() | undefined}.
9898
-type meta() :: #meta{key :: key(), expire :: sec(), serial :: non_neg_integer(), depend :: dependencies()}.
@@ -297,16 +297,18 @@ memo(Fun, Key, MaxAge, Server) ->
297297
Result :: any().
298298
memo(Fun, Key, MaxAge, Dep, Server) ->
299299
Key1 = case Key of
300-
undefined -> memo_key(Fun);
301-
_ -> Key
302-
end,
300+
undefined -> memo_key(Fun);
301+
_ -> Key
302+
end,
303303
case ?MODULE:get_wait(Key1, Server) of
304304
{ok, Value} ->
305305
Value;
306306
{throw, R} ->
307307
throw(R);
308308
undefined ->
309-
memo_key(Fun, Key, MaxAge, Dep, Server)
309+
memo_key(Fun, Key, MaxAge, Dep, Server);
310+
{error, premature_exit} ->
311+
?MODULE:memo(Fun, Key, MaxAge, Dep, Server)
310312
end.
311313

312314
%% @private
@@ -323,31 +325,36 @@ memo(Fun, Key, MaxAge, Dep, Server) ->
323325
Server :: depcache_server(),
324326
Result :: any().
325327
memo_key(Fun, Key, MaxAge, Dep, Server) ->
326-
try
327-
Value =
328-
case Fun of
329-
{M,F,A} -> erlang:apply(M,F,A);
330-
{M,F} -> M:F();
331-
_ when is_function(Fun) -> Fun()
332-
end,
333-
{Value1, MaxAge1, Dep1} =
334-
case Value of
335-
#memo{value=V, max_age=MA, deps=D} ->
336-
MA1 = case is_integer(MA) of true -> MA; false -> MaxAge end,
337-
{V, MA1, Dep++D};
338-
_ ->
339-
{Value, MaxAge, Dep}
340-
end,
341-
case MaxAge of
342-
0 -> memo_send_replies(Key, Value1, Server);
343-
_ -> set(Key, Value1, MaxAge1, Dep1, Server)
344-
end,
345-
Value1
346-
catch
347-
?WITH_STACKTRACE(Class, R, S)
348-
memo_send_errors(Key, {throw, R}, Server),
349-
erlang:raise(Class, R, S)
350-
end.
328+
try
329+
{Value1, MaxAge1, Dep1} = case apply_fun(Fun) of
330+
#memo{value=V, max_age=MA, deps=D} ->
331+
MA1 = case is_integer(MA) of
332+
true -> MA;
333+
false -> MaxAge
334+
end,
335+
{V, MA1, Dep++D};
336+
Value ->
337+
{Value, MaxAge, Dep}
338+
end,
339+
case MaxAge of
340+
0 -> memo_send_replies(Key, Value1, Server);
341+
_ -> set(Key, Value1, MaxAge1, Dep1, Server)
342+
end,
343+
344+
Value1
345+
catch
346+
?WITH_STACKTRACE(Class, R, S)
347+
memo_send_errors(Key, {throw, R}, Server),
348+
erlang:raise(Class, R, S)
349+
end.
350+
351+
352+
%% @private
353+
%% @doc Execute the memo function
354+
%% Returns the result value
355+
apply_fun({M,F,A}) -> erlang:apply(M,F,A);
356+
apply_fun({M,F}) -> M:F();
357+
apply_fun(Fun) when is_function(Fun) -> Fun().
351358

352359

353360
%% @private
@@ -458,7 +465,7 @@ set(Key, Data, MaxAge, Depend, Server) ->
458465
-spec get_wait( Key, Server ) -> Result when
459466
Key :: key(),
460467
Server :: depcache_server(),
461-
Result :: {ok, any()} | undefined | {throw, term()}.
468+
Result :: {ok, any()} | undefined | {throw, term()} | {error, premature_exit}.
462469
get_wait(Key, Server) ->
463470
case get_process_dict(Key, Server) of
464471
NoValue when NoValue =:= undefined orelse NoValue =:= depcache_disabled ->
@@ -776,7 +783,8 @@ init(Config) ->
776783
tables = Tables,
777784
now=now_sec(),
778785
serial=0,
779-
wait_pids=#{}
786+
wait_pids=#{},
787+
writers=#{}
780788
},
781789
timer:send_after(1000, tick),
782790
spawn_link(?MODULE,
@@ -911,6 +919,21 @@ handle_info(tick, State) ->
911919
erase_process_dict(),
912920
{noreply, State#state{now=now_sec()}};
913921

922+
handle_info({'DOWN', Ref, process, _Pid, _Reason}, #state{ writers = Writers }=State) ->
923+
case maps:take(Ref, Writers) of
924+
error ->
925+
{noreply, State};
926+
{Key, Writers1} ->
927+
WaitPids1 = case maps:take(Key, State#state.wait_pids) of
928+
error ->
929+
State#state.wait_pids;
930+
{{_MaxAge, List, _WriterRef}, WaitPids} ->
931+
_ = [ catch gen_server:reply(From, {error, premature_exit}) || From <- List ],
932+
WaitPids
933+
end,
934+
{noreply, State#state{ writers = Writers1, wait_pids = WaitPids1 }}
935+
end;
936+
914937
handle_info(_Msg, State) ->
915938
{noreply, State}.
916939

@@ -973,14 +996,29 @@ handle_call_get_wait(Key, From, #state{tables = Tables} = State) ->
973996
undefined -> State
974997
end,
975998
case State#state.wait_pids of
976-
#{Key := {MaxAge, List}} when State#state.now < MaxAge ->
999+
#{Key := {MaxAge, List, WriterRef}} when State#state.now < MaxAge ->
9771000
%% Another process is already calculating the value, let the caller wait.
978-
WaitPids = maps:update(Key, {MaxAge, [From|List]}, State#state.wait_pids),
1001+
WaitPids = maps:update(Key, {MaxAge, [From|List], WriterRef}, State#state.wait_pids),
9791002
{noreply, State#state{wait_pids=WaitPids}};
9801003
_ ->
1004+
%% de-monitor an old writer, if any.
1005+
Writers = case maps:find(Key, State#state.wait_pids) of
1006+
error ->
1007+
State#state.writers;
1008+
{ok, {_, _WaitPids, OldWriter}} ->
1009+
erlang:demonitor(OldWriter),
1010+
maps:without([OldWriter], State#state.writers)
1011+
end,
1012+
1013+
%% Monitor and register the writer
1014+
{Pid, _} = From,
1015+
Ref = erlang:monitor(process, Pid),
1016+
Writers1 = maps:put(Ref, Key, Writers),
1017+
9811018
%% Nobody waiting or we hit a timeout, let next requestors wait for this caller.
982-
WaitPids = maps:put(Key, {State#state.now+?MAX_GET_WAIT, []}, State#state.wait_pids),
983-
{reply, undefined, State#state{wait_pids=WaitPids}}
1019+
WaitPids = maps:put(Key, {State#state.now+?MAX_GET_WAIT, [], Ref}, State#state.wait_pids),
1020+
1021+
{reply, undefined, State#state{wait_pids=WaitPids, writers=Writers1}}
9841022
end;
9851023
{ok, _Value} = Found ->
9861024
{reply, Found, State}
@@ -999,7 +1037,7 @@ handle_call_get_wait(Key, From, #state{tables = Tables} = State) ->
9991037
Tag :: atom().
10001038
handle_call_get_waiting_pids(Key, State) ->
10011039
{State1, Pids} = case maps:take(Key, State#state.wait_pids) of
1002-
{{_MaxAge, List}, WaitPids} ->
1040+
{{_MaxAge, List, _WriterRef}, WaitPids} ->
10031041
{State#state{wait_pids=WaitPids}, List};
10041042
error ->
10051043
{State, []}
@@ -1082,9 +1120,11 @@ handle_call_set({Key, Data, MaxAge, Depend}, #state{tables = Tables} = State) ->
10821120

10831121
%% Check if other processes are waiting for this key, send them the data
10841122
case maps:take(Key, State1#state.wait_pids) of
1085-
{{_MaxAge, List}, WaitPids} ->
1123+
{{_MaxAge, List, WriterRef}, WaitPids} ->
10861124
_ = [ catch gen_server:reply(From, {ok, Data}) || From <- List ],
1087-
{reply, ok, State1#state{wait_pids=WaitPids}};
1125+
_ = erlang:demonitor(WriterRef),
1126+
Writers = maps:without([WriterRef], State#state.writers),
1127+
{reply, ok, State1#state{writers=Writers, wait_pids=WaitPids}};
10881128
error ->
10891129
{reply, ok, State1}
10901130
end.

test/depcache_tests.erl

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,3 +145,28 @@ memo_raise_test() ->
145145
?assertMatch({depcache_tests, raise_error, 0, _}, hd(S))
146146
end,
147147
ok.
148+
149+
memo_premature_kill_test() ->
150+
{ok, C} = depcache:start_link(#{}),
151+
152+
LongTask = fun() ->
153+
Fun = fun() ->
154+
timer:sleep(500),
155+
done
156+
end,
157+
depcache:memo(Fun, premature_kill_test, C)
158+
end,
159+
160+
Pid = spawn(LongTask),
161+
timer:kill_after(250, Pid),
162+
timer:sleep(50),
163+
?assertEqual({error, premature_exit}, depcache:get_wait(premature_kill_test, C)),
164+
165+
% Check if another process takes over processing in case of pre-mature exits
166+
Task = spawn(LongTask),
167+
timer:kill_after(250, Task),
168+
timer:sleep(50),
169+
?assertEqual(done, LongTask()),
170+
171+
ok.
172+

test/prop_depcache.erl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
11
-module(prop_depcache).
22
-include_lib("proper/include/proper.hrl").
33

4+
-dialyzer({nowarn_function, prop_set_get/0}).
5+
-dialyzer({nowarn_function, prop_flush_all/0}).
6+
-dialyzer({nowarn_function, prop_get_set_maxage/0}).
7+
-dialyzer({nowarn_function, prop_get_set_maxage_0/0}).
8+
-dialyzer({nowarn_function, prop_get_set_depend/0}).
9+
-dialyzer({nowarn_function, prop_get_set_depend_map/0}).
10+
-dialyzer({nowarn_function, prop_memo/0}).
11+
412
%%%%%%%%%%%%%%%%%%
513
%%% Properties %%%
614
%%%%%%%%%%%%%%%%%%

0 commit comments

Comments
 (0)