Skip to content

Commit cf9f2d3

Browse files
author
Tristan Sloughter
authored
Merge pull request #560 from tsloughter/delta-observables
metrics: support delta/cumulative conversions in sum aggregator
2 parents 1f02419 + f6944c3 commit cf9f2d3

File tree

11 files changed

+152
-64
lines changed

11 files changed

+152
-64
lines changed

apps/opentelemetry_api_experimental/include/otel_metrics.hrl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,14 @@
44
description :: otel_instrument:description(),
55
kind :: otel_instrument:kind(),
66
unit :: otel_instrument:unit() | undefined,
7+
temporality :: otel_instrument:temporality(),
78
callback :: otel_instrument:callback() | undefined,
89
callback_args :: term() | undefined}).
910

11+
-define(TEMPORALITY_DELTA, temporality_delta).
12+
-define(TEMPORALITY_CUMULATIVE, temporality_cumulative).
13+
-define(TEMPORALITY_UNSPECIFIED, temporality_unspecified).
14+
1015
-define(KIND_COUNTER, counter).
1116
-define(KIND_OBSERVABLE_COUNTER, observable_counter).
1217
-define(KIND_HISTOGRAM, histogram).

apps/opentelemetry_api_experimental/src/otel_instrument.erl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,18 @@
3434
-type callback() :: fun((callback_args()) -> observation() |
3535
[named_observation()]).
3636

37+
-type temporality() :: ?TEMPORALITY_UNSPECIFIED |
38+
?TEMPORALITY_DELTA |
39+
?TEMPORALITY_CUMULATIVE.
40+
3741
-type t() :: #instrument{}.
3842

3943
-export_type([t/0,
4044
name/0,
4145
description/0,
4246
kind/0,
4347
unit/0,
48+
temporality/0,
4449
callback/0,
4550
callback_args/0]).
4651

@@ -50,6 +55,7 @@ new(Module, Meter, Kind, Name, Description, Unit) ->
5055
meter = Meter,
5156
name = Name,
5257
description = Description,
58+
temporality = ?TEMPORALITY_DELTA,
5359
kind = Kind,
5460
unit = Unit}.
5561

@@ -61,6 +67,7 @@ new(Module, Meter, Kind, Name, Description, Unit, Callback, CallbackArgs) ->
6167
description = Description,
6268
kind = Kind,
6369
unit = Unit,
70+
temporality = ?TEMPORALITY_CUMULATIVE,
6471
callback = Callback,
6572
callback_args = CallbackArgs}.
6673

apps/opentelemetry_experimental/include/otel_metrics.hrl

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
-define(DEFAULT_METER_PROVIDER, otel_meter_provider_default).
22

3-
-define(AGGREGATION_TEMPORALITY_DELTA, aggregation_temporality_delta).
4-
-define(AGGREGATION_TEMPORALITY_CUMULATIVE, aggregation_temporality_cumulative).
5-
-define(AGGREGATION_TEMPORALITY_UNSPECIFIED, aggregation_temporality_unspecified).
6-
73
-record(meter,
84
{
95
module :: module() | '_',
@@ -30,6 +26,7 @@
3026
start_time_unix_nano :: integer() | '_' | '$1' | {const, integer()},
3127
last_start_time_unix_nano :: integer() | undefined | '$5',
3228
checkpoint :: number() | undefined | '_' | '$2' | '$3',
29+
previous_checkpoint :: number() | undefined | '_' | '$5',
3330
int_value :: number() | undefined | '$3' | {'+', '$3', {const, number()}},
3431
float_value :: number() | undefined | '$4' | {'+', '$4', {const, number()}}
3532
}).
@@ -83,7 +80,7 @@
8380
-record(sum,
8481
{
8582
datapoints :: [#datapoint{}],
86-
aggregation_temporality :: otel_aggregation:temporality(),
83+
aggregation_temporality :: otel_instrument:temporality(),
8784
is_monotonic :: boolean()
8885
}).
8986

@@ -110,7 +107,7 @@
110107
-record(histogram,
111108
{
112109
datapoints :: [#histogram_datapoint{}],
113-
aggregation_temporality :: otel_aggregation:temporality()
110+
aggregation_temporality :: otel_instrument:temporality()
114111
}).
115112

116113
-record(metric,

apps/opentelemetry_experimental/src/otel_aggregation.erl

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,6 @@
99
-include("otel_metrics.hrl").
1010
-include("otel_view.hrl").
1111

12-
-type temporality() :: ?AGGREGATION_TEMPORALITY_UNSPECIFIED |
13-
?AGGREGATION_TEMPORALITY_DELTA |
14-
?AGGREGATION_TEMPORALITY_CUMULATIVE.
15-
1612
%% -type t() :: drop | sum | last_value | histogram.
1713
-type t() :: otel_aggregation_drop:t() | otel_aggregation_sum:t() |
1814
otel_aggregation_last_value:t() | otel_aggregation_histogram_explicit:t().
@@ -23,8 +19,7 @@
2319

2420
-export_type([t/0,
2521
key/0,
26-
options/0,
27-
temporality/0]).
22+
options/0]).
2823

2924
%% Returns the aggregation's record as it is seen and updated by
3025
%% the aggregation module in the metrics table.
@@ -79,22 +74,22 @@ default_mapping() ->
7974
?KIND_OBSERVABLE_UPDOWNCOUNTER => otel_aggregation_sum}.
8075

