Skip to content

Commit c3db261

Browse files
authored
Merge pull request #697 from tsloughter/observe-exemplars
Add a fresh context to each observable callback and test observe exemplars
2 parents 63d338a + d83598c commit c3db261

File tree

8 files changed

+199
-43
lines changed

8 files changed

+199
-43
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
-define(current_ctx, otel_ctx:get_current()).
2+
3+
-define(with_ctx(Ctx, Fun),
4+
otel_ctx:with_ctx(Ctx, Fun)).

apps/opentelemetry_api/src/otel_ctx.erl

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737

3838
attach/1,
3939
detach/1,
40+
with_ctx/2,
4041
get_current/0,
4142

4243
text_map_extractor/2,
@@ -159,12 +160,27 @@ attach(Ctx) ->
159160
erlang:put(?CURRENT_CTX, Ctx).
160161

161162
%% @doc Detaches the given context from the current process.
162-
-spec detach(token()) -> ok.
163+
-spec detach(token()) -> t() | undefined.
163164
detach(Token) ->
164165
%% at this time `Token' is a context
165166
update_logger_process_metadata(Token),
166167
erlang:put(?CURRENT_CTX, Token).
167168

169+
%% @doc Attaches a context and runs a function, detaching the context at the end.
170+
%%
171+
%% Returns the detached context.
172+
-spec with_ctx(t(), fun(() -> term())) -> {term(), t()}.
173+
with_ctx(Ctx, Fun) ->
174+
Token = otel_ctx:attach(Ctx),
175+
try
176+
Result = Fun(),
177+
{Result, otel_ctx:detach(Token)}
178+
catch
179+
C:T:S ->
180+
otel_ctx:detach(Token),
181+
erlang:raise(C, T, S)
182+
end.
183+
168184

169185
%% Extractor and Injector setup functions
170186

apps/opentelemetry_experimental/src/otel_aggregation.erl

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ default_mapping() ->
7171

7272
split(Keys, Map) ->
7373
lists:foldl(fun(Key, {KeptAcc, DroppedAcc}) ->
74-
{Value, DroppedAcc1} = maps:take(Key, DroppedAcc),
75-
{KeptAcc#{Key => Value}, DroppedAcc1}
74+
case maps:take(Key, DroppedAcc) of
75+
{Value, DroppedAcc1} ->
76+
{KeptAcc#{Key => Value}, DroppedAcc1};
77+
error ->
78+
{KeptAcc, DroppedAcc}
79+
end
7680
end, {#{}, Map}, Keys).

apps/opentelemetry_experimental/src/otel_meter_server.erl

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
-export([start_link/4,
3838
add_metric_reader/4,
3939
add_metric_reader/5,
40+
get_readers/0,
41+
get_readers/1,
4042
add_instrument/1,
4143
add_instrument/2,
4244
register_callback/3,
@@ -129,6 +131,12 @@ add_metric_reader(ReaderId, ReaderPid, DefaultAggregationMapping, Temporality) -
129131
add_metric_reader(Provider, ReaderId, ReaderPid, DefaultAggregationMapping, Temporality) ->
130132
gen_server:call(Provider, {add_metric_reader, ReaderId, ReaderPid, DefaultAggregationMapping, Temporality}).
131133

134+
get_readers() ->
135+
get_readers(?GLOBAL_METER_PROVIDER_REG_NAME).
136+
137+
get_readers(Provider) ->
138+
gen_server:call(Provider, get_readers).
139+
132140
-spec register_callback([otel_instrument:t()], otel_instrument:callback(), otel_instrument:callback_args()) -> boolean().
133141
register_callback(Instruments, Callback, CallbackArgs) ->
134142
register_callback(?GLOBAL_METER_PROVIDER_REG_NAME, Instruments, Callback, CallbackArgs).
@@ -208,6 +216,9 @@ init_producers(ProducerConfigs) ->
208216
end
209217
end, ProducerConfigs).
210218

219+
handle_call(get_readers, _From, State=#state{readers=Readers}) ->
220+
221+
{reply, Readers, State};
211222
handle_call({add_metric_reader, ReaderId, ReaderPid, DefaultAggregationMapping, Temporality},
212223
_From, State=#state{readers=Readers,
213224
views=Views,

apps/opentelemetry_experimental/src/otel_metric_exemplar_reservoir_aligned_histogram.erl

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,12 @@ offer(Ctx, ExemplarsTab, Key, Value, FilteredAttributes, #state{bucket_boundarie
5858
%% @doc Return all exemplars for a `Key' and then delete them.
5959
-spec collect(ets:table(), term(), #state{}) -> [otel_metric_exemplar:exemplar()].
6060
collect(ExemplarsTab, Key, _State) ->
61-
Exemplars = [E || [E] <- ets:match(ExemplarsTab, {{Key, '_'}, '$1'})],
61+
Exemplars = ets:select(ExemplarsTab, [{{{'$1', '_'}, '$2'},
62+
[{'==', '$1', {const, Key}}],
63+
['$2']}]),
6264

63-
_ = ets:match_delete(ExemplarsTab, {{Key, '_'}, '_'}),
65+
_ = ets:select_delete(ExemplarsTab, [{{'$1', '_'}, [{'==', '$1', {const, Key}}], [true]},
66+
{{{'$1', '_'}, '_'}, [{'==', '$1', {const, Key}}], [true]}]),
6467

6568
Exemplars.
6669

apps/opentelemetry_experimental/src/otel_metric_exemplar_reservoir_simple.erl

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,12 @@ offer(Ctx, ExemplarsTab, Key, Value, FilteredAttributes, #state{num_buckets=NumB
6767
%% @doc Return all exemplars for a `Key' and then delete them.
6868
-spec collect(ets:table(), term(), #state{}) -> [otel_metric_exemplar:exemplar()].
6969
collect(ExemplarsTab, Key, _State) ->
70-
Exemplars = [E || [E] <- ets:match(ExemplarsTab, {{Key, '_'}, '$1'})],
70+
Exemplars = ets:select(ExemplarsTab, [{{{'$1', '_'}, '$2'},
71+
[{'==', '$1', {const, Key}}],
72+
['$2']}]),
7173

72-
_ = ets:select_delete(ExemplarsTab, [{{Key, '_'}, [], [true]},
73-
{{{Key, '_'}, '_'}, [], [true]}]),
74+
_ = ets:select_delete(ExemplarsTab, [{{'$1', '_'}, [{'==', '$1', {const, Key}}], [true]},
75+
{{{'$1', '_'}, '_'}, [{'==', '$1', {const, Key}}], [true]}]),
7476

7577
Exemplars.
7678

apps/opentelemetry_experimental/src/otel_observables.erl

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
-include_lib("kernel/include/logger.hrl").
2323
-include_lib("opentelemetry_api_experimental/include/otel_metrics.hrl").
2424
-include("otel_view.hrl").
25+
-include_lib("opentelemetry_api/include/otel_ctx.hrl").
2526

2627
-type callbacks() :: [{otel_instrument:callback(), otel_instrument:callback_args(), otel_instrument:t()}].
2728

@@ -30,8 +31,11 @@
3031
run_callbacks(Callbacks, ReaderId, StreamTab, MetricsTab, ExemplarsTab) ->
3132
lists:foreach(fun({Callback, CallbackArgs, Instruments})
3233
when is_list(Instruments) ->
33-
Ctx = otel_ctx:new(),
34-
Results = Callback(CallbackArgs),
34+
Ctx0 = otel_ctx:new(),
35+
{Results, Ctx} = ?with_ctx(Ctx0, fun() ->
36+
Results0 = Callback(CallbackArgs),
37+
Results0
38+
end),
3539
handle_instruments_observations(Ctx,
3640
Results,
3741
Instruments,
@@ -40,8 +44,11 @@ run_callbacks(Callbacks, ReaderId, StreamTab, MetricsTab, ExemplarsTab) ->
4044
ExemplarsTab,
4145
ReaderId);
4246
({Callback, CallbackArgs, Instrument}) ->
43-
Ctx = otel_ctx:new(),
44-
Results = Callback(CallbackArgs),
47+
Ctx0 = otel_ctx:new(),
48+
{Results, Ctx} = ?with_ctx(Ctx0, fun() ->
49+
Results0 = Callback(CallbackArgs),
50+
Results0
51+
end),
4552
%% when not a list of instruments it isn't expecting named observation
4653
%% results so we use handle_instrument instead of handle_instruments
4754
%% but we can't type that correctly so have to use a `fixme'

0 commit comments

Comments
 (0)