Skip to content

Commit 197ab18

Browse files
committed
increased error logging and handling lc handler
1 parent 466c5bc commit 197ab18

File tree

4 files changed

+236
-22
lines changed

4 files changed

+236
-22
lines changed

parea/parea_logger.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,16 @@
1-
from typing import Any, Dict, Optional
2-
31
import json
42
import logging
53
import os
4+
from typing import Any, Dict, Optional
65

76
from attrs import asdict, define, field
87
from cattrs import structure
9-
108
from parea.api_client import HTTPClient
119
from parea.constants import PAREA_OS_ENV_EXPERIMENT_UUID
1210
from parea.helpers import serialize_metadata_values
1311
from parea.schemas.log import TraceIntegrations
1412
from parea.schemas.models import CreateGetProjectResponseSchema, TraceLog, UpdateLog
13+
from parea.utils.trace_integrations.langchain_utils import _dumps_json
1514
from parea.utils.universal_encoder import json_dumps
1615

1716
logger = logging.getLogger()
@@ -90,7 +89,7 @@ def record_vendor_log(self, data: Dict[str, Any], vendor: TraceIntegrations) ->
9089
self._client.request(
9190
"POST",
9291
VENDOR_LOG_ENDPOINT.format(vendor=vendor.value),
93-
data=json.loads(json_dumps(data)), # uuid is not serializable
92+
data=json.loads(_dumps_json(data)), # uuid is not serializable
9493
)
9594

