Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions langfuse/_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2809,18 +2809,21 @@ async def _process_experiment_item(
# Link to dataset run if this is a dataset item
if hasattr(item, "id") and hasattr(item, "dataset_id"):
try:
# Use sync API to avoid event loop issues when run_async_safely
# creates multiple event loops across different threads
dataset_run_item = await asyncio.to_thread(
self.api.dataset_run_items.create,
request=CreateDatasetRunItemRequest(
# Use sync API instead of async API due to httpx.AsyncClient performance issues
# at high concurrency (100+ concurrent requests). The async API exhibits cumulative
# performance degradation where each subsequent request takes progressively longer
# (~100ms more per request), causing items to complete in 800ms, 1000ms, 1200ms, etc.,
# even though the server processes each request in ~300ms. Using the sync API blocks
# during creation but avoids the httpx async bottleneck, ensuring correct timing.
dataset_run_item = self.api.dataset_run_items.create(
CreateDatasetRunItemRequest(
runName=experiment_run_name,
runDescription=experiment_description,
metadata=experiment_metadata,
datasetItemId=item.id, # type: ignore
traceId=trace_id,
observationId=span.id,
),
)
)
Comment on lines +2818 to 2827
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: calling sync API self.api.dataset_run_items.create() directly inside an async function blocks the event loop. When running 100+ concurrent items via asyncio.gather() (line 2690), this blocking call defeats the purpose of async concurrency. Each blocked call prevents other coroutines from executing, effectively serializing execution and degrading performance worse than the original asyncio.to_thread() approach.

Use asyncio.to_thread() to offload the blocking call:

Suggested change
dataset_run_item = self.api.dataset_run_items.create(
CreateDatasetRunItemRequest(
runName=experiment_run_name,
runDescription=experiment_description,
metadata=experiment_metadata,
datasetItemId=item.id, # type: ignore
traceId=trace_id,
observationId=span.id,
),
)
)
dataset_run_item = await asyncio.to_thread(
self.api.dataset_run_items.create,
CreateDatasetRunItemRequest(
runName=experiment_run_name,
runDescription=experiment_description,
metadata=experiment_metadata,
datasetItemId=item.id, # type: ignore
traceId=trace_id,
observationId=span.id,
)
)
Prompt To Fix With AI
This is a comment left during a code review.
Path: langfuse/_client/client.py
Line: 2818:2827

Comment:
**logic:** calling sync API `self.api.dataset_run_items.create()` directly inside an async function blocks the event loop. When running 100+ concurrent items via `asyncio.gather()` (line 2690), this blocking call defeats the purpose of async concurrency. Each blocked call prevents other coroutines from executing, effectively serializing execution and degrading performance worse than the original `asyncio.to_thread()` approach.

Use `asyncio.to_thread()` to offload the blocking call:

```suggestion
                        dataset_run_item = await asyncio.to_thread(
                            self.api.dataset_run_items.create,
                            CreateDatasetRunItemRequest(
                                runName=experiment_run_name,
                                runDescription=experiment_description,
                                metadata=experiment_metadata,
                                datasetItemId=item.id,  # type: ignore
                                traceId=trace_id,
                                observationId=span.id,
                            )
                        )
```

How can I resolve this? If you propose a fix, please make it concise.


dataset_run_id = dataset_run_item.dataset_run_id
Expand Down
Loading