-
Notifications
You must be signed in to change notification settings - Fork 209
fix(experiments): move to sync run item creation to avoid latency issues #1444
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 file reviewed, 1 comment
Edit Code Review Agent Settings | Greptile
React with 👍 or 👎 to share your feedback on this new summary format
| dataset_run_item = self.api.dataset_run_items.create( | ||
| CreateDatasetRunItemRequest( | ||
| runName=experiment_run_name, | ||
| runDescription=experiment_description, | ||
| metadata=experiment_metadata, | ||
| datasetItemId=item.id, # type: ignore | ||
| traceId=trace_id, | ||
| observationId=span.id, | ||
| ), | ||
| ) | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: calling sync API self.api.dataset_run_items.create() directly inside an async function blocks the event loop. When running 100+ concurrent items via asyncio.gather() (line 2690), this blocking call defeats the purpose of async concurrency. Each blocked call prevents other coroutines from executing, effectively serializing execution and degrading performance worse than the original asyncio.to_thread() approach.
Use asyncio.to_thread() to offload the blocking call:
| dataset_run_item = self.api.dataset_run_items.create( | |
| CreateDatasetRunItemRequest( | |
| runName=experiment_run_name, | |
| runDescription=experiment_description, | |
| metadata=experiment_metadata, | |
| datasetItemId=item.id, # type: ignore | |
| traceId=trace_id, | |
| observationId=span.id, | |
| ), | |
| ) | |
| ) | |
| dataset_run_item = await asyncio.to_thread( | |
| self.api.dataset_run_items.create, | |
| CreateDatasetRunItemRequest( | |
| runName=experiment_run_name, | |
| runDescription=experiment_description, | |
| metadata=experiment_metadata, | |
| datasetItemId=item.id, # type: ignore | |
| traceId=trace_id, | |
| observationId=span.id, | |
| ) | |
| ) |
Prompt To Fix With AI
This is a comment left during a code review.
Path: langfuse/_client/client.py
Line: 2818:2827
Comment:
**logic:** calling sync API `self.api.dataset_run_items.create()` directly inside an async function blocks the event loop. When running 100+ concurrent items via `asyncio.gather()` (line 2690), this blocking call defeats the purpose of async concurrency. Each blocked call prevents other coroutines from executing, effectively serializing execution and degrading performance worse than the original `asyncio.to_thread()` approach.
Use `asyncio.to_thread()` to offload the blocking call:
```suggestion
dataset_run_item = await asyncio.to_thread(
self.api.dataset_run_items.create,
CreateDatasetRunItemRequest(
runName=experiment_run_name,
runDescription=experiment_description,
metadata=experiment_metadata,
datasetItemId=item.id, # type: ignore
traceId=trace_id,
observationId=span.id,
)
)
```
How can I resolve this? If you propose a fix, please make it concise.
Important
Switches from async to sync API for dataset run item creation in
client.pyto address performance issues at high concurrency.dataset_run_items.createin_process_experiment_item()inclient.py.httpx.AsyncClientat high concurrency (100+ requests).This description was created by
for 3d570eb. You can customize this summary. It will automatically update as commits are pushed.
Disclaimer: Experimental PR review
Greptile Summary
asyncio.to_thread) with direct blocking sync call in_process_experiment_itemConfidence Score: 1/5
asyncio.to_thread()with a direct blocking sync call inside an async function that runs 100+ items concurrently viaasyncio.gather(), which blocks the event loop and defeats async concurrencyImportant Files Changed
Sequence Diagram
sequenceDiagram participant User participant Langfuse participant ProcessItem as "_process_experiment_item" participant API as "dataset_run_items.create" participant Server as "Langfuse Server" User->>Langfuse: "run_experiment(data, task)" Langfuse->>Langfuse: "asyncio.gather() 100+ items" loop For each item concurrently Langfuse->>ProcessItem: "await _process_experiment_item(item)" ProcessItem->>ProcessItem: "Check if item has dataset_id" ProcessItem->>API: "self.api.dataset_run_items.create(request)" Note over API: "Blocking sync call (blocks event loop)" API->>Server: "HTTP POST /dataset_run_items" Server-->>API: "Response (~300ms)" API-->>ProcessItem: "dataset_run_item" ProcessItem->>ProcessItem: "Execute task(item)" ProcessItem-->>Langfuse: "ExperimentItemResult" end Langfuse-->>User: "ExperimentResult"