Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
85648ee
feat: add option to disable internal send and receive spans
omgitsaheadcrab Aug 16, 2024
5b7759a
fix: broken custom headers
omgitsaheadcrab Aug 16, 2024
b6faf30
test(TestAsgiApplication): add test for internal span exclusion
omgitsaheadcrab Aug 16, 2024
15b4d60
chore: update CHANGELOG
omgitsaheadcrab Aug 16, 2024
90943e9
Merge branch 'main' into optional-send-receive-spans
omgitsaheadcrab Aug 16, 2024
2073d2a
refactor: remove environment variables for internal span toggle
omgitsaheadcrab Aug 19, 2024
906721b
chore: remove unused import
omgitsaheadcrab Aug 19, 2024
0741bc6
refactor(FastAPIInstrumentor): add ability to exclude internal send a…
omgitsaheadcrab Aug 19, 2024
839dbf9
refactor: remove spurious addition
omgitsaheadcrab Aug 19, 2024
5b6ea8e
Merge branch 'optional-send-receive-spans-fastapi' into optional-send…
omgitsaheadcrab Aug 19, 2024
52bd660
docs: update CHANGELOG
omgitsaheadcrab Aug 19, 2024
b1e445e
fix: add missing instrumentation
omgitsaheadcrab Aug 19, 2024
1cc0495
Merge branch 'main' into optional-send-receive-spans
lzchen Aug 19, 2024
a5bfd43
chore: fix too-many-branches ignore
omgitsaheadcrab Aug 20, 2024
9daafa7
docs: appease sphinx autodoc
omgitsaheadcrab Aug 20, 2024
9c32d80
fix: server span logic should always be executed if recording
omgitsaheadcrab Aug 21, 2024
f312d4a
refactor: combine exclusions into one optional list
omgitsaheadcrab Aug 21, 2024
c074c62
docs: update wording
omgitsaheadcrab Aug 21, 2024
ddfb5e6
style: format code
omgitsaheadcrab Aug 21, 2024
3fc263e
refactor: extract set_send_span and set_server_span logic to helpers
omgitsaheadcrab Aug 21, 2024
3f36b70
Merge branch 'main' into optional-send-receive-spans
lzchen Aug 22, 2024
09a2ef4
test: add assertion to ensure list always has at least the server spans
omgitsaheadcrab Aug 23, 2024
be1ebbf
refactor: apply suggestions from code review
omgitsaheadcrab Aug 23, 2024
f7b2a51
Merge branch 'main' into optional-send-receive-spans
lzchen Aug 23, 2024
6b2490a
Merge branch 'main' into optional-send-receive-spans
lzchen Aug 23, 2024
59ef359
Merge branch 'main' into optional-send-receive-spans
lzchen Aug 26, 2024
7ec823d
Merge branch 'main' into optional-send-receive-spans
omgitsaheadcrab Sep 5, 2024
6909756
test: ensure memory exporter cleared between cases
omgitsaheadcrab Sep 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `opentelemetry-instrumentation-kafka-python` Instrument temporary fork, kafka-python-ng
inside kafka-python's instrumentation
([#2537](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2537))
- `opentelemetry-instrumentation-asgi`, `opentelemetry-instrumentation-fastapi` Add ability to disable internal HTTP send and receive spans
([#2802](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2802))

### Breaking changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ def get_default_span_details(scope: dict) -> Tuple[str, dict]:


def _collect_target_attribute(
scope: typing.Dict[str, typing.Any]
scope: typing.Dict[str, typing.Any],
) -> typing.Optional[str]:
"""
Returns the target path as defined by the Semantic Conventions.
Expand Down Expand Up @@ -529,6 +529,7 @@ class OpenTelemetryMiddleware:
the current globally configured one is used.
meter_provider: The optional meter provider to use. If omitted
the current globally configured one is used.
exclude_spans: Optionally exclude HTTP `send` and/or `receive` spans from the trace.
"""

# pylint: disable=too-many-branches
Expand All @@ -547,6 +548,7 @@ def __init__(
http_capture_headers_server_request: list[str] | None = None,
http_capture_headers_server_response: list[str] | None = None,
http_capture_headers_sanitize_fields: list[str] | None = None,
exclude_spans: list[typing.Literal["receive", "send"]] | None = None,
):
# initialize semantic conventions opt-in if needed
_OpenTelemetrySemanticConventionStability._initialize()
Expand Down Expand Up @@ -653,6 +655,12 @@ def __init__(
)
or []
)
self.exclude_receive_span = (
"receive" in exclude_spans if exclude_spans else False
)
self.exclude_send_span = (
"send" in exclude_spans if exclude_spans else False
)

# pylint: disable=too-many-statements
async def __call__(
Expand Down Expand Up @@ -796,8 +804,10 @@ async def __call__(
span.end()

# pylint: enable=too-many-branches

def _get_otel_receive(self, server_span_name, scope, receive):
if self.exclude_receive_span:
return receive

@wraps(receive)
async def otel_receive():
with self.tracer.start_as_current_span(
Expand All @@ -821,6 +831,66 @@ async def otel_receive():

return otel_receive

def _set_send_span(
self,
server_span_name,
scope,
send,
message,
status_code,
expecting_trailers,
):
"""Set send span attributes and status code."""
with self.tracer.start_as_current_span(
" ".join((server_span_name, scope["type"], "send"))
) as send_span:
if callable(self.client_response_hook):
self.client_response_hook(send_span, scope, message)

if send_span.is_recording():
if message["type"] == "http.response.start":
expecting_trailers = message.get("trailers", False)
send_span.set_attribute("asgi.event.type", message["type"])

if status_code:
set_status_code(
send_span,
status_code,
None,
self._sem_conv_opt_in_mode,
)
return expecting_trailers

def _set_server_span(
self, server_span, message, status_code, duration_attrs
):
"""Set server span attributes and status code."""
if (
server_span.is_recording()
and server_span.kind == trace.SpanKind.SERVER
and "headers" in message
):
custom_response_attributes = (
collect_custom_headers_attributes(
message,
self.http_capture_headers_sanitize_fields,
self.http_capture_headers_server_response,
normalise_response_header_name,
)
if self.http_capture_headers_server_response
else {}
)
if len(custom_response_attributes) > 0:
server_span.set_attributes(custom_response_attributes)

if status_code:
set_status_code(
server_span,
status_code,
duration_attrs,
self._sem_conv_opt_in_mode,
)

def _get_otel_send(
self,
server_span,
Expand All @@ -834,74 +904,46 @@ def _get_otel_send(
@wraps(send)
async def otel_send(message: dict[str, Any]):
nonlocal expecting_trailers
with self.tracer.start_as_current_span(
" ".join((server_span_name, scope["type"], "send"))
) as send_span:
if callable(self.client_response_hook):
self.client_response_hook(send_span, scope, message)

status_code = None
if message["type"] == "http.response.start":
status_code = message["status"]
elif message["type"] == "websocket.send":
status_code = 200

if send_span.is_recording():
if message["type"] == "http.response.start":
expecting_trailers = message.get("trailers", False)
send_span.set_attribute("asgi.event.type", message["type"])
if (
server_span.is_recording()
and server_span.kind == trace.SpanKind.SERVER
and "headers" in message
):
custom_response_attributes = (
collect_custom_headers_attributes(
message,
self.http_capture_headers_sanitize_fields,
self.http_capture_headers_server_response,
normalise_response_header_name,
)
if self.http_capture_headers_server_response
else {}
)
if len(custom_response_attributes) > 0:
server_span.set_attributes(
custom_response_attributes
)
if status_code:
# We record metrics only once
set_status_code(
server_span,
status_code,
duration_attrs,
self._sem_conv_opt_in_mode,
)
set_status_code(
send_span,
status_code,
None,
self._sem_conv_opt_in_mode,
)
status_code = None
if message["type"] == "http.response.start":
status_code = message["status"]
elif message["type"] == "websocket.send":
status_code = 200

propagator = get_global_response_propagator()
if propagator:
propagator.inject(
message,
context=set_span_in_context(
server_span, trace.context_api.Context()
),
setter=asgi_setter,
)
if not self.exclude_send_span:
expecting_trailers = self._set_send_span(
server_span_name,
scope,
send,
message,
status_code,
expecting_trailers,
)

content_length = asgi_getter.get(message, "content-length")
if content_length:
try:
self.content_length_header = int(content_length[0])
except ValueError:
pass
self._set_server_span(
server_span, message, status_code, duration_attrs
)

propagator = get_global_response_propagator()
if propagator:
propagator.inject(
message,
context=set_span_in_context(
server_span, trace.context_api.Context()
),
setter=asgi_setter,
)

content_length = asgi_getter.get(message, "content-length")
if content_length:
try:
self.content_length_header = int(content_length[0])
except ValueError:
pass

await send(message)

await send(message)
# pylint: disable=too-many-boolean-expressions
if (
not expecting_trailers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,30 @@ async def test_background_execution(self):
_SIMULATED_BACKGROUND_TASK_EXECUTION_TIME_S * 10**9,
)

async def test_exclude_internal_spans(self):
"""Test that internal spans are excluded from the emitted spans when
the `exclude_receive_span` or `exclude_send_span` attributes are set.
"""
cases = [
(["receive", "send"], ["GET / http receive", "GET / http send"]),
(["send"], ["GET / http send"]),
(["receive"], ["GET / http receive"]),
([], []),
]
for exclude_spans, excluded_spans in cases:
self.memory_exporter.clear()
app = otel_asgi.OpenTelemetryMiddleware(
simple_asgi, exclude_spans=exclude_spans
)
self.seed_app(app)
await self.send_default_request()
await self.get_all_output()
span_list = self.memory_exporter.get_finished_spans()
self.assertTrue(span_list)
for span in span_list:
for excluded_span in excluded_spans:
self.assertNotEqual(span.name, excluded_span)

async def test_trailers(self):
"""Test that trailers are emitted as expected and that the server span is ended
BEFORE the background task is finished."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def client_response_hook(span: Span, scope: dict[str, Any], message: dict[str, A
from __future__ import annotations

import logging
from typing import Collection
from typing import Collection, Literal

import fastapi
from starlette.routing import Match
Expand Down Expand Up @@ -222,7 +222,7 @@ class FastAPIInstrumentor(BaseInstrumentor):

@staticmethod
def instrument_app(
app: fastapi.FastAPI,
app,
server_request_hook: ServerRequestHook = None,
client_request_hook: ClientRequestHook = None,
client_response_hook: ClientResponseHook = None,
Expand All @@ -232,8 +232,28 @@ def instrument_app(
http_capture_headers_server_request: list[str] | None = None,
http_capture_headers_server_response: list[str] | None = None,
http_capture_headers_sanitize_fields: list[str] | None = None,
exclude_spans: list[Literal["receive", "send"]] | None = None,
):
"""Instrument an uninstrumented FastAPI application."""
"""Instrument an uninstrumented FastAPI application.

Args:
app: The fastapi ASGI application callable to forward requests to.
server_request_hook: Optional callback which is called with the server span and ASGI
scope object for every incoming request.
client_request_hook: Optional callback which is called with the internal span, and ASGI
scope and event which are sent as dictionaries for when the method receive is called.
client_response_hook: Optional callback which is called with the internal span, and ASGI
scope and event which are sent as dictionaries for when the method send is called.
tracer_provider: The optional tracer provider to use. If omitted
the current globally configured one is used.
meter_provider: The optional meter provider to use. If omitted
the current globally configured one is used.
excluded_urls: Optional comma delimited string of regexes to match URLs that should not be traced.
http_capture_headers_server_request: Optional list of HTTP headers to capture from the request.
http_capture_headers_server_response: Optional list of HTTP headers to capture from the response.
http_capture_headers_sanitize_fields: Optional list of HTTP headers to sanitize.
exclude_spans: Optionally exclude HTTP `send` and/or `receive` spans from the trace.
"""
if not hasattr(app, "_is_instrumented_by_opentelemetry"):
app._is_instrumented_by_opentelemetry = False

Expand Down Expand Up @@ -273,6 +293,7 @@ def instrument_app(
http_capture_headers_server_request=http_capture_headers_server_request,
http_capture_headers_server_response=http_capture_headers_server_response,
http_capture_headers_sanitize_fields=http_capture_headers_sanitize_fields,
exclude_spans=exclude_spans,
)
app._is_instrumented_by_opentelemetry = True
if app not in _InstrumentedFastAPI._instrumented_fastapi_apps:
Expand Down Expand Up @@ -323,6 +344,7 @@ def _instrument(self, **kwargs):
else parse_excluded_urls(_excluded_urls)
)
_InstrumentedFastAPI._meter_provider = kwargs.get("meter_provider")
_InstrumentedFastAPI._exclude_spans = kwargs.get("exclude_spans")
fastapi.FastAPI = _InstrumentedFastAPI

def _uninstrument(self, **kwargs):
Expand Down Expand Up @@ -373,6 +395,7 @@ def __init__(self, *args, **kwargs):
http_capture_headers_server_request=_InstrumentedFastAPI._http_capture_headers_server_request,
http_capture_headers_server_response=_InstrumentedFastAPI._http_capture_headers_server_response,
http_capture_headers_sanitize_fields=_InstrumentedFastAPI._http_capture_headers_sanitize_fields,
exclude_spans=_InstrumentedFastAPI._exclude_spans,
)
self._is_instrumented_by_opentelemetry = True
_InstrumentedFastAPI._instrumented_fastapi_apps.add(self)
Expand Down