1
1
from __future__ import annotations
2
2
3
+ import json
3
4
from collections .abc import AsyncIterator , Iterator
4
5
from contextlib import asynccontextmanager , contextmanager
5
6
from dataclasses import dataclass , field
9
10
import logfire_api
10
11
from opentelemetry ._events import Event , EventLogger , EventLoggerProvider , get_event_logger_provider
11
12
from opentelemetry .trace import Tracer , TracerProvider , get_tracer_provider
13
+ from opentelemetry .util .types import AttributeValue
12
14
13
15
from ..messages import (
14
16
ModelMessage ,
46
48
'frequency_penalty' ,
47
49
)
48
50
49
- NOT_GIVEN = object ()
50
-
51
51
52
52
@dataclass
53
53
class InstrumentedModel (WrapperModel ):
54
- """Model which is instrumented with logfire ."""
54
+ """Model which is instrumented with OpenTelemetry ."""
55
55
56
56
tracer : Tracer = field (repr = False )
57
57
event_logger : EventLogger = field (repr = False )
58
+ event_mode : Literal ['attributes' , 'logs' ] = 'attributes'
58
59
59
60
def __init__ (
60
61
self ,
61
62
wrapped : Model | KnownModelName ,
62
63
tracer_provider : TracerProvider | None = None ,
63
64
event_logger_provider : EventLoggerProvider | None = None ,
65
+ event_mode : Literal ['attributes' , 'logs' ] = 'attributes' ,
64
66
):
65
67
super ().__init__ (wrapped )
66
68
tracer_provider = tracer_provider or get_tracer_provider ()
67
69
event_logger_provider = event_logger_provider or get_event_logger_provider ()
68
70
self .tracer = tracer_provider .get_tracer ('pydantic-ai' )
69
71
self .event_logger = event_logger_provider .get_event_logger ('pydantic-ai' )
72
+ self .event_mode = event_mode
70
73
71
74
@classmethod
72
75
def from_logfire (
73
76
cls ,
74
77
wrapped : Model | KnownModelName ,
75
78
logfire_instance : logfire_api .Logfire = logfire_api .DEFAULT_LOGFIRE_INSTANCE ,
79
+ event_mode : Literal ['attributes' , 'logs' ] = 'attributes' ,
76
80
) -> InstrumentedModel :
77
81
if hasattr (logfire_instance .config , 'get_event_logger_provider' ):
78
82
event_provider = logfire_instance .config .get_event_logger_provider ()
79
83
else :
80
84
event_provider = None
81
85
tracer_provider = logfire_instance .config .get_tracer_provider ()
82
- return cls (wrapped , tracer_provider , event_provider )
86
+ return cls (wrapped , tracer_provider , event_provider , event_mode )
83
87
84
88
async def request (
85
89
self ,
@@ -111,7 +115,7 @@ async def request_stream(
111
115
finish (response_stream .get (), response_stream .usage ())
112
116
113
117
@contextmanager
114
- def _instrument (
118
+ def _instrument ( # noqa: C901
115
119
self ,
116
120
messages : list [ModelMessage ],
117
121
model_settings : ModelSettings | None ,
@@ -126,18 +130,19 @@ def _instrument(
126
130
# - server.port: to parse from the base_url
127
131
# - error.type: unclear if we should do something here or just always rely on span exceptions
128
132
# - gen_ai.request.stop_sequences/top_k: model_settings doesn't include these
129
- attributes : dict [str , Any ] = {
133
+ attributes : dict [str , AttributeValue ] = {
130
134
'gen_ai.operation.name' : operation ,
131
135
'gen_ai.system' : system ,
132
136
'gen_ai.request.model' : model_name ,
133
137
}
134
138
135
139
if model_settings :
136
140
for key in MODEL_SETTING_ATTRIBUTES :
137
- if (value := model_settings .get (key , NOT_GIVEN )) is not NOT_GIVEN :
141
+ if isinstance (value := model_settings .get (key ), ( float , int )) :
138
142
attributes [f'gen_ai.request.{ key } ' ] = value
139
143
140
- emit_event = partial (self ._emit_event , system )
144
+ events_list = []
145
+ emit_event = partial (self ._emit_event , system , events_list )
141
146
142
147
with self .tracer .start_as_current_span (span_name , attributes = attributes ) as span :
143
148
if span .is_recording ():
@@ -167,22 +172,36 @@ def finish(response: ModelResponse, usage: Usage):
167
172
)
168
173
span .set_attributes (
169
174
{
170
- k : v
171
- for k , v in {
172
- # TODO finish_reason (https://github.com/open-telemetry/semantic-conventions/issues/1277), id
173
- # https://github.com/pydantic/pydantic-ai/issues/886
174
- 'gen_ai.response.model' : response .model_name or model_name ,
175
- 'gen_ai.usage.input_tokens' : usage .request_tokens ,
176
- 'gen_ai.usage.output_tokens' : usage .response_tokens ,
177
- }.items ()
178
- if v is not None
175
+ # TODO finish_reason (https://github.com/open-telemetry/semantic-conventions/issues/1277), id
176
+ # https://github.com/pydantic/pydantic-ai/issues/886
177
+ 'gen_ai.response.model' : response .model_name or model_name ,
178
+ ** usage .opentelemetry_attributes (),
179
179
}
180
180
)
181
+ if events_list :
182
+ attr_name = 'events'
183
+ span .set_attributes (
184
+ {
185
+ attr_name : json .dumps (events_list ),
186
+ 'logfire.json_schema' : json .dumps (
187
+ {
188
+ 'type' : 'object' ,
189
+ 'properties' : {attr_name : {'type' : 'array' }},
190
+ }
191
+ ),
192
+ }
193
+ )
181
194
182
195
yield finish
183
196
184
- def _emit_event (self , system : str , event_name : str , body : dict [str , Any ]) -> None :
185
- self .event_logger .emit (Event (event_name , body = body , attributes = {'gen_ai.system' : system }))
197
+ def _emit_event (
198
+ self , system : str , events_list : list [dict [str , Any ]], event_name : str , body : dict [str , Any ]
199
+ ) -> None :
200
+ attributes = {'gen_ai.system' : system }
201
+ if self .event_mode == 'logs' :
202
+ self .event_logger .emit (Event (event_name , body = body , attributes = attributes ))
203
+ else :
204
+ events_list .append ({'event.name' : event_name , ** body , ** attributes })
186
205
187
206
188
207
def _request_part_body (part : ModelRequestPart ) -> tuple [str , dict [str , Any ]]:
0 commit comments