8176
temporality_mapping() ->
82-
#{?KIND_COUNTER =>?AGGREGATION_TEMPORALITY_DELTA,
83-
?KIND_OBSERVABLE_COUNTER => ?AGGREGATION_TEMPORALITY_CUMULATIVE,
84-
?KIND_UPDOWN_COUNTER => ?AGGREGATION_TEMPORALITY_DELTA,
85-
?KIND_OBSERVABLE_UPDOWNCOUNTER => ?AGGREGATION_TEMPORALITY_CUMULATIVE,
86-
?KIND_HISTOGRAM => ?AGGREGATION_TEMPORALITY_UNSPECIFIED,
87-
?KIND_OBSERVABLE_GAUGE => ?AGGREGATION_TEMPORALITY_UNSPECIFIED}.
77+
#{?KIND_COUNTER =>?TEMPORALITY_DELTA,
78+
?KIND_OBSERVABLE_COUNTER => ?TEMPORALITY_CUMULATIVE,
79+
?KIND_UPDOWN_COUNTER => ?TEMPORALITY_DELTA,
80+
?KIND_OBSERVABLE_UPDOWNCOUNTER => ?TEMPORALITY_CUMULATIVE,
81+
?KIND_HISTOGRAM => ?TEMPORALITY_UNSPECIFIED,
82+
?KIND_OBSERVABLE_GAUGE => ?TEMPORALITY_UNSPECIFIED}.
8883

