Skip to content

Commit e2c4532

Browse files
committed
Run pre-commits
1 parent 2dc5c38 commit e2c4532

File tree

2 files changed

+25
-23
lines changed

2 files changed

+25
-23
lines changed

src/workflows/services/common_service.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ def start_transport(self):
204204
self.log.debug("No transport layer defined for service. Skipping.")
205205

206206
from workflows.transport.middleware.tracer import TracerMiddleware
207+
207208
instrument = TracerMiddleware(self.__class__.__name__)
208209
self._transport.add_middleware(instrument)
209210

src/workflows/transport/middleware/tracer.py

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,23 @@
11
from __future__ import annotations
22

33
import functools
4-
from typing import Callable
54
import logging
6-
7-
from . import BaseTransportMiddleware
5+
from typing import Callable
86

97
from opentelemetry import trace
10-
from opentelemetry.sdk.trace import TracerProvider
11-
from opentelemetry.trace import Status, StatusCode
12-
from opentelemetry.sdk.trace.export import(
13-
BatchSpanProcessor,
14-
ConsoleSpanExporter,
15-
)
8+
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
169
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
17-
from opentelemetry.trace.propagation.tracecontext \
18-
import TraceContextTextMapPropagator
19-
from opentelemetry.exporter.otlp.proto.http.trace_exporter \
20-
import OTLPSpanExporter
10+
from opentelemetry.sdk.trace import TracerProvider
11+
from opentelemetry.sdk.trace.export import (
12+
BatchSpanProcessor,
13+
)
14+
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
15+
16+
from . import BaseTransportMiddleware
2117

2218
logger = logging.getLogger(__name__)
2319

20+
2421
class TracerMiddleware(BaseTransportMiddleware):
2522
def __init__(self, service_name: str):
2623
self.service_name = service_name
@@ -29,12 +26,11 @@ def __init__(self, service_name: str):
2926
def _initiate_tracers(self, service_name):
3027
"""Initiates everything needed for tracing."""
3128
# Label this resource as its service:
32-
resource = Resource(attributes={
33-
SERVICE_NAME: service_name
34-
})
29+
resource = Resource(attributes={SERVICE_NAME: service_name})
3530
# Export to OpenTelemetry Collector:
36-
processor = BatchSpanProcessor(OTLPSpanExporter( \
37-
endpoint="http://localhost:4318/v1/traces"))
31+
processor = BatchSpanProcessor(
32+
OTLPSpanExporter(endpoint="http://localhost:4318/v1/traces")
33+
)
3834
# A provider provides tracers:
3935
provider = TracerProvider(resource=resource)
4036
provider.add_span_processor(processor)
@@ -45,10 +41,10 @@ def _initiate_tracers(self, service_name):
4541

4642
def _extract_trace_context(self, message):
4743
"""Retrieves Context object from message."""
48-
carrier = message.get('trace_context')
44+
carrier = message.get("trace_context")
4945
if carrier:
5046
# Deserialise serialised context into a Context object:
51-
ctx = TraceContextTextMapPropagator().extract(carrier=carrier)
47+
ctx = TraceContextTextMapPropagator().extract(carrier=carrier)
5248
logger.info(f"extracted trace context from {self.service_name}")
5349
return ctx
5450
# If no context, leave empty:
@@ -58,24 +54,29 @@ def _extract_trace_context(self, message):
5854
def _inject_trace_context(self, message):
5955
"""Inserts serialized trace context into message."""
6056
if type(message) == str:
61-
logger.warning(f"received string message in {self.service_name}, could not extract trace context")
57+
logger.warning(
58+
f"received string message in {self.service_name}, could not extract trace context"
59+
)
6260
return
6361
carrier = {}
6462
# If called outside of a span context, just leave carrier empty
6563
# (very safe!)
6664
TraceContextTextMapPropagator().inject(carrier)
67-
message['trace_context'] = carrier
65+
message["trace_context"] = carrier
6866
logger.info(f"injected trace context into {self.service_name}")
6967

7068
def subscribe(self, call_next: Callable, channel, callback, **kwargs) -> int:
7169
"""The callback includes 'everything' that happens in a service that
7270
we care about, so we wrap it in a span context.
7371
To link the current span context with others from the same request
7472
we inject/extract the serialized trace context in the recipe message."""
73+
7574
@functools.wraps(callback)
7675
def wrapped_callback(header, message):
7776
ctx = self._extract_trace_context(message)
78-
with self.tracer.start_as_current_span(self.service_name, context=ctx) as span:
77+
with self.tracer.start_as_current_span(
78+
self.service_name, context=ctx
79+
) as span:
7980
if ctx == {}:
8081
self._inject_trace_context(message)
8182

0 commit comments

Comments
 (0)