Skip to content

Commit e6c37e9

Browse files
committed
refactor: enhance PostgreSQL model definitions and integrate dual-write service for task assignments and user roles
1 parent b7cfd53 commit e6c37e9

17 files changed

+823
-732
lines changed

todo/migrations/0001_initial_setup.py

Lines changed: 365 additions & 198 deletions
Large diffs are not rendered by default.

todo/repositories/task_assignment_repository.py

Lines changed: 118 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,23 +6,45 @@
66
from todo.models.task_assignment import TaskAssignmentModel
77
from todo.repositories.common.mongo_repository import MongoRepository
88
from todo.models.common.pyobjectid import PyObjectId
9+
from todo.services.enhanced_dual_write_service import EnhancedDualWriteService
910

1011

1112
class TaskAssignmentRepository(MongoRepository):
1213
collection_name = TaskAssignmentModel.collection_name
1314

1415
@classmethod
1516
def create(cls, task_assignment: TaskAssignmentModel) -> TaskAssignmentModel:
16-
"""
17-
Creates a new task assignment.
18-
"""
1917
collection = cls.get_collection()
2018
task_assignment.created_at = datetime.now(timezone.utc)
2119
task_assignment.updated_at = None
2220

2321
task_assignment_dict = task_assignment.model_dump(mode="json", by_alias=True, exclude_none=True)
2422
insert_result = collection.insert_one(task_assignment_dict)
2523
task_assignment.id = insert_result.inserted_id
24+
25+
dual_write_service = EnhancedDualWriteService()
26+
task_assignment_data = {
27+
"task_id": str(task_assignment.task_id),
28+
"assignee_id": str(task_assignment.assignee_id),
29+
"user_type": task_assignment.user_type,
30+
"team_id": str(task_assignment.team_id) if task_assignment.team_id else None,
31+
"is_active": task_assignment.is_active,
32+
"created_by": str(task_assignment.created_by),
33+
"updated_by": str(task_assignment.updated_by) if task_assignment.updated_by else None,
34+
"created_at": task_assignment.created_at,
35+
"updated_at": task_assignment.updated_at,
36+
}
37+
38+
dual_write_success = dual_write_service.create_document(
39+
collection_name="task_assignments", data=task_assignment_data, mongo_id=str(task_assignment.id)
40+
)
41+
42+
if not dual_write_success:
43+
import logging
44+
45+
logger = logging.getLogger(__name__)
46+
logger.warning(f"Failed to sync task assignment {task_assignment.id} to Postgres")
47+
2648
return task_assignment
2749

2850
@classmethod
@@ -124,11 +146,13 @@ def update_assignment(
124146

125147
@classmethod
126148
def delete_assignment(cls, task_id: str, user_id: str) -> bool:
127-
"""
128-
Soft delete a task assignment by setting is_active to False.
129-
"""
130149
collection = cls.get_collection()
131150
try:
151+
# Get current assignment first
152+
current_assignment = cls.get_by_task_id(task_id)
153+
if not current_assignment:
154+
return False
155+
132156
# Try with ObjectId first
133157
result = collection.update_one(
134158
{"task_id": ObjectId(task_id), "is_active": True},
@@ -152,17 +176,45 @@ def delete_assignment(cls, task_id: str, user_id: str) -> bool:
152176
}
153177
},
154178
)
179+
180+
if result.modified_count > 0:
181+
# Sync to PostgreSQL
182+
dual_write_service = EnhancedDualWriteService()
183+
assignment_data = {
184+
"task_id": str(current_assignment.task_id),
185+
"assignee_id": str(current_assignment.assignee_id),
186+
"user_type": current_assignment.user_type,
187+
"team_id": str(current_assignment.team_id) if current_assignment.team_id else None,
188+
"is_active": False,
189+
"created_by": str(current_assignment.created_by),
190+
"updated_by": str(user_id),
191+
"created_at": current_assignment.created_at,
192+
"updated_at": datetime.now(timezone.utc),
193+
}
194+
195+
dual_write_success = dual_write_service.update_document(
196+
collection_name="task_assignments", data=assignment_data, mongo_id=str(current_assignment.id)
197+
)
198+
199+
if not dual_write_success:
200+
import logging
201+
202+
logger = logging.getLogger(__name__)
203+
logger.warning(f"Failed to sync task assignment deletion {current_assignment.id} to Postgres")
204+
155205
return result.modified_count > 0
156206
except Exception:
157207
return False
158208

