Skip to content

Commit af73c06

Browse files
authored
Add up counter/histogram metrics in spans (#1099)
1 parent 8a5b4b7 commit af73c06

File tree

14 files changed

+556
-44
lines changed

14 files changed

+556
-44
lines changed

logfire/_internal/config.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -235,14 +235,14 @@ class PydanticPlugin:
235235

236236
@dataclass
237237
class MetricsOptions:
238-
"""Configuration of metrics.
239-
240-
This only has one option for now, but it's a place to add more related options in the future.
241-
"""
238+
"""Configuration of metrics."""
242239

243240
additional_readers: Sequence[MetricReader] = ()
244241
"""Sequence of metric readers to be used in addition to the default which exports metrics to Logfire's API."""
245242

243+
collect_in_spans: bool = False
244+
"""Experimental setting to add up the values of counter and histogram metrics in active spans."""
245+
246246

247247
@dataclass
248248
class CodeSource:
@@ -1097,7 +1097,7 @@ def exit_open_spans(): # pragma: no cover
10971097
# The reason that spans may be lingering open is that they're in suspended generator frames.
10981098
# Apart from here, they will be ended when the generator is garbage collected
10991099
# as the interpreter shuts down, but that's too late.
1100-
for span in list(OPEN_SPANS):
1100+
for span in list(OPEN_SPANS.values()):
11011101
# TODO maybe we should be recording something about what happened here?
11021102
span.end()
11031103
# Interpreter shutdown may trigger another call to .end(),

logfire/_internal/metrics.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,12 @@
2121
UpDownCounter,
2222
)
2323
from opentelemetry.sdk.metrics import MeterProvider as SDKMeterProvider
24+
from opentelemetry.trace import get_current_span
2425
from opentelemetry.util.types import Attributes
2526

27+
from .tracer import _LogfireWrappedSpan # type: ignore
28+
from .utils import handle_internal_errors
29+
2630
try:
2731
# This only exists in opentelemetry-sdk>=1.23.0
2832
from opentelemetry.metrics import _Gauge
@@ -247,6 +251,12 @@ def on_meter_set(self, meter: Meter) -> None:
247251
def _create_real_instrument(self, meter: Meter) -> InstrumentT:
248252
"""Create an instance of the real instrument. Implement this."""
249253

254+
@handle_internal_errors
255+
def _increment_span_metric(self, amount: float, attributes: Attributes | None = None):
256+
span = get_current_span()
257+
if isinstance(span, _LogfireWrappedSpan):
258+
span.increment_metric(self._name, attributes or {}, amount)
259+
250260

