Skip to content

Commit 6eca6d4

Browse files
authored
feat(sdk-metrics-base): detect resets on async metrics (#2990)
1 parent 4378303 commit 6eca6d4

31 files changed

+680
-283
lines changed

experimental/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ All notable changes to experimental packages in this project will be documented
6262
* fix(sdk-metrics-base): misbehaving aggregation temporality selector tolerance #2958 @legendecas
6363
* feat(trace-otlp-grpc): configure security with env vars #2827 @svetlanabrennan
6464
* feat(sdk-metrics-base): async instruments callback timeout #2742 @legendecas
65+
* feat(sdk-metrics-base): detect resets on async metrics #2990 @legendecas
66+
* Added monotonicity support in SumAggregator.
67+
* Added reset and gaps detection for async metric instruments.
68+
* Fixed the start time and end time of an exported metric with regarding to resets and gaps.
6569

6670
### :bug: (Bug Fix)
6771

experimental/packages/opentelemetry-exporter-metrics-otlp-http/test/node/CollectorMetricExporter.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -306,8 +306,8 @@ describe('OTLPMetricExporter - node with json over http', () => {
306306
assert.ok(typeof metric3 !== 'undefined', "histogram doesn't exist");
307307
ensureHistogramIsCorrect(
308308
metric3,
309-
core.hrTimeToNanoseconds(metrics.scopeMetrics[0].metrics[1].dataPoints[0].endTime),
310-
core.hrTimeToNanoseconds(metrics.scopeMetrics[0].metrics[1].dataPoints[0].startTime),
309+
core.hrTimeToNanoseconds(metrics.scopeMetrics[0].metrics[2].dataPoints[0].endTime),
310+
core.hrTimeToNanoseconds(metrics.scopeMetrics[0].metrics[2].dataPoints[0].startTime),
311311
[0, 100],
312312
[0, 2, 0]
313313
);

experimental/packages/opentelemetry-sdk-metrics-base/src/Instruments.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import * as api from '@opentelemetry/api';
1818
import * as metrics from '@opentelemetry/api-metrics';
1919
import { ObservableCallback } from '@opentelemetry/api-metrics';
20+
import { hrTime } from '@opentelemetry/core';
2021
import { InstrumentDescriptor } from './InstrumentDescriptor';
2122
import { ObservableRegistry } from './state/ObservableRegistry';
2223
import { AsyncWritableMetricStorage, WritableMetricStorage } from './state/WritableMetricStorage';
@@ -31,7 +32,7 @@ export class SyncInstrument {
3132
);
3233
value = Math.trunc(value);
3334
}
34-
this._writableMetricStorage.record(value, attributes, context);
35+
this._writableMetricStorage.record(value, attributes, context, hrTime());
3536
}
3637
}
3738

experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/Drop.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ export class DropAggregator implements Aggregator<undefined> {
4545
_descriptor: InstrumentDescriptor,
4646
_aggregationTemporality: AggregationTemporality,
4747
_accumulationByAttributes: AccumulationRecord<undefined>[],
48-
_startTime: HrTime,
4948
_endTime: HrTime): Maybe<MetricData> {
5049
return undefined;
5150
}

experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/Histogram.ts

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ function createNewEmptyCheckpoint(boundaries: number[]): Histogram {
4545

4646
export class HistogramAccumulation implements Accumulation {
4747
constructor(
48+
public startTime: HrTime,
4849
private readonly _boundaries: number[],
4950
private _recordMinMax = true,
5051
private _current: Histogram = createNewEmptyCheckpoint(_boundaries)
@@ -70,6 +71,10 @@ export class HistogramAccumulation implements Accumulation {
7071
this._current.buckets.counts[this._boundaries.length] += 1;
7172
}
7273

74+
setStartTime(startTime: HrTime): void {
75+
this.startTime = startTime;
76+
}
77+
7378
toPointValue(): Histogram {
7479
return this._current;
7580
}
@@ -88,8 +93,8 @@ export class HistogramAggregator implements Aggregator<HistogramAccumulation> {
8893
*/
8994
constructor(private readonly _boundaries: number[], private readonly _recordMinMax: boolean) {}
9095

91-
createAccumulation() {
92-
return new HistogramAccumulation(this._boundaries, this._recordMinMax);
96+
createAccumulation(startTime: HrTime) {
97+
return new HistogramAccumulation(startTime, this._boundaries, this._recordMinMax);
9398
}
9499

95100
/**
@@ -125,7 +130,7 @@ export class HistogramAggregator implements Aggregator<HistogramAccumulation> {
125130
}
126131
}
127132

128-
return new HistogramAccumulation(previousValue.buckets.boundaries, this._recordMinMax, {
133+
return new HistogramAccumulation(previous.startTime, previousValue.buckets.boundaries, this._recordMinMax, {
129134
buckets: {
130135
boundaries: previousValue.buckets.boundaries,
131136
counts: mergedCounts,
@@ -153,7 +158,7 @@ export class HistogramAggregator implements Aggregator<HistogramAccumulation> {
153158
diffedCounts[idx] = currentCounts[idx] - previousCounts[idx];
154159
}
155160

156-
return new HistogramAccumulation(previousValue.buckets.boundaries, this._recordMinMax, {
161+
return new HistogramAccumulation(current.startTime, previousValue.buckets.boundaries, this._recordMinMax, {
157162
buckets: {
158163
boundaries: previousValue.buckets.boundaries,
159164
counts: diffedCounts,
@@ -170,7 +175,6 @@ export class HistogramAggregator implements Aggregator<HistogramAccumulation> {
170175
descriptor: InstrumentDescriptor,
171176
aggregationTemporality: AggregationTemporality,
172177
accumulationByAttributes: AccumulationRecord<HistogramAccumulation>[],
173-
startTime: HrTime,
174178
endTime: HrTime): Maybe<HistogramMetricData> {
175179
return {
176180
descriptor,
@@ -179,7 +183,7 @@ export class HistogramAggregator implements Aggregator<HistogramAccumulation> {
179183
dataPoints: accumulationByAttributes.map(([attributes, accumulation]) => {
180184
return {
181185
attributes,
182-
startTime,
186+
startTime: accumulation.startTime,
183187
endTime,
184188
value: accumulation.toPointValue(),
185189
};

experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/LastValue.ts

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,17 @@ import { Maybe } from '../utils';
2323
import { AggregationTemporality } from '../export/AggregationTemporality';
2424

2525
export class LastValueAccumulation implements Accumulation {
26-
constructor(private _current: number = 0, public sampleTime: HrTime = [0, 0]) {}
26+
constructor(public startTime: HrTime, private _current: number = 0, public sampleTime: HrTime = [0, 0]) {}
2727

2828
record(value: number): void {
2929
this._current = value;
3030
this.sampleTime = hrTime();
3131
}
3232

33+
setStartTime(startTime: HrTime): void {
34+
this.startTime = startTime;
35+
}
36+
3337
toPointValue(): LastValue {
3438
return this._current;
3539
}
@@ -39,8 +43,8 @@ export class LastValueAccumulation implements Accumulation {
3943
export class LastValueAggregator implements Aggregator<LastValueAccumulation> {
4044
public kind: AggregatorKind.LAST_VALUE = AggregatorKind.LAST_VALUE;
4145

42-
createAccumulation() {
43-
return new LastValueAccumulation();
46+
createAccumulation(startTime: HrTime) {
47+
return new LastValueAccumulation(startTime);
4448
}
4549

4650
/**
@@ -51,7 +55,7 @@ export class LastValueAggregator implements Aggregator<LastValueAccumulation> {
5155
merge(previous: LastValueAccumulation, delta: LastValueAccumulation): LastValueAccumulation {
5256
// nanoseconds may lose precisions.
5357
const latestAccumulation = hrTimeToMicroseconds(delta.sampleTime) >= hrTimeToMicroseconds(previous.sampleTime) ? delta : previous;
54-
return new LastValueAccumulation(latestAccumulation.toPointValue(), latestAccumulation.sampleTime);
58+
return new LastValueAccumulation(previous.startTime, latestAccumulation.toPointValue(), latestAccumulation.sampleTime);
5559
}
5660

5761
/**
@@ -63,14 +67,13 @@ export class LastValueAggregator implements Aggregator<LastValueAccumulation> {
6367
diff(previous: LastValueAccumulation, current: LastValueAccumulation): LastValueAccumulation {
6468
// nanoseconds may lose precisions.
6569
const latestAccumulation = hrTimeToMicroseconds(current.sampleTime) >= hrTimeToMicroseconds(previous.sampleTime) ? current : previous;
66-
return new LastValueAccumulation(latestAccumulation.toPointValue(), latestAccumulation.sampleTime);
70+
return new LastValueAccumulation(current.startTime, latestAccumulation.toPointValue(), latestAccumulation.sampleTime);
6771
}
6872

6973
toMetricData(
7074
descriptor: InstrumentDescriptor,
7175
aggregationTemporality: AggregationTemporality,
7276
accumulationByAttributes: AccumulationRecord<LastValueAccumulation>[],
73-
startTime: HrTime,
7477
endTime: HrTime): Maybe<SingularMetricData> {
7578
return {
7679
descriptor,
@@ -79,7 +82,7 @@ export class LastValueAggregator implements Aggregator<LastValueAccumulation> {
7982
dataPoints: accumulationByAttributes.map(([attributes, accumulation]) => {
8083
return {
8184
attributes,
82-
startTime,
85+
startTime: accumulation.startTime,
8386
endTime,
8487
value: accumulation.toPointValue(),
8588
};

experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/Sum.ts

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,19 @@ import { Maybe } from '../utils';
2222
import { AggregationTemporality } from '../export/AggregationTemporality';
2323

2424
export class SumAccumulation implements Accumulation {
25-
constructor(private _current: number = 0) {}
25+
constructor(public startTime: HrTime, public monotonic: boolean, private _current: number = 0, public reset = false) {}
2626

2727
record(value: number): void {
28+
if (this.monotonic && value < 0) {
29+
return;
30+
}
2831
this._current += value;
2932
}
3033

34+
setStartTime(startTime: HrTime): void {
35+
this.startTime = startTime;
36+
}
37+
3138
toPointValue(): Sum {
3239
return this._current;
3340
}
@@ -37,29 +44,45 @@ export class SumAccumulation implements Accumulation {
3744
export class SumAggregator implements Aggregator<SumAccumulation> {
3845
public kind: AggregatorKind.SUM = AggregatorKind.SUM;
3946

40-
createAccumulation() {
41-
return new SumAccumulation();
47+
constructor (public monotonic: boolean) {}
48+
49+
createAccumulation(startTime: HrTime) {
50+
return new SumAccumulation(startTime, this.monotonic);
4251
}
4352

4453
/**
4554
* Returns the result of the merge of the given accumulations.
4655
*/
4756
merge(previous: SumAccumulation, delta: SumAccumulation): SumAccumulation {
48-
return new SumAccumulation(previous.toPointValue() + delta.toPointValue());
57+
const prevPv = previous.toPointValue();
58+
const deltaPv = delta.toPointValue();
59+
if (delta.reset) {
60+
return new SumAccumulation(delta.startTime, this.monotonic, deltaPv, delta.reset);
61+
}
62+
return new SumAccumulation(previous.startTime, this.monotonic, prevPv + deltaPv);
4963
}
5064

5165
/**
5266
* Returns a new DELTA aggregation by comparing two cumulative measurements.
5367
*/
5468
diff(previous: SumAccumulation, current: SumAccumulation): SumAccumulation {
55-
return new SumAccumulation(current.toPointValue() - previous.toPointValue());
69+
const prevPv = previous.toPointValue();
70+
const currPv = current.toPointValue();
71+
/**
72+
* If the SumAggregator is a monotonic one and the previous point value is
73+
* greater than the current one, a reset is deemed to be happened.
74+
* Return the current point value to prevent the value from been reset.
75+
*/
76+
if (this.monotonic && (prevPv > currPv)) {
77+
return new SumAccumulation(current.startTime, this.monotonic, currPv, true);
78+
}
79+
return new SumAccumulation(current.startTime, this.monotonic, currPv - prevPv);
5680
}
5781

5882
toMetricData(
5983
descriptor: InstrumentDescriptor,
6084
aggregationTemporality: AggregationTemporality,
6185
accumulationByAttributes: AccumulationRecord<SumAccumulation>[],
62-
startTime: HrTime,
6386
endTime: HrTime): Maybe<SingularMetricData> {
6487
return {
6588
descriptor,
@@ -68,7 +91,7 @@ export class SumAggregator implements Aggregator<SumAccumulation> {
6891
dataPoints: accumulationByAttributes.map(([attributes, accumulation]) => {
6992
return {
7093
attributes,
71-
startTime,
94+
startTime: accumulation.startTime,
7295
endTime,
7396
value: accumulation.toPointValue(),
7497
};

experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/types.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ export interface Histogram {
6868
* An Aggregator accumulation state.
6969
*/
7070
export interface Accumulation {
71+
setStartTime(startTime: HrTime): void;
7172
record(value: number): void;
7273
}
7374

@@ -84,7 +85,7 @@ export interface Aggregator<T> {
8485
/**
8586
* Create a clean state of accumulation.
8687
*/
87-
createAccumulation(): T;
88+
createAccumulation(startTime: HrTime): T;
8889

8990
/**
9091
* Returns the result of the merge of the given accumulations.
@@ -112,13 +113,11 @@ export interface Aggregator<T> {
112113
*
113114
* @param descriptor the metric instrument descriptor.
114115
* @param accumulationByAttributes the array of attributes and accumulation pairs.
115-
* @param startTime the start time of the metric data.
116116
* @param endTime the end time of the metric data.
117117
* @return the {@link MetricData} that this {@link Aggregator} will produce.
118118
*/
119119
toMetricData(descriptor: InstrumentDescriptor,
120120
aggregationTemporality: AggregationTemporality,
121121
accumulationByAttributes: AccumulationRecord<T>[],
122-
startTime: HrTime,
123122
endTime: HrTime): Maybe<MetricData>;
124123
}

experimental/packages/opentelemetry-sdk-metrics-base/src/state/AsyncMetricStorage.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,12 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>> extends MetricSto
4646
this._temporalMetricStorage = new TemporalMetricProcessor(aggregator);
4747
}
4848

49-
record(measurements: AttributeHashMap<number>) {
49+
record(measurements: AttributeHashMap<number>, observationTime: HrTime) {
5050
const processed = new AttributeHashMap<number>();
5151
Array.from(measurements.entries()).forEach(([attributes, value]) => {
5252
processed.set(this._attributesProcessor.process(attributes), value);
5353
});
54-
this._deltaMetricStorage.batchCumulate(processed);
54+
this._deltaMetricStorage.batchCumulate(processed, observationTime);
5555
}
5656

5757
/**
@@ -64,7 +64,6 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>> extends MetricSto
6464
collect(
6565
collector: MetricCollectorHandle,
6666
collectors: MetricCollectorHandle[],
67-
sdkStartTime: HrTime,
6867
collectionTime: HrTime,
6968
): Maybe<MetricData> {
7069
const accumulations = this._deltaMetricStorage.collect();
@@ -74,7 +73,6 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>> extends MetricSto
7473
collectors,
7574
this._instrumentDescriptor,
7675
accumulations,
77-
sdkStartTime,
7876
collectionTime
7977
);
8078
}

experimental/packages/opentelemetry-sdk-metrics-base/src/state/DeltaMetricProcessor.ts

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
import { Context } from '@opentelemetry/api';
17+
import { Context, HrTime } from '@opentelemetry/api';
1818
import { MetricAttributes } from '@opentelemetry/api-metrics';
1919
import { Maybe } from '../utils';
2020
import { Accumulation, Aggregator } from '../aggregator/types';
@@ -35,31 +35,36 @@ export class DeltaMetricProcessor<T extends Maybe<Accumulation>> {
3535

3636
constructor(private _aggregator: Aggregator<T>) {}
3737

38-
/** Bind an efficient storage handle for a set of attributes. */
39-
private bind(attributes: MetricAttributes) {
40-
return this._activeCollectionStorage.getOrDefault(attributes, () => this._aggregator.createAccumulation());
41-
}
42-
43-
record(value: number, attributes: MetricAttributes, _context: Context) {
44-
const accumulation = this.bind(attributes);
38+
record(value: number, attributes: MetricAttributes, _context: Context, collectionTime: HrTime) {
39+
const accumulation = this._activeCollectionStorage.getOrDefault(
40+
attributes,
41+
() => this._aggregator.createAccumulation(collectionTime)
42+
);
4543
accumulation?.record(value);
4644
}
4745

48-
batchCumulate(measurements: AttributeHashMap<number>) {
46+
batchCumulate(measurements: AttributeHashMap<number>, collectionTime: HrTime) {
4947
Array.from(measurements.entries()).forEach(([attributes, value, hashCode]) => {
50-
let accumulation = this._aggregator.createAccumulation();
48+
const accumulation = this._aggregator.createAccumulation(collectionTime);
5149
accumulation?.record(value);
50+
let delta = accumulation;
5251
if (this._cumulativeMemoStorage.has(attributes, hashCode)) {
52+
// has() returned true, previous is present.
5353
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
5454
const previous = this._cumulativeMemoStorage.get(attributes, hashCode)!;
55-
accumulation = this._aggregator.diff(previous, accumulation);
55+
delta = this._aggregator.diff(previous, accumulation);
5656
}
5757

58+
// Save the current record and the delta record.
5859
this._cumulativeMemoStorage.set(attributes, accumulation, hashCode);
59-
this._activeCollectionStorage.set(attributes, accumulation, hashCode);
60+
this._activeCollectionStorage.set(attributes, delta, hashCode);
6061
});
6162
}
6263

64+
/**
65+
* Returns a collection of delta metrics. Start time is the when first
66+
* time event collected.
67+
*/
6368
collect() {
6469
const unreportedDelta = this._activeCollectionStorage;
6570
this._activeCollectionStorage = new AttributeHashMap();

0 commit comments

Comments
 (0)