159209
@classmethod
160210
def update_executor(cls, task_id: str, executor_id: str, user_id: str) -> bool:
161-
"""
162-
Update the executor_id for the active assignment of the given task_id.
163-
"""
164211
collection = cls.get_collection()
165212
try:
213+
# Get current assignment first
214+
current_assignment = cls.get_by_task_id(task_id)
215+
if not current_assignment:
216+
return False
217+
166218
result = collection.update_one(
167219
{"task_id": ObjectId(task_id), "is_active": True},
168220
{
@@ -187,17 +239,45 @@ def update_executor(cls, task_id: str, executor_id: str, user_id: str) -> bool:
187239
}
188240
},
189241
)
242+
243+
if result.modified_count > 0:
244+
# Sync to PostgreSQL
245+
dual_write_service = EnhancedDualWriteService()
246+
assignment_data = {
247+
"task_id": str(current_assignment.task_id),
248+
"assignee_id": str(executor_id),
249+
"user_type": "user",
250+
"team_id": str(current_assignment.team_id) if current_assignment.team_id else None,
251+
"is_active": current_assignment.is_active,
252+
"created_by": str(current_assignment.created_by),
253+
"updated_by": str(user_id),
254+
"created_at": current_assignment.created_at,
255+
"updated_at": datetime.now(timezone.utc),
256+
}
257+
258+
dual_write_success = dual_write_service.update_document(
259+
collection_name="task_assignments", data=assignment_data, mongo_id=str(current_assignment.id)
260+
)
261+
262+
if not dual_write_success:
263+
import logging
264+
265+
logger = logging.getLogger(__name__)
266+
logger.warning(f"Failed to sync task assignment update {current_assignment.id} to Postgres")
267+
190268
return result.modified_count > 0
191269
except Exception:
192270
return False
193271

194272
@classmethod
195273
def deactivate_by_task_id(cls, task_id: str, user_id: str) -> bool:
196-
"""
197-
Deactivate all assignments for a specific task by setting is_active to False.
198-
"""
199274
collection = cls.get_collection()
200275
try:
276+
# Get all active assignments for this task
277+
active_assignments = cls.get_by_task_id(task_id)
278+
if not active_assignments:
279+
return False
280+
201281
# Try with ObjectId first
202282
result = collection.update_many(
203283
{"task_id": ObjectId(task_id), "is_active": True},
@@ -221,6 +301,32 @@ def deactivate_by_task_id(cls, task_id: str, user_id: str) -> bool:
221301
}
222302
},
223303
)
304+
305+
if result.modified_count > 0:
306+
# Sync to PostgreSQL for each assignment
307+
dual_write_service = EnhancedDualWriteService()
308+
assignment_data = {
309+
"task_id": str(active_assignments.task_id),
310+
"assignee_id": str(active_assignments.assignee_id),
311+
"user_type": active_assignments.user_type,
312+
"team_id": str(active_assignments.team_id) if active_assignments.team_id else None,
313+
"is_active": False,
314+
"created_by": str(active_assignments.created_by),
315+
"updated_by": str(user_id),
316+
"created_at": active_assignments.created_at,
317+
"updated_at": datetime.now(timezone.utc),
318+
}
319+
320+
dual_write_success = dual_write_service.update_document(
321+
collection_name="task_assignments", data=assignment_data, mongo_id=str(active_assignments.id)
322+
)
323+
324+
if not dual_write_success:
325+
import logging
326+
327+
logger = logging.getLogger(__name__)
328+
logger.warning(f"Failed to sync task assignment deactivation {active_assignments.id} to Postgres")
329+
224330
return result.modified_count > 0
225331
except Exception:
226332
return False

todo/repositories/task_repository.py

Lines changed: 85 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
TaskStatus,
1717
)
1818
from todo.repositories.team_repository import UserTeamDetailsRepository
19+
from todo.services.enhanced_dual_write_service import EnhancedDualWriteService
20+
from todo.models.postgres import PostgresTask, PostgresDeferredDetails
1921

2022

2123
class TaskRepository(MongoRepository):
@@ -210,6 +212,31 @@ def create(cls, task: TaskModel) -> TaskModel:
210212
insert_result = tasks_collection.insert_one(task_dict, session=session)
211213

212214
task.id = insert_result.inserted_id
215+
216+
dual_write_service = EnhancedDualWriteService()
217+
task_data = {
218+
"title": task.title,
219+
"description": task.description,
220+
"priority": task.priority,
221+
"status": task.status,
222+
"display_id": task.displayId,
223+
"created_by": str(task.createdBy),
224+
"updated_by": str(task.updatedBy) if task.updatedBy else None,
225+
"is_deleted": task.isDeleted,
226+
"created_at": task.createdAt,
227+
"updated_at": task.updatedAt,
228+
}
229+
230+
dual_write_success = dual_write_service.create_document(
231+
collection_name="tasks", data=task_data, mongo_id=str(task.id)
232+
)
233+
234+
if not dual_write_success:
235+
import logging
236+
237+
logger = logging.getLogger(__name__)
238+
logger.warning(f"Failed to sync task {task.id} to Postgres")
239+
213240
return task
214241

215242
except Exception as e:
@@ -259,9 +286,6 @@ def delete_by_id(cls, task_id: ObjectId, user_id: str) -> TaskModel | None:
259286

260287
@classmethod
261288
def update(cls, task_id: str, update_data: dict) -> TaskModel | None:
262-
"""
263-
Updates a specific task by its ID with the given data.
264-
"""
265289
if not isinstance(update_data, dict):
266290
raise ValueError("update_data must be a dictionary.")
267291

