Skip to content

Commit f940175

Browse files
committed
Move all OTLP exporter tests that are testing the
underlying behavior in the mixin to the mixin unit tests, instead of having them specified multiple times in the Metric/Log/Trace exporters. Fix the shutdown tests which were flaky, so that they just test whether a pending export call completes or not. Update shutdown so it doesn't release the lock -- in cases where an export call is pending, export then also releases the lock causing a Runtime Error: https://docs.python.org/3/library/threading.html#threading.Lock.release.
1 parent 829fcc5 commit f940175

File tree

6 files changed

+328
-936
lines changed

6 files changed

+328
-936
lines changed

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -289,11 +289,11 @@ def _export(
289289
# expo returns a generator that yields delay values which grow
290290
# exponentially. Once delay is greater than max_value, the yielded
291291
# value will remain constant.
292-
for delay in _create_exp_backoff_generator(max_value=max_value):
293-
if delay == max_value or self._shutdown:
294-
return self._result.FAILURE
292+
with self._export_lock:
293+
for delay in _create_exp_backoff_generator(max_value=max_value):
294+
if delay == max_value or self._shutdown:
295+
return self._result.FAILURE
295296

296-
with self._export_lock:
297297
try:
298298
self._client.Export(
299299
request=self._translate_data(data),
@@ -352,15 +352,14 @@ def _export(
352352

353353
return self._result.FAILURE
354354

355-
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
355+
def shutdown(self, timeout_millis: float = 30_000) -> None:
356356
if self._shutdown:
357357
logger.warning("Exporter already shutdown, ignoring call")
358358
return
359359
# wait for the last export if any
360360
self._export_lock.acquire(timeout=timeout_millis / 1e3)
361361
self._shutdown = True
362362
self._channel.close()
363-
self._export_lock.release()
364363

365364
@property
366365
@abstractmethod

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,8 @@ def _translate_data(
142142
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
143143
return self._export(spans)
144144

145-
def shutdown(self) -> None:
146-
OTLPExporterMixin.shutdown(self)
145+
def shutdown(self, timeout_millis: float = 30_000) -> None:
146+
OTLPExporterMixin.shutdown(self, timeout_millis)
147147

148148
def force_flush(self, timeout_millis: int = 30000) -> bool:
149149
"""Nothing is buffered in this exporter, so this method does nothing."""

exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py

Lines changed: 1 addition & 210 deletions
Original file line numberDiff line numberDiff line change
@@ -15,33 +15,20 @@
1515
# pylint: disable=too-many-lines
1616

1717
import time
18-
from concurrent.futures import ThreadPoolExecutor
1918
from os.path import dirname
2019
from unittest import TestCase
2120
from unittest.mock import patch
2221

23-
from google.protobuf.duration_pb2 import ( # pylint: disable=no-name-in-module
24-
Duration,
25-
)
2622
from google.protobuf.json_format import MessageToDict
27-
from google.rpc.error_details_pb2 import ( # pylint: disable=no-name-in-module
28-
RetryInfo,
29-
)
30-
from grpc import ChannelCredentials, Compression, StatusCode, server
23+
from grpc import ChannelCredentials, Compression
3124

3225
from opentelemetry._logs import SeverityNumber
3326
from opentelemetry.exporter.otlp.proto.common._internal import _encode_value
3427
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import (
3528
OTLPLogExporter,
3629
)
37-
from opentelemetry.exporter.otlp.proto.grpc.version import __version__
3830
from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import (
3931
ExportLogsServiceRequest,
40-
ExportLogsServiceResponse,
41-
)
42-
from opentelemetry.proto.collector.logs.v1.logs_service_pb2_grpc import (
43-
LogsServiceServicer,
44-
add_LogsServiceServicer_to_server,
4532
)
4633
from opentelemetry.proto.common.v1.common_pb2 import AnyValue, KeyValue
4734
from opentelemetry.proto.common.v1.common_pb2 import (
@@ -53,7 +40,6 @@
5340
Resource as OTLPResource,
5441
)
5542
from opentelemetry.sdk._logs import LogData, LogRecord
56-
from opentelemetry.sdk._logs.export import LogExportResult
5743
from opentelemetry.sdk.environment_variables import (
5844
OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE,
5945
OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE,
@@ -70,62 +56,9 @@
7056
THIS_DIR = dirname(__file__)
7157

7258

73-
class LogsServiceServicerUNAVAILABLEDelay(LogsServiceServicer):
74-
# pylint: disable=invalid-name,unused-argument,no-self-use
75-
def Export(self, request, context):
76-
context.set_code(StatusCode.UNAVAILABLE)
77-
78-
context.send_initial_metadata(
79-
(("google.rpc.retryinfo-bin", RetryInfo().SerializeToString()),)
80-
)
81-
context.set_trailing_metadata(
82-
(
83-
(
84-
"google.rpc.retryinfo-bin",
85-
RetryInfo(
86-
retry_delay=Duration(nanos=int(1e7))
87-
).SerializeToString(),
88-
),
89-
)
90-
)
91-
92-
return ExportLogsServiceResponse()
93-
94-
95-
class LogsServiceServicerUNAVAILABLE(LogsServiceServicer):
96-
# pylint: disable=invalid-name,unused-argument,no-self-use
97-
def Export(self, request, context):
98-
context.set_code(StatusCode.UNAVAILABLE)
99-
100-
return ExportLogsServiceResponse()
101-
102-
103-
class LogsServiceServicerSUCCESS(LogsServiceServicer):
104-
# pylint: disable=invalid-name,unused-argument,no-self-use
105-
def Export(self, request, context):
106-
context.set_code(StatusCode.OK)
107-
108-
return ExportLogsServiceResponse()
109-
110-
111-
class LogsServiceServicerALREADY_EXISTS(LogsServiceServicer):
112-
# pylint: disable=invalid-name,unused-argument,no-self-use
113-
def Export(self, request, context):
114-
context.set_code(StatusCode.ALREADY_EXISTS)
115-
116-
return ExportLogsServiceResponse()
117-
118-
11959
class TestOTLPLogExporter(TestCase):
12060
def setUp(self):
12161
self.exporter = OTLPLogExporter()
122-
123-
self.server = server(ThreadPoolExecutor(max_workers=10))
124-
125-
self.server.add_insecure_port("127.0.0.1:4317")
126-
127-
self.server.start()
128-
12962
self.log_data_1 = LogData(
13063
log_record=LogRecord(
13164
timestamp=int(time.time() * 1e9),
@@ -204,9 +137,6 @@ def setUp(self):
204137
),
205138
)
206139

207-
def tearDown(self):
208-
self.server.stop(None)
209-
210140
def test_exporting(self):
211141
# pylint: disable=protected-access
212142
self.assertEqual(self.exporter._exporting, "logs")
@@ -296,145 +226,6 @@ def test_env_variables_with_only_certificate(
296226

297227
mock_logger_error.assert_not_called()
298228

299-
@patch(
300-
"opentelemetry.exporter.otlp.proto.grpc.exporter.ssl_channel_credentials"
301-
)
302-
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.secure_channel")
303-
@patch(
304-
"opentelemetry.exporter.otlp.proto.grpc._log_exporter.OTLPLogExporter._stub"
305-
)
306-
# pylint: disable=unused-argument
307-
def test_no_credentials_error(
308-
self, mock_ssl_channel, mock_secure, mock_stub
309-
):
310-
OTLPLogExporter(insecure=False)
311-
self.assertTrue(mock_ssl_channel.called)
312-
313-
# pylint: disable=no-self-use
314-
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel")
315-
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.secure_channel")
316-
def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure):
317-
expected_endpoint = "localhost:4317"
318-
endpoints = [
319-
(
320-
"http://localhost:4317",
321-
None,
322-
mock_insecure,
323-
),
324-
(
325-
"localhost:4317",
326-
None,
327-
mock_secure,
328-
),
329-
(
330-
"http://localhost:4317",
331-
True,
332-
mock_insecure,
333-
),
334-
(
335-
"localhost:4317",
336-
True,
337-
mock_insecure,
338-
),
339-
(
340-
"http://localhost:4317",
341-
False,
342-
mock_secure,
343-
),
344-
(
345-
"localhost:4317",
346-
False,
347-
mock_secure,
348-
),
349-
(
350-
"https://localhost:4317",
351-
False,
352-
mock_secure,
353-
),
354-
(
355-
"https://localhost:4317",
356-
None,
357-
mock_secure,
358-
),
359-
(
360-
"https://localhost:4317",
361-
True,
362-
mock_secure,
363-
),
364-
]
365-
366-
# pylint: disable=C0209
367-
for endpoint, insecure, mock_method in endpoints:
368-
OTLPLogExporter(endpoint=endpoint, insecure=insecure)
369-
self.assertEqual(
370-
1,
371-
mock_method.call_count,
372-
"expected {} to be called for {} {}".format(
373-
mock_method, endpoint, insecure
374-
),
375-
)
376-
self.assertEqual(
377-
expected_endpoint,
378-
mock_method.call_args[0][0],
379-
"expected {} got {} {}".format(
380-
expected_endpoint, mock_method.call_args[0][0], endpoint
381-
),
382-
)
383-
mock_method.reset_mock()
384-
385-
def test_otlp_headers_from_env(self):
386-
# pylint: disable=protected-access
387-
self.assertEqual(
388-
self.exporter._headers,
389-
(("user-agent", "OTel-OTLP-Exporter-Python/" + __version__),),
390-
)
391-
392-
@patch(
393-
"opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator"
394-
)
395-
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
396-
def test_unavailable(self, mock_sleep, mock_expo):
397-
mock_expo.configure_mock(**{"return_value": [0.01]})
398-
399-
add_LogsServiceServicer_to_server(
400-
LogsServiceServicerUNAVAILABLE(), self.server
401-
)
402-
self.assertEqual(
403-
self.exporter.export([self.log_data_1]), LogExportResult.FAILURE
404-
)
405-
mock_sleep.assert_called_with(0.01)
406-
407-
@patch(
408-
"opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator"
409-
)
410-
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
411-
def test_unavailable_delay(self, mock_sleep, mock_expo):
412-
mock_expo.configure_mock(**{"return_value": [1]})
413-
414-
add_LogsServiceServicer_to_server(
415-
LogsServiceServicerUNAVAILABLEDelay(), self.server
416-
)
417-
self.assertEqual(
418-
self.exporter.export([self.log_data_1]), LogExportResult.FAILURE
419-
)
420-
mock_sleep.assert_called_with(0.01)
421-
422-
def test_success(self):
423-
add_LogsServiceServicer_to_server(
424-
LogsServiceServicerSUCCESS(), self.server
425-
)
426-
self.assertEqual(
427-
self.exporter.export([self.log_data_1]), LogExportResult.SUCCESS
428-
)
429-
430-
def test_failure(self):
431-
add_LogsServiceServicer_to_server(
432-
LogsServiceServicerALREADY_EXISTS(), self.server
433-
)
434-
self.assertEqual(
435-
self.exporter.export([self.log_data_1]), LogExportResult.FAILURE
436-
)
437-
438229
def export_log_and_deserialize(self, log_data):
439230
# pylint: disable=protected-access
440231
translated_data = self.exporter._translate_data([log_data])

0 commit comments

Comments
 (0)