-
Notifications
You must be signed in to change notification settings - Fork 591
Expand file tree
/
Copy pathmonitoring.py
More file actions
147 lines (121 loc) · 5.58 KB
/
monitoring.py
File metadata and controls
147 lines (121 loc) · 5.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import inspect
import sys
from functools import wraps
from sentry_sdk.consts import SPANDATA
import sentry_sdk.utils
from sentry_sdk import start_span
from sentry_sdk.tracing import Span
from sentry_sdk.traces import StreamedSpan
from sentry_sdk.utils import ContextVar, reraise, capture_internal_exceptions
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Optional, Callable, Awaitable, Any, Union, TypeVar
F = TypeVar("F", bound=Union[Callable[..., Any], Callable[..., Awaitable[Any]]])
_ai_pipeline_name = ContextVar("ai_pipeline_name", default=None)
def set_ai_pipeline_name(name: "Optional[str]") -> None:
_ai_pipeline_name.set(name)
def get_ai_pipeline_name() -> "Optional[str]":
return _ai_pipeline_name.get()
def ai_track(description: str, **span_kwargs: "Any") -> "Callable[[F], F]":
def decorator(f: "F") -> "F":
def sync_wrapped(*args: "Any", **kwargs: "Any") -> "Any":
curr_pipeline = _ai_pipeline_name.get()
op = span_kwargs.pop("op", "ai.run" if curr_pipeline else "ai.pipeline")
with start_span(name=description, op=op, **span_kwargs) as span:
for k, v in kwargs.pop("sentry_tags", {}).items():
span.set_tag(k, v)
for k, v in kwargs.pop("sentry_data", {}).items():
span.set_data(k, v)
if curr_pipeline:
span.set_data(SPANDATA.GEN_AI_PIPELINE_NAME, curr_pipeline)
return f(*args, **kwargs)
else:
_ai_pipeline_name.set(description)
try:
res = f(*args, **kwargs)
except Exception as e:
exc_info = sys.exc_info()
with capture_internal_exceptions():
event, hint = sentry_sdk.utils.event_from_exception(
e,
client_options=sentry_sdk.get_client().options,
mechanism={"type": "ai_monitoring", "handled": False},
)
sentry_sdk.capture_event(event, hint=hint)
reraise(*exc_info)
finally:
_ai_pipeline_name.set(None)
return res
async def async_wrapped(*args: "Any", **kwargs: "Any") -> "Any":
curr_pipeline = _ai_pipeline_name.get()
op = span_kwargs.pop("op", "ai.run" if curr_pipeline else "ai.pipeline")
with start_span(name=description, op=op, **span_kwargs) as span:
for k, v in kwargs.pop("sentry_tags", {}).items():
span.set_tag(k, v)
for k, v in kwargs.pop("sentry_data", {}).items():
span.set_data(k, v)
if curr_pipeline:
span.set_data(SPANDATA.GEN_AI_PIPELINE_NAME, curr_pipeline)
return await f(*args, **kwargs)
else:
_ai_pipeline_name.set(description)
try:
res = await f(*args, **kwargs)
except Exception as e:
exc_info = sys.exc_info()
with capture_internal_exceptions():
event, hint = sentry_sdk.utils.event_from_exception(
e,
client_options=sentry_sdk.get_client().options,
mechanism={"type": "ai_monitoring", "handled": False},
)
sentry_sdk.capture_event(event, hint=hint)
reraise(*exc_info)
finally:
_ai_pipeline_name.set(None)
return res
if inspect.iscoroutinefunction(f):
return wraps(f)(async_wrapped) # type: ignore
else:
return wraps(f)(sync_wrapped) # type: ignore
return decorator
def record_token_usage(
span: "Union[Span, StreamedSpan]",
input_tokens: "Optional[int]" = None,
input_tokens_cached: "Optional[int]" = None,
input_tokens_cache_write: "Optional[int]" = None,
output_tokens: "Optional[int]" = None,
output_tokens_reasoning: "Optional[int]" = None,
total_tokens: "Optional[int]" = None,
) -> None:
# TODO: move pipeline name elsewhere
if isinstance(span, StreamedSpan):
set_on_span = span.set_attribute
else:
set_on_span = span.set_data
ai_pipeline_name = get_ai_pipeline_name()
if ai_pipeline_name:
set_on_span(SPANDATA.GEN_AI_PIPELINE_NAME, ai_pipeline_name)
if input_tokens is not None:
set_on_span(SPANDATA.GEN_AI_USAGE_INPUT_TOKENS, input_tokens)
if input_tokens_cached is not None:
set_on_span(
SPANDATA.GEN_AI_USAGE_INPUT_TOKENS_CACHED,
input_tokens_cached,
)
if input_tokens_cache_write is not None:
set_on_span(
SPANDATA.GEN_AI_USAGE_INPUT_TOKENS_CACHE_WRITE,
input_tokens_cache_write,
)
if output_tokens is not None:
set_on_span(SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS, output_tokens)
if output_tokens_reasoning is not None:
set_on_span(
SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS_REASONING,
output_tokens_reasoning,
)
if total_tokens is None and input_tokens is not None and output_tokens is not None:
total_tokens = input_tokens + output_tokens
if total_tokens is not None:
set_on_span(SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS, total_tokens)