Skip to content

Commit d898d7c

Browse files
feat: async knowledge support (#4023)
* feat: add async support for tools, add async tool tests * chore: improve tool decorator typing * fix: ensure _run backward compat * chore: update docs * chore: make docstrings a little more readable * feat: add async execution support to agent executor * chore: add tests * feat: add aiosqlite dep; regenerate lockfile * feat: add async ops to memory feat; create tests * feat: async knowledge support; add tests * chore: regenerate lockfile
1 parent f04c40b commit d898d7c

14 files changed

+545
-94
lines changed

lib/crewai/src/crewai/knowledge/knowledge.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ def __init__(
3232
sources: list[BaseKnowledgeSource],
3333
embedder: EmbedderConfig | None = None,
3434
storage: KnowledgeStorage | None = None,
35-
**data,
36-
):
35+
**data: object,
36+
) -> None:
3737
super().__init__(**data)
3838
if storage:
3939
self.storage = storage
@@ -75,3 +75,44 @@ def reset(self) -> None:
7575
self.storage.reset()
7676
else:
7777
raise ValueError("Storage is not initialized.")
78+
79+
async def aquery(
80+
self, query: list[str], results_limit: int = 5, score_threshold: float = 0.6
81+
) -> list[SearchResult]:
82+
"""Query across all knowledge sources asynchronously.
83+
84+
Args:
85+
query: List of query strings.
86+
results_limit: Maximum number of results to return.
87+
score_threshold: Minimum similarity score for results.
88+
89+
Returns:
90+
The top results matching the query.
91+
92+
Raises:
93+
ValueError: If storage is not initialized.
94+
"""
95+
if self.storage is None:
96+
raise ValueError("Storage is not initialized.")
97+
98+
return await self.storage.asearch(
99+
query,
100+
limit=results_limit,
101+
score_threshold=score_threshold,
102+
)
103+
104+
async def aadd_sources(self) -> None:
105+
"""Add all knowledge sources to storage asynchronously."""
106+
try:
107+
for source in self.sources:
108+
source.storage = self.storage
109+
await source.aadd()
110+
except Exception as e:
111+
raise e
112+
113+
async def areset(self) -> None:
114+
"""Reset the knowledge base asynchronously."""
115+
if self.storage:
116+
await self.storage.areset()
117+
else:
118+
raise ValueError("Storage is not initialized.")

lib/crewai/src/crewai/knowledge/source/base_file_knowledge_source.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from abc import ABC, abstractmethod
22
from pathlib import Path
3+
from typing import Any
34

45
from pydantic import Field, field_validator
56

@@ -25,7 +26,10 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
2526
safe_file_paths: list[Path] = Field(default_factory=list)
2627

2728
@field_validator("file_path", "file_paths", mode="before")
28-
def validate_file_path(cls, v, info): # noqa: N805
29+
@classmethod
30+
def validate_file_path(
31+
cls, v: Path | list[Path] | str | list[str] | None, info: Any
32+
) -> Path | list[Path] | str | list[str] | None:
2933
"""Validate that at least one of file_path or file_paths is provided."""
3034
# Single check if both are None, O(1) instead of nested conditions
3135
if (
@@ -38,7 +42,7 @@ def validate_file_path(cls, v, info): # noqa: N805
3842
raise ValueError("Either file_path or file_paths must be provided")
3943
return v
4044

41-
def model_post_init(self, _):
45+
def model_post_init(self, _: Any) -> None:
4246
"""Post-initialization method to load content."""
4347
self.safe_file_paths = self._process_file_paths()
4448
self.validate_content()
@@ -48,7 +52,7 @@ def model_post_init(self, _):
4852
def load_content(self) -> dict[Path, str]:
4953
"""Load and preprocess file content. Should be overridden by subclasses. Assume that the file path is relative to the project root in the knowledge directory."""
5054

51-
def validate_content(self):
55+
def validate_content(self) -> None:
5256
"""Validate the paths."""
5357
for path in self.safe_file_paths:
5458
if not path.exists():
@@ -65,13 +69,20 @@ def validate_content(self):
6569
color="red",
6670
)
6771

