Skip to content

Commit 416ac0c

Browse files
committed
Customise telemetry event name
1 parent 731d366 commit 416ac0c

File tree

3 files changed

+88
-48
lines changed

3 files changed

+88
-48
lines changed

src/segmented_cache.erl

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ For more information, see the README, and the function documentation.
4747
-type merger_fun(Value) :: fun((Value, Value) -> Value).
4848
?DOC("Configuration values for the cache.").
4949
-type opts() :: #{
50+
prefix => telemetry:event_name(),
5051
scope => scope(),
5152
strategy => strategy(),
5253
entries_limit => entries_limit(),
@@ -94,6 +95,8 @@ Start and link a cache entity in the local node.
9495
and an entry in persistent_term will be created and the worker will join a pg group of
9596
the same name.
9697
`Opts` is a map containing the configuration.
98+
- `prefix` is a `telemetry` event name to prefix events raised by this library.
99+
Defaults to `[segmented_cache, Name]`.
97100
- `scope` is a `pg` scope. Defaults to `pg`.
98101
- `strategy` can be fifo or lru. Default is `fifo`.
99102
- `segment_num` is the number of segments for the cache. Default is `3`
@@ -110,27 +113,25 @@ start_link(Name, Opts) when is_atom(Name), is_map(Opts) ->
110113
Check if Key is cached.
111114
112115
Raises a telemetry span:
113-
- name: `[segmented_cache, Name, request, _]`
116+
- name: `Prefix ++ [request]`
114117
- start metadata: `#{name => atom()}`
115118
- stop metadata: `t:hit/0`
116119
""").
117120
-spec is_member(name(), key()) -> boolean().
118121
is_member(Name, Key) when is_atom(Name) ->
119-
Span = segmented_cache_helpers:is_member_span(Name, Key),
120-
telemetry:span([segmented_cache, Name, request], #{name => Name, type => is_member}, Span).
122+
segmented_cache_helpers:is_member(Name, Key).
121123

122124
?DOC("""
123125
Get the entry for Key in cache.
124126
125127
Raises telemetry span:
126-
- name: `[segmented_cache, Name, request, _]`
128+
- name: `Prefix ++ [request]`
127129
- start metadata: `#{name => atom()}`
128130
- stop metadata: `t:hit/0`
129131
""").
130132
-spec get_entry(name(), key()) -> value() | not_found.
131133
get_entry(Name, Key) when is_atom(Name) ->
132-
Span = segmented_cache_helpers:get_entry_span(Name, Key),
133-
telemetry:span([segmented_cache, Name, request], #{name => Name, type => get_entry}, Span).
134+
segmented_cache_helpers:get_entry(Name, Key).
134135

135136
?DOC("""
136137
Add an entry to the first table in the segments.
@@ -182,7 +183,7 @@ merge_entry(Name, Key, Value) when is_atom(Name) ->
182183
Delete an entry in all ets segments.
183184
184185
Might raise a telemetry error if the request fails:
185-
- name: `[segmented_cache, Name, delete_error]`
186+
- name: `Prefix ++ [request, delete_error]`
186187
- measurements: `#{}`
187188
- metadata: `t:delete_error/1`
188189
""").

src/segmented_cache_helpers.erl

Lines changed: 41 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,22 @@
1010
-compile({inline, [next/2]}).
1111

1212
-export([init_cache_config/2, get_cache_scope/1, erase_cache_config/1]).
13-
-export([is_member_span/2, get_entry_span/2, put_entry_front/3, merge_entry/3]).
13+
-export([is_member/2, get_entry/2]).
14+
-export([put_entry_front/3, merge_entry/3]).
1415
-export([delete_entry/2, delete_pattern/2]).
1516
-export([purge_last_segment_and_rotate/1]).
1617

1718
-record(segmented_cache, {
1819
scope :: segmented_cache:scope(),
1920
name :: segmented_cache:name(),
21+
telemetry_name :: [segmented_cache:name()],
2022
strategy = fifo :: segmented_cache:strategy(),
2123
entries_limit = infinity :: segmented_cache:entries_limit(),
2224
index :: atomics:atomics_ref(),
2325
segments :: tuple(),
2426
merger_fun :: merger_fun(dynamic())
2527
}).
2628

27-
-type span() :: fun(() -> {dynamic(), span_metadata()}).
28-
-type span_metadata() :: #{hit := boolean()}.
2929
-type merger_fun(Value) :: fun((Value, Value) -> Value).
3030
-type iterative_fun(Key, Value) :: fun((ets:tid(), Key) -> {continue | stop, Value}).
3131
-type config() :: #segmented_cache{}.
@@ -43,15 +43,18 @@ init_cache_config(Name, Opts0) ->
4343
entries_limit := EntriesLimit,
4444
segment_num := N,
4545
ttl := TTL,
46-
merger_fun := MergerFun
47-
} = Opts = assert_parameters(Opts0),
46+
merger_fun := MergerFun,
47+
prefix := TelemetryPrefix
48+
} = Opts = assert_parameters(Name, Opts0),
4849
SegmentOpts = ets_settings(Opts),
4950
SegmentsList = lists:map(fun(_) -> ets:new(undefined, SegmentOpts) end, lists:seq(1, N)),
5051
Segments = list_to_tuple(SegmentsList),
5152
Index = atomics:new(1, [{signed, false}]),
5253
atomics:put(Index, 1, 1),
54+
TelemetryEventName = TelemetryPrefix ++ [request],
5355
Config = #segmented_cache{
5456
scope = Scope,
57+
telemetry_name = TelemetryEventName,
5558
name = Name,
5659
strategy = Strategy,
5760
index = Index,
@@ -83,19 +86,28 @@ persist_cache_config(Name, Config) ->
8386
%% ETS checks
8487
%%====================================================================
8588

86-
-spec is_member_span(segmented_cache:name(), segmented_cache:key()) -> span().
87-
is_member_span(Name, Key) when is_atom(Name) ->
88-
fun() ->
89-
Value = iterate_fun_in_tables(Name, Key, fun segmented_cache_callbacks:is_member_ets_fun/2),
90-
{Value, #{hit => Value =:= true}}
91-
end.
92-
93-
-spec get_entry_span(segmented_cache:name(), segmented_cache:key()) -> span().
94-
get_entry_span(Name, Key) when is_atom(Name) ->
95-
fun() ->
96-
Value = iterate_fun_in_tables(Name, Key, fun segmented_cache_callbacks:get_entry_ets_fun/2),
97-
{Value, #{hit => Value =/= not_found}}
98-
end.
89+
-spec is_member(segmented_cache:name(), segmented_cache:key()) -> boolean().
90+
is_member(Name, Key) when is_atom(Name) ->
91+
#segmented_cache{telemetry_name = Prefix} = SegmentRecord = get_cache_config(Name),
92+
Span = fun() ->
93+
Value = iterate_fun_in_tables(
94+
SegmentRecord, Key, fun segmented_cache_callbacks:is_member_ets_fun/2
95+
),
96+
{Value, #{name => Name, type => is_member, hit => Value =:= true}}
97+
end,
98+
telemetry:span(Prefix, #{name => Name, type => is_member}, Span).
99+
100+
-spec get_entry(segmented_cache:name(), segmented_cache:key()) ->
101+
segmented_cache:value() | not_found.
102+
get_entry(Name, Key) when is_atom(Name) ->
103+
#segmented_cache{telemetry_name = Prefix} = SegmentRecord = get_cache_config(Name),
104+
Span = fun() ->
105+
Value = iterate_fun_in_tables(
106+
SegmentRecord, Key, fun segmented_cache_callbacks:get_entry_ets_fun/2
107+
),
108+
{Value, #{name => Name, type => get_entry, hit => Value =/= not_found}}
109+
end,
110+
telemetry:span(Prefix, #{name => Name, type => get_entry}, Span).
99111

100112
%% Atomically compare_and_swap an entry, attempt three times, post-check front insert
101113
-spec put_entry_front(segmented_cache:name(), segmented_cache:key(), segmented_cache:value()) ->
@@ -115,7 +127,7 @@ merge_entry(Name, Key, Value) when is_atom(Name) ->
115127
false -> {continue, false}
116128
end
117129
end,
118-
case iterate_fun_in_tables(Name, Key, F) of
130+
case iterate_fun_in_tables(SegmentRecord, Key, F) of
119131
true -> true;
120132
false -> do_put_entry_front(SegmentRecord, Key, Value, 3)
121133
end.
@@ -133,24 +145,24 @@ delete_pattern(Name, Pattern) when is_atom(Name) ->
133145
Type :: entry | pattern,
134146
IterativeFun :: iterative_fun(Key, dynamic()).
135147
delete_request(Name, Value, Type, Fun) ->
148+
#segmented_cache{telemetry_name = Prefix} = SegmentRecord = get_cache_config(Name),
136149
try
137-
iterate_fun_in_tables(Name, Value, Fun)
150+
iterate_fun_in_tables(SegmentRecord, Value, Fun)
138151
catch
139152
Class:Reason ->
140153
Metadata = #{
141154
name => Name, delete_type => Type, value => Value, class => Class, reason => Reason
142155
},
143-
telemetry:execute([segmented_cache, Name, delete_error], #{}, Metadata)
156+
telemetry:execute(Prefix ++ [delete_error], #{}, Metadata)
144157
end.
145158

146159
%%====================================================================
147160
%% Internals
148161
%%====================================================================
149162

150-
-spec iterate_fun_in_tables(segmented_cache:name(), Key, IterativeFun) -> Value when
163+
-spec iterate_fun_in_tables(config(), Key, IterativeFun) -> Value when
151164
IterativeFun :: iterative_fun(Key, Value).
152-
iterate_fun_in_tables(Name, Key, IterativeFun) ->
153-
SegmentRecord = get_cache_config(Name),
165+
iterate_fun_in_tables(SegmentRecord, Key, IterativeFun) ->
154166
Segments = SegmentRecord#segmented_cache.segments,
155167
Size = tuple_size(Segments),
156168
CurrentIndex = atomics:get(SegmentRecord#segmented_cache.index, 1),
@@ -318,8 +330,8 @@ purge_last_segment_and_rotate(Name) ->
318330
atomics:put(SegmentRecord#segmented_cache.index, 1, NewIndex),
319331
NewIndex.
320332

321-
-spec assert_parameters(segmented_cache:opts()) -> segmented_cache:opts().
322-
assert_parameters(Opts0) when is_map(Opts0) ->
333+
-spec assert_parameters(segmented_cache:name(), segmented_cache:opts()) -> segmented_cache:opts().
334+
assert_parameters(Name, Opts0) when is_map(Opts0) ->
323335
#{
324336
scope := Scope,
325337
strategy := Strategy,
@@ -328,6 +340,8 @@ assert_parameters(Opts0) when is_map(Opts0) ->
328340
ttl := TTL0,
329341
merger_fun := MergerFun
330342
} = Opts = maps:merge(defaults(), Opts0),
343+
TelemetryEventName = maps:get(prefix, Opts0, [segmented_cache, Name]),
344+
true = is_list(TelemetryEventName),
331345
TTL =
332346
case TTL0 of
333347
infinity -> infinity;
@@ -343,7 +357,7 @@ assert_parameters(Opts0) when is_map(Opts0) ->
343357
true = (Strategy =:= fifo) orelse (Strategy =:= lru),
344358
true = is_function(MergerFun, 2),
345359
true = (undefined =/= whereis(Scope)),
346-
Opts#{ttl := TTL}.
360+
Opts#{ttl := TTL, prefix => TelemetryEventName}.
347361

348362
is_pos_int_or_infinity(Value) ->
349363
(Value =:= infinity) orelse (is_integer(Value) andalso 0 < Value).

test/segmented_cache_SUITE.erl

Lines changed: 39 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
-include_lib("proper/include/proper.hrl").
2020

2121
-define(CACHE_NAME, test).
22+
-define(CACHE_NAME_B, [custom_test, lru]).
2223
-define(CMD_MODULE, segmented_cache_proper_commands).
2324

2425
all() ->
@@ -45,6 +46,7 @@ groups() ->
4546
put_entry_wait_and_check_false
4647
]},
4748
{lru, [sequence], [
49+
get_entry_with_custom_prefix,
4850
put_entry_and_verify_it_stays,
4951
put_entry_and_verify_it_stays_under_load,
5052
stateful_property
@@ -58,33 +60,39 @@ init_per_suite(Config) ->
5860
ct:pal("Online schedulers ~p~n", [erlang:system_info(schedulers_online)]),
5961
application:ensure_all_started(telemetry),
6062
cnt_pt_new(?CACHE_NAME),
61-
ok = telemetry:attach(
63+
cnt_pt_new(?CACHE_NAME_B),
64+
ok = telemetry:attach_many(
6265
<<"cache-request-handler">>,
63-
[segmented_cache, ?CACHE_NAME, request, stop],
66+
[
67+
[segmented_cache, ?CACHE_NAME, request, stop],
68+
?CACHE_NAME_B ++ [request, stop]
69+
],
6470
fun ?MODULE:handle_event/4,
6571
[]
6672
),
6773
pg:start(pg),
6874
Config.
6975

7076
end_per_suite(_Config) ->
71-
print_and_restart_counters(),
77+
print_and_restart_counters(?CACHE_NAME),
78+
print_and_restart_counters(?CACHE_NAME_B),
7279
ok.
7380

7481
%%%===================================================================
7582
%%% Group specific setup/teardown
7683
%%%===================================================================
7784
init_per_group(lru, Config) ->
78-
print_and_restart_counters(),
85+
print_and_restart_counters(?CACHE_NAME_B),
7986
Opts = #{
87+
prefix => ?CACHE_NAME_B,
8088
strategy => lru,
8189
segment_num => 2,
8290
ttl => {milliseconds, 100}
8391
},
8492
{ok, Cleaner} = segmented_cache:start(?CACHE_NAME, Opts),
8593
[{cleaner, Cleaner} | Config];
8694
init_per_group(short_fifo, Config) ->
87-
print_and_restart_counters(),
95+
print_and_restart_counters(?CACHE_NAME),
8896
Opts = #{
8997
strategy => fifo,
9098
segment_num => 2,
@@ -93,7 +101,7 @@ init_per_group(short_fifo, Config) ->
93101
{ok, Cleaner} = segmented_cache:start(?CACHE_NAME, Opts),
94102
[{cleaner, Cleaner} | Config];
95103
init_per_group(cache_limits, Config) ->
96-
print_and_restart_counters(),
104+
print_and_restart_counters(?CACHE_NAME),
97105
Opts = #{
98106
entries_limit => 1,
99107
strategy => fifo,
@@ -103,7 +111,7 @@ init_per_group(cache_limits, Config) ->
103111
{ok, Cleaner} = segmented_cache:start(?CACHE_NAME, Opts),
104112
[{cleaner, Cleaner} | Config];
105113
init_per_group(_Groupname, Config) ->
106-
print_and_restart_counters(),
114+
print_and_restart_counters(?CACHE_NAME),
107115
{ok, Cleaner} = segmented_cache:start(?CACHE_NAME),
108116
[{cleaner, Cleaner} | Config].
109117

@@ -118,7 +126,7 @@ init_per_testcase(TestCase, Config) when
118126
TestCase =:= put_entry_and_verify_it_stays;
119127
TestCase =:= put_entry_and_verify_it_stays_under_load
120128
->
121-
print_and_restart_counters(),
129+
print_and_restart_counters(?CACHE_NAME_B),
122130
Config;
123131
init_per_testcase(_TestCase, Config) ->
124132
Config.
@@ -226,6 +234,18 @@ put_entry_wait_and_check_false(_) ->
226234
),
227235
run_prop(?FUNCTION_NAME, Prop).
228236

237+
get_entry_with_custom_prefix(_) ->
238+
Prop = ?FORALL(
239+
{Key0, Value},
240+
{non_empty(binary()), union([char(), binary(), integer()])},
241+
begin
242+
Key = {Key0, make_ref()},
243+
segmented_cache:put_entry(?CACHE_NAME, Key, Value),
244+
Value =:= segmented_cache:get_entry(?CACHE_NAME, Key)
245+
end
246+
),
247+
run_prop(?FUNCTION_NAME, Prop).
248+
229249
put_entry_and_verify_it_stays(_) ->
230250
Prop = ?FORALL(
231251
{Key0, Value},
@@ -266,7 +286,7 @@ put_entry_and_verify_it_stays_under_load(_) ->
266286
end
267287
),
268288
run_prop(?FUNCTION_NAME, Prop, 10_000, 128),
269-
{Hits, Misses} = print_and_restart_counters(),
289+
{Hits, Misses} = print_and_restart_counters(?CACHE_NAME_B),
270290
ct:pal("Hits ~p; Misses ~p~n", [Hits, Misses]),
271291
Total = Hits + Misses,
272292
case Misses / Total < 0.001 of
@@ -341,6 +361,11 @@ handle_event([segmented_cache, CacheName, request, stop], _, #{hit := Hit}, _) -
341361
case Hit of
342362
true -> cnt_pt_incr_hits(CacheName);
343363
false -> cnt_pt_incr_misses(CacheName)
364+
end;
365+
handle_event([custom_test, lru, request, stop], _, #{hit := Hit}, _) ->
366+
case Hit of
367+
true -> cnt_pt_incr_hits(?CACHE_NAME_B);
368+
false -> cnt_pt_incr_misses(?CACHE_NAME_B)
344369
end.
345370

346371
cnt_pt_new(Counter) ->
@@ -358,9 +383,9 @@ cnt_pt_read_hits(Counter) ->
358383
cnt_pt_read_misses(Counter) ->
359384
counters:get(persistent_term:get({?MODULE, Counter}), 2).
360385

361-
print_and_restart_counters() ->
362-
Hits = cnt_pt_read_hits(?CACHE_NAME),
363-
Misses = cnt_pt_read_misses(?CACHE_NAME),
364-
counters:put(persistent_term:get({?MODULE, ?CACHE_NAME}), 1, 0),
365-
counters:put(persistent_term:get({?MODULE, ?CACHE_NAME}), 2, 0),
386+
print_and_restart_counters(CacheName) ->
387+
Hits = cnt_pt_read_hits(CacheName),
388+
Misses = cnt_pt_read_misses(CacheName),
389+
counters:put(persistent_term:get({?MODULE, CacheName}), 1, 0),
390+
counters:put(persistent_term:get({?MODULE, CacheName}), 2, 0),
366391
{Hits, Misses}.

0 commit comments

Comments
 (0)