Skip to content

Commit f04c40b

Browse files
feat: async memory support
Adds async support for tools with tests, async execution in the agent executor, and async operations for memory (with aiosqlite). Improves tool decorator typing, ensures _run backward compatibility, updates docs and docstrings, adds tests, and regenerates lockfiles.
1 parent c456e5c commit f04c40b

File tree

11 files changed

+1604
-139
lines changed

11 files changed

+1604
-139
lines changed

lib/crewai/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ dependencies = [
3838
"pydantic-settings~=2.10.1",
3939
"mcp~=1.16.0",
4040
"uv~=0.9.13",
41+
"aiosqlite~=0.21.0",
4142
]
4243

4344
[project.urls]

lib/crewai/src/crewai/memory/contextual/contextual_memory.py

Lines changed: 120 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import asyncio
34
from typing import TYPE_CHECKING
45

56
from crewai.memory import (
@@ -16,6 +17,8 @@
1617

1718

1819
class ContextualMemory:
20+
"""Aggregates and retrieves context from multiple memory sources."""
21+
1922
def __init__(
2023
self,
2124
stm: ShortTermMemory,
@@ -46,9 +49,14 @@ def __init__(
4649
self.exm.task = self.task
4750

4851
def build_context_for_task(self, task: Task, context: str) -> str:
49-
"""
50-
Automatically builds a minimal, highly relevant set of contextual information
51-
for a given task.
52+
"""Build contextual information for a task synchronously.
53+
54+
Args:
55+
task: The task to build context for.
56+
context: Additional context string.
57+
58+
Returns:
59+
Formatted context string from all memory sources.
5260
"""
5361
query = f"{task.description} {context}".strip()
5462

@@ -63,6 +71,31 @@ def build_context_for_task(self, task: Task, context: str) -> str:
6371
]
6472
return "\n".join(filter(None, context_parts))
6573

74+
async def abuild_context_for_task(self, task: Task, context: str) -> str:
75+
"""Build contextual information for a task asynchronously.
76+
77+
Args:
78+
task: The task to build context for.
79+
context: Additional context string.
80+
81+
Returns:
82+
Formatted context string from all memory sources.
83+
"""
84+
query = f"{task.description} {context}".strip()
85+
86+
if query == "":
87+
return ""
88+
89+
# Fetch all contexts concurrently
90+
results = await asyncio.gather(
91+
self._afetch_ltm_context(task.description),
92+
self._afetch_stm_context(query),
93+
self._afetch_entity_context(query),
94+
self._afetch_external_context(query),
95+
)
96+
97+
return "\n".join(filter(None, results))
98+
6699
def _fetch_stm_context(self, query: str) -> str:
67100
"""
68101
Fetches recent relevant insights from STM related to the task's description and expected_output,
@@ -135,3 +168,87 @@ def _fetch_external_context(self, query: str) -> str:
135168
f"- {result['content']}" for result in external_memories
136169
)
137170
return f"External memories:\n{formatted_memories}"
171+
172+
async def _afetch_stm_context(self, query: str) -> str:
173+
"""Fetch recent relevant insights from STM asynchronously.
174+
175+
Args:
176+
query: The search query.
177+
178+
Returns:
179+
Formatted insights as bullet points, or empty string if none found.
180+
"""
181+
if self.stm is None:
182+
return ""
183+
184+
stm_results = await self.stm.asearch(query)
185+
formatted_results = "\n".join(
186+
[f"- {result['content']}" for result in stm_results]
187+
)
188+
return f"Recent Insights:\n{formatted_results}" if stm_results else ""
189+
190+
async def _afetch_ltm_context(self, task: str) -> str | None:
191+
"""Fetch historical data from LTM asynchronously.
192+
193+
Args:
194+
task: The task description to search for.
195+
196+
Returns:
197+
Formatted historical data as bullet points, or None if none found.
198+
"""
199+
if self.ltm is None:
200+
return ""
201+
202+
ltm_results = await self.ltm.asearch(task, latest_n=2)
203+
if not ltm_results:
204+
return None
205+
206+
formatted_results = [
207+
suggestion
208+
for result in ltm_results
209+
for suggestion in result["metadata"]["suggestions"]
210+
]
211+
formatted_results = list(dict.fromkeys(formatted_results))
212+
formatted_results = "\n".join([f"- {result}" for result in formatted_results]) # type: ignore # Incompatible types in assignment (expression has type "str", variable has type "list[str]")
213+
214+
return f"Historical Data:\n{formatted_results}" if ltm_results else ""
215+
216+
async def _afetch_entity_context(self, query: str) -> str:
217+
"""Fetch relevant entity information asynchronously.
218+
219+
Args:
220+
query: The search query.
221+
222+
Returns:
223+
Formatted entity information as bullet points, or empty string if none found.
224+
"""
225+
if self.em is None:
226+
return ""
227+
228+
em_results = await self.em.asearch(query)
229+
formatted_results = "\n".join(
230+
[f"- {result['content']}" for result in em_results]
231+
)
232+
return f"Entities:\n{formatted_results}" if em_results else ""
233+
234+
async def _afetch_external_context(self, query: str) -> str:
235+
"""Fetch relevant information from External Memory asynchronously.
236+
237+
Args:
238+
query: The search query.
239+
240+
Returns:
241+
Formatted information as bullet points, or empty string if none found.
242+
"""
243+
if self.exm is None:
244+
return ""
245+
246+
external_memories = await self.exm.asearch(query)
247+
248+
if not external_memories:
249+
return ""
250+
251+
formatted_memories = "\n".join(
252+
f"- {result['content']}" for result in external_memories
253+
)
254+
return f"External memories:\n{formatted_memories}"

lib/crewai/src/crewai/memory/entity/entity_memory.py

Lines changed: 181 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,13 @@ class EntityMemory(Memory):
2626

2727
_memory_provider: str | None = PrivateAttr()
2828

29-
def __init__(self, crew=None, embedder_config=None, storage=None, path=None):
29+
def __init__(
30+
self,
31+
crew: Any = None,
32+
embedder_config: Any = None,
33+
storage: Any = None,
34+
path: str | None = None,
35+
) -> None:
3036
memory_provider = None
3137
if embedder_config and isinstance(embedder_config, dict):
3238
memory_provider = embedder_config.get("provider")
@@ -43,7 +49,7 @@ def __init__(self, crew=None, embedder_config=None, storage=None, path=None):
4349
if embedder_config and isinstance(embedder_config, dict)
4450
else None
4551
)
46-
storage = Mem0Storage(type="short_term", crew=crew, config=config)
52+
storage = Mem0Storage(type="short_term", crew=crew, config=config) # type: ignore[no-untyped-call]
4753
else:
4854
storage = (
4955
storage
@@ -170,7 +176,17 @@ def search(
170176
query: str,
171177
limit: int = 5,
172178
score_threshold: float = 0.6,
173-
):
179+
) -> list[Any]:
180+
"""Search entity memory for relevant entries.
181+
182+
Args:
183+
query: The search query.
184+
limit: Maximum number of results to return.
185+
score_threshold: Minimum similarity score for results.
186+
187+
Returns:
188+
List of matching memory entries.
189+
"""
174190
crewai_event_bus.emit(
175191
self,
176192
event=MemoryQueryStartedEvent(
@@ -217,6 +233,168 @@ def search(
217233
)
218234
raise
219235

236+
async def asave(
237+
self,
238+
value: EntityMemoryItem | list[EntityMemoryItem],
239+
metadata: dict[str, Any] | None = None,
240+
) -> None:
241+
"""Save entity items asynchronously.
242+
243+
Args:
244+
value: Single EntityMemoryItem or list of EntityMemoryItems to save.
245+
metadata: Optional metadata dict (not used, for signature compatibility).
246+
"""
247+
if not value:
248+
return
249+
250+
items = value if isinstance(value, list) else [value]
251+
is_batch = len(items) > 1
252+
253+
metadata = {"entity_count": len(items)} if is_batch else items[0].metadata
254+
crewai_event_bus.emit(
255+
self,
256+
event=MemorySaveStartedEvent(
257+
metadata=metadata,
258+
source_type="entity_memory",
259+
from_agent=self.agent,
260+
from_task=self.task,
261+
),
262+
)
263+
264+
start_time = time.time()
265+
saved_count = 0
266+
errors: list[str | None] = []
267+
268+
async def save_single_item(item: EntityMemoryItem) -> tuple[bool, str | None]:
269+
"""Save a single item asynchronously."""
270+
try:
271+
if self._memory_provider == "mem0":
272+
data = f"""
273+
Remember details about the following entity:
274+
Name: {item.name}
275+
Type: {item.type}
276+
Entity Description: {item.description}
277+
"""
278+
else:
279+
data = f"{item.name}({item.type}): {item.description}"
280+
281+
await super(EntityMemory, self).asave(data, item.metadata)
282+
return True, None
283+
except Exception as e:
284+
return False, f"{item.name}: {e!s}"
285+
286+
try:
287+
for item in items:
288+
success, error = await save_single_item(item)
289+
if success:
290+
saved_count += 1
291+
else:
292+
errors.append(error)
293+
294+
if is_batch:
295+
emit_value = f"Saved {saved_count} entities"
296+
metadata = {"entity_count": saved_count, "errors": errors}
297+
else:
298+
emit_value = f"{items[0].name}({items[0].type}): {items[0].description}"
299+
metadata = items[0].metadata
300+
301+
crewai_event_bus.emit(
302+
self,
303+
event=MemorySaveCompletedEvent(
304+
value=emit_value,
305+
metadata=metadata,
306+
save_time_ms=(time.time() - start_time) * 1000,
307+
source_type="entity_memory",
308+
from_agent=self.agent,
309+
from_task=self.task,
310+
),
311+
)
312+
313+
if errors:
314+
raise Exception(
315+
f"Partial save: {len(errors)} failed out of {len(items)}"
316+
)
317+
318+
except Exception as e:
319+
fail_metadata = (
320+
{"entity_count": len(items), "saved": saved_count}
321+
if is_batch
322+
else items[0].metadata
323+
)
324+
crewai_event_bus.emit(
325+
self,
326+
event=MemorySaveFailedEvent(
327+
metadata=fail_metadata,
328+
error=str(e),
329+
source_type="entity_memory",
330+
from_agent=self.agent,
331+
from_task=self.task,
332+
),
333+
)
334+
raise
335+
336+
async def asearch(
337+
self,
338+
query: str,
339+
limit: int = 5,
340+
score_threshold: float = 0.6,
341+
) -> list[Any]:
342+
"""Search entity memory asynchronously.
343+
344+
Args:
345+
query: The search query.
346+
limit: Maximum number of results to return.
347+
score_threshold: Minimum similarity score for results.
348+
349+
Returns:
350+
List of matching memory entries.
351+
"""
352+
crewai_event_bus.emit(
353+
self,
354+
event=MemoryQueryStartedEvent(
355+
query=query,
356+
limit=limit,
357+
score_threshold=score_threshold,
358+
source_type="entity_memory",
359+
from_agent=self.agent,
360+
from_task=self.task,
361+
),
362+
)
363+
364+
start_time = time.time()
365+
try:
366+
results = await super().asearch(
367+
query=query, limit=limit, score_threshold=score_threshold
368+
)
369+
370+
crewai_event_bus.emit(
371+
self,
372+
event=MemoryQueryCompletedEvent(
373+
query=query,
374+
results=results,
375+
limit=limit,
376+
score_threshold=score_threshold,
377+
query_time_ms=(time.time() - start_time) * 1000,
378+
source_type="entity_memory",
379+
from_agent=self.agent,
380+
from_task=self.task,
381+
),
382+
)
383+
384+
return results
385+
except Exception as e:
386+
crewai_event_bus.emit(
387+
self,
388+
event=MemoryQueryFailedEvent(
389+
query=query,
390+
limit=limit,
391+
score_threshold=score_threshold,
392+
error=str(e),
393+
source_type="entity_memory",
394+
),
395+
)
396+
raise
397+
220398
def reset(self) -> None:
221399
try:
222400
self.storage.reset()

0 commit comments

Comments
 (0)