68-
def _save_documents(self):
72+
def _save_documents(self) -> None:
6973
"""Save the documents to the storage."""
7074
if self.storage:
7175
self.storage.save(self.chunks)
7276
else:
7377
raise ValueError("No storage found to save documents.")
7478

79+
async def _asave_documents(self) -> None:
80+
"""Save the documents to the storage asynchronously."""
81+
if self.storage:
82+
await self.storage.asave(self.chunks)
83+
else:
84+
raise ValueError("No storage found to save documents.")
85+
7586
def convert_to_path(self, path: Path | str) -> Path:
7687
"""Convert a path to a Path object."""
7788
return Path(KNOWLEDGE_DIRECTORY + "/" + path) if isinstance(path, str) else path

lib/crewai/src/crewai/knowledge/source/base_knowledge_source.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,32 @@ def _chunk_text(self, text: str) -> list[str]:
3939
for i in range(0, len(text), self.chunk_size - self.chunk_overlap)
4040
]
4141

42-
def _save_documents(self):
43-
"""
44-
Save the documents to the storage.
42+
def _save_documents(self) -> None:
43+
"""Save the documents to the storage.
44+
4545
This method should be called after the chunks and embeddings are generated.
46+
47+
Raises:
48+
ValueError: If no storage is configured.
4649
"""
4750
if self.storage:
4851
self.storage.save(self.chunks)
4952
else:
5053
raise ValueError("No storage found to save documents.")
54+
55+
@abstractmethod
56+
async def aadd(self) -> None:
57+
"""Process content, chunk it, compute embeddings, and save them asynchronously."""
58+
59+
async def _asave_documents(self) -> None:
60+
"""Save the documents to the storage asynchronously.
61+
62+
This method should be called after the chunks and embeddings are generated.
63+
64+
Raises:
65+
ValueError: If no storage is configured.
66+
"""
67+
if self.storage:
68+
await self.storage.asave(self.chunks)
69+
else:
70+
raise ValueError("No storage found to save documents.")

lib/crewai/src/crewai/knowledge/source/crew_docling_source.py

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,24 @@
22

33
from collections.abc import Iterator
44
from pathlib import Path
5+
from typing import TYPE_CHECKING, Any
56
from urllib.parse import urlparse
67

78

89
try:
9-
from docling.datamodel.base_models import ( # type: ignore[import-not-found]
10-
InputFormat,
11-
)
12-
from docling.document_converter import ( # type: ignore[import-not-found]
13-
DocumentConverter,
14-
)
15-
from docling.exceptions import ConversionError # type: ignore[import-not-found]
16-
from docling_core.transforms.chunker.hierarchical_chunker import ( # type: ignore[import-not-found]
17-
HierarchicalChunker,
18-
)
19-
from docling_core.types.doc.document import ( # type: ignore[import-not-found]
20-
DoclingDocument,
21-
)
10+
from docling.datamodel.base_models import InputFormat
11+
from docling.document_converter import DocumentConverter
12+
from docling.exceptions import ConversionError
13+
from docling_core.transforms.chunker.hierarchical_chunker import HierarchicalChunker
14+
from docling_core.types.doc.document import DoclingDocument
2215

2316
DOCLING_AVAILABLE = True
2417
except ImportError:
2518
DOCLING_AVAILABLE = False
19+
# Provide type stubs for when docling is not available
20+
if TYPE_CHECKING:
21+
from docling.document_converter import DocumentConverter
22+
from docling_core.types.doc.document import DoclingDocument
2623

2724
from pydantic import Field
2825

@@ -32,11 +29,13 @@
3229

3330

3431
class CrewDoclingSource(BaseKnowledgeSource):
35-
"""Default Source class for converting documents to markdown or json
36-
This will auto support PDF, DOCX, and TXT, XLSX, Images, and HTML files without any additional dependencies and follows the docling package as the source of truth.
32+
"""Default Source class for converting documents to markdown or json.
33+
34+
This will auto support PDF, DOCX, and TXT, XLSX, Images, and HTML files without
35+
any additional dependencies and follows the docling package as the source of truth.
3736
"""
3837

