Skip to content

Commit 1cae35f

Browse files
authored
Use force_flush (#2853)
* Use force_flush Fixes #2816 * Refactor time out passing * Fix test cases * Add documentation * Add test case * Fix docs * Remove final decorator * Check for timeout in async calls * Remove timeout checking in metric reader * Add force_flush to PeriodicExportingMetricReader * Use TimeoutError * Fix lint * Address timeout comments * Add MetricsTimeoutError * Fix test case
1 parent 22bfe82 commit 1cae35f

File tree

9 files changed

+154
-8
lines changed

9 files changed

+154
-8
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
Meter,
1919
MeterProvider,
2020
)
21+
from opentelemetry.sdk.metrics._internal.exceptions import ( # noqa: F401
22+
MetricsTimeoutError,
23+
)
2124
from opentelemetry.sdk.metrics._internal.instrument import ( # noqa: F401
2225
Counter,
2326
Histogram,

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
ObservableUpDownCounter as APIObservableUpDownCounter,
3131
)
3232
from opentelemetry.metrics import UpDownCounter as APIUpDownCounter
33+
from opentelemetry.sdk.metrics._internal.exceptions import MetricsTimeoutError
3334
from opentelemetry.sdk.metrics._internal.instrument import (
3435
_Counter,
3536
_Histogram,
@@ -384,8 +385,10 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool:
384385
current_ts = _time_ns()
385386
try:
386387
if current_ts >= deadline_ns:
387-
raise Exception("Timed out while flushing metric readers")
388-
metric_reader.collect(
388+
raise MetricsTimeoutError(
389+
"Timed out while flushing metric readers"
390+
)
391+
metric_reader.force_flush(
389392
timeout_millis=(deadline_ns - current_ts) / 10**6
390393
)
391394

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
class MetricsTimeoutError(Exception):
17+
"""Raised when a metrics function times out"""

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,14 +294,23 @@ def __init__(
294294
def collect(self, timeout_millis: float = 10_000) -> None:
295295
"""Collects the metrics from the internal SDK state and
296296
invokes the `_receive_metrics` with the collection.
297+
298+
Args:
299+
timeout_millis: Amount of time in milliseconds before this function
300+
raises a timeout error.
301+
302+
If any of the underlying ``collect`` methods called by this method
303+
fails by any reason (including timeout) an exception will be raised
304+
detailing the individual errors that caused this function to fail.
297305
"""
298306
if self._collect is None:
299307
_logger.warning(
300308
"Cannot call collect on a MetricReader until it is registered on a MeterProvider"
301309
)
302310
return
311+
303312
self._receive_metrics(
304-
self._collect(self),
313+
self._collect(self, timeout_millis=timeout_millis),
305314
timeout_millis=timeout_millis,
306315
)
307316

@@ -328,6 +337,10 @@ def _receive_metrics(
328337
) -> None:
329338
"""Called by `MetricReader.collect` when it receives a batch of metrics"""
330339

340+
def force_flush(self, timeout_millis: float = 10_000) -> bool:
341+
self.collect(timeout_millis=timeout_millis)
342+
return True
343+
331344
@abstractmethod
332345
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
333346
"""Shuts down the MetricReader. This method provides a way
@@ -485,3 +498,8 @@ def _shutdown():
485498
self._shutdown_event.set()
486499
self._daemon_thread.join(timeout=(deadline_ns - _time_ns()) / 10**9)
487500
self._exporter.shutdown(timeout=(deadline_ns - _time_ns()) / 10**6)
501+
502+
def force_flush(self, timeout_millis: float = 10_000) -> bool:
503+
super().force_flush(timeout_millis=timeout_millis)
504+
self._exporter.force_flush(timeout_millis=timeout_millis)
505+
return True

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@
2323
import opentelemetry.sdk.metrics._internal.instrument
2424
import opentelemetry.sdk.metrics._internal.sdk_configuration
2525
from opentelemetry.metrics._internal.instrument import CallbackOptions
26+
from opentelemetry.sdk.metrics._internal.exceptions import MetricsTimeoutError
2627
from opentelemetry.sdk.metrics._internal.measurement import Measurement
2728
from opentelemetry.sdk.metrics._internal.metric_reader_storage import (
2829
MetricReaderStorage,
2930
)
3031
from opentelemetry.sdk.metrics._internal.point import Metric
32+
from opentelemetry.util._time import _time_ns
3133

3234

3335
class MeasurementConsumer(ABC):
@@ -48,6 +50,7 @@ def register_asynchronous_instrument(
4850
def collect(
4951
self,
5052
metric_reader: "opentelemetry.sdk.metrics.MetricReader",
53+
timeout_millis: float = 10_000,
5154
) -> Iterable[Metric]:
5255
pass
5356

@@ -90,12 +93,34 @@ def register_asynchronous_instrument(
9093
def collect(
9194
self,
9295
metric_reader: "opentelemetry.sdk.metrics.MetricReader",
96+
timeout_millis: float = 10_000,
9397
) -> Iterable[Metric]:
98+
9499
with self._lock:
95100
metric_reader_storage = self._reader_storages[metric_reader]
96101
# for now, just use the defaults
97102
callback_options = CallbackOptions()
103+
deadline_ns = _time_ns() + timeout_millis * 10**6
104+
105+
default_timeout_millis = 10000 * 10**6
106+
98107
for async_instrument in self._async_instruments:
99-
for measurement in async_instrument.callback(callback_options):
108+
109+
remaining_time = deadline_ns - _time_ns()
110+
111+
if remaining_time < default_timeout_millis:
112+
113+
callback_options = CallbackOptions(
114+
timeout_millis=remaining_time
115+
)
116+
117+
measurements = async_instrument.callback(callback_options)
118+
if _time_ns() >= deadline_ns:
119+
raise MetricsTimeoutError(
120+
"Timed out while executing callback"
121+
)
122+
123+
for measurement in measurements:
100124
metric_reader_storage.consume_measurement(measurement)
125+
101126
return self._reader_storages[metric_reader].collect()

opentelemetry-sdk/tests/metrics/test_measurement_consumer.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from sys import version_info
16+
from time import sleep
1517
from unittest import TestCase
1618
from unittest.mock import MagicMock, Mock, patch
1719

@@ -115,3 +117,74 @@ def test_collect_calls_async_instruments(self, MockMetricReaderStorage):
115117
self.assertEqual(
116118
len(reader_storage_mock.consume_measurement.mock_calls), 5
117119
)
120+
121+
def test_collect_timeout(self, MockMetricReaderStorage):
122+
reader_mock = Mock()
123+
reader_storage_mock = Mock()
124+
MockMetricReaderStorage.return_value = reader_storage_mock
125+
consumer = SynchronousMeasurementConsumer(
126+
SdkConfiguration(
127+
resource=Mock(),
128+
metric_readers=[reader_mock],
129+
views=Mock(),
130+
)
131+
)
132+
133+
def sleep_1(*args, **kwargs):
134+
sleep(1)
135+
136+
consumer.register_asynchronous_instrument(
137+
Mock(**{"callback.side_effect": sleep_1})
138+
)
139+
140+
with self.assertRaises(Exception) as error:
141+
consumer.collect(reader_mock, timeout_millis=10)
142+
143+
self.assertIn(
144+
"Timed out while executing callback", error.exception.args[0]
145+
)
146+
147+
@patch(
148+
"opentelemetry.sdk.metrics._internal."
149+
"measurement_consumer.CallbackOptions"
150+
)
151+
def test_collect_deadline(
152+
self, mock_callback_options, MockMetricReaderStorage
153+
):
154+
reader_mock = Mock()
155+
reader_storage_mock = Mock()
156+
MockMetricReaderStorage.return_value = reader_storage_mock
157+
consumer = SynchronousMeasurementConsumer(
158+
SdkConfiguration(
159+
resource=Mock(),
160+
metric_readers=[reader_mock],
161+
views=Mock(),
162+
)
163+
)
164+
165+
def sleep_1(*args, **kwargs):
166+
sleep(1)
167+
return []
168+
169+
consumer.register_asynchronous_instrument(
170+
Mock(**{"callback.side_effect": sleep_1})
171+
)
172+
consumer.register_asynchronous_instrument(
173+
Mock(**{"callback.side_effect": sleep_1})
174+
)
175+
176+
consumer.collect(reader_mock)
177+
178+
if version_info < (3, 8):
179+
callback_options_time_call = mock_callback_options.mock_calls[-1][
180+
2
181+
]["timeout_millis"]
182+
else:
183+
callback_options_time_call = mock_callback_options.mock_calls[
184+
-1
185+
].kwargs["timeout_millis"]
186+
187+
self.assertLess(
188+
callback_options_time_call,
189+
10000 * 10**6,
190+
)

opentelemetry-sdk/tests/metrics/test_metric_reader.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
from typing import Dict, Iterable
1616
from unittest import TestCase
17+
from unittest.mock import patch
1718

1819
from opentelemetry.sdk.metrics import Counter, Histogram, ObservableGauge
1920
from opentelemetry.sdk.metrics._internal.instrument import (
@@ -135,3 +136,9 @@ def test_configure_aggregation(self):
135136
dummy_metric_reader._instrument_class_aggregation[_Counter],
136137
LastValueAggregation,
137138
)
139+
140+
def test_force_flush(self):
141+
142+
with patch.object(DummyMetricReader, "collect") as mock_collect:
143+
DummyMetricReader().force_flush(timeout_millis=10)
144+
mock_collect.assert_called_with(timeout_millis=10)

opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -821,7 +821,7 @@ def test_view_instrument_match_conflict_8(self):
821821
# and also the temporality and monotonicity of the up down counter and
822822
# the histogram are the same.
823823

824-
observable_counter = _UpDownCounter(
824+
up_down_counter = _UpDownCounter(
825825
"up_down_counter",
826826
Mock(),
827827
[Mock()],
@@ -859,7 +859,7 @@ def test_view_instrument_match_conflict_8(self):
859859
with self.assertRaises(AssertionError):
860860
with self.assertLogs(level=WARNING):
861861
metric_reader_storage.consume_measurement(
862-
Measurement(1, observable_counter)
862+
Measurement(1, up_down_counter)
863863
)
864864

865865
with self.assertLogs(level=WARNING) as log:

opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,9 @@ def _create_periodic_reader(
118118
exporter, export_interval_millis=interval
119119
)
120120

121-
def _collect(reader):
121+
def _collect(reader, timeout_millis):
122122
time.sleep(collect_wait)
123-
pmr._receive_metrics(metrics)
123+
pmr._receive_metrics(metrics, timeout_millis)
124124

125125
pmr._set_collect_callback(_collect)
126126
return pmr

0 commit comments

Comments
 (0)