251261
class _ProxyAsynchronousInstrument(_ProxyInstrument[InstrumentT], ABC):
252262
def __init__(
@@ -271,6 +281,7 @@ def add(
271281
*args: Any,
272282
**kwargs: Any,
273283
) -> None:
284+
self._increment_span_metric(amount, attributes)
274285
self._instrument.add(amount, attributes, *args, **kwargs)
275286

276287
def _create_real_instrument(self, meter: Meter) -> Counter:
@@ -285,6 +296,7 @@ def record(
285296
*args: Any,
286297
**kwargs: Any,
287298
) -> None:
299+
self._increment_span_metric(amount, attributes)
288300
self._instrument.record(amount, attributes, *args, **kwargs)
289301

290302
def _create_real_instrument(self, meter: Meter) -> Histogram:

logfire/_internal/tracer.py

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
from __future__ import annotations
22

3+
import json
34
import sys
45
import traceback
6+
from collections import defaultdict
57
from collections.abc import Mapping, Sequence
68
from dataclasses import dataclass, field
79
from threading import Lock
810
from typing import TYPE_CHECKING, Any, Callable, cast
9-
from weakref import WeakKeyDictionary, WeakSet
11+
from weakref import WeakKeyDictionary, WeakValueDictionary
1012

1113
import opentelemetry.trace as trace_api
1214
from opentelemetry import context as context_api
@@ -43,7 +45,7 @@
4345
ValidationError = None
4446

4547

46-
OPEN_SPANS: WeakSet[_LogfireWrappedSpan] = WeakSet()
48+
OPEN_SPANS: WeakValueDictionary[tuple[int, int], _LogfireWrappedSpan] = WeakValueDictionary()
4749

4850

4951
@dataclass
@@ -112,6 +114,23 @@ def force_flush(self, timeout_millis: int = 30000) -> bool:
112114
return True # pragma: no cover
113115

114116

117+
@dataclass
118+
class SpanMetric:
119+
details: dict[tuple[tuple[str, otel_types.AttributeValue], ...], float] = field(
120+
default_factory=lambda: defaultdict(int)
121+
)
122+
123+
def dump(self):
124+
return {
125+
'details': [{'attributes': dict(attributes), 'total': total} for attributes, total in self.details.items()],
126+
'total': sum(total for total in self.details.values()),
127+
}
128+
129+
def increment(self, attributes: Mapping[str, otel_types.AttributeValue], value: float):
130+
key = tuple(sorted(attributes.items()))
131+
self.details[key] += value
132+
133+
115134
@dataclass(eq=False)
116135
class _LogfireWrappedSpan(trace_api.Span, ReadableSpan):
117136
"""A span that wraps another span and overrides some behaviors in a logfire-specific way.
@@ -124,14 +143,24 @@ class _LogfireWrappedSpan(trace_api.Span, ReadableSpan):
124143

125144
span: Span
126145
ns_timestamp_generator: Callable[[], int]
146+
record_metrics: bool
147+
metrics: dict[str, SpanMetric] = field(default_factory=lambda: defaultdict(SpanMetric))
127148

128149
def __post_init__(self):
129-
OPEN_SPANS.add(self)
150+
OPEN_SPANS[self._open_spans_key()] = self
130151

131152
def end(self, end_time: int | None = None) -> None:
132-
OPEN_SPANS.discard(self)
153+
with handle_internal_errors:
154+
OPEN_SPANS.pop(self._open_spans_key(), None)
155+
if self.metrics:
156+
self.span.set_attribute(
157+
'logfire.metrics', json.dumps({name: metric.dump() for name, metric in self.metrics.items()})
158+
)
133159
self.span.end(end_time or self.ns_timestamp_generator())
134160

161+
def _open_spans_key(self):
162+
return _open_spans_key(self.span.get_span_context())
163+
135164
def get_span_context(self) -> SpanContext:
136165
return self.span.get_span_context()
137166

@@ -175,6 +204,14 @@ def record_exception(
175204
timestamp = timestamp or self.ns_timestamp_generator()
176205
record_exception(self.span, exception, attributes=attributes, timestamp=timestamp, escaped=escaped)
177206

207+
def increment_metric(self, name: str, attributes: Mapping[str, otel_types.AttributeValue], value: float) -> None:
208+
if not self.is_recording() or not self.record_metrics:
209+
return
210+
211+
self.metrics[name].increment(attributes, value)
212+
if self.parent and (parent := OPEN_SPANS.get(_open_spans_key(self.parent))):
213+
parent.increment_metric(name, attributes, value)
214+
178215
def __exit__(self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: Any) -> None:
179216
if self.is_recording():
180217
if isinstance(exc_value, BaseException):
@@ -187,6 +224,10 @@ def __getattr__(self, name: str) -> Any:
187224
return getattr(self.span, name)
188225

189226

227+
def _open_spans_key(ctx: SpanContext) -> tuple[int, int]:
228+
return ctx.trace_id, ctx.span_id
229+
230+
190231
@dataclass
191232
class _ProxyTracer(Tracer):
192233
"""A tracer that wraps another internal tracer allowing it to be re-assigned."""
@@ -216,7 +257,11 @@ def start_span(
216257
record_exception: bool = True,
217258
set_status_on_exception: bool = True,
218259
) -> Span:
219-
start_time = start_time or self.provider.config.advanced.ns_timestamp_generator()
260+
config = self.provider.config
261+
ns_timestamp_generator = config.advanced.ns_timestamp_generator
262+
record_metrics: bool = not isinstance(config.metrics, (bool, type(None))) and config.metrics.collect_in_spans
263+
264+
start_time = start_time or ns_timestamp_generator()
220265

221266
# Make a copy of the attributes since this method can be called by arbitrary external code,
222267
# e.g. third party instrumentation.
@@ -241,7 +286,8 @@ def start_span(
241286
)
242287
return _LogfireWrappedSpan(
243288
span,
244-
ns_timestamp_generator=self.provider.config.advanced.ns_timestamp_generator,
289+
ns_timestamp_generator=ns_timestamp_generator,
290+
record_metrics=record_metrics,
245291
)
246292

247293
# This means that `with start_as_current_span(...):`

logfire/integrations/pydantic.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,16 +127,16 @@ def wrapped_validator(input_data: Any, *args: Any, **kwargs: Any) -> Any:
127127
try:
128128
result = validator(input_data, *args, **kwargs)
129129
except ValidationError as error:
130-
self._count_validation(success=False)
131130
self._on_error_span(span, error)
131+
self._count_validation(success=False)
132132
raise
133133
except Exception as exception:
134-
self._count_validation(success=False)
135134
self._on_exception_span(span, exception)
135+
self._count_validation(success=False)
136136
raise
137137
else:
138-
self._count_validation(success=True)
139138
self._on_success(span, result)
139+
self._count_validation(success=True)
140140
return result
141141

142142
elif self._record == 'failure':

tests/conftest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ def config(config_kwargs: dict[str, Any], metrics_reader: InMemoryMetricReader)
101101
**config_kwargs,
102102
metrics=logfire.MetricsOptions(
103103
additional_readers=[metrics_reader],
104+
collect_in_spans=True,
104105
),
105106
)
106107
# sanity check: there are no active spans

tests/otel_integrations/test_asgi.py

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import contextlib
44

5+
from dirty_equals import IsFloat, IsInt
56
from inline_snapshot import snapshot
67
from opentelemetry.propagate import inject
78
from starlette.applications import Starlette
@@ -34,7 +35,7 @@ def homepage(_: Request):
3435
assert response.status_code == 200
3536
assert response.text == 'middleware test'
3637

37-
assert exporter.exported_spans_as_dict() == snapshot(
38+
assert exporter.exported_spans_as_dict(parse_json_attributes=True) == snapshot(
3839
[
3940
{
4041
'name': 'inside request handler',
@@ -98,6 +99,70 @@ def homepage(_: Request):
9899
'logfire.msg_template': 'outside request handler',
99100
'logfire.msg': 'outside request handler',
100101
'logfire.span_type': 'span',
102+
'logfire.metrics': {
103+
'http.server.duration': {
104+
'details': [
105+
{
106+
'attributes': {
107+
'http.flavor': '1.1',
108+
'http.host': 'testserver',
109+
'http.method': 'GET',
110+
'http.scheme': 'http',
111+
'http.server_name': 'testserver',
112+
'http.status_code': 200,
113+
'net.host.port': 80,
114+
},
115+
'total': IsInt(),
116+
}
117+
],
118+
'total': IsInt(),
119+
},
120+
'http.server.request.duration': {
121+
'details': [
122+
{
123+
'attributes': {
124+
'http.request.method': 'GET',
125+
'http.response.status_code': 200,
126+
'network.protocol.version': '1.1',
127+
'url.scheme': 'http',
128+
},
129+
'total': IsFloat(),
130+
}
131+
],
132+
'total': IsFloat(),
133+
},
134+
'http.server.response.size': {
135+
'details': [
136+
{
137+
'attributes': {
138+
'http.flavor': '1.1',
139+
'http.host': 'testserver',
140+
'http.method': 'GET',
141+
'http.scheme': 'http',
142+
'http.server_name': 'testserver',
143+
'http.status_code': 200,
144+
'net.host.port': 80,
145+
},
146+
'total': 15,
147+
}
148+
],
149+
'total': 15,
150+
},
151+
'http.server.response.body.size': {
152+
'details': [
153+
{
154+
'attributes': {
155+
'http.request.method': 'GET',
156+
'http.response.status_code': 200,
157+
'network.protocol.version': '1.1',
158+
'url.scheme': 'http',
159+
},
160+
'total': 15,
161+
}
162+
],
163+
'total': 15,
164+
},
165+
},
101166
},
102167
},
103168
]

0 commit comments

Comments
 (0)