Skip to content

Commit 14bd6f9

Browse files
authored
refactor(sdk-metrics-base): meter shared states (#2821)
1 parent 0213d82 commit 14bd6f9

File tree

9 files changed

+253
-89
lines changed

9 files changed

+253
-89
lines changed

experimental/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ All notable changes to experimental packages in this project will be documented
5353
### :house: (Internal)
5454

5555
* chore: move trace exporters back to experimental #2835 @dyladan
56+
* refactor(sdk-metrics-base): meter shared states #2821 @legendecas
5657

5758
## v0.27.0
5859

experimental/packages/opentelemetry-sdk-metrics-base/src/Meter.ts

Lines changed: 11 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -16,35 +16,27 @@
1616

1717
import * as metrics from '@opentelemetry/api-metrics';
1818
import { InstrumentationLibrary } from '@opentelemetry/core';
19-
import { createInstrumentDescriptor, InstrumentDescriptor, InstrumentType } from './InstrumentDescriptor';
19+
import { createInstrumentDescriptor, InstrumentType } from './InstrumentDescriptor';
2020
import { CounterInstrument, HistogramInstrument, UpDownCounterInstrument } from './Instruments';
2121
import { MeterProviderSharedState } from './state/MeterProviderSharedState';
22-
import { MultiMetricStorage } from './state/MultiWritableMetricStorage';
23-
import { SyncMetricStorage } from './state/SyncMetricStorage';
24-
import { InstrumentationLibraryMetrics } from './export/MetricData';
25-
import { isNotNullish } from './utils';
26-
import { MetricCollectorHandle } from './state/MetricCollector';
27-
import { HrTime } from '@opentelemetry/api';
28-
import { AsyncMetricStorage } from './state/AsyncMetricStorage';
29-
import { WritableMetricStorage } from './state/WritableMetricStorage';
30-
import { MetricStorageRegistry } from './state/MetricStorageRegistry';
22+
import { MeterSharedState } from './state/MeterSharedState';
3123

3224
/**
3325
* This class implements the {@link metrics.Meter} interface.
3426
*/
3527
export class Meter implements metrics.Meter {
36-
private _metricStorageRegistry = new MetricStorageRegistry();
28+
private _meterSharedState: MeterSharedState;
3729

38-
constructor(private _meterProviderSharedState: MeterProviderSharedState, private _instrumentationLibrary: InstrumentationLibrary) {
39-
this._meterProviderSharedState.meters.push(this);
30+
constructor(meterProviderSharedState: MeterProviderSharedState, instrumentationLibrary: InstrumentationLibrary) {
31+
this._meterSharedState = meterProviderSharedState.getMeterSharedState(instrumentationLibrary);
4032
}
4133

4234
/**
4335
* Create a {@link metrics.Histogram} instrument.
4436
*/
4537
createHistogram(name: string, options?: metrics.HistogramOptions): metrics.Histogram {
4638
const descriptor = createInstrumentDescriptor(name, InstrumentType.HISTOGRAM, options);
47-
const storage = this._registerMetricStorage(descriptor);
39+
const storage = this._meterSharedState.registerMetricStorage(descriptor);
4840
return new HistogramInstrument(storage, descriptor);
4941
}
5042

@@ -53,7 +45,7 @@ export class Meter implements metrics.Meter {
5345
*/
5446
createCounter(name: string, options?: metrics.CounterOptions): metrics.Counter {
5547
const descriptor = createInstrumentDescriptor(name, InstrumentType.COUNTER, options);
56-
const storage = this._registerMetricStorage(descriptor);
48+
const storage = this._meterSharedState.registerMetricStorage(descriptor);
5749
return new CounterInstrument(storage, descriptor);
5850
}
5951

@@ -62,7 +54,7 @@ export class Meter implements metrics.Meter {
6254
*/
6355
createUpDownCounter(name: string, options?: metrics.UpDownCounterOptions): metrics.UpDownCounter {
6456
const descriptor = createInstrumentDescriptor(name, InstrumentType.UP_DOWN_COUNTER, options);
65-
const storage = this._registerMetricStorage(descriptor);
57+
const storage = this._meterSharedState.registerMetricStorage(descriptor);
6658
return new UpDownCounterInstrument(storage, descriptor);
6759
}
6860

@@ -75,7 +67,7 @@ export class Meter implements metrics.Meter {
7567
options?: metrics.ObservableGaugeOptions,
7668
): void {
7769
const descriptor = createInstrumentDescriptor(name, InstrumentType.OBSERVABLE_GAUGE, options);
78-
this._registerAsyncMetricStorage(descriptor, callback);
70+
this._meterSharedState.registerAsyncMetricStorage(descriptor, callback);
7971
}
8072

8173
/**
@@ -87,7 +79,7 @@ export class Meter implements metrics.Meter {
8779
options?: metrics.ObservableCounterOptions,
8880
): void {
8981
const descriptor = createInstrumentDescriptor(name, InstrumentType.OBSERVABLE_COUNTER, options);
90-
this._registerAsyncMetricStorage(descriptor, callback);
82+
this._meterSharedState.registerAsyncMetricStorage(descriptor, callback);
9183
}
9284

9385
/**
@@ -99,47 +91,6 @@ export class Meter implements metrics.Meter {
9991
options?: metrics.ObservableUpDownCounterOptions,
10092
): void {
10193
const descriptor = createInstrumentDescriptor(name, InstrumentType.OBSERVABLE_UP_DOWN_COUNTER, options);
102-
this._registerAsyncMetricStorage(descriptor, callback);
103-
}
104-
105-
private _registerMetricStorage(descriptor: InstrumentDescriptor): WritableMetricStorage {
106-
const views = this._meterProviderSharedState.viewRegistry.findViews(descriptor, this._instrumentationLibrary);
107-
const storages = views.map(view => this._metricStorageRegistry.register(SyncMetricStorage.create(view, descriptor)))
108-
.filter(isNotNullish);
109-
110-
if (storages.length === 1) {
111-
return storages[0];
112-
}
113-
114-
// This will be a no-op WritableMetricStorage when length is null.
115-
return new MultiMetricStorage(storages);
116-
}
117-
118-
private _registerAsyncMetricStorage(descriptor: InstrumentDescriptor, callback: metrics.ObservableCallback) {
119-
const views = this._meterProviderSharedState.viewRegistry.findViews(descriptor, this._instrumentationLibrary);
120-
views.forEach(view => {
121-
this._metricStorageRegistry.register(AsyncMetricStorage.create(view, descriptor, callback));
122-
});
123-
}
124-
125-
/**
126-
* @internal
127-
* @param collector opaque handle of {@link MetricCollector} which initiated the collection.
128-
* @param collectionTime the HrTime at which the collection was initiated.
129-
* @returns the list of {@link MetricData} collected.
130-
*/
131-
async collect(collector: MetricCollectorHandle, collectionTime: HrTime): Promise<InstrumentationLibraryMetrics> {
132-
const metricData = await Promise.all(this._metricStorageRegistry.getStorages().map(metricStorage => {
133-
return metricStorage.collect(
134-
collector,
135-
this._meterProviderSharedState.metricCollectors,
136-
this._meterProviderSharedState.sdkStartTime,
137-
collectionTime);
138-
}));
139-
140-
return {
141-
instrumentationLibrary: this._instrumentationLibrary,
142-
metrics: metricData.filter(isNotNullish),
143-
};
94+
this._meterSharedState.registerAsyncMetricStorage(descriptor, callback);
14495
}
14596
}

experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterProviderSharedState.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@
1515
*/
1616

1717
import { HrTime } from '@opentelemetry/api';
18-
import { hrTime } from '@opentelemetry/core';
18+
import { hrTime, InstrumentationLibrary } from '@opentelemetry/core';
1919
import { Resource } from '@opentelemetry/resources';
20-
import { Meter } from '../Meter';
2120
import { ViewRegistry } from '../view/ViewRegistry';
21+
import { MeterSharedState } from './MeterSharedState';
2222
import { MetricCollector } from './MetricCollector';
2323

2424
/**
@@ -30,7 +30,15 @@ export class MeterProviderSharedState {
3030

3131
metricCollectors: MetricCollector[] = [];
3232

33-
meters: Meter[] = [];
33+
meterSharedStates: MeterSharedState[] = [];
3434

3535
constructor(public resource: Resource) {}
36+
37+
getMeterSharedState(instrumentationLibrary: InstrumentationLibrary) {
38+
// TODO: meter identity
39+
// https://github.com/open-telemetry/opentelemetry-js/issues/2593
40+
const meterSharedState = new MeterSharedState(this, instrumentationLibrary);
41+
this.meterSharedStates.push(meterSharedState);
42+
return meterSharedState;
43+
}
3644
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import { HrTime } from '@opentelemetry/api';
18+
import * as metrics from '@opentelemetry/api-metrics';
19+
import { InstrumentationLibrary } from '@opentelemetry/core';
20+
import { InstrumentationLibraryMetrics } from '../export/MetricData';
21+
import { createInstrumentDescriptorWithView, InstrumentDescriptor } from '../InstrumentDescriptor';
22+
import { isNotNullish } from '../utils';
23+
import { AsyncMetricStorage } from './AsyncMetricStorage';
24+
import { MeterProviderSharedState } from './MeterProviderSharedState';
25+
import { MetricCollectorHandle } from './MetricCollector';
26+
import { MetricStorageRegistry } from './MetricStorageRegistry';
27+
import { MultiMetricStorage } from './MultiWritableMetricStorage';
28+
import { SyncMetricStorage } from './SyncMetricStorage';
29+
30+
/**
31+
* An internal record for shared meter provider states.
32+
*/
33+
export class MeterSharedState {
34+
private _metricStorageRegistry = new MetricStorageRegistry();
35+
36+
constructor(private _meterProviderSharedState: MeterProviderSharedState, private _instrumentationLibrary: InstrumentationLibrary) {}
37+
38+
registerMetricStorage(descriptor: InstrumentDescriptor) {
39+
const views = this._meterProviderSharedState.viewRegistry.findViews(descriptor, this._instrumentationLibrary);
40+
const storages = views
41+
.map(view => {
42+
const viewDescriptor = createInstrumentDescriptorWithView(view, descriptor);
43+
const aggregator = view.aggregation.createAggregator(viewDescriptor);
44+
const storage = new SyncMetricStorage(viewDescriptor, aggregator, view.attributesProcessor);
45+
return this._metricStorageRegistry.register(storage);
46+
})
47+
.filter(isNotNullish);
48+
if (storages.length === 1) {
49+
return storages[0];
50+
}
51+
return new MultiMetricStorage(storages);
52+
}
53+
54+
registerAsyncMetricStorage(descriptor: InstrumentDescriptor, callback: metrics.ObservableCallback) {
55+
const views = this._meterProviderSharedState.viewRegistry.findViews(descriptor, this._instrumentationLibrary);
56+
views.forEach(view => {
57+
const viewDescriptor = createInstrumentDescriptorWithView(view, descriptor);
58+
const aggregator = view.aggregation.createAggregator(viewDescriptor);
59+
const viewStorage = new AsyncMetricStorage(viewDescriptor, aggregator, view.attributesProcessor, callback);
60+
this._metricStorageRegistry.register(viewStorage);
61+
});
62+
}
63+
64+
/**
65+
* @param collector opaque handle of {@link MetricCollector} which initiated the collection.
66+
* @param collectionTime the HrTime at which the collection was initiated.
67+
* @returns the list of {@link MetricData} collected.
68+
*/
69+
async collect(collector: MetricCollectorHandle, collectionTime: HrTime): Promise<InstrumentationLibraryMetrics> {
70+
/**
71+
* 1. Call all observable callbacks first.
72+
* 2. Collect metric result for the collector.
73+
*/
74+
const metricDataList = await Promise.all(Array.from(this._metricStorageRegistry.getStorages())
75+
.map(metricStorage => {
76+
return metricStorage.collect(
77+
collector,
78+
this._meterProviderSharedState.metricCollectors,
79+
this._meterProviderSharedState.sdkStartTime,
80+
collectionTime);
81+
})
82+
.filter(isNotNullish));
83+
84+
return {
85+
instrumentationLibrary: this._instrumentationLibrary,
86+
metrics: metricDataList.filter(isNotNullish),
87+
};
88+
}
89+
}

experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricCollector.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ export class MetricCollector implements MetricProducer {
3434

3535
async collect(): Promise<ResourceMetrics> {
3636
const collectionTime = hrTime();
37-
const instrumentationLibraryMetrics = (await Promise.all(this._sharedState.meters
38-
.map(meter => meter.collect(this, collectionTime))));
37+
const instrumentationLibraryMetrics = (await Promise.all(this._sharedState.meterSharedStates
38+
.map(meterSharedState => meterSharedState.collect(this, collectionTime))));
3939

4040
return {
4141
resource: this._sharedState.resource,
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import { ExportResult, ExportResultCode } from '@opentelemetry/core';
18+
import { AggregationTemporality, PushMetricExporter, ResourceMetrics } from '../../src';
19+
20+
export class TestMetricExporter implements PushMetricExporter {
21+
resourceMetricsList: ResourceMetrics[] = [];
22+
export(resourceMetrics: ResourceMetrics, resultCallback: (result: ExportResult) => void): void {
23+
this.resourceMetricsList.push(resourceMetrics);
24+
process.nextTick(() => resultCallback({ code: ExportResultCode.SUCCESS }));
25+
}
26+
27+
async forceFlush(): Promise<void> {}
28+
async shutdown(): Promise<void> {}
29+
30+
getPreferredAggregationTemporality(): AggregationTemporality {
31+
return AggregationTemporality.CUMULATIVE;
32+
}
33+
}
34+
35+
export class TestDeltaMetricExporter extends TestMetricExporter {
36+
override getPreferredAggregationTemporality(): AggregationTemporality {
37+
return AggregationTemporality.DELTA;
38+
}
39+
}

0 commit comments

Comments
 (0)