Skip to content

Commit fb60538

Browse files
authored
Implement SynchronousMeasurementConsumer (#2388)
1 parent 7d076cc commit fb60538

File tree

6 files changed

+212
-81
lines changed

6 files changed

+212
-81
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/_metrics/__init__.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
SynchronousMeasurementConsumer,
4646
)
4747
from opentelemetry.sdk._metrics.metric_reader import MetricReader
48+
from opentelemetry.sdk._metrics.sdk_configuration import SdkConfiguration
4849
from opentelemetry.sdk.resources import Resource
4950
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo
5051

@@ -156,19 +157,22 @@ def __init__(
156157
self._lock = Lock()
157158
self._meter_lock = Lock()
158159
self._atexit_handler = None
159-
160-
self._measurement_consumer = SynchronousMeasurementConsumer()
160+
self._sdk_config = SdkConfiguration(
161+
resource=resource, metric_readers=metric_readers
162+
)
163+
self._measurement_consumer = SynchronousMeasurementConsumer(
164+
sdk_config=self._sdk_config
165+
)
161166

162167
if shutdown_on_exit:
163168
self._atexit_handler = register(self.shutdown)
164169

165170
self._meters = {}
166171
self._metric_readers = metric_readers
167172

168-
for metric_reader in self._metric_readers:
173+
for metric_reader in self._sdk_config.metric_readers:
169174
metric_reader._register_measurement_consumer(self)
170175

171-
self._resource = resource
172176
self._shutdown = False
173177

174178
def force_flush(self) -> bool:
@@ -177,7 +181,7 @@ def force_flush(self) -> bool:
177181

178182
metric_reader_result = True
179183

180-
for metric_reader in self._metric_readers:
184+
for metric_reader in self._sdk_config.metric_readers:
181185
metric_reader_result = (
182186
metric_reader_result and metric_reader.force_flush()
183187
)
@@ -196,7 +200,7 @@ def shutdown(self):
196200

197201
result = True
198202

199-
for metric_reader in self._metric_readers:
203+
for metric_reader in self._sdk_config.metric_readers:
200204
result = result and metric_reader.shutdown()
201205

202206
if not result:

opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,17 @@
1313
# limitations under the License.
1414

1515
from abc import ABC, abstractmethod
16-
from typing import TYPE_CHECKING, Iterable
16+
from threading import Lock
17+
from typing import TYPE_CHECKING, Iterable, List, Mapping
1718

1819
from opentelemetry.sdk._metrics.aggregation import AggregationTemporality
1920
from opentelemetry.sdk._metrics.measurement import Measurement
2021
from opentelemetry.sdk._metrics.metric_reader import MetricReader
22+
from opentelemetry.sdk._metrics.metric_reader_storage import (
23+
MetricReaderStorage,
24+
)
2125
from opentelemetry.sdk._metrics.point import Metric
26+
from opentelemetry.sdk._metrics.sdk_configuration import SdkConfiguration
2227

2328
if TYPE_CHECKING:
2429
from opentelemetry.sdk._metrics.instrument import _Asynchronous
@@ -41,15 +46,31 @@ def collect(
4146

4247

4348
class SynchronousMeasurementConsumer(MeasurementConsumer):
49+
def __init__(self, sdk_config: SdkConfiguration) -> None:
50+
self._lock = Lock()
51+
self._sdk_config = sdk_config
52+
# should never be mutated
53+
self._reader_storages: Mapping[MetricReader, MetricReaderStorage] = {
54+
reader: MetricReaderStorage(sdk_config)
55+
for reader in sdk_config.metric_readers
56+
}
57+
self._async_instruments: List["_Asynchronous"] = []
58+
4459
def consume_measurement(self, measurement: Measurement) -> None:
45-
pass
60+
for reader_storage in self._reader_storages.values():
61+
reader_storage.consume_measurement(measurement)
4662

4763
def register_asynchronous_instrument(
4864
self, instrument: "_Asynchronous"
4965
) -> None:
50-
pass
66+
with self._lock:
67+
self._async_instruments.append(instrument)
5168

5269
def collect(
5370
self, metric_reader: MetricReader, temporality: AggregationTemporality
5471
) -> Iterable[Metric]:
55-
pass
72+
with self._lock:
73+
for async_instrument in self._async_instruments:
74+
for measurement in async_instrument.callback():
75+
self.consume_measurement(measurement)
76+
return self._reader_storages[metric_reader].collect(temporality)
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from typing import Iterable
16+
17+
from opentelemetry.sdk._metrics.aggregation import AggregationTemporality
18+
from opentelemetry.sdk._metrics.measurement import Measurement
19+
from opentelemetry.sdk._metrics.point import Metric
20+
from opentelemetry.sdk._metrics.sdk_configuration import SdkConfiguration
21+
22+
23+
# TODO: #2378
24+
class MetricReaderStorage:
25+
"""The SDK's storage for a given reader"""
26+
27+
def __init__(self, sdk_config: SdkConfiguration) -> None:
28+
pass
29+
30+
def consume_measurement(self, measurement: Measurement) -> None:
31+
pass
32+
33+
def collect(self, temporality: AggregationTemporality) -> Iterable[Metric]:
34+
pass
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from dataclasses import dataclass
2+
from typing import Sequence
3+
4+
from opentelemetry.sdk._metrics.metric_reader import MetricReader
5+
from opentelemetry.sdk.resources import Resource
6+
7+
8+
@dataclass
9+
class SdkConfiguration:
10+
resource: Resource
11+
# TODO: once views are added
12+
# views: Sequence[View]
13+
metric_readers: Sequence[MetricReader]

opentelemetry-sdk/tests/metrics/test_measurement_consumer.py

Lines changed: 52 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -13,87 +13,78 @@
1313
# limitations under the License.
1414

1515
from unittest import TestCase
16-
from unittest.mock import Mock, patch
16+
from unittest.mock import MagicMock, Mock, patch
1717

18-
from opentelemetry.sdk._metrics import MeterProvider
1918
from opentelemetry.sdk._metrics.measurement_consumer import (
2019
MeasurementConsumer,
2120
SynchronousMeasurementConsumer,
2221
)
22+
from opentelemetry.sdk._metrics.point import AggregationTemporality
23+
from opentelemetry.sdk._metrics.sdk_configuration import SdkConfiguration
2324

2425

26+
@patch("opentelemetry.sdk._metrics.measurement_consumer.MetricReaderStorage")
2527
class TestSynchronousMeasurementConsumer(TestCase):
26-
def test_parent(self):
28+
def test_parent(self, _):
2729

2830
self.assertIsInstance(
29-
SynchronousMeasurementConsumer(), MeasurementConsumer
31+
SynchronousMeasurementConsumer(MagicMock()), MeasurementConsumer
3032
)
3133

32-
@patch("opentelemetry.sdk._metrics.SynchronousMeasurementConsumer")
33-
def test_measurement_consumer_class(
34-
self, mock_serial_measurement_consumer
35-
):
36-
MeterProvider()
37-
38-
mock_serial_measurement_consumer.assert_called()
39-
40-
@patch("opentelemetry.sdk._metrics.SynchronousMeasurementConsumer")
41-
def test_register_asynchronous_instrument(
42-
self, mock_serial_measurement_consumer
43-
):
44-
45-
meter_provider = MeterProvider()
46-
47-
meter_provider._measurement_consumer.register_asynchronous_instrument.assert_called_with(
48-
meter_provider.get_meter("name").create_observable_counter(
49-
"name", Mock()
50-
)
51-
)
52-
meter_provider._measurement_consumer.register_asynchronous_instrument.assert_called_with(
53-
meter_provider.get_meter("name").create_observable_up_down_counter(
54-
"name", Mock()
55-
)
56-
)
57-
meter_provider._measurement_consumer.register_asynchronous_instrument.assert_called_with(
58-
meter_provider.get_meter("name").create_observable_gauge(
59-
"name", Mock()
60-
)
34+
def test_creates_metric_reader_storages(self, MockMetricReaderStorage):
35+
"""It should create one MetricReaderStorage per metric reader passed in the SdkConfiguration"""
36+
reader_mocks = [Mock() for _ in range(5)]
37+
SynchronousMeasurementConsumer(
38+
SdkConfiguration(resource=Mock(), metric_readers=reader_mocks)
6139
)
40+
self.assertEqual(len(MockMetricReaderStorage.mock_calls), 5)
6241

63-
@patch("opentelemetry.sdk._metrics.SynchronousMeasurementConsumer")
64-
def test_consume_measurement_counter(
65-
self, mock_serial_measurement_consumer
42+
def test_measurements_passed_to_each_reader_storage(
43+
self, MockMetricReaderStorage
6644
):
45+
reader_mocks = [Mock() for _ in range(5)]
46+
reader_storage_mocks = [Mock() for _ in range(5)]
47+
MockMetricReaderStorage.side_effect = reader_storage_mocks
6748

68-
meter_provider = MeterProvider()
69-
counter = meter_provider.get_meter("name").create_counter("name")
70-
71-
counter.add(1)
72-
73-
meter_provider._measurement_consumer.consume_measurement.assert_called()
74-
75-
@patch("opentelemetry.sdk._metrics.SynchronousMeasurementConsumer")
76-
def test_consume_measurement_up_down_counter(
77-
self, mock_serial_measurement_consumer
78-
):
79-
80-
meter_provider = MeterProvider()
81-
counter = meter_provider.get_meter("name").create_up_down_counter(
82-
"name"
49+
consumer = SynchronousMeasurementConsumer(
50+
SdkConfiguration(resource=Mock(), metric_readers=reader_mocks)
8351
)
52+
measurement_mock = Mock()
53+
consumer.consume_measurement(measurement_mock)
8454

85-
counter.add(1)
55+
for rs_mock in reader_storage_mocks:
56+
rs_mock.consume_measurement.assert_called_once_with(
57+
measurement_mock
58+
)
8659

87-
meter_provider._measurement_consumer.consume_measurement.assert_called()
60+
def test_collect_passed_to_reader_stage(self, MockMetricReaderStorage):
61+
"""Its collect() method should defer to the underlying MetricReaderStorage"""
62+
reader_mocks = [Mock() for _ in range(5)]
63+
reader_storage_mocks = [Mock() for _ in range(5)]
64+
MockMetricReaderStorage.side_effect = reader_storage_mocks
8865

89-
@patch("opentelemetry.sdk._metrics.SynchronousMeasurementConsumer")
90-
def test_consume_measurement_histogram(
91-
self, mock_serial_measurement_consumer
92-
):
66+
consumer = SynchronousMeasurementConsumer(
67+
SdkConfiguration(resource=Mock(), metric_readers=reader_mocks)
68+
)
69+
for r_mock, rs_mock in zip(reader_mocks, reader_storage_mocks):
70+
rs_mock.collect.assert_not_called()
71+
consumer.collect(r_mock, AggregationTemporality.CUMULATIVE)
72+
rs_mock.collect.assert_called_once_with(
73+
AggregationTemporality.CUMULATIVE
74+
)
9375

94-
meter_provider = MeterProvider()
95-
counter = meter_provider.get_meter("name").create_histogram("name")
76+
def test_collect_calls_async_instruments(self, _):
77+
"""Its collect() method should invoke async instruments"""
78+
reader_mock = Mock()
79+
consumer = SynchronousMeasurementConsumer(
80+
SdkConfiguration(resource=Mock(), metric_readers=[reader_mock])
81+
)
82+
async_instrument_mocks = [MagicMock() for _ in range(5)]
83+
for i_mock in async_instrument_mocks:
84+
consumer.register_asynchronous_instrument(i_mock)
9685

97-
counter.record(1)
86+
consumer.collect(reader_mock, AggregationTemporality.CUMULATIVE)
9887

99-
meter_provider._measurement_consumer.consume_measurement.assert_called()
88+
# it should call async instruments
89+
for i_mock in async_instrument_mocks:
90+
i_mock.callback.assert_called_once()

0 commit comments

Comments
 (0)