8
8
from todo .models .common .pyobjectid import PyObjectId
9
9
from todo .constants .task import TaskStatus
10
10
from todo .services .enhanced_dual_write_service import EnhancedDualWriteService
11
+ from todo .repositories .audit_log_repository import AuditLogRepository , AuditLogModel
11
12
12
13
13
14
class TaskAssignmentRepository (MongoRepository ):
@@ -362,58 +363,103 @@ def reassign_tasks_from_user_to_team(cls, user_id: str, team_id: str, performed_
362
363
"""
363
364
Reassign all tasks of user to team
364
365
"""
365
- client = cls .get_client ()
366
366
collection = cls .get_collection ()
367
-
367
+ client = cls . get_client ()
368
368
with client .start_session () as session :
369
369
try :
370
370
with session .start_transaction ():
371
371
now = datetime .now (timezone .utc )
372
- pipeline = [
372
+ user_task_assignments = list (
373
+ collection .find (
374
+ {
375
+ "$and" : [
376
+ {"is_active" : True },
377
+ {
378
+ "$or" : [{"assignee_id" : user_id }, {"assignee_id" : ObjectId (user_id )}],
379
+ },
380
+ {"$or" : [{"team_id" : team_id }, {"team_id" : ObjectId (team_id )}]},
381
+ ]
382
+ },
383
+ session = session ,
384
+ )
385
+ )
386
+ if not user_task_assignments :
387
+ return 0
388
+ active_user_task_assignments_ids = [
389
+ ObjectId (assignment ["task_id" ]) for assignment in user_task_assignments
390
+ ]
391
+
392
+ from todo .repositories .task_repository import TaskRepository
393
+
394
+ tasks_collection = TaskRepository .get_collection ()
395
+ active_tasks = list (
396
+ tasks_collection .find (
397
+ {
398
+ "_id" : {"$in" : active_user_task_assignments_ids },
399
+ "status" : {"$ne" : TaskStatus .DONE .value },
400
+ },
401
+ session = session ,
402
+ )
403
+ )
404
+ not_done_tasks_ids = [str (tasks ["_id" ]) for tasks in active_tasks ]
405
+ tasks_to_reset_status_ids = []
406
+ tasks_to_clear_deferred_ids = []
407
+ for tasks in active_tasks :
408
+ if tasks ["status" ] == TaskStatus .IN_PROGRESS .value :
409
+ tasks_to_reset_status_ids .append (tasks ["_id" ])
410
+ elif tasks .get ("deferredDetails" ) is not None :
411
+ tasks_to_clear_deferred_ids .append (tasks ["_id" ])
412
+
413
+ collection .update_many (
414
+ {
415
+ "task_id" : {"$in" : not_done_tasks_ids },
416
+ },
373
417
{
374
- "$match " : {
375
- "team_id " : { "$in" : [ team_id , ObjectId (team_id )]} ,
376
- "assignee_id " : { "$in" : [ user_id , ObjectId ( user_id )]} ,
377
- "user_type " : "user" ,
378
- "is_active " : True ,
418
+ "$set " : {
419
+ "assignee_id " : ObjectId (team_id ),
420
+ "user_type " : "team" ,
421
+ "updated_at " : now ,
422
+ "updated_by " : ObjectId ( performed_by_user_id ) ,
379
423
}
380
424
},
425
+ session = session ,
426
+ )
427
+
428
+ for assignment in user_task_assignments :
429
+ AuditLogRepository .create (
430
+ AuditLogModel (
431
+ task_id = PyObjectId (assignment ["task_id" ]),
432
+ team_id = PyObjectId (team_id ),
433
+ action = "assigned_to_team" ,
434
+ performed_by = PyObjectId (performed_by_user_id ),
435
+ )
436
+ )
437
+
438
+ tasks_collection .update_many (
439
+ {"_id" : {"$in" : tasks_to_reset_status_ids }},
381
440
{
382
- "$addFields" : {
383
- "task_id_obj" : {
384
- "$convert" : {
385
- "input" : "$task_id" ,
386
- "to" : "objectId" ,
387
- "onError" : "$task_id" ,
388
- "onNull" : None ,
389
- }
390
- }
441
+ "$set" : {
442
+ "status" : TaskStatus .TODO .value ,
443
+ "updated_at" : now ,
444
+ "updated_by" : ObjectId (performed_by_user_id ),
391
445
}
392
446
},
447
+ session = session ,
448
+ )
449
+ tasks_collection .update_many (
450
+ {"_id" : {"$in" : tasks_to_clear_deferred_ids }},
393
451
{
394
- "$lookup " : {
395
- "from " : "tasks" ,
396
- "localField " : "task_id_obj" ,
397
- "foreignField " : "_id" ,
398
- "as " : "task_info" ,
452
+ "$set " : {
453
+ "status " : TaskStatus . TODO . value ,
454
+ "deferredDetails " : None ,
455
+ "updated_at " : now ,
456
+ "updated_by " : ObjectId ( performed_by_user_id ) ,
399
457
}
400
458
},
401
- {"$unwind" : "$task_info" },
402
- {"$match" : {"task_info.status" : {"$ne" : TaskStatus .DONE .value }}},
403
- ]
404
- user_task_assignments = list (collection .aggregate (pipeline ))
405
- if not user_task_assignments :
406
- return 0
407
- not_done_task_assignment_ids = [assignment ["_id" ] for assignment in user_task_assignments ]
408
- not_done_task_ids = [assignment ["task_id_obj" ] for assignment in user_task_assignments ]
409
- collection .update_many (
410
- {"_id" : {"$in" : not_done_task_assignment_ids }},
411
- {"$set" : {"is_active" : False , "updated_by" : ObjectId (performed_by_user_id ), "updated_at" : now }},
412
459
session = session ,
413
460
)
414
461
415
- new_assignments = []
416
-
462
+ tasks_by_id = {task ["_id" ]: task for task in active_tasks }
417
463
operations = []
418
464
dual_write_service = EnhancedDualWriteService ()
419
465
for assignment in user_task_assignments :
@@ -424,95 +470,54 @@ def reassign_tasks_from_user_to_team(cls, user_id: str, team_id: str, performed_
424
470
"mongo_id" : assignment ["_id" ],
425
471
"data" : {
426
472
"task_mongo_id" : str (assignment ["task_id" ]),
427
- "assignee_id" : str (assignment ["assignee_id " ]),
428
- "user_type" : assignment [ "user_type" ] ,
473
+ "assignee_id" : str (assignment ["team_id " ]),
474
+ "user_type" : "team" ,
429
475
"team_id" : str (assignment ["team_id" ]),
430
- "is_active" : False ,
476
+ "is_active" : True ,
431
477
"created_at" : assignment ["created_at" ],
432
- "updated_at" : datetime .now (timezone .utc ),
433
478
"created_by" : str (assignment ["created_by" ]),
479
+ "updated_at" : datetime .now (timezone .utc ),
434
480
"updated_by" : str (performed_by_user_id ),
435
481
},
436
482
}
437
483
)
438
- new_assignment_id = PyObjectId ()
439
- new_assignments .append (
440
- TaskAssignmentModel (
441
- _id = new_assignment_id ,
442
- task_id = assignment ["task_id_obj" ],
443
- assignee_id = PyObjectId (team_id ),
444
- user_type = "team" ,
445
- created_by = PyObjectId (performed_by_user_id ),
446
- updated_by = None ,
447
- team_id = PyObjectId (team_id ),
448
- ).model_dump (mode = "json" , by_alias = True , exclude_none = True )
449
- )
450
- operations .append (
451
- {
452
- "collection_name" : "task_assignments" ,
453
- "operation" : "create" ,
454
- "mongo_id" : new_assignment_id ,
455
- "data" : {
456
- "task_mongo_id" : str (assignment ["task_id" ]),
457
- "assignee_id" : str (team_id ),
458
- "user_type" : "team" ,
459
- "team_id" : str (team_id ),
460
- "is_active" : True ,
461
- "created_at" : datetime .now (timezone .utc ),
462
- "updated_at" : None ,
463
- "created_by" : str (performed_by_user_id ),
464
- "updated_by" : None ,
465
- },
466
- }
467
- )
468
- task_details = assignment ["task_info" ]
469
- operations .append (
470
- {
471
- "collection_name" : "tasks" ,
472
- "operation" : "update" ,
473
- "mongo_id" : assignment ["task_id" ],
474
- "data" : {
475
- "title" : task_details .get ("title" ),
476
- "description" : task_details .get ("description" ),
477
- "priority" : task_details .get ("priority" ),
478
- "status" : TaskStatus .TODO ,
479
- "displayId" : task_details .get ("displayId" ),
480
- "deferredDetails" : None ,
481
- "isAcknowledged" : task_details .get ("isAcknowledged" , False ),
482
- "isDeleted" : task_details .get ("isDeleted" , False ),
483
- "startedAt" : task_details .get ("startedAt" ),
484
- "dueAt" : task_details .get ("dueAt" ),
485
- "createdAt" : task_details .get ("createdAt" ),
486
- "updatedAt" : datetime .now (timezone .utc ),
487
- "createdBy" : str (task_details .get ("createdBy" )),
488
- "updated_by" : str (performed_by_user_id ),
489
- },
490
- }
491
- )
492
-
493
- if new_assignments :
494
- collection .insert_many (new_assignments , session = session )
495
- from todo .repositories .task_repository import TaskRepository
496
-
497
- tasks_collection = TaskRepository .get_collection ()
498
- tasks_collection .update_many (
499
- {"_id" : {"$in" : not_done_task_ids }},
500
- {
501
- "$set" : {
502
- "status" : TaskStatus .TODO .value ,
503
- "deferredDetails" : None ,
504
- "updatedAt" : datetime .now (timezone .utc ),
484
+ if (
485
+ assignment ["task_id" ] in tasks_to_clear_deferred_ids
486
+ or assignment ["task_id" ] in tasks_to_reset_status_ids
487
+ ):
488
+ task = tasks_by_id [assignment ["task_id" ]]
489
+ operations .append (
490
+ {
491
+ "collection_name" : "tasks" ,
492
+ "operation" : "update" ,
493
+ "mongo_id" : assignment ["task_id" ],
494
+ "data" : {
495
+ "title" : task .get ("title" ),
496
+ "description" : task .get ("description" ),
497
+ "priority" : task .get ("priority" ),
498
+ "status" : TaskStatus .TODO ,
499
+ "displayId" : task .get ("displayId" ),
500
+ "deferredDetails" : None ,
501
+ "isAcknowledged" : task .get ("isAcknowledged" , False ),
502
+ "isDeleted" : task .get ("isDeleted" , False ),
503
+ "startedAt" : task .get ("startedAt" ),
504
+ "dueAt" : task .get ("dueAt" ),
505
+ "createdAt" : task .get ("createdAt" ),
506
+ "createdBy" : str (task .get ("createdBy" )),
507
+ "updatedAt" : datetime .now (timezone .utc ),
508
+ "updated_by" : str (performed_by_user_id ),
509
+ },
505
510
}
506
- },
507
- session = session ,
508
- )
509
- dual_write_success = dual_write_service .batch_operations (operations )
511
+ )
512
+
513
+ dual_write_success = dual_write_service .batch_operations (operations )
514
+ if not dual_write_success :
515
+ import logging
510
516
511
- if not dual_write_success :
512
- import logging
517
+ logger = logging . getLogger ( __name__ )
518
+ logger . warning ( "Failed to sync task reassignments to Postgres" )
513
519
514
- logger = logging .getLogger (__name__ )
515
- logger .warning ("Failed to sync task reassignments to Postgres" )
516
- return len (new_assignments )
520
+ return len (user_task_assignments )
521
+ return len (user_task_assignments )
517
522
except Exception :
518
523
return 0
0 commit comments