8984
instrument_temporality(#instrument{kind=?KIND_COUNTER}) ->
90-
?AGGREGATION_TEMPORALITY_DELTA;
85+
?TEMPORALITY_DELTA;
9186
instrument_temporality(#instrument{kind=?KIND_OBSERVABLE_COUNTER}) ->
92-
?AGGREGATION_TEMPORALITY_CUMULATIVE;
87+
?TEMPORALITY_CUMULATIVE;
9388
instrument_temporality(#instrument{kind=?KIND_UPDOWN_COUNTER}) ->
94-
?AGGREGATION_TEMPORALITY_DELTA;
89+
?TEMPORALITY_DELTA;
9590
instrument_temporality(#instrument{kind=?KIND_OBSERVABLE_UPDOWNCOUNTER}) ->
96-
?AGGREGATION_TEMPORALITY_CUMULATIVE;
91+
?TEMPORALITY_CUMULATIVE;
9792
instrument_temporality(#instrument{kind=?KIND_HISTOGRAM}) ->
98-
?AGGREGATION_TEMPORALITY_UNSPECIFIED;
93+
?TEMPORALITY_UNSPECIFIED;
9994
instrument_temporality(#instrument{kind=?KIND_OBSERVABLE_GAUGE}) ->
100-
?AGGREGATION_TEMPORALITY_UNSPECIFIED.
95+
?TEMPORALITY_UNSPECIFIED.

apps/opentelemetry_experimental/src/otel_aggregation_histogram_explicit.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ aggregate(Table, #view_aggregation{name=Name,
133133
-dialyzer({nowarn_function, checkpoint/3}).
134134
checkpoint(Tab, #view_aggregation{name=Name,
135135
reader=ReaderId,
136-
temporality=?AGGREGATION_TEMPORALITY_DELTA}, CollectionStartNano) ->
136+
temporality=?TEMPORALITY_DELTA}, CollectionStartNano) ->
137137
MS = [{#explicit_histogram_aggregation{key='$1',
138138
start_time_unix_nano='$9',
139139
boundaries='$2',

apps/opentelemetry_experimental/src/otel_aggregation_last_value.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
checkpoint/3,
2525
collect/3]).
2626

27+
-include_lib("opentelemetry_api_experimental/include/otel_metrics.hrl").
2728
-include("otel_metrics.hrl").
2829

2930
-type t() :: #last_value_aggregation{}.
@@ -55,7 +56,7 @@ aggregate(Tab, ViewAggregation=#view_aggregation{name=Name,
5556
-dialyzer({nowarn_function, checkpoint/3}).
5657
checkpoint(Tab, #view_aggregation{name=Name,
5758
reader=ReaderId,
58-
temporality=?AGGREGATION_TEMPORALITY_DELTA}, CollectionStartNano) ->
59+
temporality=?TEMPORALITY_DELTA}, CollectionStartNano) ->
5960
MS = [{#last_value_aggregation{key='$1',
6061
start_time_unix_nano='$3',
6162
last_start_time_unix_nano='_',

apps/opentelemetry_experimental/src/otel_aggregation_sum.erl

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ init(#view_aggregation{name=Name,
3737
Key = {Name, Attributes, ReaderId},
3838
#sum_aggregation{key=Key,
3939
start_time_unix_nano=erlang:system_time(nanosecond),
40+
checkpoint=0, %% 0 value is never reported but gets copied to previous_checkpoint
41+
%% which is used to add/subtract for conversion of temporality
4042
int_value=0,
4143
float_value=0.0}.
4244

@@ -87,11 +89,12 @@ aggregate(_Tab, #view_aggregation{name=_Name,
8789
-dialyzer({nowarn_function, checkpoint/3}).
8890
checkpoint(Tab, #view_aggregation{name=Name,
8991
reader=ReaderPid,
90-
temporality=?AGGREGATION_TEMPORALITY_DELTA}, CollectionStartNano) ->
92+
temporality=?TEMPORALITY_DELTA}, CollectionStartNano) ->
9193
MS = [{#sum_aggregation{key='$1',
9294
start_time_unix_nano='$4',
9395
last_start_time_unix_nano='_',
94-
checkpoint='_',
96+
checkpoint='$5',
97+
previous_checkpoint='_',
9598
int_value='$2',
9699
float_value='$3'},
97100
[{'=:=', {element, 1, '$1'}, {const, Name}},
@@ -101,12 +104,14 @@ checkpoint(Tab, #view_aggregation{name=Name,
101104
start_time_unix_nano={const, CollectionStartNano},
102105
last_start_time_unix_nano='$4',
103106
checkpoint='$2',
107+
previous_checkpoint='$5',
104108
int_value=0,
105109
float_value=0.0}}]},
106110
{#sum_aggregation{key='$1',
107111
start_time_unix_nano='$4',
108112
last_start_time_unix_nano='_',
109-
checkpoint='_',
113+
checkpoint='$5',
114+
previous_checkpoint='_',
110115
int_value='$2',
111116
float_value='$3'},
112117
[{'=:=', {element, 1, '$1'}, {const, Name}},
@@ -115,17 +120,19 @@ checkpoint(Tab, #view_aggregation{name=Name,
115120
start_time_unix_nano={const, CollectionStartNano},
116121
last_start_time_unix_nano='$4',
117122
checkpoint={'+', '$2', '$3'},
123+
previous_checkpoint='$5',
118124
int_value=0,
119125
float_value=0.0}}]}],
120126
_ = ets:select_replace(Tab, MS),
121127
ok;
122128
checkpoint(Tab, #view_aggregation{name=Name,
123129
reader=ReaderPid,
124-
temporality=?AGGREGATION_TEMPORALITY_CUMULATIVE}, _CollectionStartNano) ->
130+
temporality=?TEMPORALITY_CUMULATIVE}, _CollectionStartNano) ->
125131
MS = [{#sum_aggregation{key='$1',
126132
start_time_unix_nano='$2',
127133
last_start_time_unix_nano='_',
128-
checkpoint='_',
134+
checkpoint='$5',
135+
previous_checkpoint='_',
129136
int_value='$3',
130137
float_value='$4'},
131138
[{'=:=', {element, 1, '$1'}, {const, Name}},
@@ -135,12 +142,14 @@ checkpoint(Tab, #view_aggregation{name=Name,
135142
start_time_unix_nano='$2',
136143
last_start_time_unix_nano='$2',
137144
checkpoint='$3',
138-
int_value='$3',
139-
float_value='$4'}}]},
145+
previous_checkpoint='$5',
146+
int_value=0,
147+
float_value=0.0}}]},
140148
{#sum_aggregation{key='$1',
141149
start_time_unix_nano='$2',
142150
last_start_time_unix_nano='_',
143-
checkpoint='_',
151+
checkpoint='$5',
152+
previous_checkpoint='_',
144153
int_value='$3',
145154
float_value='$4'},
146155
[{'=:=', {element, 1, '$1'}, {const, Name}},
@@ -149,13 +158,15 @@ checkpoint(Tab, #view_aggregation{name=Name,
149158
start_time_unix_nano='$2',
150159
last_start_time_unix_nano='$2',
151160
checkpoint={'+', '$3', '$4'},
152-
int_value='$3',
153-
float_value='$4'}}]}],
161+
previous_checkpoint='$5',
162+
int_value=0,
163+
float_value=0.0}}]}],
154164
_ = ets:select_replace(Tab, MS),
155165
ok.
156166

