16
16
17
17
import base64
18
18
import inspect
19
+ import random
19
20
import subprocess
20
21
import os
21
22
import tempfile
22
23
import unittest
24
+ from http .server import BaseHTTPRequestHandler , HTTPServer
23
25
from pathlib import Path
24
26
from typing import Callable , Mapping , Optional
25
27
33
35
ROOT_DIR = os .path .abspath (os .path .dirname (os .path .dirname (os .path .dirname (__file__ ))))
34
36
35
37
36
- class ElasticIntegrationTestCase (unittest .TestCase ):
38
+ class ElasticIntegrationGRPCTestCase (unittest .TestCase ):
37
39
"""This is an experimental reimplementation of OtelTest using unittest
38
40
39
41
The idea is to do integration testing by creating a separate virtualenv for each TestCase inheriting
@@ -42,7 +44,7 @@ class ElasticIntegrationTestCase(unittest.TestCase):
42
44
43
45
A basic TestCase would look like:
44
46
45
- class MyTestCase(ElasticIntegrationTestCase ):
47
+ class MyTestCase(ElasticIntegrationGRPCTestCase ):
46
48
@classmethod
47
49
def requirements(cls):
48
50
requirements = super().requirements()
@@ -274,3 +276,101 @@ def run_script(
274
276
ex .stderr .decode () if ex .stderr else None ,
275
277
proc .returncode ,
276
278
)
279
+
280
+
281
+ class HttpSink (ot .HttpSink ):
282
+ """Backport of teardown fixes, drop if we ever rebase on top of oteltest 0.37.0+"""
283
+
284
+ # This code is copyright Pablo Collins under Apache-2.0
285
+
286
+ def __init__ (self , listener , port = 4318 , daemon = True ):
287
+ self .httpd = None
288
+ super ().__init__ (listener , port , daemon )
289
+
290
+ def run_server (self ):
291
+ class Handler (BaseHTTPRequestHandler ):
292
+ def do_POST (this ):
293
+ # /v1/traces
294
+ content_length = int (this .headers ["Content-Length" ])
295
+ post_data = this .rfile .read (content_length )
296
+
297
+ otlp_handler_func = self .handlers .get (this .path )
298
+ if otlp_handler_func :
299
+ otlp_handler_func (post_data , {k : v for k , v in this .headers .items ()})
300
+
301
+ this .send_response (200 )
302
+ this .send_header ("Content-type" , "text/html" )
303
+ this .end_headers ()
304
+
305
+ this .wfile .write ("OK" .encode ("utf-8" ))
306
+
307
+ self .httpd = HTTPServer (("" , self .port ), Handler )
308
+ self .httpd .serve_forever ()
309
+
310
+ def stop (self ):
311
+ if self .httpd :
312
+ self .httpd .shutdown ()
313
+ super ().stop ()
314
+
315
+
316
+ class ElasticIntegrationHTTPTestCase (ElasticIntegrationGRPCTestCase ):
317
+ """This is an experimental reimplementation of OtelTest using unittest
318
+
319
+ The idea is to do integration testing by creating a separate virtualenv for each TestCase inheriting
320
+ from ElasticIntegrationTestCase, run a script, collect OTLP http/protobuf calls and make the received data
321
+ available in order to add assertions.
322
+
323
+ A basic TestCase would look like:
324
+
325
+ class MyTestCase(ElasticIntegrationHTTPTestCase):
326
+ @classmethod
327
+ def requirements(cls):
328
+ requirements = super().requirements()
329
+ return requirements + [f"my-library"]
330
+
331
+ def script(self):
332
+ import sqlite3
333
+
334
+ connection = sqlite3.connect(":memory:")
335
+ cursor = connection.cursor()
336
+ cursor.execute("CREATE TABLE movie(title, year, score)")
337
+
338
+ def test_one_span_generated(self):
339
+ stdout, stderr, returncode = self.run_script(self.script, wrapper_script="opentelemetry-instrument")
340
+
341
+ telemetry.get_telemetry()
342
+ assert len(telemetry["traces"], 1)
343
+
344
+
345
+ Each TestCase costs around 10 seconds for settings up the virtualenv so split tests accordingly.
346
+ """
347
+
348
+ def set_http_port (self ):
349
+ self ._http_port = int (random .uniform (44318 , 45000 ))
350
+ return self ._http_port
351
+
352
+ def get_http_port (self ):
353
+ return self ._http_port
354
+
355
+ def setUp (self ):
356
+ self .handler = ot .AccumulatingHandler ()
357
+ # pick a new port each time because otherwise the next test will fail to bind to the same port, even with SO_REUSEPORT :(
358
+ port = self .set_http_port ()
359
+ self .sink = HttpSink (self .handler , port = port )
360
+ self .sink .start ()
361
+
362
+ def run_script (
363
+ self ,
364
+ script_func : Callable [[], None ],
365
+ environment_variables : Optional [Mapping [str , str ]] = None ,
366
+ wrapper_script : Optional [str ] = None ,
367
+ on_start : Optional [Callable [[], Optional [float ]]] = None ,
368
+ ):
369
+ # Add a sane default so that callers don't need to remember to set that
370
+ if environment_variables is None :
371
+ otlp_endpoint = f"http://localhost:{ self .get_http_port ()} "
372
+ environment_variables = {
373
+ "OTEL_EXPORTER_OTLP_PROTOCOL" : "http/protobuf" ,
374
+ "OTEL_EXPORTER_OTLP_ENDPOINT" : otlp_endpoint ,
375
+ }
376
+ return super ().run_script (script_func , environment_variables , wrapper_script , on_start )
0 commit comments