39-
def __init__(self, *args, **kwargs):
38+
def __init__(self, *args: Any, **kwargs: Any) -> None:
4039
if not DOCLING_AVAILABLE:
4140
raise ImportError(
4241
"The docling package is required to use CrewDoclingSource. "
@@ -66,7 +65,7 @@ def __init__(self, *args, **kwargs):
6665
)
6766
)
6867

69-
def model_post_init(self, _) -> None:
68+
def model_post_init(self, _: Any) -> None:
7069
if self.file_path:
7170
self._logger.log(
7271
"warning",
@@ -99,6 +98,15 @@ def add(self) -> None:
9998
self.chunks.extend(list(new_chunks_iterable))
10099
self._save_documents()
101100

101+
async def aadd(self) -> None:
102+
"""Add docling content asynchronously."""
103+
if self.content is None:
104+
return
105+
for doc in self.content:
106+
new_chunks_iterable = self._chunk_doc(doc)
107+
self.chunks.extend(list(new_chunks_iterable))
108+
await self._asave_documents()
109+
102110
def _convert_source_to_docling_documents(self) -> list[DoclingDocument]:
103111
conv_results_iter = self.document_converter.convert_all(self.safe_file_paths)
104112
return [result.document for result in conv_results_iter]

lib/crewai/src/crewai/knowledge/source/csv_knowledge_source.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,15 @@ def add(self) -> None:
3131
self.chunks.extend(new_chunks)
3232
self._save_documents()
3333

34+
async def aadd(self) -> None:
35+
"""Add CSV file content asynchronously."""
36+
content_str = (
37+
str(self.content) if isinstance(self.content, dict) else self.content
38+
)
39+
new_chunks = self._chunk_text(content_str)
40+
self.chunks.extend(new_chunks)
41+
await self._asave_documents()
42+
3443
def _chunk_text(self, text: str) -> list[str]:
3544
"""Utility method to split text into chunks."""
3645
return [

lib/crewai/src/crewai/knowledge/source/excel_knowledge_source.py

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
from pathlib import Path
2+
from types import ModuleType
3+
from typing import Any
24

35
from pydantic import Field, field_validator
46

@@ -26,7 +28,10 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
2628
safe_file_paths: list[Path] = Field(default_factory=list)
2729

2830
@field_validator("file_path", "file_paths", mode="before")
29-
def validate_file_path(cls, v, info): # noqa: N805
31+
@classmethod
32+
def validate_file_path(
33+
cls, v: Path | list[Path] | str | list[str] | None, info: Any
34+
) -> Path | list[Path] | str | list[str] | None:
3035
"""Validate that at least one of file_path or file_paths is provided."""
3136
# Single check if both are None, O(1) instead of nested conditions
3237
if (
@@ -69,7 +74,7 @@ def _process_file_paths(self) -> list[Path]:
6974

7075
return [self.convert_to_path(path) for path in path_list]
7176

72-
def validate_content(self):
77+
def validate_content(self) -> None:
7378
"""Validate the paths."""
7479
for path in self.safe_file_paths:
7580
if not path.exists():
@@ -86,7 +91,7 @@ def validate_content(self):
8691
color="red",
8792
)
8893

89-
def model_post_init(self, _) -> None:
94+
def model_post_init(self, _: Any) -> None:
9095
if self.file_path:
9196
self._logger.log(
9297
"warning",
@@ -128,12 +133,12 @@ def convert_to_path(self, path: Path | str) -> Path:
128133
"""Convert a path to a Path object."""
129134
return Path(KNOWLEDGE_DIRECTORY + "/" + path) if isinstance(path, str) else path
130135

131-
def _import_dependencies(self):
136+
def _import_dependencies(self) -> ModuleType:
132137
"""Dynamically import dependencies."""
133138
try:
134-
import pandas as pd # type: ignore[import-untyped,import-not-found]
139+
import pandas as pd # type: ignore[import-untyped]
135140

136-
return pd
141+
return pd # type: ignore[no-any-return]
137142
except ImportError as e:
138143
missing_package = str(e).split()[-1]
139144
raise ImportError(
@@ -159,6 +164,20 @@ def add(self) -> None:
159164
self.chunks.extend(new_chunks)
160165
self._save_documents()
161166

167+
async def aadd(self) -> None:
168+
"""Add Excel file content asynchronously."""
169+
content_str = ""
170+
for value in self.content.values():
171+
if isinstance(value, dict):
172+
for sheet_value in value.values():
173+
content_str += str(sheet_value) + "\n"
174+
else:
175+
content_str += str(value) + "\n"
176+
177+
new_chunks = self._chunk_text(content_str)
178+
self.chunks.extend(new_chunks)
179+
await self._asave_documents()
180+
162181
def _chunk_text(self, text: str) -> list[str]:
163182
"""Utility method to split text into chunks."""
164183
return [

lib/crewai/src/crewai/knowledge/source/json_knowledge_source.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,15 @@ def add(self) -> None:
4444
self.chunks.extend(new_chunks)
4545
self._save_documents()
4646

47+
async def aadd(self) -> None:
48+
"""Add JSON file content asynchronously."""
49+
content_str = (
50+
str(self.content) if isinstance(self.content, dict) else self.content
51+
)
52+
new_chunks = self._chunk_text(content_str)
53+
self.chunks.extend(new_chunks)
54+
await self._asave_documents()
55+
4756
def _chunk_text(self, text: str) -> list[str]:
4857
"""Utility method to split text into chunks."""
4958
return [

lib/crewai/src/crewai/knowledge/source/pdf_knowledge_source.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from pathlib import Path
2+
from types import ModuleType
23

34
from crewai.knowledge.source.base_file_knowledge_source import BaseFileKnowledgeSource
45

@@ -23,7 +24,7 @@ def load_content(self) -> dict[Path, str]:
2324
content[path] = text
2425
return content
2526

26-
def _import_pdfplumber(self):
27+
def _import_pdfplumber(self) -> ModuleType:
2728
"""Dynamically import pdfplumber."""
2829
try:
2930
import pdfplumber
@@ -44,6 +45,13 @@ def add(self) -> None:
4445
self.chunks.extend(new_chunks)
4546
self._save_documents()
4647

48+
async def aadd(self) -> None:
49+
"""Add PDF file content asynchronously."""
50+
for text in self.content.values():
51+
new_chunks = self._chunk_text(text)
52+
self.chunks.extend(new_chunks)
53+
await self._asave_documents()
54+
4755
def _chunk_text(self, text: str) -> list[str]:
4856
"""Utility method to split text into chunks."""
4957
return [

lib/crewai/src/crewai/knowledge/source/string_knowledge_source.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from typing import Any
2+
13
from pydantic import Field
24

35
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
@@ -9,11 +11,11 @@ class StringKnowledgeSource(BaseKnowledgeSource):
911
content: str = Field(...)
1012
collection_name: str | None = Field(default=None)
1113

12-
def model_post_init(self, _):
14+
def model_post_init(self, _: Any) -> None:
1315
"""Post-initialization method to validate content."""
1416
self.validate_content()
1517

16-
def validate_content(self):
18+
def validate_content(self) -> None:
1719
"""Validate string content."""
1820
if not isinstance(self.content, str):
1921
raise ValueError("StringKnowledgeSource only accepts string content")
@@ -24,6 +26,12 @@ def add(self) -> None:
2426
self.chunks.extend(new_chunks)
2527
self._save_documents()
2628

29+
async def aadd(self) -> None:
30+
"""Add string content asynchronously."""
31+
new_chunks = self._chunk_text(self.content)
32+
self.chunks.extend(new_chunks)
33+
await self._asave_documents()
34+
2735
def _chunk_text(self, text: str) -> list[str]:
2836
"""Utility method to split text into chunks."""
2937
return [

0 commit comments

Comments
 (0)