Skip to content

Commit 11b9952

Browse files
authored
Merge pull request #1040 from parea-ai/improve-lc-errors
Improve lc errors
2 parents 466c5bc + a75fcb6 commit 11b9952

File tree

4 files changed

+235
-16
lines changed

4 files changed

+235
-16
lines changed

parea/parea_logger.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from parea.helpers import serialize_metadata_values
1313
from parea.schemas.log import TraceIntegrations
1414
from parea.schemas.models import CreateGetProjectResponseSchema, TraceLog, UpdateLog
15+
from parea.utils.trace_integrations.langchain_utils import _dumps_json
1516
from parea.utils.universal_encoder import json_dumps
1617

1718
logger = logging.getLogger()
@@ -90,7 +91,7 @@ def record_vendor_log(self, data: Dict[str, Any], vendor: TraceIntegrations) ->
9091
self._client.request(
9192
"POST",
9293
VENDOR_LOG_ENDPOINT.format(vendor=vendor.value),
93-
data=json.loads(json_dumps(data)), # uuid is not serializable
94+
data=json.loads(_dumps_json(data)), # uuid is not serializable
9495
)
9596

9697
async def arecord_vendor_log(self, data: Dict[str, Any], vendor: TraceIntegrations) -> None:
@@ -101,7 +102,7 @@ async def arecord_vendor_log(self, data: Dict[str, Any], vendor: TraceIntegratio
101102
await self._client.request_async(
102103
"POST",
103104
VENDOR_LOG_ENDPOINT.format(vendor=vendor.value),
104-
data=json.loads(json_dumps(data)), # uuid is not serializable
105+
data=json.loads(_dumps_json(data)), # uuid is not serializable
105106
)
106107

107108

parea/utils/trace_integrations/langchain.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,19 +46,22 @@ def __init__(
4646
def _persist_run(self, run: Union[Run, LLMRun, ChainRun, ToolRun]) -> None:
4747
if is_logging_disabled():
4848
return
49-
self.parent_trace_id = run.id
50-
# using .dict() since langchain Run class currently set to Pydantic v1
51-
data = run.dict()
52-
data["_parea_root_trace_id"] = self._parea_root_trace_id or None
53-
data["_session_id"] = self._session_id
54-
data["_tags"] = self._tags
55-
data["_metadata"] = self._metadata
56-
data["_end_user_identifier"] = self._end_user_identifier
57-
data["_deployment_id"] = self._deployment_id
58-
# check if run has an attribute execution order
59-
if (hasattr(run, "execution_order") and run.execution_order == 1) or run.parent_run_id is None:
60-
data["_parea_parent_trace_id"] = self._parea_parent_trace_id or None
61-
parea_logger.record_vendor_log(data, TraceIntegrations.LANGCHAIN)
49+
try:
50+
self.parent_trace_id = run.id
51+
# using .dict() since langchain Run class currently set to Pydantic v1
52+
data = run.dict()
53+
data["_parea_root_trace_id"] = self._parea_root_trace_id or None
54+
data["_session_id"] = self._session_id
55+
data["_tags"] = self._tags
56+
data["_metadata"] = self._metadata
57+
data["_end_user_identifier"] = self._end_user_identifier
58+
data["_deployment_id"] = self._deployment_id
59+
# check if run has an attribute execution order
60+
if (hasattr(run, "execution_order") and run.execution_order == 1) or run.parent_run_id is None:
61+
data["_parea_parent_trace_id"] = self._parea_parent_trace_id or None
62+
parea_logger.record_vendor_log(data, TraceIntegrations.LANGCHAIN)
63+
except Exception as e:
64+
logger.exception(f"Error occurred while logging langchain run: {e}", stack_info=True)
6265

6366
def get_parent_trace_id(self) -> UUID:
6467
return self.parent_trace_id
Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
from __future__ import annotations
2+
3+
from typing import Any, Callable, Dict, List, Optional, TypedDict, TypeVar, Union
4+
5+
import copy
6+
import datetime
7+
import functools
8+
import json
9+
import logging
10+
import re
11+
import uuid
12+
from enum import Enum
13+
14+
from orjson import orjson
15+
16+
logger = logging.getLogger()
17+
18+
19+
ID_TYPE = Union[uuid.UUID, str]
20+
_MAX_DEPTH = 2
21+
22+
23+
class RunTypeEnum(str, Enum):
24+
"""(Deprecated) Enum for run types. Use string directly."""
25+
26+
tool = "tool"
27+
chain = "chain"
28+
llm = "llm"
29+
retriever = "retriever"
30+
embedding = "embedding"
31+
prompt = "prompt"
32+
parser = "parser"
33+
34+
35+
class RunLikeDict(TypedDict, total=False):
36+
"""Run-like dictionary, for type-hinting."""
37+
38+
name: str
39+
run_type: RunTypeEnum
40+
start_time: datetime
41+
inputs: Optional[dict]
42+
outputs: Optional[dict]
43+
end_time: Optional[datetime]
44+
extra: Optional[dict]
45+
error: Optional[str]
46+
serialized: Optional[dict]
47+
parent_run_id: Optional[uuid.UUID]
48+
manifest_id: Optional[uuid.UUID]
49+
events: Optional[List[dict]]
50+
tags: Optional[List[str]]
51+
inputs_s3_urls: Optional[dict]
52+
outputs_s3_urls: Optional[dict]
53+
id: Optional[uuid.UUID]
54+
session_id: Optional[uuid.UUID]
55+
session_name: Optional[str]
56+
reference_example_id: Optional[uuid.UUID]
57+
input_attachments: Optional[dict]
58+
output_attachments: Optional[dict]
59+
trace_id: uuid.UUID
60+
dotted_order: str
61+
62+
63+
def _as_uuid(value: ID_TYPE, var: Optional[str] = None) -> uuid.UUID:
64+
try:
65+
return uuid.UUID(value) if not isinstance(value, uuid.UUID) else value
66+
except ValueError as e:
67+
var = var or "value"
68+
raise Exception(f"{var} must be a valid UUID or UUID string. Got {value}") from e
69+
70+
71+
def _simple_default(obj: Any) -> Any:
72+
# Don't traverse into nested objects
73+
try:
74+
if isinstance(obj, datetime.datetime):
75+
return obj.isoformat()
76+
if isinstance(obj, uuid.UUID):
77+
return str(obj)
78+
return json.loads(json.dumps(obj))
79+
except BaseException as e:
80+
logger.debug(f"Failed to serialize {type(obj)} to JSON: {e}")
81+
return repr(obj)
82+
83+
84+
def _serialize_json(obj: Any, depth: int = 0, serialize_py: bool = True) -> Any:
85+
try:
86+
if depth >= _MAX_DEPTH:
87+
try:
88+
return orjson.loads(_dumps_json_single(obj))
89+
except BaseException:
90+
return repr(obj)
91+
if isinstance(obj, bytes):
92+
return obj.decode("utf-8")
93+
if isinstance(obj, (set, tuple)):
94+
return orjson.loads(_dumps_json_single(list(obj)))
95+
96+
serialization_methods = [
97+
("model_dump_json", True), # Pydantic V2
98+
("json", True), # Pydantic V1
99+
("to_json", False), # dataclass_json
100+
("model_dump", True), # Pydantic V2 with non-serializable fields
101+
("dict", False), # Pydantic V1 with non-serializable fields
102+
]
103+
for attr, exclude_none in serialization_methods:
104+
if hasattr(obj, attr) and callable(getattr(obj, attr)):
105+
try:
106+
method = getattr(obj, attr)
107+
json_str = method(exclude_none=exclude_none) if exclude_none else method()
108+
if isinstance(json_str, str):
109+
return json.loads(json_str)
110+
return orjson.loads(_dumps_json(json_str, depth=depth + 1, serialize_py=serialize_py))
111+
except Exception as e:
112+
logger.debug(f"Failed to serialize {type(obj)} to JSON: {e}")
113+
pass
114+
if serialize_py:
115+
all_attrs = {}
116+
if hasattr(obj, "__slots__"):
117+
all_attrs.update({slot: getattr(obj, slot, None) for slot in obj.__slots__})
118+
if hasattr(obj, "__dict__"):
119+
all_attrs.update(vars(obj))
120+
if all_attrs:
121+
filtered = {k: v if v is not obj else repr(v) for k, v in all_attrs.items()}
122+
return orjson.loads(_dumps_json(filtered, depth=depth + 1, serialize_py=serialize_py))
123+
return repr(obj)
124+
except BaseException as e:
125+
logger.debug(f"Failed to serialize {type(obj)} to JSON: {e}")
126+
return repr(obj)
127+
128+
129+
def _elide_surrogates(s: bytes) -> bytes:
130+
pattern = re.compile(rb"\\ud[89a-f][0-9a-f]{2}", re.IGNORECASE)
131+
result = pattern.sub(b"", s)
132+
return result
133+
134+
135+
def _dumps_json_single(obj: Any, default: Optional[Callable[[Any], Any]] = None) -> bytes:
136+
try:
137+
return orjson.dumps(
138+
obj,
139+
default=default,
140+
option=orjson.OPT_SERIALIZE_NUMPY | orjson.OPT_SERIALIZE_DATACLASS | orjson.OPT_SERIALIZE_UUID | orjson.OPT_NON_STR_KEYS,
141+
)
142+
except TypeError as e:
143+
# Usually caused by UTF surrogate characters
144+
logger.debug(f"Orjson serialization failed: {repr(e)}. Falling back to json.")
145+
result = json.dumps(
146+
obj,
147+
default=_simple_default,
148+
ensure_ascii=True,
149+
).encode("utf-8")
150+
try:
151+
result = orjson.dumps(orjson.loads(result.decode("utf-8", errors="surrogateescape")))
152+
except orjson.JSONDecodeError:
153+
result = _elide_surrogates(result)
154+
return result
155+
156+
157+
def _dumps_json(obj: Any, depth: int = 0, serialize_py: bool = True) -> bytes:
158+
"""Serialize an object to a JSON formatted string.
159+
Parameters
160+
----------
161+
obj : Any
162+
The object to serialize.
163+
default : Callable[[Any], Any] or None, default=None
164+
The default function to use for serialization.
165+
Returns:
166+
-------
167+
str
168+
The JSON formatted string.
169+
"""
170+
return _dumps_json_single(obj, functools.partial(_serialize_json, depth=depth, serialize_py=serialize_py))
171+
172+
173+
T = TypeVar("T")
174+
175+
176+
def _middle_copy(val: T, memo: Dict[int, Any], max_depth: int = 4, _depth: int = 0) -> T:
177+
cls = type(val)
178+
179+
copier = getattr(cls, "__deepcopy__", None)
180+
if copier is not None:
181+
try:
182+
return copier(memo)
183+
except BaseException:
184+
pass
185+
if _depth >= max_depth:
186+
return val
187+
if isinstance(val, dict):
188+
return {_middle_copy(k, memo, max_depth, _depth + 1): _middle_copy(v, memo, max_depth, _depth + 1) for k, v in val.items()} # type: ignore[return-value]
189+
if isinstance(val, list):
190+
return [_middle_copy(item, memo, max_depth, _depth + 1) for item in val] # type: ignore[return-value]
191+
if isinstance(val, tuple):
192+
return tuple(_middle_copy(item, memo, max_depth, _depth + 1) for item in val) # type: ignore[return-value]
193+
if isinstance(val, set):
194+
return {_middle_copy(item, memo, max_depth, _depth + 1) for item in val} # type: ignore[return-value]
195+
196+
return val
197+
198+
199+
def deepish_copy(val: T) -> T:
200+
"""Deep copy a value with a compromise for uncopyable objects.
201+
Args:
202+
val: The value to be deep copied.
203+
Returns:
204+
The deep copied value.
205+
"""
206+
memo: Dict[int, Any] = {}
207+
try:
208+
return copy.deepcopy(val, memo)
209+
except BaseException as e:
210+
# Generators, locks, etc. cannot be copied
211+
# and raise a TypeError (mentioning pickling, since the dunder methods)
212+
# are re-used for copying. We'll try to do a compromise and copy
213+
# what we can
214+
logger.debug("Failed to deepcopy input: %s", repr(e))
215+
return _middle_copy(val, memo)

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ build-backend = "poetry.core.masonry.api"
66
[tool.poetry]
77
name = "parea-ai"
88
packages = [{ include = "parea" }]
9-
version = "0.2.194"
9+
version = "0.2.195"
1010
description = "Parea python sdk"
1111
readme = "README.md"
1212
authors = ["joel-parea-ai <[email protected]>"]

0 commit comments

Comments
 (0)