feat(memory): add RedisStorageBackend for distributed production environments#5919
feat(memory): add RedisStorageBackend for distributed production environments#5919irfaan101 wants to merge 3 commits into
Conversation
📝 WalkthroughWalkthroughAdds RedisStorageBackend: runtime Redis client, JSON storage of MemoryRecord (ISO datetime parsing), save/get/update/delete by IDs, count/reset via scanning, many search/list/scope methods raise NotImplementedError, and async wrappers that offload to threads. ChangesRedis Storage Backend Implementation
🎯 3 (Moderate) | ⏱️ ~20 minutes
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@lib/crewai/src/crewai/memory/storage/backend.py`:
- Around line 196-199: Change the bare re-raise in the ImportError handler to
chain the original exception; in the except ImportError block (around
RedisStorageBackend import/setup), capture the exception (e.g., except
ImportError as e) and re-raise the new ImportError with the help message using
"raise ImportError(... ) from e" so the original traceback is preserved and Ruff
B904 is satisfied.
- Around line 245-259: The delete method currently only handles record_ids and
silently returns 0 for any other filters, which hides misuse; update
delete(self, ...) in the backend to validate inputs and fail fast: if any of
scope_prefix, categories, older_than, or metadata_filter are provided (i.e., not
None or non-empty) and record_ids is not used, raise a clear exception (e.g.,
NotImplementedError or ValueError) indicating those filters are unsupported;
keep the existing behavior for record_ids by mapping each id with self._get_key
and calling self.client.delete(*keys_to_delete) as before, but do not silently
return 0 for unsupported combinations.
- Around line 219-226: The fallback JSON serialization only converts created_at
but may leave other datetime fields (e.g., last_accessed) un-serialized causing
json.dumps to fail; update the fallback logic around record_data =
record.__dict__.copy() to scan record_data for any values that are datetime
instances (at least handle "last_accessed" and any other datetime fields) and
replace them with .isoformat() (or a str) before calling
json.dumps(record_data), referencing the record_data dict, the
"created_at"/"last_accessed" keys, and the json.dumps call so the change is
applied in the same fallback serialization path.
- Around line 293-304: The count and reset methods ignore the scope_prefix
parameter and always operate on "crewai:memory:*"; update both methods (count
and reset in backend.py) to build the Redis key pattern using the provided
scope_prefix when non-None (e.g., "crewai:memory:{scope_prefix}:*" or similar
consistent namespacing used elsewhere) and fall back to the global pattern when
scope_prefix is None; use that pattern with scan_iter in count and to assemble
keys_to_delete in reset, and ensure you properly handle/sanitize scope_prefix
values so reset only deletes keys matching the computed pattern.
- Around line 260-291: The placeholder read methods (search, list_records,
get_scope_info, list_scopes, list_categories) currently return empty defaults
and should fail fast instead; update each of these functions in backend.py to
raise a clear NotImplementedError (or RuntimeError) with a message indicating
the method is unimplemented (include the function name like search,
list_records, get_scope_info, list_scopes, list_categories) so callers
immediately see missing backend implementations rather than silently receiving
empty results.
- Around line 307-342: The async methods asave, asearch, and adelete currently
call their synchronous counterparts (save, search, delete) which perform
blocking Redis I/O; change these async wrappers to offload the blocking work to
a thread executor (e.g., asyncio.to_thread or loop.run_in_executor) so the event
loop isn’t blocked, i.e., await asyncio.to_thread(self.save, records) in asave,
await asyncio.to_thread(self.delete, scope_prefix, categories, record_ids,
older_than, metadata_filter) in adelete, and update asearch to await
asyncio.to_thread(self.search, query_embedding, scope_prefix, categories,
metadata_filter, limit, min_score) (or similar) so any future synchronous Redis
calls inside search/save/delete run off the event loop.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: b8fdb0b1-6d63-4bec-b018-e0026a07fb18
📒 Files selected for processing (1)
lib/crewai/src/crewai/memory/storage/backend.py
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
lib/crewai/src/crewai/memory/storage/backend.py (1)
203-209:⚠️ Potential issue | 🟠 Major | ⚡ Quick winAdd Redis socket timeouts to prevent indefinite hangs.
RedisStorageBackend.__init__createsredis.Redis(...)withoutsocket_connect_timeout/socket_timeout(defaults toNonein redis-py), so connection/command calls can wait unboundedly on network problems. Add explicit timeouts.Minimal guardrail
self.client = redis.Redis( host=host, port=port, db=db, password=password, decode_responses=True, + socket_connect_timeout=5, + socket_timeout=5, )🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/crewai/src/crewai/memory/storage/backend.py` around lines 203 - 209, The Redis client in RedisStorageBackend.__init__ is created without socket timeouts, which can cause indefinite hangs; update the self.client = redis.Redis(...) construction to pass explicit socket_connect_timeout and socket_timeout values (or accept them as constructor parameters with sensible defaults) so both connection attempts and command responses have bounded timeouts, and ensure any higher-level call sites/handlers can handle timeout exceptions from redis (e.g., redis.exceptions.TimeoutError).
🧹 Nitpick comments (1)
lib/crewai/src/crewai/memory/storage/backend.py (1)
325-329: ⚡ Quick winKeep
reset()streaming end-to-end.
scan_iter()avoids blocking Redis, but rebuilding the entire result set intokeys_to_deletereintroduces an O(N) client-side spike. Deleting in bounded batches keeps reset predictable on large datasets.Batch delete example
- keys_to_delete = [ - key for key in self.client.scan_iter("crewai:memory:*") - ] - if keys_to_delete: - self.client.delete(*keys_to_delete) + batch: list[str] = [] + for key in self.client.scan_iter("crewai:memory:*"): + batch.append(key) + if len(batch) == 500: + self.client.delete(*batch) + batch.clear() + if batch: + self.client.delete(*batch)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/crewai/src/crewai/memory/storage/backend.py` around lines 325 - 329, The reset() implementation currently collects all keys from self.client.scan_iter("crewai:memory:*") into keys_to_delete which reintroduces an O(N) client-side spike; instead stream deletions in bounded batches: iterate scan_iter, accumulate keys into a small batch (e.g., BATCH_SIZE = 500–1000), and when batch is full call self.client.delete(*batch) (or self.client.unlink(*batch) for non-blocking removal) then clear the batch and continue, finally deleting any remaining keys after the loop; update reset() to use this pattern referencing scan_iter, keys_to_delete (or replace with local batch), and self.client.delete/unlink.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@lib/crewai/src/crewai/memory/storage/backend.py`:
- Around line 253-262: The current guard in RedisStorageBackend.delete
incorrectly blocks calls that provide record_ids for rooted memories
(UnifiedMemory.forget passes scope_prefix=self.root_scope); update the condition
so Delete still raises for unsupported filters (categories, older_than,
metadata_filter) but allows the record_ids path even when scope_prefix is set.
Concretely, in RedisStorageBackend.delete adjust the if-statement to only raise
if any of categories, older_than, or metadata_filter are provided (or if
record_ids is absent when scope_prefix is present), ensuring calls with
record_ids and a non-None scope_prefix (as used by UnifiedMemory.forget and
root_scope) proceed normally.
---
Outside diff comments:
In `@lib/crewai/src/crewai/memory/storage/backend.py`:
- Around line 203-209: The Redis client in RedisStorageBackend.__init__ is
created without socket timeouts, which can cause indefinite hangs; update the
self.client = redis.Redis(...) construction to pass explicit
socket_connect_timeout and socket_timeout values (or accept them as constructor
parameters with sensible defaults) so both connection attempts and command
responses have bounded timeouts, and ensure any higher-level call sites/handlers
can handle timeout exceptions from redis (e.g., redis.exceptions.TimeoutError).
---
Nitpick comments:
In `@lib/crewai/src/crewai/memory/storage/backend.py`:
- Around line 325-329: The reset() implementation currently collects all keys
from self.client.scan_iter("crewai:memory:*") into keys_to_delete which
reintroduces an O(N) client-side spike; instead stream deletions in bounded
batches: iterate scan_iter, accumulate keys into a small batch (e.g., BATCH_SIZE
= 500–1000), and when batch is full call self.client.delete(*batch) (or
self.client.unlink(*batch) for non-blocking removal) then clear the batch and
continue, finally deleting any remaining keys after the loop; update reset() to
use this pattern referencing scan_iter, keys_to_delete (or replace with local
batch), and self.client.delete/unlink.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: 6dc23853-b4f2-48a2-bee3-99989e8f5045
📒 Files selected for processing (1)
lib/crewai/src/crewai/memory/storage/backend.py
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
lib/crewai/src/crewai/memory/storage/backend.py (1)
1-11:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winFix formatting to pass CI.
The pipeline is failing because
ruff format --checkreports this file needs reformatting. Run:uv run ruff format lib/crewai/src/crewai/memory/storage/backend.py🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/crewai/src/crewai/memory/storage/backend.py` around lines 1 - 11, The file backend.py is failing ruff format checks; run the formatter on lib/crewai/src/crewai/memory/storage/backend.py (e.g., uv run ruff format lib/crewai/src/crewai/memory/storage/backend.py) or apply equivalent auto-formatting so imports and whitespace match the project's ruff/format settings and then commit the reformatted file; target the top-level module string and imports shown in backend.py to verify the changes.
🧹 Nitpick comments (1)
lib/crewai/src/crewai/memory/storage/backend.py (1)
229-232: 💤 Low valueConsider handling "Z" suffix in datetime parsing for interoperability.
datetime.fromisoformat()doesn't handle the "Z" UTC designator common in ISO 8601 strings. If records are imported from external sources, parsing could fail. The LanceDB backend handles this pattern.♻️ Proposed fix for robustness
for dt_field in ("created_at", "last_accessed"): if dt_field in raw_data and isinstance(raw_data[dt_field], str): - raw_data[dt_field] = datetime.fromisoformat(raw_data[dt_field]) + dt_str = raw_data[dt_field].replace("Z", "+00:00") + raw_data[dt_field] = datetime.fromisoformat(dt_str)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/crewai/src/crewai/memory/storage/backend.py` around lines 229 - 232, The datetime parsing for raw_data fields ("created_at", "last_accessed") uses datetime.fromisoformat which fails on ISO strings ending with "Z"; update the parsing logic in the backend (the section handling raw_data and dt_field) to detect a trailing "Z" on the string and convert it to an equivalent offset (e.g., replace trailing "Z" with "+00:00") before calling datetime.fromisoformat so UTC-designated timestamps parse correctly and remain timezone-aware.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@lib/crewai/src/crewai/memory/storage/backend.py`:
- Around line 1-11: The file backend.py is failing ruff format checks; run the
formatter on lib/crewai/src/crewai/memory/storage/backend.py (e.g., uv run ruff
format lib/crewai/src/crewai/memory/storage/backend.py) or apply equivalent
auto-formatting so imports and whitespace match the project's ruff/format
settings and then commit the reformatted file; target the top-level module
string and imports shown in backend.py to verify the changes.
---
Nitpick comments:
In `@lib/crewai/src/crewai/memory/storage/backend.py`:
- Around line 229-232: The datetime parsing for raw_data fields ("created_at",
"last_accessed") uses datetime.fromisoformat which fails on ISO strings ending
with "Z"; update the parsing logic in the backend (the section handling raw_data
and dt_field) to detect a trailing "Z" on the string and convert it to an
equivalent offset (e.g., replace trailing "Z" with "+00:00") before calling
datetime.fromisoformat so UTC-designated timestamps parse correctly and remain
timezone-aware.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: 15304b16-1bd9-4ee4-91b7-0dd580868453
📒 Files selected for processing (1)
lib/crewai/src/crewai/memory/storage/backend.py
Description
This PR introduces
RedisStorageBackendinsidecrewai/memory/storage/backend.pyto support enterprise production environments running distributed asynchronous workers (e.g., Kubernetes, Celery worker fleets).Highlights
StorageBackendabstract protocol (including async overrides).model_dump_json).scan_iter()cursor iteration instead of blockingkeys()calls for count/reset structures.Related to #5802
Summary by CodeRabbit
New Features
Limitations