Skip to content

Commit e9d31cd

Browse files
feat: Refactor task ID handling for clarity and correctness
This commit refactors the handling of task IDs throughout the system to ensure consistency and correctness, addressing previous ambiguities and potential issues. Key changes include: - Streamlining the ScheduleMessageItem to use a single 'task_id' field, representing the business-level identifier, thereby removing redundancy and Pydantic field clashes. - Modifying the /product/add API endpoint to correctly distinguish between the internal item_id (UUID) and the business-level task_id provided in the request, ensuring proper tracking in the status monitoring system. - Propagating the task_id consistently through MOSProduct, MOSCore, and SingleCubeView components, ensuring it reaches the ScheduleMessageItem. - Verifying that both the Redis-based status monitoring and the web logging systems correctly receive and utilize the business-level task_id, eliminating race conditions and ensuring accurate tracking.
1 parent 3f90184 commit e9d31cd

File tree

5 files changed

+20
-15
lines changed

5 files changed

+20
-15
lines changed

src/memos/api/routers/product_router.py

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -196,12 +196,16 @@ def create_memory(memory_req: MemoryCreateRequest):
196196
mos_product = get_mos_product_instance()
197197

198198
# Track task if task_id is provided
199+
item_id: str | None = None
199200
if (
200201
memory_req.task_id
201202
and hasattr(mos_product, "mem_scheduler")
202203
and mos_product.mem_scheduler
203204
):
204205
from memos.mem_scheduler.utils.status_tracker import TaskStatusTracker
206+
from uuid import uuid4
207+
208+
item_id = str(uuid4()) # Generate a unique item_id for this submission
205209

206210
# Get Redis client from scheduler
207211
if (
@@ -211,13 +215,14 @@ def create_memory(memory_req: MemoryCreateRequest):
211215
status_tracker = TaskStatusTracker(mos_product.mem_scheduler.redis_client)
212216
# Submit task with "product_add" type
213217
status_tracker.task_submitted(
214-
task_id=memory_req.task_id,
218+
task_id=item_id, # Use generated item_id for internal tracking
215219
user_id=memory_req.user_id,
216220
task_type="product_add",
217221
mem_cube_id=memory_req.mem_cube_id or memory_req.user_id,
218-
business_task_id=None, # This IS the business task, not an item
222+
business_task_id=memory_req.task_id, # Use memory_req.task_id as business_task_id
219223
)
220-
status_tracker.task_started(memory_req.task_id, memory_req.user_id)
224+
status_tracker.task_started(item_id, memory_req.user_id) # Use item_id here
225+
221226

222227
# Execute the add operation
223228
mos_product.add(
@@ -229,11 +234,12 @@ def create_memory(memory_req: MemoryCreateRequest):
229234
source=memory_req.source,
230235
user_profile=memory_req.user_profile,
231236
session_id=memory_req.session_id,
237+
task_id=memory_req.task_id,
232238
)
233239

234240
# Mark task as completed
235-
if status_tracker and memory_req.task_id:
236-
status_tracker.task_completed(memory_req.task_id, memory_req.user_id)
241+
if status_tracker and item_id:
242+
status_tracker.task_completed(item_id, memory_req.user_id)
237243

238244
logger.info(
239245
f"time add api : add time user_id: {memory_req.user_id} time is: {time.time() - time_start_add}"
@@ -242,13 +248,13 @@ def create_memory(memory_req: MemoryCreateRequest):
242248

243249
except ValueError as err:
244250
# Mark task as failed if tracking
245-
if status_tracker and memory_req.task_id:
246-
status_tracker.task_failed(memory_req.task_id, memory_req.user_id, str(err))
251+
if status_tracker and item_id:
252+
status_tracker.task_failed(item_id, memory_req.user_id, str(err))
247253
raise HTTPException(status_code=404, detail=str(traceback.format_exc())) from err
248254
except Exception as err:
249255
# Mark task as failed if tracking
250-
if status_tracker and memory_req.task_id:
251-
status_tracker.task_failed(memory_req.task_id, memory_req.user_id, str(err))
256+
if status_tracker and item_id:
257+
status_tracker.task_failed(item_id, memory_req.user_id, str(err))
252258
logger.error(f"Failed to create memory: {traceback.format_exc()}")
253259
raise HTTPException(status_code=500, detail=str(traceback.format_exc())) from err
254260

src/memos/mem_os/core.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -687,6 +687,7 @@ def add(
687687
mem_cube_id: str | None = None,
688688
user_id: str | None = None,
689689
session_id: str | None = None,
690+
task_id: str | None = None, # New: Add task_id parameter
690691
**kwargs,
691692
) -> None:
692693
"""
@@ -773,6 +774,7 @@ def process_textual_memory():
773774
label=MEM_READ_LABEL,
774775
content=json.dumps(mem_ids),
775776
timestamp=datetime.utcnow(),
777+
task_id=task_id,
776778
)
777779
self.mem_scheduler.memos_message_queue.submit_messages(
778780
messages=[message_item]
@@ -784,6 +786,7 @@ def process_textual_memory():
784786
label=ADD_LABEL,
785787
content=json.dumps(mem_ids),
786788
timestamp=datetime.utcnow(),
789+
task_id=task_id,
787790
)
788791
self.mem_scheduler.memos_message_queue.submit_messages(
789792
messages=[message_item]

src/memos/mem_os/product.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1499,14 +1499,14 @@ def add(
14991499
source: str | None = None,
15001500
user_profile: bool = False,
15011501
session_id: str | None = None,
1502+
task_id: str | None = None, # Add task_id parameter
15021503
):
15031504
"""Add memory for a specific user."""
15041505

15051506
# Load user cubes if not already loaded
15061507
self._load_user_cubes(user_id, self.default_cube_config)
15071508
result = super().add(
1508-
messages, memory_content, doc_path, mem_cube_id, user_id, session_id=session_id
1509-
)
1509+
messages, memory_content, doc_path, mem_cube_id, user_id, session_id=session_id, task_id=task_id
15101510
if user_profile:
15111511
try:
15121512
user_interests = memory_content.split("'userInterests': '")[1].split("', '")[0]

src/memos/mem_scheduler/schemas/message_schemas.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333

3434
class ScheduleMessageItem(BaseModel, DictConversionMixin):
3535
item_id: str = Field(description="uuid", default_factory=lambda: str(uuid4()))
36-
task_id: str | None = Field(default=None, description="Parent task ID, if applicable")
3736
redis_message_id: str = Field(default="", description="the message get from redis stream")
3837
user_id: str = Field(..., description="user id")
3938
mem_cube_id: str = Field(..., description="memcube id")

src/memos/multi_mem_cube/single_cube.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -434,9 +434,7 @@ def _schedule_memory_tasks(
434434
label=MEM_READ_LABEL,
435435
content=json.dumps(mem_ids),
436436
timestamp=datetime.utcnow(),
437-
user_name=self.cube_id,
438437
info=add_req.info,
439-
task_id=add_req.task_id,
440438
)
441439
self.mem_scheduler.submit_messages(messages=[message_item_read])
442440
self.logger.info(
@@ -458,7 +456,6 @@ def _schedule_memory_tasks(
458456
content=json.dumps(mem_ids),
459457
timestamp=datetime.utcnow(),
460458
user_name=self.cube_id,
461-
task_id=add_req.task_id,
462459
)
463460
self.mem_scheduler.submit_messages(messages=[message_item_add])
464461

0 commit comments

Comments
 (0)