|
6 | 6 | from todo.models.task_assignment import TaskAssignmentModel
|
7 | 7 | from todo.repositories.common.mongo_repository import MongoRepository
|
8 | 8 | from todo.models.common.pyobjectid import PyObjectId
|
| 9 | +from todo.constants.task import TaskStatus |
9 | 10 | from todo.services.enhanced_dual_write_service import EnhancedDualWriteService
|
| 11 | +from todo.repositories.audit_log_repository import AuditLogRepository, AuditLogModel |
10 | 12 |
|
11 | 13 |
|
12 | 14 | class TaskAssignmentRepository(MongoRepository):
|
@@ -355,3 +357,167 @@ def deactivate_by_task_id(cls, task_id: str, user_id: str) -> bool:
|
355 | 357 | return result.modified_count > 0
|
356 | 358 | except Exception:
|
357 | 359 | return False
|
| 360 | + |
| 361 | + @classmethod |
| 362 | + def reassign_tasks_from_user_to_team(cls, user_id: str, team_id: str, performed_by_user_id: str) -> bool: |
| 363 | + """ |
| 364 | + Reassign all tasks of user to team |
| 365 | + """ |
| 366 | + collection = cls.get_collection() |
| 367 | + client = cls.get_client() |
| 368 | + with client.start_session() as session: |
| 369 | + try: |
| 370 | + with session.start_transaction(): |
| 371 | + now = datetime.now(timezone.utc) |
| 372 | + user_task_assignments = list( |
| 373 | + collection.find( |
| 374 | + { |
| 375 | + "$and": [ |
| 376 | + {"is_active": True}, |
| 377 | + { |
| 378 | + "$or": [{"assignee_id": user_id}, {"assignee_id": ObjectId(user_id)}], |
| 379 | + }, |
| 380 | + {"$or": [{"team_id": team_id}, {"team_id": ObjectId(team_id)}]}, |
| 381 | + ] |
| 382 | + }, |
| 383 | + session=session, |
| 384 | + ) |
| 385 | + ) |
| 386 | + if not user_task_assignments: |
| 387 | + return 0 |
| 388 | + active_user_task_assignments_ids = [ |
| 389 | + ObjectId(assignment["task_id"]) for assignment in user_task_assignments |
| 390 | + ] |
| 391 | + |
| 392 | + from todo.repositories.task_repository import TaskRepository |
| 393 | + |
| 394 | + tasks_collection = TaskRepository.get_collection() |
| 395 | + active_tasks = list( |
| 396 | + tasks_collection.find( |
| 397 | + { |
| 398 | + "_id": {"$in": active_user_task_assignments_ids}, |
| 399 | + "status": {"$ne": TaskStatus.DONE.value}, |
| 400 | + }, |
| 401 | + session=session, |
| 402 | + ) |
| 403 | + ) |
| 404 | + not_done_tasks_ids = [str(tasks["_id"]) for tasks in active_tasks] |
| 405 | + tasks_to_reset_status_ids = [] |
| 406 | + tasks_to_clear_deferred_ids = [] |
| 407 | + for tasks in active_tasks: |
| 408 | + if tasks["status"] == TaskStatus.IN_PROGRESS.value: |
| 409 | + tasks_to_reset_status_ids.append(tasks["_id"]) |
| 410 | + elif tasks.get("deferredDetails") is not None: |
| 411 | + tasks_to_clear_deferred_ids.append(tasks["_id"]) |
| 412 | + |
| 413 | + collection.update_many( |
| 414 | + { |
| 415 | + "task_id": {"$in": not_done_tasks_ids}, |
| 416 | + }, |
| 417 | + { |
| 418 | + "$set": { |
| 419 | + "assignee_id": team_id, |
| 420 | + "user_type": "team", |
| 421 | + "updated_at": now, |
| 422 | + "updated_by": ObjectId(performed_by_user_id), |
| 423 | + } |
| 424 | + }, |
| 425 | + session=session, |
| 426 | + ) |
| 427 | + |
| 428 | + for assignment in user_task_assignments: |
| 429 | + AuditLogRepository.create( |
| 430 | + AuditLogModel( |
| 431 | + task_id=PyObjectId(assignment["task_id"]), |
| 432 | + team_id=PyObjectId(team_id), |
| 433 | + action="assigned_to_team", |
| 434 | + performed_by=PyObjectId(performed_by_user_id), |
| 435 | + ) |
| 436 | + ) |
| 437 | + |
| 438 | + tasks_collection.update_many( |
| 439 | + {"_id": {"$in": tasks_to_reset_status_ids}}, |
| 440 | + { |
| 441 | + "$set": { |
| 442 | + "status": TaskStatus.TODO.value, |
| 443 | + "updated_at": now, |
| 444 | + "updated_by": ObjectId(performed_by_user_id), |
| 445 | + } |
| 446 | + }, |
| 447 | + session=session, |
| 448 | + ) |
| 449 | + tasks_collection.update_many( |
| 450 | + {"_id": {"$in": tasks_to_clear_deferred_ids}}, |
| 451 | + { |
| 452 | + "$set": { |
| 453 | + "status": TaskStatus.TODO.value, |
| 454 | + "deferredDetails": None, |
| 455 | + "updated_at": now, |
| 456 | + "updated_by": ObjectId(performed_by_user_id), |
| 457 | + } |
| 458 | + }, |
| 459 | + session=session, |
| 460 | + ) |
| 461 | + |
| 462 | + tasks_by_id = {task["_id"]: task for task in active_tasks} |
| 463 | + operations = [] |
| 464 | + dual_write_service = EnhancedDualWriteService() |
| 465 | + for assignment in user_task_assignments: |
| 466 | + operations.append( |
| 467 | + { |
| 468 | + "collection_name": "task_assignments", |
| 469 | + "operation": "update", |
| 470 | + "mongo_id": assignment["_id"], |
| 471 | + "data": { |
| 472 | + "task_mongo_id": str(assignment["task_id"]), |
| 473 | + "assignee_id": str(assignment["team_id"]), |
| 474 | + "user_type": "team", |
| 475 | + "team_id": str(assignment["team_id"]), |
| 476 | + "is_active": True, |
| 477 | + "created_at": assignment["created_at"], |
| 478 | + "created_by": str(assignment["created_by"]), |
| 479 | + "updated_at": datetime.now(timezone.utc), |
| 480 | + "updated_by": str(performed_by_user_id), |
| 481 | + }, |
| 482 | + } |
| 483 | + ) |
| 484 | + if ( |
| 485 | + assignment["task_id"] in tasks_to_clear_deferred_ids |
| 486 | + or assignment["task_id"] in tasks_to_reset_status_ids |
| 487 | + ): |
| 488 | + task = tasks_by_id[assignment["task_id"]] |
| 489 | + operations.append( |
| 490 | + { |
| 491 | + "collection_name": "tasks", |
| 492 | + "operation": "update", |
| 493 | + "mongo_id": assignment["task_id"], |
| 494 | + "data": { |
| 495 | + "title": task.get("title"), |
| 496 | + "description": task.get("description"), |
| 497 | + "priority": task.get("priority"), |
| 498 | + "status": TaskStatus.TODO.value, |
| 499 | + "displayId": task.get("displayId"), |
| 500 | + "deferredDetails": None, |
| 501 | + "isAcknowledged": task.get("isAcknowledged", False), |
| 502 | + "isDeleted": task.get("isDeleted", False), |
| 503 | + "startedAt": task.get("startedAt"), |
| 504 | + "dueAt": task.get("dueAt"), |
| 505 | + "createdAt": task.get("createdAt"), |
| 506 | + "createdBy": str(task.get("createdBy")), |
| 507 | + "updatedAt": datetime.now(timezone.utc), |
| 508 | + "updated_by": str(performed_by_user_id), |
| 509 | + }, |
| 510 | + } |
| 511 | + ) |
| 512 | + |
| 513 | + dual_write_success = dual_write_service.batch_operations(operations) |
| 514 | + if not dual_write_success: |
| 515 | + import logging |
| 516 | + |
| 517 | + logger = logging.getLogger(__name__) |
| 518 | + logger.warning("Failed to sync task reassignments to Postgres") |
| 519 | + |
| 520 | + return False |
| 521 | + return True |
| 522 | + except Exception: |
| 523 | + return False |
0 commit comments