@@ -281,7 +305,37 @@ def update(cls, task_id: str, update_data: dict) -> TaskModel | None:
281305
)
282306

283307
if updated_task_doc:
284-
return TaskModel(**updated_task_doc)
308+
task_model = TaskModel(**updated_task_doc)
309+
310+
dual_write_service = EnhancedDualWriteService()
311+
task_data = {
312+
"title": task_model.title,
313+
"description": task_model.description,
314+
"priority": task_model.priority,
315+
"status": task_model.status,
316+
"display_id": task_model.displayId,
317+
"created_by": str(task_model.createdBy),
318+
"updated_by": str(task_model.updatedBy) if task_model.updatedBy else None,
319+
"is_deleted": task_model.isDeleted,
320+
"created_at": task_model.createdAt,
321+
"updated_at": task_model.updatedAt,
322+
}
323+
324+
dual_write_success = dual_write_service.update_document(
325+
collection_name="tasks", data=task_data, mongo_id=str(task_model.id)
326+
)
327+
328+
if not dual_write_success:
329+
import logging
330+
331+
logger = logging.getLogger(__name__)
332+
logger.warning(f"Failed to sync task update {task_model.id} to Postgres")
333+
334+
# Handle deferred details if present in update_data
335+
if "deferredDetails" in update_data:
336+
cls._handle_deferred_details_sync(task_id, update_data["deferredDetails"])
337+
338+
return task_model
285339
return None
286340

287341
@classmethod
@@ -307,3 +361,30 @@ def get_by_ids(cls, task_ids: List[str]) -> List[TaskModel]:
307361
object_ids = [ObjectId(task_id) for task_id in task_ids]
308362
cursor = tasks_collection.find({"_id": {"$in": object_ids}})
309363
return [TaskModel(**doc) for doc in cursor]
364+
365+
@classmethod
366+
def _handle_deferred_details_sync(cls, task_id: str, deferred_details: dict) -> None:
367+
"""Handle deferred details synchronization to PostgreSQL"""
368+
try:
369+
postgres_task = PostgresTask.objects.get(mongo_id=task_id)
370+
371+
if deferred_details:
372+
deferred_details_data = {
373+
"task": postgres_task,
374+
"deferred_at": deferred_details.get("deferredAt"),
375+
"deferred_till": deferred_details.get("deferredTill"),
376+
"deferred_by": str(deferred_details.get("deferredBy")),
377+
}
378+
379+
PostgresDeferredDetails.objects.update_or_create(task=postgres_task, defaults=deferred_details_data)
380+
else:
381+
# Remove deferred details if None
382+
PostgresDeferredDetails.objects.filter(task=postgres_task).delete()
383+
384+
except PostgresTask.DoesNotExist:
385+
pass
386+
except Exception as e:
387+
import logging
388+
389+
logger = logging.getLogger(__name__)
390+
logger.warning(f"Failed to sync deferred details to PostgreSQL for task {task_id}: {str(e)}")

todo/repositories/team_creation_invite_code_repository.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from todo.repositories.common.mongo_repository import MongoRepository
55
from todo.models.team_creation_invite_code import TeamCreationInviteCodeModel
66
from todo.repositories.user_repository import UserRepository
7+
from todo.services.enhanced_dual_write_service import EnhancedDualWriteService
78

89

910
class TeamCreationInviteCodeRepository(MongoRepository):
@@ -38,13 +39,34 @@ def validate_and_consume_code(cls, code: str, used_by: str) -> Optional[dict]:
3839

3940
@classmethod
4041
def create(cls, team_invite_code: TeamCreationInviteCodeModel) -> TeamCreationInviteCodeModel:
41-
"""Create a new team invite code."""
4242
collection = cls.get_collection()
4343
team_invite_code.created_at = datetime.now(timezone.utc)
4444

4545
code_dict = team_invite_code.model_dump(mode="json", by_alias=True, exclude_none=True)
4646
insert_result = collection.insert_one(code_dict)
4747
team_invite_code.id = insert_result.inserted_id
48+
49+
dual_write_service = EnhancedDualWriteService()
50+
invite_code_data = {
51+
"code": team_invite_code.code,
52+
"description": team_invite_code.description,
53+
"is_used": team_invite_code.is_used,
54+
"created_by": str(team_invite_code.created_by),
55+
"used_by": str(team_invite_code.used_by) if team_invite_code.used_by else None,
56+
"created_at": team_invite_code.created_at,
57+
"used_at": team_invite_code.used_at,
58+
}
59+
60+
dual_write_success = dual_write_service.create_document(
61+
collection_name="team_creation_invite_codes", data=invite_code_data, mongo_id=str(team_invite_code.id)
62+
)
63+
64+
if not dual_write_success:
65+
import logging
66+
67+
logger = logging.getLogger(__name__)
68+
logger.warning(f"Failed to sync team creation invite code {team_invite_code.id} to Postgres")
69+
4870
return team_invite_code
4971

5072
@classmethod

0 commit comments

Comments
 (0)