Skip to content

Conversation

AnujChhikara
Copy link
Member

@AnujChhikara AnujChhikara commented Aug 25, 2025

Date: 25 Aug 2025

Developer Name: @AnujChhikara


Issue Ticket Number

Description

Documentation Updated?

  • Yes
  • No

Under Feature Flag

  • Yes
  • No

Database Changes

  • Yes
  • No

Breaking Changes

  • Yes
  • No

Development Tested?

  • Yes
  • No

Screenshots

Screenshot 1

Test Coverage

Screenshot 1

Additional Notes

Description by Korbit AI

What change is being made?

Introduce a dual-write mechanism to the codebase to synchronize data between MongoDB and PostgreSQL, including corresponding data models, service classes, repositories, migration files, and configuration updates.

Why are these changes being made?

The changes aim to facilitate a future migration from MongoDB to PostgreSQL by ensuring data consistency and reliability across both databases. This approach minimizes operational risks and enables a systematic transition through dual-writing and eventual read migration. The synchronization is monitored with appropriate error handling, retries, and system metrics to maintain data integrity and performance.

Is this description stale? Ask me to generate a new description by commenting /korbit-generate-pr-description

iamitprakash and others added 17 commits August 22, 2025 13:28
…zation

- Added comprehensive documentation for the dual-write feature in README.md
- Updated requirements.txt to include psycopg2-binary for PostgreSQL support
- Modified Django settings to configure PostgreSQL as the primary database
- Introduced dual-write operations with error handling and monitoring capabilities
- Enhanced project structure for future migration paths and Docker development setup
…te service for task assignments and user roles
…data handling and enhance dual-write service integration
@AnujChhikara AnujChhikara self-assigned this Aug 25, 2025
Copy link

korbit-ai bot commented Aug 25, 2025

Korbit doesn't automatically review large (3000+ lines changed) pull requests such as this one. If you want me to review anyway, use /korbit-review.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 98

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (15)
todo/repositories/abstract_repository.py (1)

184-201: AbstractAuditLogRepository interface mismatches PostgresAuditLog fields

The concrete Postgres implementation in todo/repositories/postgres_repository.py invokes filters on fields that don’t exist on the PostgresAuditLog model:

  • get_by_user(...) filters on user_mongo_id, but the Django model defines performed_by instead.
  • get_by_collection(...) filters on collection_name, yet no such column exists on PostgresAuditLog.

As-is, these calls will raise runtime errors. To resolve:

  • Rename and adjust get_by_user to get_by_performed_by (and filter on performed_by).
  • Either remove or redefine get_by_collection to target a valid field (e.g. task_id or team_id), or update the model to include the intended column.
  • Update the abstract interface (AbstractAuditLogRepository) to match the revised method names and signatures.

Please make these critical fixes before merging.

todo/services/task_assignment_service.py (1)

63-73: Guard against creation failures; exclusivity is already enforced

  • After calling TaskAssignmentRepository.create(...), add a check to surface failures instead of returning a response with missing identifiers:

     assignment = TaskAssignmentRepository.create(task_assignment)
     if not assignment:
         raise ValueError("Failed to create task assignment")
  • The repository’s update_assignment method (in todo/repositories/task_assignment_repository.py, lines 90–109) already deactivates any existing active assignments—first by matching on ObjectId(task_id), then on the string task_id—before creating the new assignment, ensuring that a task cannot remain assigned to both a team and an individual simultaneously.

todo/repositories/team_creation_invite_code_repository.py (2)

31-35: Critical: find_one_and_update returns the pre-update doc due to wrong return_document value

PyMongo expects ReturnDocument.BEFORE/AFTER, not a boolean. Passing True will not yield the updated document and will likely return the pre-update version, breaking the dual-write payload and the method’s return value.

Apply this diff:

