Skip to content

Commit a902583

Browse files
committed
Implemented tracer middleware class
1 parent 782ed69 commit a902583

File tree

1 file changed

+70
-0
lines changed

1 file changed

+70
-0
lines changed
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
from __future__ import annotations
2+
3+
import functools
4+
from typing import Callable
5+
6+
from . import BaseTransportMiddleware
7+
8+
from opentelemetry import trace
9+
from opentelemetry.sdk.trace import TracerProvider
10+
from opentelemetry.trace import Status, StatusCode
11+
from opentelemetry.sdk.trace.export import(
12+
BatchSpanProcessor,
13+
ConsoleSpanExporter,
14+
)
15+
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
16+
from opentelemetry.trace.propagation.tracecontext \
17+
import TraceContextTextMapPropagator
18+
from opentelemetry.exporter.otlp.proto.http.trace_exporter \
19+
import OTLPSpanExporter
20+
21+
resource = Resource(attributes={
22+
SERVICE_NAME: "Common Service"
23+
})
24+
25+
processor = BatchSpanProcessor(OTLPSpanExporter( \
26+
endpoint="http://localhost:4318/v1/traces"))
27+
provider = TracerProvider(resource = resource)
28+
# A provider provides tracers:
29+
provider = TracerProvider(resource=resource)
30+
provider.add_span_processor(processor)
31+
# In python trace is global:
32+
trace.set_tracer_provider(provider)
33+
tracer = trace.get_tracer(__name__)
34+
35+
class TracerMiddleware(BaseTransportMiddleware):
36+
def __init__(self, service_name: str):
37+
self.service_name = service_name
38+
39+
def _get_trace_context(self, message):
40+
"""If a trace context exists in the recipe wrapper's environment, get it"""
41+
try:
42+
#carrier = message['environment']['trace_context']
43+
carrier = message['trace_context']
44+
ctx = TraceContextTextMapPropagator().extract(carrier=carrier)
45+
return ctx
46+
except KeyError:
47+
return {}
48+
49+
def subscribe(self, call_next: Callable, channel, callback, **kwargs) -> int:
50+
print(f"call_next: {call_next}, channel: {channel}, callback: {callback}, kwargs: {kwargs}")
51+
52+
@functools.wraps(callback)
53+
def wrapped_callback(header, message):
54+
print(f"wrapped_callback header: {header}, message: {message}")
55+
56+
ctx = self._get_trace_context(message)
57+
with tracer.start_as_current_span(self.service_name, context=ctx) as span:
58+
if ctx == {}:
59+
print("inserting trace_context into message")
60+
message['trace_context'] = "foo"
61+
return callback(header, message)
62+
63+
return call_next(channel, wrapped_callback, **kwargs)
64+
65+
def send(self, call_next: Callable, destination, message, **kwargs):
66+
carrier = {}
67+
TraceContextTextMapPropagator().inject(carrier)
68+
message['trace_context'] = carrier
69+
70+
call_next(destination, message, **kwargs)

0 commit comments

Comments
 (0)