9695
async def arecord_vendor_log(self, data: Dict[str, Any], vendor: TraceIntegrations) -> None:
@@ -101,7 +100,7 @@ async def arecord_vendor_log(self, data: Dict[str, Any], vendor: TraceIntegratio
101100
await self._client.request_async(
102101
"POST",
103102
VENDOR_LOG_ENDPOINT.format(vendor=vendor.value),
104-
data=json.loads(json_dumps(data)), # uuid is not serializable
103+
data=json.loads(_dumps_json(data)), # uuid is not serializable
105104
)
106105

107106

parea/utils/trace_integrations/langchain.py

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
1-
from typing import Any, Dict, List, Optional, Union
2-
31
import logging
2+
from typing import Any, Dict, List, Optional, Union
43
from uuid import UUID
54

65
from langchain_core.tracers import BaseTracer
76
from langchain_core.tracers.schemas import ChainRun, LLMRun, Run, ToolRun
8-
97
from parea.helpers import is_logging_disabled
108
from parea.parea_logger import parea_logger
119
from parea.schemas import UpdateTraceScenario
@@ -46,19 +44,22 @@ def __init__(
4644
def _persist_run(self, run: Union[Run, LLMRun, ChainRun, ToolRun]) -> None:
4745
if is_logging_disabled():
4846
return
49-
self.parent_trace_id = run.id
50-
# using .dict() since langchain Run class currently set to Pydantic v1
51-
data = run.dict()
52-
data["_parea_root_trace_id"] = self._parea_root_trace_id or None
53-
data["_session_id"] = self._session_id
54-
data["_tags"] = self._tags
55-
data["_metadata"] = self._metadata
56-
data["_end_user_identifier"] = self._end_user_identifier
57-
data["_deployment_id"] = self._deployment_id
58-
# check if run has an attribute execution order
59-
if (hasattr(run, "execution_order") and run.execution_order == 1) or run.parent_run_id is None:
60-
data["_parea_parent_trace_id"] = self._parea_parent_trace_id or None
61-
parea_logger.record_vendor_log(data, TraceIntegrations.LANGCHAIN)
47+
try:
48+
self.parent_trace_id = run.id
49+
# using .dict() since langchain Run class currently set to Pydantic v1
50+
data = run.dict()
51+
data["_parea_root_trace_id"] = self._parea_root_trace_id or None
52+
data["_session_id"] = self._session_id
53+
data["_tags"] = self._tags
54+
data["_metadata"] = self._metadata
55+
data["_end_user_identifier"] = self._end_user_identifier
56+
data["_deployment_id"] = self._deployment_id
57+
# check if run has an attribute execution order
58+
if (hasattr(run, "execution_order") and run.execution_order == 1) or run.parent_run_id is None:
59+
data["_parea_parent_trace_id"] = self._parea_parent_trace_id or None
60+
parea_logger.record_vendor_log(data, TraceIntegrations.LANGCHAIN)
61+
except Exception as e:
62+
logger.exception(f"Error occurred while logging langchain run: {e}", stack_info=True)
6263

6364
def get_parent_trace_id(self) -> UUID:
6465
return self.parent_trace_id
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
from __future__ import annotations
2+
3+
import copy
4+
import datetime
5+
import functools
6+
import json
7+
import logging
8+
import re
9+
import uuid
10+
from enum import Enum
11+
from typing import Any, Callable, Dict, List, Optional, TypedDict, TypeVar, Union
12+
13+
from orjson import orjson
14+
15+
logger = logging.getLogger()
16+
17+
18+
ID_TYPE = Union[uuid.UUID, str]
19+
_MAX_DEPTH = 2
20+
21+
22+
class RunTypeEnum(str, Enum):
23+
"""(Deprecated) Enum for run types. Use string directly."""
24+
25+
tool = "tool"
26+
chain = "chain"
27+
llm = "llm"
28+
retriever = "retriever"
29+
embedding = "embedding"
30+
prompt = "prompt"
31+
parser = "parser"
32+
33+
34+
class RunLikeDict(TypedDict, total=False):
35+
"""Run-like dictionary, for type-hinting."""
36+
37+
name: str
38+
run_type: RunTypeEnum
39+
start_time: datetime
40+
inputs: Optional[dict]
41+
outputs: Optional[dict]
42+
end_time: Optional[datetime]
43+
extra: Optional[dict]
44+
error: Optional[str]
45+
serialized: Optional[dict]
46+
parent_run_id: Optional[uuid.UUID]
47+
manifest_id: Optional[uuid.UUID]
48+
events: Optional[List[dict]]
49+
tags: Optional[List[str]]
50+
inputs_s3_urls: Optional[dict]
51+
outputs_s3_urls: Optional[dict]
52+
id: Optional[uuid.UUID]
53+
session_id: Optional[uuid.UUID]
54+
session_name: Optional[str]
55+
reference_example_id: Optional[uuid.UUID]
56+
input_attachments: Optional[dict]
57+
output_attachments: Optional[dict]
58+
trace_id: uuid.UUID
59+
dotted_order: str
60+
61+
62+
def _as_uuid(value: ID_TYPE, var: Optional[str] = None) -> uuid.UUID:
63+
try:
64+
return uuid.UUID(value) if not isinstance(value, uuid.UUID) else value
65+
except ValueError as e:
66+
var = var or "value"
67+
raise Exception(f"{var} must be a valid UUID or UUID string. Got {value}") from e
68+
69+
70+
def _simple_default(obj: Any) -> Any:
71+
# Don't traverse into nested objects
72+
try:
73+
if isinstance(obj, datetime.datetime):
74+
return obj.isoformat()
75+
if isinstance(obj, uuid.UUID):
76+
return str(obj)
77+
return json.loads(json.dumps(obj))
78+
except BaseException as e:
79+
logger.debug(f"Failed to serialize {type(obj)} to JSON: {e}")
80+
return repr(obj)
81+
82+
83+
def _serialize_json(obj: Any, depth: int = 0, serialize_py: bool = True) -> Any:
84+
try:
85+
if depth >= _MAX_DEPTH:
86+
try:
87+
return orjson.loads(_dumps_json_single(obj))
88+
except BaseException:
89+
return repr(obj)
90+
if isinstance(obj, bytes):
91+
return obj.decode("utf-8")
92+
if isinstance(obj, (set, tuple)):
93+
return orjson.loads(_dumps_json_single(list(obj)))
94+
95+
serialization_methods = [
96+
("model_dump_json", True), # Pydantic V2
97+
("json", True), # Pydantic V1
98+
("to_json", False), # dataclass_json
99+
("model_dump", True), # Pydantic V2 with non-serializable fields
100+
("dict", False), # Pydantic V1 with non-serializable fields
101+
]
102+
for attr, exclude_none in serialization_methods:
103+
if hasattr(obj, attr) and callable(getattr(obj, attr)):
104+
try:
105+
method = getattr(obj, attr)
106+
json_str = method(exclude_none=exclude_none) if exclude_none else method()
107+
if isinstance(json_str, str):
108+
return json.loads(json_str)
109+
return orjson.loads(_dumps_json(json_str, depth=depth + 1, serialize_py=serialize_py))
110+
except Exception as e:
111+
logger.debug(f"Failed to serialize {type(obj)} to JSON: {e}")
112+
pass
113+
if serialize_py:
114+
all_attrs = {}
115+
if hasattr(obj, "__slots__"):
116+
all_attrs.update({slot: getattr(obj, slot, None) for slot in obj.__slots__})
117+
if hasattr(obj, "__dict__"):
118+
all_attrs.update(vars(obj))
119+
if all_attrs:
120+
filtered = {k: v if v is not obj else repr(v) for k, v in all_attrs.items()}
121+
return orjson.loads(_dumps_json(filtered, depth=depth + 1, serialize_py=serialize_py))
122+
return repr(obj)
123+
except BaseException as e:
124+
logger.debug(f"Failed to serialize {type(obj)} to JSON: {e}")
125+
return repr(obj)
126+
127+
128+
def _elide_surrogates(s: bytes) -> bytes:
129+
pattern = re.compile(rb"\\ud[89a-f][0-9a-f]{2}", re.IGNORECASE)
130+
result = pattern.sub(b"", s)
131+
return result
132+
133+
134+
def _dumps_json_single(obj: Any, default: Optional[Callable[[Any], Any]] = None) -> bytes:
135+
try:
136+
return orjson.dumps(
137+
obj,
138+
default=default,
139+
option=orjson.OPT_SERIALIZE_NUMPY | orjson.OPT_SERIALIZE_DATACLASS | orjson.OPT_SERIALIZE_UUID | orjson.OPT_NON_STR_KEYS,
140+
)
141+
except TypeError as e:
142+
# Usually caused by UTF surrogate characters
143+
logger.debug(f"Orjson serialization failed: {repr(e)}. Falling back to json.")
144+
result = json.dumps(
145+
obj,
146+
default=_simple_default,
147+
ensure_ascii=True,
148+
).encode("utf-8")
149+
try:
150+
result = orjson.dumps(orjson.loads(result.decode("utf-8", errors="surrogateescape")))
151+
except orjson.JSONDecodeError:
152+
result = _elide_surrogates(result)
153+
return result
154+
155+
156+
def _dumps_json(obj: Any, depth: int = 0, serialize_py: bool = True) -> bytes:
157+
"""Serialize an object to a JSON formatted string.
158+
Parameters
159+
----------
160+
obj : Any
161+
The object to serialize.
162+
default : Callable[[Any], Any] or None, default=None
163+
The default function to use for serialization.
164+
Returns:
165+
-------
166+
str
167+
The JSON formatted string.
168+
"""
169+
return _dumps_json_single(obj, functools.partial(_serialize_json, depth=depth, serialize_py=serialize_py))
170+
171+
172+
T = TypeVar("T")
173+
174+
175+
def _middle_copy(val: T, memo: Dict[int, Any], max_depth: int = 4, _depth: int = 0) -> T:
176+
cls = type(val)
177+
178+
copier = getattr(cls, "__deepcopy__", None)
179+
if copier is not None:
180+
try:
181+
return copier(memo)
182+
except BaseException:
183+
pass
184+
if _depth >= max_depth:
185+
return val
186+
if isinstance(val, dict):
187+
return {_middle_copy(k, memo, max_depth, _depth + 1): _middle_copy(v, memo, max_depth, _depth + 1) for k, v in val.items()} # type: ignore[return-value]
188+
if isinstance(val, list):
189+
return [_middle_copy(item, memo, max_depth, _depth + 1) for item in val] # type: ignore[return-value]
190+
if isinstance(val, tuple):
191+
return tuple(_middle_copy(item, memo, max_depth, _depth + 1) for item in val) # type: ignore[return-value]
192+
if isinstance(val, set):
193+
return {_middle_copy(item, memo, max_depth, _depth + 1) for item in val} # type: ignore[return-value]
194+
195+
return val
196+
197+
198+
def deepish_copy(val: T) -> T:
199+
"""Deep copy a value with a compromise for uncopyable objects.
200+
Args:
201+
val: The value to be deep copied.
202+
Returns:
203+
The deep copied value.
204+
"""
205+
memo: Dict[int, Any] = {}
206+
try:
207+
return copy.deepcopy(val, memo)
208+
except BaseException as e:
209+
# Generators, locks, etc. cannot be copied
210+
# and raise a TypeError (mentioning pickling, since the dunder methods)
211+
# are re-used for copying. We'll try to do a compromise and copy
212+
# what we can
213+
logger.debug("Failed to deepcopy input: %s", repr(e))
214+
return _middle_copy(val, memo)

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ build-backend = "poetry.core.masonry.api"
66
[tool.poetry]
77
name = "parea-ai"
88
packages = [{ include = "parea" }]
9-
version = "0.2.194"
9+
version = "0.2.195"
1010
description = "Parea python sdk"
1111
readme = "README.md"
1212
authors = ["joel-parea-ai <[email protected]>"]

0 commit comments

Comments
 (0)