157167
collect(Tab, #view_aggregation{name=Name,
158168
reader=ReaderId,
169+
instrument=#instrument{temporality=InstrumentTemporality},
159170
temporality=Temporality,
160171
is_monotonic=IsMonotonic}, CollectionStartTime) ->
161172
Select = [{'$1',
@@ -165,16 +176,40 @@ collect(Tab, #view_aggregation{name=Name,
165176
AttributesAggregation = ets:select(Tab, Select),
166177
#sum{aggregation_temporality=Temporality,
167178
is_monotonic=IsMonotonic,
168-
datapoints=[datapoint(CollectionStartTime, SumAgg) || SumAgg <- AttributesAggregation]}.
179+
datapoints=[datapoint(CollectionStartTime, InstrumentTemporality, Temporality, SumAgg) || SumAgg <- AttributesAggregation]}.
169180

170-
datapoint(CollectionStartNano, #sum_aggregation{key={_, Attributes, _},
171-
last_start_time_unix_nano=StartTimeUnixNano,
172-
checkpoint=Value}) ->
181+
datapoint(CollectionStartNano, Temporality, Temporality, #sum_aggregation{key={_, Attributes, _},
182+
last_start_time_unix_nano=StartTimeUnixNano,
183+
checkpoint=Value}) ->
173184
#datapoint{
174185
attributes=Attributes,
175186
start_time_unix_nano=StartTimeUnixNano,
176187
time_unix_nano=CollectionStartNano,
177188
value=Value,
178189
exemplars=[],
179190
flags=0
191+
};
192+
datapoint(CollectionStartNano, _, ?TEMPORALITY_CUMULATIVE, #sum_aggregation{key={_, Attributes, _},
193+
last_start_time_unix_nano=StartTimeUnixNano,
194+
previous_checkpoint=PreviousCheckpoint,
195+
checkpoint=Value}) ->
196+
#datapoint{
197+
attributes=Attributes,
198+
start_time_unix_nano=StartTimeUnixNano,
199+
time_unix_nano=CollectionStartNano,
200+
value=Value + PreviousCheckpoint,
201+
exemplars=[],
202+
flags=0
203+
};
204+
datapoint(CollectionStartNano, _, ?TEMPORALITY_DELTA, #sum_aggregation{key={_, Attributes, _},
205+
last_start_time_unix_nano=StartTimeUnixNano,
206+
previous_checkpoint=PreviousCheckpoint,
207+
checkpoint=Value}) ->
208+
#datapoint{
209+
attributes=Attributes,
210+
start_time_unix_nano=StartTimeUnixNano,
211+
time_unix_nano=CollectionStartNano,
212+
value=Value - PreviousCheckpoint,
213+
exemplars=[],
214+
flags=0
180215
}.

