Skip to content
3 changes: 2 additions & 1 deletion todo/models/task_assignment.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ class TaskAssignmentModel(Document):
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime | None = None
executor_id: PyObjectId | None = None # User within the team who is executing the task
original_team_id: PyObjectId | None = None # Track the original team when reassigned from team to user

@validator("task_id", "assignee_id", "created_by", "updated_by")
@validator("task_id", "assignee_id", "created_by", "updated_by", "original_team_id")
def validate_object_ids(cls, v):
if v is None:
return v
Expand Down
7 changes: 7 additions & 0 deletions todo/repositories/task_assignment_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ def update_assignment(
"""
collection = cls.get_collection()
try:
current_assignment = cls.get_by_task_id(task_id)
original_team_id = None

if current_assignment and current_assignment.user_type == "team" and user_type == "user":
original_team_id = current_assignment.assignee_id

# Deactivate current assignment if exists (try both ObjectId and string)
collection.update_many(
{"task_id": ObjectId(task_id), "is_active": True},
Expand Down Expand Up @@ -102,6 +108,7 @@ def update_assignment(
user_type=user_type,
created_by=PyObjectId(user_id),
updated_by=None,
original_team_id=original_team_id,
)

return cls.create(new_assignment)
Expand Down
86 changes: 77 additions & 9 deletions todo/repositories/task_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from typing import List
from bson import ObjectId
from pymongo import ReturnDocument
import logging

from todo.exceptions.task_exceptions import TaskNotFoundException
from todo.models.task import TaskModel
Expand Down Expand Up @@ -40,17 +39,52 @@ def list(
status_filter: str = None,
) -> List[TaskModel]:
tasks_collection = cls.get_collection()
logger = logging.getLogger(__name__)

base_filter = cls._build_status_filter(status_filter)

if team_id:
logger.debug(f"TaskRepository.list: team_id={team_id}")
# Get tasks directly assigned to the team
team_assignments = TaskAssignmentRepository.get_by_assignee_id(team_id, "team")
team_task_ids = [assignment.task_id for assignment in team_assignments]
logger.debug(f"TaskRepository.list: team_task_ids={team_task_ids}")
query_filter = {"$and": [base_filter, {"_id": {"$in": team_task_ids}}]}
logger.debug(f"TaskRepository.list: query_filter={query_filter}")
direct_team_task_ids = [assignment.task_id for assignment in team_assignments]

# Get tasks assigned to team members (with original_team_id filtering for team isolation)
from todo.repositories.team_repository import UserTeamDetailsRepository

team_member_ids = UserTeamDetailsRepository.get_users_by_team_id(team_id)
member_task_ids = []

if team_member_ids:
# Only get tasks where original_team_id matches current team (for team isolation)
member_assignments = list(
TaskAssignmentRepository.get_collection().find(
{
"$and": [
{
"$or": [
{
"assignee_id": {
"$in": [ObjectId(member_id) for member_id in team_member_ids]
}
},
{"assignee_id": {"$in": team_member_ids}},
]
},
{"user_type": "user"},
{"is_active": True},
{
"$or": [
{"original_team_id": ObjectId(team_id)},
{"original_team_id": team_id},
]
},
]
}
)
)
member_task_ids = [ObjectId(assignment["task_id"]) for assignment in member_assignments]

all_team_task_ids = list(set(direct_team_task_ids + member_task_ids))
query_filter = {"$and": [base_filter, {"_id": {"$in": all_team_task_ids}}]}
elif user_id:
assigned_task_ids = cls._get_assigned_task_ids_for_user(user_id)
query_filter = {"$and": [base_filter, {"_id": {"$in": assigned_task_ids}}]}
Expand Down Expand Up @@ -108,9 +142,43 @@ def count(cls, user_id: str = None, team_id: str = None, status_filter: str = No
base_filter = cls._build_status_filter(status_filter)

if team_id:
# Get tasks directly assigned to the team
team_assignments = TaskAssignmentRepository.get_by_assignee_id(team_id, "team")
team_task_ids = [assignment.task_id for assignment in team_assignments]
query_filter = {"$and": [base_filter, {"_id": {"$in": team_task_ids}}]}
direct_team_task_ids = [assignment.task_id for assignment in team_assignments]

# Get tasks assigned to team members (with original_team_id filtering for team isolation)
from todo.repositories.team_repository import UserTeamDetailsRepository

team_member_ids = UserTeamDetailsRepository.get_users_by_team_id(team_id)
member_task_ids = []

if team_member_ids:
# Only get tasks where original_team_id matches current team (for team isolation)

member_assignments = TaskAssignmentRepository.get_collection().find(
{
"$and": [
{
"$or": [
{"assignee_id": {"$in": [ObjectId(member_id) for member_id in team_member_ids]}},
{"assignee_id": {"$in": team_member_ids}},
]
},
{"user_type": "user"},
{"is_active": True},
{
"$or": [
{"original_team_id": ObjectId(team_id)},
{"original_team_id": team_id},
]
},
]
}
)
member_task_ids = [ObjectId(assignment["task_id"]) for assignment in member_assignments]

all_team_task_ids = list(set(direct_team_task_ids + member_task_ids))
query_filter = {"$and": [base_filter, {"_id": {"$in": all_team_task_ids}}]}
elif user_id:
assigned_task_ids = cls._get_assigned_task_ids_for_user(user_id)
query_filter = {
Expand Down
20 changes: 5 additions & 15 deletions todo/views/task_assignment.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,26 +222,16 @@ def patch(self, request: Request, task_id: str):
{"error": f"User {executor_id} is not a member of the team."}, status=status.HTTP_400_BAD_REQUEST
)

# Update executor_id
try:
updated = TaskAssignmentRepository.update_executor(task_id, executor_id, user["user_id"])
if not updated:
# Get more details about why it failed
import traceback

print(
f"DEBUG: update_executor failed for task_id={task_id}, executor_id={executor_id}, user_id={user['user_id']}"
)
print(f"DEBUG: assignment details: {assignment}")
updated_assignment = TaskAssignmentRepository.update_assignment(
task_id, executor_id, "user", user["user_id"]
)
if not updated_assignment:
return Response(
{"error": "Failed to update executor. Check server logs for details."},
{"error": "Failed to update assignment."},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
except Exception as e:
print(f"DEBUG: Exception in update_executor: {str(e)}")
import traceback

traceback.print_exc()
return Response(
{"error": f"Exception during update: {str(e)}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
Expand Down