Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions memori/llm/_invoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,26 @@ async def invoke(self, **kwargs):
)

raw_response = await self._method(**kwargs)

# Check if streaming is enabled - if so, return a wrapped async generator
# that handles post-processing after the stream is consumed.
# This fixes the "cannot pickle '_thread.RLock' object" error when using
# OpenAI with Agno in streaming mode (Issue #214)
if kwargs.get("stream", False):
return self._wrap_stream(raw_response, kwargs, start)

self.handle_post_response(kwargs, start, raw_response)
return raw_response

async def _wrap_stream(self, stream, kwargs, start):
"""Wrap an async stream to handle post-processing after consumption."""
raw_response = {}
async for chunk in stream:
raw_response = merge_chunk(raw_response, chunk.__dict__)
yield chunk

self.handle_post_response(kwargs, start, raw_response)


class InvokeAsyncIterator(BaseInvoke):
async def invoke(self, **kwargs):
Expand Down