-            result = collection.find_one_and_update(
+            result = collection.find_one_and_update(
                 {"code": code, "is_used": False},
-                {"$set": {"is_used": True, "used_by": used_by, "used_at": current_time.isoformat()}},
-                return_document=True,
+                {"$set": {"is_used": True, "used_by": used_by, "used_at": current_time}},
+                return_document=ReturnDocument.AFTER,
             )

And add the missing import at the top of the file:

+from pymongo import ReturnDocument

122-131: Consistent API shape for created_by/used_by in list response

created_by falls back to {}, while used_by is None when unavailable. This asymmetry complicates clients. Recommend returning None for both when absent.

-                enhanced_code = {
+                enhanced_code = {
                     "id": str(code["_id"]),
                     "code": code["code"],
                     "description": code.get("description"),
                     "created_at": code.get("created_at"),
                     "used_at": code.get("used_at"),
                     "is_used": code.get("is_used", False),
-                    "created_by": created_by_user or {},
-                    "used_by": used_by_user,
+                    "created_by": created_by_user or None,
+                    "used_by": used_by_user or None,
                 }
todo/repositories/team_repository.py (3)

80-106: Missing Postgres dual-write on team update

The create path dual-writes, but update doesn’t. Postgres will drift from Mongo. Add an update_document call after a successful Mongo update.

         updated_doc = teams_collection.find_one_and_update(
             {"_id": ObjectId(team_id), "is_deleted": False},
             {"$set": update_data},
             return_document=ReturnDocument.AFTER,
         )
 
-        if updated_doc:
-            return TeamModel(**updated_doc)
+        if updated_doc:
+            # Sync update to Postgres
+            dual_write_service = EnhancedDualWriteService()
+            team_data = {
+                "name": updated_doc.get("name"),
+                "description": updated_doc.get("description"),
+                "invite_code": updated_doc.get("invite_code"),
+                "poc_id": str(updated_doc.get("poc_id")) if updated_doc.get("poc_id") else None,
+                "created_by": str(updated_doc.get("created_by")),
+                "updated_by": str(updated_doc.get("updated_by")),
+                "is_deleted": updated_doc.get("is_deleted", False),
+                "created_at": updated_doc.get("created_at"),
+                "updated_at": updated_doc.get("updated_at"),
+            }
+            dual_write_success = dual_write_service.update_document(
+                collection_name="teams", mongo_id=str(updated_doc["_id"]), data=team_data
+            )
+            if not dual_write_success:
+                logger.warning(f"Failed to sync team update {updated_doc['_id']} to Postgres")
+            return TeamModel(**updated_doc)

Ensure a module-level logger is available (see separate comment).


332-347: Reactivation path returns stale data and skips Postgres sync

When reactivating an existing relationship, you update Mongo but return the pre-update document and don’t sync Postgres. Use find_one_and_update with ReturnDocument.AFTER and call update_document.

-            if not existing_relationship.get("is_active", True):
-                collection.update_one(
-                    {"_id": existing_relationship["_id"]},
-                    {
-                        "$set": {
-                            "is_active": True,
-                            "role_id": role_id,
-                            "updated_by": created_by_user_id,
-                            "updated_at": datetime.now(timezone.utc),
-                        }
-                    },
-                )
-                return UserTeamDetailsModel(**existing_relationship)
+            if not existing_relationship.get("is_active", True):
+                updated = collection.find_one_and_update(
+                    {"_id": existing_relationship["_id"]},
+                    {
+                        "$set": {
+                            "is_active": True,
+                            "role_id": role_id,
+                            "updated_by": created_by_user_id,
+                            "updated_at": datetime.now(timezone.utc),
+                        }
+                    },
+                    return_document=ReturnDocument.AFTER,
+                )
+                if updated:
+                    dual_write_service = EnhancedDualWriteService()
+                    payload = {
+                        "user_id": str(updated["user_id"]),
+                        "team_id": str(updated["team_id"]),
+                        "is_active": True,
+                        "created_by": str(updated["created_by"]),
+                        "updated_by": str(updated["updated_by"]),
+                        "created_at": updated["created_at"],
+                        "updated_at": updated["updated_at"],
+                    }
+                    if not dual_write_service.update_document(
+                        collection_name="user_team_details", mongo_id=str(updated["_id"]), data=payload
+                    ):
+                        logger.warning(f"Failed to sync user team reactivation {updated['_id']} to Postgres")
+                    return UserTeamDetailsModel(**updated)
+                return UserTeamDetailsModel(**existing_relationship)

280-296: Optional: collapse read-then-update into a single atomic operation

You read the active relationship, then update it. This is two round trips and susceptible to races. Consider find_one_and_update with ReturnDocument.BEFORE/AFTER to both assert is_active and get the prior doc in one call (you already did this pattern elsewhere).

If you want, I can draft a patch to switch remove_user_from_team to a single atomic call with the same dual-write behavior.

todo/repositories/watchlist_repository.py (3)

28-36: Normalize ID string conversions for model initialization

You convert updatedBy to str when present, but not createdBy. Be consistent to avoid Pydantic coercion surprises.

         if doc:
             # Convert ObjectId fields to strings for the model
-            if "updatedBy" in doc and doc["updatedBy"]:
+            if "updatedBy" in doc and doc["updatedBy"]:
                 doc["updatedBy"] = str(doc["updatedBy"])
+            if "createdBy" in doc and doc["createdBy"]:
+                doc["createdBy"] = str(doc["createdBy"])
             return WatchlistModel(**doc)

317-325: Inconsistent type for updatedBy: store as str for consistency

Elsewhere IDs are stored as strings in watchlists. Here you’re writing an ObjectId. Standardize on strings to match queries and pipelines that rely on $toObjectId later.

         update_result = watchlist_collection.update_one(
             {"userId": str(userId), "taskId": str(taskId)},
             {
                 "$set": {
                     "isActive": isActive,
                     "updatedAt": datetime.now(timezone.utc),
-                    "updatedBy": userId,
+                    "updatedBy": str(userId),
                 }
             },
         )

307-353: Refactor update() return annotation in watchlist_repository.py

The update method is currently declared as returning a dict but actually returns a pymongo.results.UpdateResult or None. Please adjust the signature and imports as follows:

• File: todo/repositories/watchlist_repository.py
• Location: method definition on line ~307

Suggested diff:

-from datetime import datetime, timezone
-from typing import List, Tuple
-from typing import Optional
+from datetime import datetime, timezone
+from typing import List, Tuple, Optional
+from pymongo.results import UpdateResult

 class WatchlistRepository(MongoRepository):
@@
-    @classmethod
-    def update(cls, taskId: ObjectId, isActive: bool, userId: ObjectId) -> dict:
+    @classmethod
+    def update(cls, taskId: ObjectId, isActive: bool, userId: ObjectId) -> Optional[UpdateResult]:
         """
         Update the watchlist status of a task.
         """
  • Import UpdateResult from pymongo.results so the annotation resolves at runtime.
  • Use Optional[UpdateResult] to accurately reflect that the method may return None.
todo/repositories/task_repository.py (1)

269-293: Soft-delete is not dual-written; Postgres gets out of sync.

delete_by_id marks isDeleted=True in Mongo but never syncs to Postgres. Add a dual-write update to keep Postgres consistent.

         deleted_task_data = tasks_collection.find_one_and_update(
             {"_id": task_id},
             {
                 "$set": {
                     "isDeleted": True,
                     "updatedAt": datetime.now(timezone.utc),
                     "updatedBy": user_id,
                 }
             },
             return_document=ReturnDocument.AFTER,
         )
 
-        if deleted_task_data:
-            return TaskModel(**deleted_task_data)
+        if deleted_task_data:
+            # Dual-write soft delete to Postgres
+            try:
+                dual_write_service = EnhancedDualWriteService()
+                dw_payload = {
+                    "isDeleted": True,
+                    "updatedAt": deleted_task_data["updatedAt"],
+                    "updatedBy": str(user_id),
+                }
+                dual_write_service.update_document(
+                    collection_name="tasks",
+                    mongo_id=str(task_id),
+                    data=dw_payload,
+                )
+            except Exception:
+                # Best-effort: do not block Mongo success
+                import logging
+                logging.getLogger(__name__).warning(f"Failed to sync task soft-delete {task_id} to Postgres")
+            return TaskModel(**deleted_task_data)
         return None
todo/repositories/task_assignment_repository.py (2)

158-166: Potential crash: PyObjectId(None) for team_id when user_type != "team".

Constructing PyObjectId with None will raise. Guard the conversion.

             new_assignment = TaskAssignmentModel(
                 _id=PyObjectId(),
                 task_id=PyObjectId(task_id),
                 assignee_id=PyObjectId(assignee_id),
                 user_type=user_type,
                 created_by=PyObjectId(user_id),
                 updated_by=None,
-                team_id=PyObjectId(team_id),
+                team_id=PyObjectId(team_id) if team_id else None,
             )

243-266: Type drift: storing assignee_id/updated_by as strings; use ObjectId consistently.

Mixed types in Mongo will break queries and necessitate fallbacks. Keep ObjectId for both update paths.

             result = collection.update_one(
                 {"task_id": ObjectId(task_id), "is_active": True},
                 {
                     "$set": {
-                        "assignee_id": executor_id,
+                        "assignee_id": ObjectId(executor_id),
                         "user_type": "user",
-                        "updated_by": user_id,
+                        "updated_by": ObjectId(user_id),
                         "updated_at": datetime.now(timezone.utc),
                     }
                 },
             )
             if result.modified_count == 0:
                 # Try with string if ObjectId doesn't work
                 result = collection.update_one(
                     {"task_id": task_id, "is_active": True},
                     {
                         "$set": {
-                            "assignee_id": executor_id,
+                            "assignee_id": ObjectId(executor_id),
                             "user_type": "user",
-                            "updated_by": user_id,
+                            "updated_by": ObjectId(user_id),
                             "updated_at": datetime.now(timezone.utc),
                         }
                     },
                 )
docker-compose.yml (1)

93-95: Connect mongo-express to the replica set

The Mongo container runs with --replSet rs0. Point mongo-express at the replica set URI for correctness.

Apply this diff:

-      ME_CONFIG_MONGODB_URL: mongodb://db:27017/
+      ME_CONFIG_MONGODB_URL: mongodb://db:27017/?replicaSet=rs0
todo_project/settings/base.py (1)

239-239: Safe default for CORS_ALLOWED_ORIGINS

Accessing os.getenv("CORS_ALLOWED_ORIGINS").split(",") will raise when the env var is unset. Provide a safe default.

Apply this diff:

-CORS_ALLOWED_ORIGINS = os.getenv("CORS_ALLOWED_ORIGINS").split(",")
+_cors = os.getenv("CORS_ALLOWED_ORIGINS", "")
+CORS_ALLOWED_ORIGINS = [o for o in _cors.split(",") if o] if _cors else []
♻️ Duplicate comments (1)
todo/repositories/task_assignment_repository.py (1)

90-131: Honor “exclusive assignment” rule from past learnings during update.

Per retrieved learnings, updates should ensure exclusivity by deactivating all active assignments before creating a new one. You do two update_many calls (ObjectId + string), which is correct; ensure no race by reading with majority write concern if available.

Would you like me to add a transactional pattern (session with retry) around both updates and the subsequent create to minimize race windows?

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review continued from previous batch...

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review continued from previous batch...

Copy link

coderabbitai bot commented Aug 25, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbit review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Summary by CodeRabbit

  • New Features

    • Added optional PostgreSQL support with dual-write synchronization alongside MongoDB.
    • Introduced a command to sync PostgreSQL tables from existing data.
    • Enabled Django admin, sessions, and messages.
  • Documentation

    • Added comprehensive guide on the dual-write architecture and migration path.
    • Minor README formatting update.
  • Chores

    • Updated environment variables sample for PostgreSQL.
    • Added PostgreSQL service to docker-compose with persistent storage.
    • Automatic Postgres table sync triggered during app initialization.
    • Updated dependencies (including PostgreSQL driver).
  • Tests

    • Added test settings disabling dual-write and database setup for faster tests.

Walkthrough

Adds PostgreSQL integration: models, initial migration, repositories, services for dual-write from MongoDB, and sync utilities. Updates settings, docker-compose, and requirements. Introduces management command and initialization sync. Provides extensive documentation for the dual-write system. Minor README formatting and small service import/order tweaks.

Changes

Cohort / File(s) Summary
Env & Orchestration
/.env.example, /docker-compose.yml, /README.md
Adds Postgres env vars (note: colon syntax used), introduces postgres service and volume in compose, wires app env/dependencies; README formatting-only tweak.
Dependencies
/requirements.txt
Adds psycopg2-binary; bumps platformdirs.
Settings & Init
/todo_project/settings/base.py, /todo_project/settings/test.py, /todo_project/db/init.py, /todo_project/__init__.py
Switches default DB to Postgres (non-test), adds dual-write flags, sessions/messages/admin middleware/apps, test settings disable dual-write and DB; init runs Postgres sync after migrations; comment update.
Docs
/docs/DUAL_WRITE_SYSTEM.md
New doc detailing Mongo↔Postgres dual-write architecture, config, operations, monitoring, and migration path.
Migrations
/todo/migrations/0001_initial_setup.py, /todo/migrations/__init__.py
Initial Postgres schema migration creating models and indexes; package init file added.
Postgres Models
/todo/models/postgres/*
New Postgres ORM models (User, Task, DeferredDetails, TaskLabel, Team, UserTeamDetails, Label, Role, TaskAssignment, Watchlist, UserRole, AuditLog, TeamCreationInviteCode) and package exports.
Repositories — Abstract & Postgres
/todo/repositories/abstract_repository.py, /todo/repositories/postgres_repository.py
Introduces abstract repository interfaces and Postgres repository implementations with CRUD, filtering, and domain lookups.
Repositories — Dual-write Integration
/todo/repositories/audit_log_repository.py, /todo/repositories/task_repository.py, /todo/repositories/task_assignment_repository.py, /todo/repositories/watchlist_repository.py, /todo/repositories/team_repository.py, /todo/repositories/team_creation_invite_code_repository.py, /todo/repositories/user_repository.py, /todo/repositories/user_role_repository.py, /todo/repositories/user_team_details_repository.py
Adds EnhancedDualWriteService calls to create/update/delete across collections; adds helper methods in some repos; logs warnings on sync failures.
Services — Dual-write
/todo/services/dual_write_service.py, /todo/services/enhanced_dual_write_service.py
New DualWriteService with collection→model mapping, transforms, and task label sync; EnhancedDualWriteService adds enable flag, batch ops, metrics, status, and retry.
Service — Postgres Sync
/todo/services/postgres_sync_service.py
New service to bulk sync Mongo data (labels, roles) into Postgres with checks and logging.
Management Command
/todo/management/commands/sync_postgres_tables.py
Adds Django command to trigger Postgres sync with optional --force.
Minor Service Tweak
/todo/services/task_assignment_service.py
Import reordering and minor formatting; no logic change.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Client
  participant Repo as Mongo Repository
  participant Mongo as MongoDB
  participant DWS as EnhancedDualWriteService
  participant PG as Postgres (Django ORM)

  rect rgb(235,245,255)
    note over Client,PG: Create flow (example: Task)
    Client->>Repo: create(payload)
    Repo->>Mongo: insertOne(payload)
    Mongo-->>Repo: insertedId
    Repo->>DWS: create_document("tasks", mapped, mongo_id)
    alt Dual-write enabled
      DWS->>PG: ORM create/related sync
      PG-->>DWS: success/failure
      DWS-->>Repo: bool
    else Disabled
      DWS-->>Repo: skipped (true)
    end
    Repo-->>Client: TaskModel
  end
Loading
sequenceDiagram
  autonumber
  participant Repo as Mongo Repository
  participant Mongo as MongoDB
  participant DWS as EnhancedDualWriteService
  participant PG as Postgres (Django ORM)

  rect rgb(245,235,255)
    note over Repo,PG: Update with deferred details
    Repo->>Mongo: updateOne(filter, update)
    Mongo-->>Repo: modifiedCount
    alt modifiedCount > 0
      Repo->>DWS: update_document("tasks", mongo_id, mapped)
      DWS->>PG: ORM update
      PG-->>DWS: ok
      opt deferredDetails present
        Repo->>PG: upsert PostgresDeferredDetails
        PG-->>Repo: ok
      end
    end
  end
Loading
sequenceDiagram
  autonumber
  participant Django as Django Init
  participant PSS as PostgresSyncService
  participant Mongo as MongoDB
  participant PG as Postgres

  rect rgb(235,255,245)
    note over Django,PG: App startup sync (labels, roles)
    Django->>PSS: sync_all_tables()
    PSS->>PG: check table exists
    PSS->>Mongo: count collections
    PSS->>PG: count tables
    alt needs sync
      PSS->>Mongo: iterate documents
      loop docs
        PSS->>PG: create if not exists (by mongo_id)
      end
    else skip
      PSS-->>Django: no-op
    end
    PSS-->>Django: boolean result
  end
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related issues

Possibly related PRs

Suggested reviewers

  • iamitprakash
  • prakashchoudhary07
  • vinit717
  • lakshayman
  • pankajjs

Poem

In burrows of bytes I thump with delight,
Two warrens now—Mongo’s left, Postgres right.
I hop, I copy, I sync without fright,
Labels and roles gleam tidy and bright.
With whiskered wisdom and ears at full height,
I dual-write dreams into tables by night. 🐇✨

✨ Finishing Touches
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch pg-migration

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbit in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbit in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbit gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbit read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbit help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbit ignore or @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbit summary or @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbit or @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Comment on lines +371 to +373
PostgresTaskAssignment.objects.filter(task_mongo_id=task_mongo_id).update(
status="REJECTED", # Mark as rejected instead of deleting
updated_at=timezone.now(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There appears to be a schema mismatch in the _sync_task_assignment_update method. The code is attempting to update a status field to "REJECTED", but according to the PostgresTaskAssignment model definition in this PR, this model doesn't have a status field. Instead, it uses an is_active boolean field for tracking assignment state.

The update should be modified to:

PostgresTaskAssignment.objects.filter(task_mongo_id=task_mongo_id).update(
    is_active=False,  # Mark as inactive instead of using status
    updated_at=timezone.now(),
)

This would align with the model's schema and maintain consistency with how assignment state is tracked throughout the application.

Suggested change
PostgresTaskAssignment.objects.filter(task_mongo_id=task_mongo_id).update(
status="REJECTED", # Mark as rejected instead of deleting
updated_at=timezone.now(),
PostgresTaskAssignment.objects.filter(task_mongo_id=task_mongo_id).update(
is_active=False, # Mark as inactive instead of using status
updated_at=timezone.now(),

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 34

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (8)
todo/services/task_assignment_service.py (5)

46-53: Make audit “unassign” logging best-effort (don’t fail the main flow) and de-risk typos

Recommendation:

  • Wrap audit log writes in try/except so logging failures don’t break assignment updates.
  • Centralize action names as constants or an Enum to avoid string typos.
-            if existing_assignment.user_type == "team":
-                AuditLogRepository.create(
-                    AuditLogModel(
-                        task_id=existing_assignment.task_id,
-                        team_id=existing_assignment.assignee_id,
-                        action="unassigned_from_team",
-                        performed_by=PyObjectId(user_id),
-                    )
-                )
+            if existing_assignment.user_type == "team":
+                try:
+                    AuditLogRepository.create(
+                        AuditLogModel(
+                            task_id=existing_assignment.task_id,
+                            team_id=existing_assignment.assignee_id,
+                            action="unassigned_from_team",  # TODO: replace with AuditAction.UNASSIGNED_FROM_TEAM
+                            performed_by=PyObjectId(user_id),
+                        )
+                    )
+                except Exception:
+                    # TODO: replace with project logger
+                    # logger.exception("Failed to write audit log for team unassignment")
+                    pass

75-83: Apply the same best-effort pattern to “assigned_to_team” logging and consider action constants

Mirror the try/except used for unassign logs and avoid raw string actions.

-        if assignment.user_type == "team":
-            AuditLogRepository.create(
-                AuditLogModel(
-                    task_id=assignment.task_id,
-                    team_id=assignment.assignee_id,
-                    action="assigned_to_team",
-                    performed_by=PyObjectId(user_id),
-                )
-            )
+        if assignment.user_type == "team":
+            try:
+                AuditLogRepository.create(
+                    AuditLogModel(
+                        task_id=assignment.task_id,
+                        team_id=assignment.assignee_id,
+                        action="assigned_to_team",  # TODO: replace with AuditAction.ASSIGNED_TO_TEAM
+                        performed_by=PyObjectId(user_id),
+                    )
+                )
+            except Exception:
+                # logger.exception("Failed to write audit log for team assignment")
+                pass

85-97: Remove or track the commented “legacy” block

Commented code risks rot. Either:

  • Remove it, or
  • Add a TODO with an issue reference/owner, or
  • Hide behind a feature flag if it must linger.

41-60: Ensure TaskAssignment dual-write enforces single active assignment and atomic consistency

The current implementation in TaskAssignmentRepository.update_assignment does not appear to:

  • Deactivate any prior active assignments for that task in Mongo (is_active = false) before marking the new one active.
  • Mirror that deactivation in Postgres (and the subsequent new assignment insert/update) within the same logical operation.
  • Enforce via a database constraint that only one “active” assignment per task can exist in Postgres.

Mandatory fixes:

  • In TaskAssignmentRepository.update_assignment (around todo/repositories/task_assignment_repository.py:90–120), add an explicit deactivation step that sets is_active = False on all existing assignments for the given task_id in Mongo, then create/update the new assignment record (and use the dual-write service to sync both the deactivation and the new activation to Postgres).
  • In your Django migrations (e.g. todo/migrations/0001_initial_setup.py), add a partial unique index on postgres_task_assignments over (task_mongo_id) where is_active = true, or a unique_together + CheckConstraint equivalent, to enforce “one active assignment per task” at the database level.
  • Introduce idempotency support on your dual-write paths (e.g. outbox table entries or idempotency keys in EnhancedDualWriteService) so that retries don’t create duplicate or conflicting records.
  • Document the write ordering and failure-recovery strategy in docs/DUAL_WRITE_SYSTEM.md (e.g., “Mongo deactivation → PG deactivation → Mongo creation → PG creation with transaction boundaries, backed by an outbox that retries on failure”).

35-39: Add and map TeamNotFoundException for consistent 404 handling

The codebase currently lacks a TeamNotFoundException and a corresponding API error message, so replacing the ValueError won’t work until we introduce and wire up a new exception type. Please:

  • Create the exception
    + todo/exceptions/team_exceptions.py
    +-------------------------------
    + from todo.constants.messages import ApiErrors
    +
    + class TeamNotFoundException(Exception):
    +     def __init__(self, team_id: str | None = None, message_template: str = ApiErrors.TEAM_NOT_FOUND):
    +         if team_id:
    +             message = message_template.format(team_id)
    +         else:
    +             message = message_template
    +         super().__init__(message)
  • Add the “team not found” API error constant
     todo/constants/messages.py
     ---------------------------
     class ApiErrors:
         USER_NOT_FOUND = "User not found: {}"
         TASK_NOT_FOUND = "Task not found: {}"
    +    TEAM_NOT_FOUND = "Team not found: {}"
  • Wire it into the global exception handler
     todo/exceptions/exception_handler.py
     ------------------------------------
         elif isinstance(exc, TaskNotFoundException):
             status_code = status.HTTP_404_NOT_FOUND
             error_list.append(ApiErrorDetail(detail=str(exc)))
    +    elif isinstance(exc, TeamNotFoundException):
    +        status_code = status.HTTP_404_NOT_FOUND
    +        error_list.append(ApiErrorDetail(detail=str(exc)))
  • Update the service to raise the new exception
     todo/services/task_assignment_service.py
     ----------------------------------------
         elif dto.user_type == "team":
             assignee = TeamRepository.get_by_id(dto.assignee_id)
             if not assignee:
    -             raise ValueError(f"Team not found: {dto.assignee_id}")
    +             raise TeamNotFoundException(dto.assignee_id)
  • Don’t forget to add:
    from todo.exceptions.team_exceptions import TeamNotFoundException

These changes will ensure missing‐team errors use a domain‐specific exception and return HTTP 404 in line with users/tasks.

todo/repositories/team_creation_invite_code_repository.py (2)

31-35: Bug: return_document must be ReturnDocument.AFTER, not True

PyMongo requires the ReturnDocument enum; passing True will raise a TypeError at runtime.

+from pymongo.collection import ReturnDocument
@@
-            result = collection.find_one_and_update(
+            result = collection.find_one_and_update(
                 {"code": code, "is_used": False},
-                {"$set": {"is_used": True, "used_by": used_by, "used_at": current_time.isoformat()}},
-                return_document=True,
+                {"$set": {"is_used": True, "used_by": used_by, "used_at": current_time}},
+                return_document=ReturnDocument.AFTER,
             )

31-35: Ensure used_by is stored as ObjectId in Mongo; normalize created_at for dual-write

Keep Mongo types consistent (ObjectId) and send normalized datetimes to Postgres to avoid serialization errors.

 from datetime import datetime, timezone
+from bson import ObjectId
@@
-            result = collection.find_one_and_update(
+            # normalize used_by to ObjectId where possible
+            try:
+                used_by_value = ObjectId(used_by) if used_by else None
+            except Exception:
+                used_by_value = used_by  # fallback to existing behavior
+            result = collection.find_one_and_update(
                 {"code": code, "is_used": False},
-                {"$set": {"is_used": True, "used_by": used_by, "used_at": current_time}},
+                {"$set": {"is_used": True, "used_by": used_by_value, "used_at": current_time}},
                 return_document=ReturnDocument.AFTER,
             )
@@
-                invite_code_data = {
+                # normalize created_at (legacy records may be strings)
+                _created_at = result.get("created_at")
+                if isinstance(_created_at, str):
+                    _created_at = datetime.fromisoformat(_created_at.replace("Z", "+00:00"))
+                invite_code_data = {
                     "code": result["code"],
                     "description": result.get("description"),
                     "is_used": True,
                     "created_by": str(result["created_by"]),
-                    "used_by": str(used_by),
-                    "created_at": result.get("created_at"),
+                    "used_by": str(used_by) if used_by else None,
+                    "created_at": _created_at,
                     "used_at": current_time,
                 }

Also applies to: 41-48

todo/repositories/user_repository.py (1)

1-12: Use a module-level logger for dual-write warnings

Avoid inline imports; define once and reuse.

 from datetime import datetime, timezone
 from typing import Optional, List
 from pymongo.collection import ReturnDocument
 from pymongo import ASCENDING
+import logging
+logger = logging.getLogger(__name__)
@@
-            if not dual_write_success:
-                import logging
-
-                logger = logging.getLogger(__name__)
-                logger.warning(f"Failed to sync user {user_model.id} to Postgres")
+            if not dual_write_success:
+                logger.warning(f"Failed to sync user {user_model.id} to Postgres")

Also applies to: 87-92

@@ -1 +1 @@
# Added this because without this file Django isn't able to auto detect the test files
# Django project initialization
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Prefer a module docstring over a comment for introspection and tooling

Switching to a docstring makes the message accessible via todo_project.__doc__ and improves consistency with Python conventions.

-# Django project initialization
+"""Django project initialization."""
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Django project initialization
"""Django project initialization."""
🤖 Prompt for AI Agents
In todo_project/__init__.py around lines 1 to 1, replace the top-line comment
with a module-level docstring so the description becomes available via
todo_project.__doc__; change the single-line comment into a quoted string
literal (triple quotes if multi-line) as the first statement in the file to
follow Python conventions and enable introspection and tooling to read the
module doc.

Comment on lines +141 to 142
"REFRESH_TOKEN_LIFETIME": int(os.getenv("REFRESH_TOKEN_LIFETIME", "604800")),
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Env var mismatch for refresh token lifetime between TESTING and non-TESTING
Non-testing uses REFRESH_TOKEN_LIFETIME (Line 141) while TESTING uses REFRESH_LIFETIME (Line 133). This creates confusing behavior and broken overrides in CI. Standardize on one key.

Apply this diff for the non-testing block (and mirror it in TESTING, see below):

-        "REFRESH_TOKEN_LIFETIME": int(os.getenv("REFRESH_TOKEN_LIFETIME", "604800")),
+        "REFRESH_TOKEN_LIFETIME": int(os.getenv("REFRESH_TOKEN_LIFETIME", "604800")),

Update the TESTING block outside the shown range:

-        "REFRESH_TOKEN_LIFETIME": int(os.getenv("REFRESH_LIFETIME", "604800")),
+        "REFRESH_TOKEN_LIFETIME": int(os.getenv("REFRESH_TOKEN_LIFETIME", "604800")),
🤖 Prompt for AI Agents
In todo_project/settings/base.py around lines 141-142, the non-TESTING config
uses REFRESH_TOKEN_LIFETIME while the TESTING block uses REFRESH_LIFETIME (line
~133); standardize to REFRESH_LIFETIME by replacing REFRESH_TOKEN_LIFETIME with
REFRESH_LIFETIME in this non-TESTING block and then mirror the same key and
getenv usage in the TESTING block so both environments read the same env var
name and default value.

Comment on lines +178 to +219
name="PostgresTaskAssignment",
fields=[
("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")),
("mongo_id", models.CharField(blank=True, max_length=24, null=True, unique=True)),
("task_mongo_id", models.CharField(max_length=24)),
("assignee_id", models.CharField(max_length=24)),
(
"user_type",
models.CharField(
choices=[("user", "User"), ("team", "Team")],
max_length=10,
),
),
("team_id", models.CharField(blank=True, max_length=24, null=True)),
("is_active", models.BooleanField(default=True)),
("created_at", models.DateTimeField(default=django.utils.timezone.now)),
("updated_at", models.DateTimeField(blank=True, null=True)),
("created_by", models.CharField(max_length=24)),
("updated_by", models.CharField(blank=True, max_length=24, null=True)),
("last_sync_at", models.DateTimeField(auto_now=True)),
(
"sync_status",
models.CharField(
choices=[("SYNCED", "Synced"), ("PENDING", "Pending"), ("FAILED", "Failed")],
default="SYNCED",
max_length=20,
),
),
("sync_error", models.TextField(blank=True, null=True)),
],
options={
"db_table": "postgres_task_assignments",
"indexes": [
models.Index(fields=["mongo_id"], name="postgres_ta_mongo_i_326fa9_idx"),
models.Index(fields=["task_mongo_id"], name="postgres_ta_task_mo_95ca3b_idx"),
models.Index(fields=["assignee_id"], name="postgres_ta_assignee_95ca3b_idx"),
models.Index(fields=["user_type"], name="postgres_ta_user_typ_d13fa3_idx"),
models.Index(fields=["team_id"], name="postgres_ta_team_id_a0605f_idx"),
models.Index(fields=["is_active"], name="postgres_ta_is_acti_8b9698_idx"),
models.Index(fields=["sync_status"], name="postgres_ta_sync_st_385c3f_idx"),
],
},
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add unique active-assignment constraint (+ team_id check) to PostgresTaskAssignment

Mirrors model-level constraints to protect data integrity at the DB layer.

         migrations.CreateModel(
             name="PostgresTaskAssignment",
             fields=[
@@
             ],
             options={
                 "db_table": "postgres_task_assignments",
                 "indexes": [
@@
                     models.Index(fields=["sync_status"], name="postgres_ta_sync_st_385c3f_idx"),
                 ],
+                "constraints": [
+                    models.UniqueConstraint(
+                        fields=("task_mongo_id", "assignee_id", "user_type"),
+                        condition=models.Q(("is_active", True)),
+                        name="uniq_active_assignment_per_assignee",
+                    ),
+                    models.CheckConstraint(
+                        check=~models.Q(("user_type", "team"), ("team_id__isnull", True)),
+                        name="team_assignment_requires_team_id",
+                    ),
+                ],
             },
         ),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
name="PostgresTaskAssignment",
fields=[
("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")),
("mongo_id", models.CharField(blank=True, max_length=24, null=True, unique=True)),
("task_mongo_id", models.CharField(max_length=24)),
("assignee_id", models.CharField(max_length=24)),
(
"user_type",
models.CharField(
choices=[("user", "User"), ("team", "Team")],
max_length=10,
),
),
("team_id", models.CharField(blank=True, max_length=24, null=True)),
("is_active", models.BooleanField(default=True)),
("created_at", models.DateTimeField(default=django.utils.timezone.now)),
("updated_at", models.DateTimeField(blank=True, null=True)),
("created_by", models.CharField(max_length=24)),
("updated_by", models.CharField(blank=True, max_length=24, null=True)),
("last_sync_at", models.DateTimeField(auto_now=True)),
(
"sync_status",
models.CharField(
choices=[("SYNCED", "Synced"), ("PENDING", "Pending"), ("FAILED", "Failed")],
default="SYNCED",
max_length=20,
),
),
("sync_error", models.TextField(blank=True, null=True)),
],
options={
"db_table": "postgres_task_assignments",
"indexes": [
models.Index(fields=["mongo_id"], name="postgres_ta_mongo_i_326fa9_idx"),
models.Index(fields=["task_mongo_id"], name="postgres_ta_task_mo_95ca3b_idx"),
models.Index(fields=["assignee_id"], name="postgres_ta_assignee_95ca3b_idx"),
models.Index(fields=["user_type"], name="postgres_ta_user_typ_d13fa3_idx"),
models.Index(fields=["team_id"], name="postgres_ta_team_id_a0605f_idx"),
models.Index(fields=["is_active"], name="postgres_ta_is_acti_8b9698_idx"),
models.Index(fields=["sync_status"], name="postgres_ta_sync_st_385c3f_idx"),
],
},
migrations.CreateModel(
name="PostgresTaskAssignment",
fields=[
("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")),
("mongo_id", models.CharField(blank=True, max_length=24, null=True, unique=True)),
("task_mongo_id", models.CharField(max_length=24)),
("assignee_id", models.CharField(max_length=24)),
(
"user_type",
models.CharField(
choices=[("user", "User"), ("team", "Team")],
max_length=10,
),
),
("team_id", models.CharField(blank=True, max_length=24, null=True)),
("is_active", models.BooleanField(default=True)),
("created_at", models.DateTimeField(default=django.utils.timezone.now)),
("updated_at", models.DateTimeField(blank=True, null=True)),
("created_by", models.CharField(max_length=24)),
("updated_by", models.CharField(blank=True, max_length=24, null=True)),
("last_sync_at", models.DateTimeField(auto_now=True)),
(
"sync_status",
models.CharField(
choices=[("SYNCED", "Synced"), ("PENDING", "Pending"), ("FAILED", "Failed")],
default="SYNCED",
max_length=20,
),
),
("sync_error", models.TextField(blank=True, null=True)),
],
options={
"db_table": "postgres_task_assignments",
"indexes": [
models.Index(fields=["mongo_id"], name="postgres_ta_mongo_i_326fa9_idx"),
models.Index(fields=["task_mongo_id"], name="postgres_ta_task_mo_95ca3b_idx"),
models.Index(fields=["assignee_id"], name="postgres_ta_assignee_95ca3b_idx"),
models.Index(fields=["user_type"], name="postgres_ta_user_typ_d13fa3_idx"),
models.Index(fields=["team_id"], name="postgres_ta_team_id_a0605f_idx"),
models.Index(fields=["is_active"], name="postgres_ta_is_acti_8b9698_idx"),
models.Index(fields=["sync_status"], name="postgres_ta_sync_st_385c3f_idx"),
],
"constraints": [
models.UniqueConstraint(
fields=("task_mongo_id", "assignee_id", "user_type"),
condition=models.Q(("is_active", True)),
name="uniq_active_assignment_per_assignee",
),
models.CheckConstraint(
check=~models.Q(("user_type", "team"), ("team_id__isnull", True)),
name="team_assignment_requires_team_id",
),
],
},
),
🤖 Prompt for AI Agents
In todo/migrations/0001_initial_setup.py around lines 178 to 219, the DB-level
unique constraint for active assignments is missing; add a UniqueConstraint that
enforces uniqueness of (task_mongo_id, assignee_id, user_type, team_id) only
when is_active is True. Update the migration to include a UniqueConstraint (e.g.
models.UniqueConstraint(fields=["task_mongo_id","assignee_id","user_type","team_id"],
condition=models.Q(is_active=True), name="postgres_ta_active_unique_...")) in
the model options (or as an AddConstraint operation), and ensure you reference
models.Q in the migration so the partial (conditional) uniqueness is applied and
the team_id check is included.

Comment on lines +9 to +32
@classmethod
def get_by_user_and_team(cls, user_id: str, team_id: str):
collection = cls.get_collection()
try:
user_id_obj = ObjectId(user_id)
except Exception:
user_id_obj = user_id
try:
team_id_obj = ObjectId(team_id)
except Exception:
team_id_obj = team_id

queries = [
{"user_id": user_id_obj, "team_id": team_id_obj},
{"user_id": user_id, "team_id": team_id_obj},
{"user_id": user_id_obj, "team_id": team_id},
{"user_id": user_id, "team_id": team_id},
]

for query in queries:
result = collection.find_one(query)
if result:
return result
return None
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Add a compound index on (user_id, team_id) in Mongo

Frequent lookups by this pair (with $in) will otherwise scan the collection. Add a migration/init to create an index: [("user_id", 1), ("team_id", 1)].

🤖 Prompt for AI Agents
In todo/repositories/user_team_details_repository.py around lines 9 to 32, the
repository performs frequent lookups by (user_id, team_id) and needs a compound
index to avoid collection scans; add a DB migration or initialization step that
calls the collection.create_index with the compound key [("user_id", 1),
("team_id", 1)] (set background=True so it doesn't block, and consider
unique=True only if the data model guarantees one document per (user_id,
team_id)); ensure the migration runs at service startup or part of your
migrations pipeline so the index exists before heavy queries run.

Comment on lines +135 to +143
except postgres_model.DoesNotExist:
# Document doesn't exist in Postgres, create it
return self.create_document(collection_name, data, mongo_id)
except Exception as e:
error_msg = f"Failed to update {collection_name}:{mongo_id} in Postgres: {str(e)}"
logger.error(error_msg)
self._record_sync_failure(collection_name, mongo_id, error_msg)
return False

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Catch ObjectDoesNotExist instead of model-scoped DoesNotExist

Avoids referencing postgres_model in except and works uniformly across models.

+from django.core.exceptions import ObjectDoesNotExist
@@
-        except postgres_model.DoesNotExist:
+        except ObjectDoesNotExist:
             # Document doesn't exist in Postgres, create it
             return self.create_document(collection_name, data, mongo_id)
@@
-        except postgres_model.DoesNotExist:
+        except ObjectDoesNotExist:
             logger.warning(f"Document {collection_name}:{mongo_id} not found in Postgres for deletion")
             return True  # Consider this a success since the goal is achieved

Also applies to: 176-183

🤖 Prompt for AI Agents
In todo/services/dual_write_service.py around lines 135-143, the except clause
currently catches postgres_model.DoesNotExist; replace that with the generic
ObjectDoesNotExist (imported from django.core.exceptions) so the handler works
uniformly across models; update the import at the top of the file to include
ObjectDoesNotExist and change the other similar block at lines 176-183 to use
ObjectDoesNotExist as well, leaving the other Exception catch and error logging
unchanged.

Comment on lines +266 to +279
def _transform_team_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Transform team data for Postgres."""
return {
"name": data.get("name"),
"description": data.get("description"),
"invite_code": data.get("invite_code"),
"poc_id": str(data.get("poc_id", "")) if data.get("poc_id") else None,
"created_by": str(data.get("created_by", "")),
"updated_by": str(data.get("updated_by", "")),
"is_deleted": data.get("is_deleted", False),
"created_at": data.get("created_at"),
"updated_at": data.get("updated_at"),
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Inconsistent transform input casing for teams (snake vs camel)

Team transform expects snake_case (invite_code, created_by) while tasks/labels use camelCase (createdAt). Standardize or accept both to reduce surprises.

I can patch _transform_team_data to read both variants, e.g.:

return {
    "name": data.get("name"),
    "description": data.get("description"),
    "invite_code": data.get("invite_code") or data.get("inviteCode"),
    "poc_id": str(data.get("poc_id") or data.get("pocId")) if (data.get("poc_id") or data.get("pocId")) else None,
    "created_by": str(data.get("created_by") or data.get("createdBy", "")),
    "updated_by": str(data.get("updated_by") or data.get("updatedBy", "")),
    ...
}

Want me to apply this pattern consistently across all transforms?

🤖 Prompt for AI Agents
In todo/services/dual_write_service.py around lines 266 to 279, the team
transform only reads snake_case keys while other transforms accept camelCase,
causing inconsistencies; update _transform_team_data to accept both snake_case
and camelCase for each field (e.g., invite_code or inviteCode, poc_id or pocId,
created_by or createdBy, updated_by or updatedBy, created_at or createdAt,
updated_at or updatedAt), prefer the snake_case value when present, convert
poc_id/created_by/updated_by to strings with the same None/default logic as now,
and apply this same dual-key pattern consistently to the other transform
functions so all transforms accept both casings.

Comment on lines +119 to +145
def get_sync_metrics(self) -> Dict[str, Any]:
"""
Get metrics about sync operations.
Returns:
Dict: Sync metrics
"""
try:
metrics = {
"total_failures": len(self.sync_failures),
"failures_by_collection": {},
"recent_failures": self.sync_failures[-10:] if self.sync_failures else [],
"enabled": self.enabled,
}

# Count failures by collection
for failure in self.sync_failures:
collection = failure["collection"]
if collection not in metrics["failures_by_collection"]:
metrics["failures_by_collection"][collection] = 0
metrics["failures_by_collection"][collection] += 1

return metrics
except Exception as e:
logger.error(f"Error getting sync metrics: {str(e)}")
return {}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Sanitize recent_failures in metrics to avoid leaking payloads

If failures later store operation/data, do not expose raw data in metrics. Return a redacted subset.

-            metrics = {
-                "total_failures": len(self.sync_failures),
-                "failures_by_collection": {},
-                "recent_failures": self.sync_failures[-10:] if self.sync_failures else [],
-                "enabled": self.enabled,
-            }
+            def _redact(fr):
+                keys = ("collection", "mongo_id", "error", "timestamp", "operation")
+                return {k: fr.get(k) for k in keys if k in fr}
+            metrics = {
+                "total_failures": len(self.sync_failures),
+                "failures_by_collection": {},
+                "recent_failures": [_redact(fr) for fr in (self.sync_failures[-10:] if self.sync_failures else [])],
+                "enabled": self.enabled,
+            }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def get_sync_metrics(self) -> Dict[str, Any]:
"""
Get metrics about sync operations.
Returns:
Dict: Sync metrics
"""
try:
metrics = {
"total_failures": len(self.sync_failures),
"failures_by_collection": {},
"recent_failures": self.sync_failures[-10:] if self.sync_failures else [],
"enabled": self.enabled,
}
# Count failures by collection
for failure in self.sync_failures:
collection = failure["collection"]
if collection not in metrics["failures_by_collection"]:
metrics["failures_by_collection"][collection] = 0
metrics["failures_by_collection"][collection] += 1
return metrics
except Exception as e:
logger.error(f"Error getting sync metrics: {str(e)}")
return {}
def get_sync_metrics(self) -> Dict[str, Any]:
"""
Get metrics about sync operations.
Returns:
Dict: Sync metrics
"""
try:
# Redact failure details to avoid exposing full payloads
def _redact(fr):
keys = ("collection", "mongo_id", "error", "timestamp", "operation")
return {k: fr.get(k) for k in keys if k in fr}
metrics = {
"total_failures": len(self.sync_failures),
"failures_by_collection": {},
"recent_failures": [
_redact(fr)
for fr in (self.sync_failures[-10:] if self.sync_failures else [])
],
"enabled": self.enabled,
}
# Count failures by collection
for failure in self.sync_failures:
collection = failure["collection"]
if collection not in metrics["failures_by_collection"]:
metrics["failures_by_collection"][collection] = 0
metrics["failures_by_collection"][collection] += 1
return metrics
except Exception as e:
logger.error(f"Error getting sync metrics: {str(e)}")
return {}
🤖 Prompt for AI Agents
In todo/services/enhanced_dual_write_service.py around lines 119 to 145, the
current get_sync_metrics returns recent_failures containing full failure objects
which can leak operation/data; update the function to redact recent_failures by
mapping each failure to a safe subset (e.g., timestamp, collection, error
message/summary, and an optional non-sensitive identifier or hashed id) and
explicitly omit any fields like "operation", "data", "payload" or other raw
content; implement this by building a small helper or inline mapping that
picks/derives only the allowed keys for the last 10 failures before returning
metrics, and keep logging unchanged for exceptions.

Comment on lines +70 to +85
with connection.cursor() as cursor:
cursor.execute(
"""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = %s
);
""",
[table_name],
)
return cursor.fetchone()[0]
except Exception as e:
logger.error(f"Error checking if table {table_name} exists: {str(e)}")
return False

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Error handling and logging are good; consider wrapping per-table operations in a transaction

Optional: Surround bulk creates with transaction.atomic() to ensure atomicity per table.

Also applies to: 110-127

🤖 Prompt for AI Agents
In todo/services/postgres_sync_service.py around lines 70-85 (also apply same
change to 110-127), per-table bulk create/check operations aren’t wrapped in a
transaction; surround each table's bulk create/update block with a
transaction.atomic() context so that all DB changes for that table are committed
or rolled back together, ensuring atomicity and preventing partial writes — add
the transaction.atomic() context manager around the per-table operation and move
existing cursor/execute logic inside it.

@iamitprakash iamitprakash merged commit 781100f into develop Aug 28, 2025
2 checks passed
@iamitprakash iamitprakash deleted the pg-migration branch August 28, 2025 21:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants