1616import time
1717from concurrent .futures import ThreadPoolExecutor
1818from logging import WARNING
19+ from typing import Any , Optional , Sequence
1920from unittest import TestCase
2021from unittest .mock import Mock , patch
2122
2728)
2829from grpc import Compression , server
2930
30- from opentelemetry .exporter .otlp .proto .grpc .exporter import (
31+ from opentelemetry .exporter .otlp .proto .common .trace_encoder import (
32+ encode_spans ,
33+ )
34+ from opentelemetry .exporter .otlp .proto .grpc .exporter import ( # noqa: F401
3135 InvalidCompressionValueException ,
36+ OTLPExporterMixin ,
3237 StatusCode ,
3338 environ_to_compression ,
3439)
35- from opentelemetry .exporter .otlp .proto .grpc .trace_exporter import (
36- OTLPSpanExporter ,
37- )
3840from opentelemetry .exporter .otlp .proto .grpc .version import __version__
3941from opentelemetry .proto .collector .trace .v1 .trace_service_pb2 import (
42+ ExportTraceServiceRequest ,
4043 ExportTraceServiceResponse ,
4144)
4245from opentelemetry .proto .collector .trace .v1 .trace_service_pb2_grpc import (
4346 TraceServiceServicer ,
47+ TraceServiceStub ,
4448 add_TraceServiceServicer_to_server ,
4549)
4650from opentelemetry .sdk .environment_variables import (
4751 OTEL_EXPORTER_OTLP_COMPRESSION ,
4852)
49- from opentelemetry .sdk .trace import _Span
53+ from opentelemetry .sdk .trace import ReadableSpan , _Span
5054from opentelemetry .sdk .trace .export import (
55+ SpanExporter ,
5156 SpanExportResult ,
5257)
5358
5459
60+ # The below tests use this test SpanExporter and Spans, but are testing the
61+ # underlying behavior in the mixin. A MetricExporter or LogExporter could
62+ # just as easily be used.
63+ # pylint: disable=no-member
64+ class OTLPSpanExporterForTesting (
65+ SpanExporter ,
66+ OTLPExporterMixin [
67+ ReadableSpan , ExportTraceServiceRequest , SpanExportResult
68+ ],
69+ ):
70+ # pylint: disable=unsubscriptable-object
71+ """OTLP span exporter
72+
73+ Args:
74+ endpoint: OpenTelemetry Collector receiver endpoint
75+ insecure: Connection type
76+ credentials: Credentials object for server authentication
77+ headers: Headers to send when exporting
78+ timeout: Backend request timeout in seconds
79+ compression: gRPC compression method to use
80+ """
81+
82+ _result = SpanExportResult
83+ _stub = TraceServiceStub
84+
85+ def __init__ (self , insecure = None , endpoint = None ):
86+ super ().__init__ (
87+ ** {
88+ "insecure" : insecure ,
89+ "endpoint" : endpoint ,
90+ }
91+ )
92+
93+ def _translate_data (
94+ self , data : Sequence [ReadableSpan ]
95+ ) -> ExportTraceServiceRequest :
96+ return encode_spans (data )
97+
98+ def export (self , spans : Sequence [ReadableSpan ]) -> SpanExportResult :
99+ return self ._export (spans )
100+
101+ def shutdown (self , timeout_millis : float = 30_000 ) -> None :
102+ OTLPExporterMixin .shutdown (self , timeout_millis )
103+
104+ @property
105+ def _exporting (self ):
106+ return "traces"
107+
108+
55109class TraceServiceServicerWithExportParams (TraceServiceServicer ):
56110 def __init__ (
57111 self ,
@@ -96,22 +150,23 @@ def Export(self, request, context):
96150class ThreadWithReturnValue (threading .Thread ):
97151 def __init__ (
98152 self ,
99- group = None ,
100153 target = None ,
101- name = None ,
102154 args = (),
103- kwargs = None ,
104155 ):
105- threading . Thread . __init__ (self , group , target , name , args , kwargs )
156+ super (). __init__ (None , target , None , args , None )
106157 self ._return = None
107158
108159 def run (self ):
109- if self ._target is not None : # type: ignore
110- self ._return = self ._target (* self ._args , ** self ._kwargs ) # type: ignore
160+ try :
161+ if self ._target is not None : # type: ignore
162+ self ._target (* self ._args , ** self ._kwargs ) # type: ignore
163+ finally :
164+ # Avoid a refcycle if the thread is running a function with
165+ # an argument that has a member that points to the thread.
166+ del self ._target , self ._args , self ._kwargs # type: ignore
111167
112- def join (self , * args ): # type: ignore
113- threading .Thread .join (self , * args )
114- return self ._return
168+ def join (self , timeout : Optional [float ] = None ) -> Any :
169+ super ().join (timeout = timeout )
115170
116171
117172class TestOTLPExporterMixin (TestCase ):
@@ -121,10 +176,7 @@ def setUp(self):
121176 self .server .add_insecure_port ("127.0.0.1:4317" )
122177
123178 self .server .start ()
124- # The below tests use the SpanExporter and Spans, but are testing the
125- # underlying behavior in the mixin. A MetricExporter or LogExporter could
126- # just as easily be used.
127- self .exporter = OTLPSpanExporter (insecure = True )
179+ self .exporter = OTLPSpanExporterForTesting (insecure = True )
128180 self .span = _Span (
129181 "a" ,
130182 context = Mock (
@@ -139,8 +191,7 @@ def setUp(self):
139191 def tearDown (self ):
140192 self .server .stop (None )
141193
142-
143- # pylint: disable=no-self-use
194+ # pylint: disable=no-self-use
144195 @patch ("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel" )
145196 @patch ("opentelemetry.exporter.otlp.proto.grpc.exporter.secure_channel" )
146197 def test_otlp_exporter_endpoint (self , mock_secure , mock_insecure ):
@@ -193,7 +244,7 @@ def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure):
193244 ),
194245 ]
195246 for endpoint , insecure , mock_method in endpoints :
196- OTLPSpanExporter (endpoint = endpoint , insecure = insecure )
247+ OTLPSpanExporterForTesting (endpoint = endpoint , insecure = insecure )
197248 self .assertEqual (
198249 1 ,
199250 mock_method .call_count ,
@@ -206,7 +257,6 @@ def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure):
206257 )
207258 mock_method .reset_mock ()
208259
209-
210260 def test_environ_to_compression (self ):
211261 with patch .dict (
212262 "os.environ" ,
@@ -238,7 +288,7 @@ def test_otlp_exporter_otlp_compression_unspecified(
238288 self , mock_insecure_channel
239289 ):
240290 """No env or kwarg should be NoCompression"""
241- OTLPSpanExporter (insecure = True )
291+ OTLPSpanExporterForTesting (insecure = True )
242292 mock_insecure_channel .assert_called_once_with (
243293 "localhost:4317" , compression = Compression .NoCompression
244294 )
@@ -250,7 +300,7 @@ def test_otlp_exporter_otlp_compression_envvar(
250300 self , mock_insecure_channel
251301 ):
252302 """Just OTEL_EXPORTER_OTLP_COMPRESSION should work"""
253- OTLPSpanExporter (insecure = True )
303+ OTLPSpanExporterForTesting (insecure = True )
254304 mock_insecure_channel .assert_called_once_with (
255305 "localhost:4317" , compression = Compression .Gzip
256306 )
@@ -293,7 +343,7 @@ def test_shutdown_wait_last_export(self):
293343 # pylint: disable=protected-access
294344 self .assertTrue (self .exporter ._shutdown )
295345 export_result = export_thread .join ()
296- self .assertEqual (export_result , SpanExportResult . SUCCESS )
346+ self .assertEqual (export_result , None )
297347
298348 def test_shutdown_doesnot_wait_last_export (self ):
299349 add_TraceServiceServicer_to_server (
@@ -314,7 +364,7 @@ def test_shutdown_doesnot_wait_last_export(self):
314364 # pylint: disable=protected-access
315365 self .assertTrue (self .exporter ._shutdown )
316366 export_result = export_thread .join ()
317- self .assertEqual (export_result , SpanExportResult . FAILURE )
367+ self .assertEqual (export_result , None )
318368
319369 def test_export_over_closed_grpc_channel (self ):
320370 # pylint: disable=protected-access
0 commit comments