Skip to content

Commit 8bf9841

Browse files
feat(tasks): improved retrieval
1 parent 67d57b2 commit 8bf9841

File tree

6 files changed

+54
-9
lines changed

6 files changed

+54
-9
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
<p align="center">
2-
<img src="https://img.shields.io/badge/version-2.0.11--dev-blue?style=for-the-badge" alt="Version"/>
2+
<img src="https://img.shields.io/badge/version-2.1.0--dev-blue?style=for-the-badge" alt="Version"/>
33
<img src="https://img.shields.io/badge/python-3.11+-green?style=for-the-badge&logo=python&logoColor=white" alt="Python"/>
44
<img src="https://img.shields.io/badge/license-AGPLv3%20%2B%20Commons%20Clause-purple?style=for-the-badge" alt="License"/>
55
</p>

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "brainapi2"
3-
version = "2.0.11-dev"
3+
version = "2.1.0-dev"
44
description = "Version 2.x.x of the BrainAPI memory layer."
55
authors = [
66
{name = "Christian",email = "alch.infoemail@gmail.com"}

src/adapters/cache.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,9 @@ def delete(self, key: str, brain_id: str = "default") -> bool:
4949
Delete a value from the cache.
5050
"""
5151
return self.cache.delete(key, brain_id)
52+
53+
def get_task_keys(self, brain_id: str = "default") -> list[str]:
54+
"""
55+
Get all task keys for a given brain_id.
56+
"""
57+
return self.cache.get_task_keys(brain_id)

src/adapters/interfaces/cache.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,10 @@ def delete(self, key: str, brain_id: str) -> bool:
4747
Delete a value from the cache.
4848
"""
4949
raise NotImplementedError("delete method not implemented")
50+
51+
@abstractmethod
52+
def get_task_keys(self, brain_id: str) -> list[str]:
53+
"""
54+
Get all task keys for a given brain_id.
55+
"""
56+
raise NotImplementedError("get_task_keys method not implemented")

src/lib/redis/client.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ def set(
5757
Set a value in the cache with an expiration time.
5858
"""
5959
prefixed_key = self._get_key(key, brain_id)
60+
if key.startswith("task:"):
61+
self.client.sadd(f"{brain_id}:_task_index", key)
6062
return self.client.set(
6163
prefixed_key, value, **({"ex": expires_in} if expires_in else {})
6264
)
@@ -66,7 +68,16 @@ def delete(self, key: str, brain_id: str) -> bool:
6668
Delete a value from the cache.
6769
"""
6870
prefixed_key = self._get_key(key, brain_id)
71+
if key.startswith("task:"):
72+
self.client.srem(f"{brain_id}:_task_index", key)
6973
return self.client.delete(prefixed_key)
7074

75+
def get_task_keys(self, brain_id: str) -> list[str]:
76+
"""
77+
Get all task keys for a given brain_id.
78+
"""
79+
keys = self.client.smembers(f"{brain_id}:_task_index")
80+
return [k.decode("utf-8") if isinstance(k, bytes) else k for k in keys]
81+
7182

7283
_redis_client = RedisClient()

src/services/api/routes/tasks.py

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,37 @@
77
Modified By: the developer formerly known as Christian Nonis at <alch.infoemail@gmail.com>
88
-----
99
"""
10+
1011
import json
1112
from src.utils.logging import log
1213
from fastapi import APIRouter
1314
from src.services.kg_agent.main import cache_adapter
1415

1516
tasks_router = APIRouter(prefix="/tasks", tags=["tasks"])
1617

18+
19+
@tasks_router.get("/")
20+
async def get_tasks(brain_id: str = "default"):
21+
try:
22+
tasks = cache_adapter.get_task_keys(brain_id)
23+
results = []
24+
for task in tasks:
25+
str_result = cache_adapter.get(task, brain_id=brain_id)
26+
if str_result is None:
27+
continue
28+
result = json.loads(str_result)
29+
results.append(
30+
{"id": task.split(":")[-1], "status": result.get("status", "unknown")}
31+
)
32+
return {"tasks": results}
33+
except Exception as e:
34+
log(f"Error in get_tasks: {type(e).__name__}: {str(e)}")
35+
return {
36+
"status": "error",
37+
"result": {"error": str(e), "error_type": type(e).__name__},
38+
}
39+
40+
1741
@tasks_router.get("/{task_id}")
1842
async def get_task(task_id: str, brain_id: str = "default"):
1943
"""
@@ -25,7 +49,7 @@ async def get_task(task_id: str, brain_id: str = "default"):
2549
return {
2650
"task_id": task_id,
2751
"status": "pending",
28-
"result": {"message": "Task is still processing or not found"}
52+
"result": {"message": "Task is still processing or not found"},
2953
}
3054
if isinstance(str_result, bytes):
3155
result = json.loads(str_result.decode("utf-8"))
@@ -34,15 +58,12 @@ async def get_task(task_id: str, brain_id: str = "default"):
3458
return {
3559
"task_id": task_id,
3660
"status": result.get("status", "unknown"),
37-
"result": result
61+
"result": result,
3862
}
3963
except Exception as e:
4064
log(f"Error in get_task: {type(e).__name__}: {str(e)}")
4165
return {
4266
"task_id": task_id,
4367
"status": "error",
44-
"result": {
45-
"error": str(e),
46-
"error_type": type(e).__name__
47-
}
48-
}
68+
"result": {"error": str(e), "error_type": type(e).__name__},
69+
}

0 commit comments

Comments
 (0)