apps/opentelemetry_experimental/src/otel_meter_server.erl

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -220,10 +220,8 @@ handle_call({add_instrument, Instrument}, _From, State=#state{readers=Readers,
220220
_ = add_instrument_(InstrumentsTab, CallbacksTab, ViewAggregationsTab, Instrument, Views, Readers),
221221
{reply, ok, State};
222222
handle_call({register_callback, Instruments, Callback, CallbackArgs}, _From, State=#state{readers=Readers,
223-
views=Views,
224-
callbacks_tab=CallbacksTab,
225-
view_aggregations_tab=ViewAggregationsTab}) ->
226-
_ = register_callback_(CallbacksTab, ViewAggregationsTab, Instruments, Callback, CallbackArgs, Views, Readers),
223+
callbacks_tab=CallbacksTab}) ->
224+
_ = register_callback_(CallbacksTab, Instruments, Callback, CallbackArgs, Readers),
227225
{reply, ok, State};
228226
handle_call({get_meter, Name, Vsn, SchemaUrl}, _From, State=#state{shared_meter=Meter}) ->
229227
Scope = opentelemetry:instrumentation_scope(Name, Vsn, SchemaUrl),
@@ -344,15 +342,10 @@ update_view_aggregations_(Instrument=#instrument{meter=Meter,
344342
end, Readers).
345343

346344
%% Match the Instrument to views and then store a per-Reader aggregation for the View
347-
register_callback_(CallbacksTab, ViewAggregationsTab, Instruments, Callback, CallbackArgs, Views, Readers) ->
348-
lists:foreach(fun(Instrument) ->
349-
ViewMatches = otel_view:match_instrument_to_views(Instrument, Views),
350-
lists:map(fun(Reader=#reader{id=ReaderId}) ->
351-
Matches = per_reader_aggregations(Reader, Instrument, ViewMatches),
352-
[true = ets:insert(ViewAggregationsTab, {Instrument, M}) || M <- Matches],
353-
ets:insert(CallbacksTab, {ReaderId, {Callback, CallbackArgs, Instruments}})
354-
end, Readers)
355-
end, Instruments).
345+
register_callback_(CallbacksTab, Instruments, Callback, CallbackArgs, Readers) ->
346+
lists:map(fun(#reader{id=ReaderId}) ->
347+
ets:insert(CallbacksTab, {ReaderId, {Callback, CallbackArgs, Instruments}})
348+
end, Readers).
356349

357350
metric_reader(ReaderId, ReaderPid, DefaultAggregationMapping, Temporality) ->
358351
%% TODO: Uncomment when we can drop OTP-23 support
@@ -407,7 +400,7 @@ view_aggregation_for_reader(Instrument=#instrument{kind=Kind}, ViewAggregation,
407400
Reader=#reader{id=Id,
408401
default_temporality_mapping=ReaderTemporalityMapping}) ->
409402
AggregationModule = aggregation_module(Instrument, View, Reader),
410-
Temporality = maps:get(Kind, ReaderTemporalityMapping, ?AGGREGATION_TEMPORALITY_UNSPECIFIED),
403+
Temporality = maps:get(Kind, ReaderTemporalityMapping, ?TEMPORALITY_UNSPECIFIED),
411404

412405
ViewAggregation#view_aggregation{
413406
reader=Id,
@@ -419,7 +412,7 @@ view_aggregation_for_reader(Instrument=#instrument{kind=Kind}, ViewAggregation,
419412
Reader=#reader{id=Id,
420413
default_temporality_mapping=ReaderTemporalityMapping}) ->
421414
AggregationModule = aggregation_module(Instrument, View, Reader),
422-
Temporality = maps:get(Kind, ReaderTemporalityMapping, ?AGGREGATION_TEMPORALITY_UNSPECIFIED),
415+
Temporality = maps:get(Kind, ReaderTemporalityMapping, ?TEMPORALITY_UNSPECIFIED),
423416

424417
ViewAggregation#view_aggregation{
425418
reader=Id,

apps/opentelemetry_experimental/src/otel_view.hrl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
-record(view_aggregation,
1010
{%% name of the view or instrument if the view has no name
11-
name :: atom(),
11+
name :: atom(),
1212
scope :: opentelemetry:instrumentation_scope(),
1313
instrument :: otel_instrument:t(),
1414
reader :: reference() | undefined,
@@ -18,7 +18,7 @@
1818
aggregation_module :: module(),
1919
aggregation_options :: map(),
2020

21-
temporality :: otel_aggregation:temporality(),
21+
temporality :: otel_instrument:temporality(),
2222
is_monotonic :: boolean(),
2323

2424
%% description from the view or the instrument if the view has no name

0 commit comments

Comments
 (0)