diff --git a/.env.example b/.env.example index 0d7567a6..f10a6d49 100644 --- a/.env.example +++ b/.env.example @@ -29,4 +29,10 @@ CORS_ALLOWED_ORIGINS='http://localhost:3000,http://localhost:8000' SWAGGER_UI_PATH='/api/schema' -ADMIN_EMAILS = "admin@gmail.com,admin2@gmail.com" \ No newline at end of file +ADMIN_EMAILS = "admin@gmail.com,admin2@gmail.com" + +POSTGRES_HOST: postgres +POSTGRES_PORT: 5432 +POSTGRES_DB: todo_postgres +POSTGRES_USER: todo_user +POSTGRES_PASSWORD: todo_password \ No newline at end of file diff --git a/README.md b/README.md index 75ee856f..47c26147 100644 --- a/README.md +++ b/README.md @@ -148,4 +148,4 @@ - If port 5678 is in use, specify a different port with `--debug-port` - Ensure VS Code Python extension is installed - Check that breakpoints are set in the correct files -- Verify the debug server shows "Debug server listening on port 5678" +- Verify the debug server shows "Debug server listening on port 5678" \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index bc48b581..12690018 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,23 +2,65 @@ services: django-app: build: . container_name: todo-django-app - command: python -Xfrozen_modules=off manage.py runserver_debug 0.0.0.0:8000 --debug-port 5678 + command: > + sh -c " + python manage.py makemigrations && + python manage.py migrate && + python manage.py shell -c 'from todo_project.db.init import initialize_database; initialize_database()' && + python manage.py runserver 0.0.0.0:8000 + " environment: MONGODB_URI: mongodb://db:27017 DB_NAME: todo-app PYTHONUNBUFFERED: 1 PYDEVD_DISABLE_FILE_VALIDATION: 1 + # PostgreSQL Configuration + POSTGRES_HOST: postgres + POSTGRES_PORT: 5432 + POSTGRES_DB: todo_postgres + POSTGRES_USER: todo_user + POSTGRES_PASSWORD: todo_password volumes: - .:/app ports: - "8000:8000" - "5678:5678" # Debug port depends_on: - - db - - mongo-init + db: + condition: service_started + mongo-init: + condition: service_completed_successfully + postgres: + condition: service_healthy stdin_open: true tty: true + postgres: + image: postgres:15 + container_name: todo-postgres + environment: + POSTGRES_DB: todo_postgres + POSTGRES_USER: todo_user + POSTGRES_PASSWORD: todo_password + POSTGRES_HOST_AUTH_METHOD: trust + ports: + - "5432:5432" + volumes: + - postgres_data:/var/lib/postgresql/data + - ./init-scripts:/docker-entrypoint-initdb.d + healthcheck: + test: + [ + "CMD-SHELL", + "pg_isready -U todo_user -d todo_app && echo 'Postgres healthcheck passed'", + ] + interval: 10s + timeout: 5s + retries: 5 + start_period: 10s + + + db: image: mongo:latest command: ["--replSet", "rs0", "--bind_ip_all", "--port", "27017"] @@ -28,7 +70,14 @@ services: volumes: - ./mongo_data:/data/db healthcheck: - test: ["CMD", "mongosh", "--quiet", "--eval", "if (db.runCommand({ping:1}).ok) process.exit(0); else process.exit(1)"] + test: + [ + "CMD", + "mongosh", + "--quiet", + "--eval", + "if (db.runCommand({ping:1}).ok) process.exit(0); else process.exit(1)", + ] interval: 10s timeout: 5s retries: 5 @@ -67,3 +116,6 @@ services: depends_on: - db - mongo-init + +volumes: + postgres_data: diff --git a/docs/DUAL_WRITE_SYSTEM.md b/docs/DUAL_WRITE_SYSTEM.md new file mode 100644 index 00000000..043993d8 --- /dev/null +++ b/docs/DUAL_WRITE_SYSTEM.md @@ -0,0 +1,324 @@ +# Dual-Write System: MongoDB to Postgres + +## Overview + +The dual-write system ensures that all data written to MongoDB is also persisted in a PostgreSQL database with a well-defined schema. This system is designed to enable future migration from MongoDB to Postgres with minimal operational risk and code changes. + +## Architecture + +### Components + +1. **Postgres Models** (`todo/models/postgres/`) + - Mirror MongoDB collections with normalized schema + - Include sync metadata for tracking sync status + - Use `mongo_id` field to maintain reference to MongoDB documents + +2. **Dual-Write Service** (`todo/services/dual_write_service.py`) + - Core service for writing to both databases + - Handles data transformation between MongoDB and Postgres + - Records sync failures for alerting + +3. **Enhanced Dual-Write Service** (`todo/services/enhanced_dual_write_service.py`) + - Extends base service with batch operations + - Provides enhanced monitoring and metrics + - Supports batch operation processing + +4. **Abstract Repository Pattern** (`todo/repositories/abstract_repository.py`) + - Defines interface for data access operations + - Enables seamless switching between databases in the future + - Provides consistent API across different storage backends + +5. **Postgres Repositories** (`todo/repositories/postgres_repository.py`) + - Concrete implementations of abstract repositories + - Handle Postgres-specific operations + - Maintain compatibility with existing MongoDB repositories + +## Configuration + +### Environment Variables + +```bash +# Dual-Write Configuration +DUAL_WRITE_ENABLED=True # Enable/disable dual-write +DUAL_WRITE_RETRY_ATTEMPTS=3 # Number of retry attempts +DUAL_WRITE_RETRY_DELAY=5 # Delay between retries (seconds) + +# Postgres Configuration +POSTGRES_HOST=localhost +POSTGRES_PORT=5432 +POSTGRES_DB=todo_postgres +POSTGRES_USER=todo_user +POSTGRES_PASSWORD=todo_password +``` + +### Django Settings + +The system automatically configures Django to use Postgres as the primary database while maintaining MongoDB connectivity through the existing `DatabaseManager`. + +## Usage + +### Basic Usage + +```python +from todo.services.enhanced_dual_write_service import EnhancedDualWriteService + +# Initialize the service +dual_write_service = EnhancedDualWriteService() + +# Create a document (writes to both MongoDB and Postgres) +success = dual_write_service.create_document( + collection_name='users', + data=user_data, + mongo_id=str(user_id) +) + +# Update a document +success = dual_write_service.update_document( + collection_name='users', + mongo_id=str(user_id), + data=updated_data +) + +# Delete a document +success = dual_write_service.delete_document( + collection_name='users', + mongo_id=str(user_id) +) +``` + +### Batch Operations + +```python +# Perform multiple operations in batch +operations = [ + { + 'collection_name': 'users', + 'data': user_data, + 'mongo_id': str(user_id), + 'operation': 'create' + }, + { + 'collection_name': 'tasks', + 'data': task_data, + 'mongo_id': str(task_id), + 'operation': 'update' + } +] + +success = dual_write_service.batch_operations(operations) +``` + +## Data Mapping + +### MongoDB to Postgres Schema + +| MongoDB Collection | Postgres Table | Key Fields | +|-------------------|----------------|------------| +| `users` | `postgres_users` | `google_id`, `email_id`, `name` | +| `tasks` | `postgres_tasks` | `title`, `status`, `priority`, `created_by` | +| `teams` | `postgres_teams` | `name`, `invite_code`, `created_by` | +| `labels` | `postgres_labels` | `name`, `color` | +| `roles` | `postgres_roles` | `name`, `permissions` | +| `task_assignments` | `postgres_task_assignments` | `task_mongo_id`, `user_mongo_id` | +| `watchlists` | `postgres_watchlists` | `name`, `user_mongo_id` | +| `user_team_details` | `postgres_user_team_details` | `user_id`, `team_id` | +| `user_roles` | `postgres_user_roles` | `user_mongo_id`, `role_mongo_id` | +| `audit_logs` | `postgres_audit_logs` | `action`, `collection_name`, `document_id` | + +### Field Transformations + +- **ObjectId Fields**: Converted to strings (24 characters) +- **Nested Objects**: Flattened or stored in separate tables +- **Arrays**: Stored in junction tables (e.g., `PostgresTaskLabel`) +- **Timestamps**: Preserved as-is +- **Enums**: Mapped to Postgres choices + +## Sync Status Tracking + +Each Postgres record includes sync metadata: + +```python +class SyncMetadata: + sync_status: str # 'SYNCED', 'PENDING', 'FAILED' + sync_error: str # Error message if sync failed + last_sync_at: datetime # Last successful sync timestamp +``` + +## Error Handling and Alerting + +### Sync Failures + +The system automatically records sync failures: + +```python +# Get sync failures +failures = dual_write_service.get_sync_failures() + +# Get sync metrics +metrics = dual_write_service.get_sync_metrics() +``` + +### Alerting + +- **Immediate Logging**: All failures are logged with ERROR level +- **Critical Alerts**: Logged with CRITICAL level for immediate attention +- **Failure Tracking**: Maintains list of recent failures for monitoring + +### Retry Mechanism + +- **Automatic Retries**: Failed operations are automatically retried +- **Configurable Attempts**: Set via `DUAL_WRITE_RETRY_ATTEMPTS` +- **Exponential Backoff**: Delay increases between retry attempts +- **Manual Retry**: Failed operations can be manually retried + +## Monitoring and Health Checks + +### Metrics + +```python +# Get comprehensive sync metrics +metrics = dual_write_service.get_sync_metrics() + +# Check sync status of specific document +status = dual_write_service.get_sync_status('users', str(user_id)) +``` + +## Future Migration Path + +### Phase 1: Dual-Write (Current) +- All writes go to both MongoDB and Postgres +- Reads continue from MongoDB +- Postgres schema is validated and optimized + +### Phase 2: Read Migration +- Gradually shift read operations to Postgres +- Use feature flags to control read source +- Monitor performance and data consistency + +### Phase 3: Full Migration +- All operations use Postgres +- MongoDB becomes read-only backup +- Eventually decommission MongoDB + +### Code Changes Required + +The abstract repository pattern minimizes code changes: + +```python +# Current: MongoDB repository +from todo.repositories.user_repository import UserRepository +user_repo = UserRepository() + +# Future: Postgres repository (minimal code change) +from todo.repositories.postgres_repository import PostgresUserRepository +user_repo = PostgresUserRepository() + +# Same interface, different implementation +user = user_repo.get_by_email("user@example.com") +``` + +## Performance Considerations + +### Synchronous Operations +- **Pros**: Immediate consistency, simple error handling +- **Cons**: Higher latency, potential for MongoDB write failures + +### Batch Operations +- **Pros**: Reduced database round trips, better throughput +- **Cons**: Potential for partial failures + +## Security + +### Data Privacy +- All sensitive data is encrypted in transit +- Postgres connections use SSL +- Access controls are maintained across both databases + +### Audit Trail +- All operations are logged in audit logs +- Sync failures are tracked for compliance +- Data integrity is maintained through transactions + +## Testing + +### Unit Tests +- Test individual components in isolation +- Mock external dependencies +- Verify data transformation logic + +### Integration Tests +- Test end-to-end sync operations +- Verify data consistency between databases +- Test failure scenarios and recovery + +### Performance Tests +- Measure sync latency under load +- Test batch operation efficiency + +## Troubleshooting + +### Common Issues + +1. **Postgres Connection Failures** + - Check database credentials and network connectivity + - Verify Postgres service is running + - Check firewall settings + +2. **Sync Failures** + - Review sync error logs + - Check data transformation logic + - Verify Postgres schema matches expectations + +3. **Performance Issues** + - Monitor sync latency + - Optimize batch operation sizes + - Monitor database performance + +### Debug Commands + +```python +# Enable debug logging +import logging +logging.getLogger('todo.services.dual_write_service').setLevel(logging.DEBUG) + +# Check sync status +status = dual_write_service.get_sync_status('users', str(user_id)) +print(f"Sync status: {status}") + +# Get recent failures +failures = dual_write_service.get_sync_failures() +for failure in failures: + print(f"Collection: {failure['collection']}, ID: {failure['mongo_id']}") +``` + +## Deployment + +### Prerequisites +- PostgreSQL 15+ with appropriate extensions +- MongoDB 7+ (existing) +- Python 3.9+ with required packages + +### Setup Steps +1. Create Postgres database and user +2. Run Django migrations +3. Configure environment variables +4. Verify sync operations + +### Production Considerations +- Use connection pooling for Postgres +- Set up monitoring and alerting +- Implement backup and recovery procedures + +## Support and Maintenance + +### Regular Maintenance +- Monitor sync metrics and failures +- Review and optimize Postgres performance +- Update sync logic as schema evolves +- Clean up old sync failure records + +### Updates and Upgrades +- Test sync operations after schema changes +- Verify data consistency after updates +- Monitor performance impact of changes +- Update documentation as needed diff --git a/requirements.txt b/requirements.txt index 73117832..15450069 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,7 +10,7 @@ filelock==3.16.1 gunicorn==23.0.0 identify==2.6.1 nodeenv==1.9.1 -platformdirs==4.3.6 +platformdirs==4.3.8 pydantic==2.10.1 pydantic_core==2.27.1 pymongo==4.10.1 @@ -28,3 +28,4 @@ email-validator==2.2.0 testcontainers[mongodb]==4.10.0 drf-spectacular==0.28.0 debugpy==1.8.14 +psycopg2-binary==2.9.9 diff --git a/todo/management/commands/sync_postgres_tables.py b/todo/management/commands/sync_postgres_tables.py new file mode 100644 index 00000000..32f160b7 --- /dev/null +++ b/todo/management/commands/sync_postgres_tables.py @@ -0,0 +1,31 @@ +from django.core.management.base import BaseCommand +from todo.services.postgres_sync_service import PostgresSyncService + + +class Command(BaseCommand): + help = "Synchronize labels and roles PostgreSQL tables with MongoDB data" + + def add_arguments(self, parser): + parser.add_argument( + "--force", + action="store_true", + help="Force sync even if tables already have data", + ) + + def handle(self, *args, **options): + self.stdout.write(self.style.SUCCESS("Starting PostgreSQL table synchronization for labels and roles...")) + + try: + postgres_sync_service = PostgresSyncService() + + if options["force"]: + self.stdout.write("Force sync enabled - will sync all tables regardless of existing data") + + success = postgres_sync_service.sync_all_tables() + + if success: + self.stdout.write(self.style.SUCCESS("PostgreSQL table synchronization completed successfully!")) + else: + self.stdout.write(self.style.ERROR("Some PostgreSQL table synchronizations failed!")) + except Exception as e: + self.stdout.write(self.style.ERROR(f"PostgreSQL table synchronization failed: {str(e)}")) diff --git a/todo/migrations/0001_initial_setup.py b/todo/migrations/0001_initial_setup.py new file mode 100644 index 00000000..19d10164 --- /dev/null +++ b/todo/migrations/0001_initial_setup.py @@ -0,0 +1,448 @@ +# Generated by Django 5.1.5 on 2025-08-23 18:54 + +import django.db.models.deletion +import django.utils.timezone +from django.db import migrations, models + + +class Migration(migrations.Migration): + initial = True + + dependencies = [] + + operations = [ + migrations.CreateModel( + name="PostgresAuditLog", + fields=[ + ("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("mongo_id", models.CharField(blank=True, max_length=24, null=True, unique=True)), + ("task_id", models.CharField(blank=True, max_length=24, null=True)), + ("team_id", models.CharField(blank=True, max_length=24, null=True)), + ("previous_executor_id", models.CharField(blank=True, max_length=24, null=True)), + ("new_executor_id", models.CharField(blank=True, max_length=24, null=True)), + ("spoc_id", models.CharField(blank=True, max_length=24, null=True)), + ("action", models.CharField(max_length=100)), + ("timestamp", models.DateTimeField(default=django.utils.timezone.now)), + ("status_from", models.CharField(blank=True, max_length=20, null=True)), + ("status_to", models.CharField(blank=True, max_length=20, null=True)), + ("assignee_from", models.CharField(blank=True, max_length=24, null=True)), + ("assignee_to", models.CharField(blank=True, max_length=24, null=True)), + ("performed_by", models.CharField(blank=True, max_length=24, null=True)), + ("last_sync_at", models.DateTimeField(auto_now=True)), + ( + "sync_status", + models.CharField( + choices=[("SYNCED", "Synced"), ("PENDING", "Pending"), ("FAILED", "Failed")], + default="SYNCED", + max_length=20, + ), + ), + ("sync_error", models.TextField(blank=True, null=True)), + ], + options={ + "db_table": "postgres_audit_logs", + "indexes": [ + models.Index(fields=["mongo_id"], name="postgres_au_mongo_i_e01883_idx"), + models.Index(fields=["task_id"], name="postgres_au_task_id_76f799_idx"), + models.Index(fields=["team_id"], name="postgres_au_team_id_aaca90_idx"), + models.Index(fields=["action"], name="postgres_au_action_582248_idx"), + models.Index(fields=["performed_by"], name="postgres_au_perform_f08d1f_idx"), + models.Index(fields=["timestamp"], name="postgres_au_timesta_ee4eef_idx"), + models.Index(fields=["sync_status"], name="postgres_au_sync_st_b7b811_idx"), + ], + }, + ), + migrations.CreateModel( + name="PostgresLabel", + fields=[ + ("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("mongo_id", models.CharField(blank=True, max_length=24, null=True, unique=True)), + ("name", models.CharField(max_length=100, unique=True)), + ("color", models.CharField(default="#000000", max_length=7)), + ("description", models.TextField(blank=True, null=True)), + ("created_at", models.DateTimeField(default=django.utils.timezone.now)), + ("updated_at", models.DateTimeField(blank=True, null=True)), + ("last_sync_at", models.DateTimeField(auto_now=True)), + ( + "sync_status", + models.CharField( + choices=[("SYNCED", "Synced"), ("PENDING", "Pending"), ("FAILED", "Failed")], + default="SYNCED", + max_length=20, + ), + ), + ("sync_error", models.TextField(blank=True, null=True)), + ], + options={ + "db_table": "postgres_labels", + "indexes": [ + models.Index(fields=["mongo_id"], name="postgres_la_mongo_i_f36969_idx"), + models.Index(fields=["name"], name="postgres_la_name_25bdde_idx"), + models.Index(fields=["sync_status"], name="postgres_la_sync_st_f795eb_idx"), + ], + }, + ), + migrations.CreateModel( + name="PostgresRole", + fields=[ + ("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("mongo_id", models.CharField(blank=True, max_length=24, null=True, unique=True)), + ("name", models.CharField(max_length=100, unique=True)), + ("description", models.TextField(blank=True, null=True)), + ("permissions", models.JSONField(default=dict)), + ("created_at", models.DateTimeField(default=django.utils.timezone.now)), + ("updated_at", models.DateTimeField(blank=True, null=True)), + ("last_sync_at", models.DateTimeField(auto_now=True)), + ( + "sync_status", + models.CharField( + choices=[("SYNCED", "Synced"), ("PENDING", "Pending"), ("FAILED", "Failed")], + default="SYNCED", + max_length=20, + ), + ), + ("sync_error", models.TextField(blank=True, null=True)), + ], + options={ + "db_table": "postgres_roles", + "indexes": [ + models.Index(fields=["mongo_id"], name="postgres_ro_mongo_i_018753_idx"), + models.Index(fields=["name"], name="postgres_ro_name_ef794d_idx"), + models.Index(fields=["sync_status"], name="postgres_ro_sync_st_9386cc_idx"), + ], + }, + ), + migrations.CreateModel( + name="PostgresTask", + fields=[ + ("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("mongo_id", models.CharField(blank=True, max_length=24, null=True, unique=True)), + ("display_id", models.CharField(blank=True, max_length=100, null=True)), + ("title", models.CharField(max_length=500)), + ("description", models.TextField(blank=True, null=True)), + ("priority", models.IntegerField(default=3)), + ("status", models.CharField(default="TODO", max_length=20)), + ("is_acknowledged", models.BooleanField(default=False)), + ("is_deleted", models.BooleanField(default=False)), + ("started_at", models.DateTimeField(blank=True, null=True)), + ("due_at", models.DateTimeField(blank=True, null=True)), + ("created_at", models.DateTimeField(default=django.utils.timezone.now)), + ("updated_at", models.DateTimeField(blank=True, null=True)), + ("created_by", models.CharField(max_length=24)), + ("updated_by", models.CharField(blank=True, max_length=24, null=True)), + ("last_sync_at", models.DateTimeField(auto_now=True)), + ( + "sync_status", + models.CharField( + choices=[("SYNCED", "Synced"), ("PENDING", "Pending"), ("FAILED", "Failed")], + default="SYNCED", + max_length=20, + ), + ), + ("sync_error", models.TextField(blank=True, null=True)), + ], + options={ + "db_table": "postgres_tasks", + "indexes": [ + models.Index(fields=["mongo_id"], name="postgres_ta_mongo_i_4bcd8b_idx"), + models.Index(fields=["display_id"], name="postgres_ta_display_0f1eae_idx"), + models.Index(fields=["status"], name="postgres_ta_status_ae228e_idx"), + models.Index(fields=["priority"], name="postgres_ta_priorit_6ea8ac_idx"), + models.Index(fields=["created_by"], name="postgres_ta_created_a5359a_idx"), + models.Index(fields=["due_at"], name="postgres_ta_due_at_45ae89_idx"), + models.Index(fields=["sync_status"], name="postgres_ta_sync_st_e67786_idx"), + ], + }, + ), + migrations.CreateModel( + name="PostgresDeferredDetails", + fields=[ + ("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("deferred_at", models.DateTimeField(blank=True, null=True)), + ("deferred_till", models.DateTimeField(blank=True, null=True)), + ("deferred_by", models.CharField(blank=True, max_length=24, null=True)), + ( + "task", + models.OneToOneField( + on_delete=django.db.models.deletion.CASCADE, + related_name="deferred_details", + to="todo.postgrestask", + ), + ), + ], + options={ + "db_table": "postgres_deferred_details", + }, + ), + migrations.CreateModel( + name="PostgresTaskAssignment", + fields=[ + ("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("mongo_id", models.CharField(blank=True, max_length=24, null=True, unique=True)), + ("task_mongo_id", models.CharField(max_length=24)), + ("assignee_id", models.CharField(max_length=24)), + ( + "user_type", + models.CharField( + choices=[("user", "User"), ("team", "Team")], + max_length=10, + ), + ), + ("team_id", models.CharField(blank=True, max_length=24, null=True)), + ("is_active", models.BooleanField(default=True)), + ("created_at", models.DateTimeField(default=django.utils.timezone.now)), + ("updated_at", models.DateTimeField(blank=True, null=True)), + ("created_by", models.CharField(max_length=24)), + ("updated_by", models.CharField(blank=True, max_length=24, null=True)), + ("last_sync_at", models.DateTimeField(auto_now=True)), + ( + "sync_status", + models.CharField( + choices=[("SYNCED", "Synced"), ("PENDING", "Pending"), ("FAILED", "Failed")], + default="SYNCED", + max_length=20, + ), + ), + ("sync_error", models.TextField(blank=True, null=True)), + ], + options={ + "db_table": "postgres_task_assignments", + "indexes": [ + models.Index(fields=["mongo_id"], name="postgres_ta_mongo_i_326fa9_idx"), + models.Index(fields=["task_mongo_id"], name="postgres_ta_task_mo_95ca3b_idx"), + models.Index(fields=["assignee_id"], name="postgres_ta_assignee_95ca3b_idx"), + models.Index(fields=["user_type"], name="postgres_ta_user_typ_d13fa3_idx"), + models.Index(fields=["team_id"], name="postgres_ta_team_id_a0605f_idx"), + models.Index(fields=["is_active"], name="postgres_ta_is_acti_8b9698_idx"), + models.Index(fields=["sync_status"], name="postgres_ta_sync_st_385c3f_idx"), + ], + }, + ), + migrations.CreateModel( + name="PostgresTeam", + fields=[ + ("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("mongo_id", models.CharField(blank=True, max_length=24, null=True, unique=True)), + ("name", models.CharField(max_length=100)), + ("description", models.TextField(blank=True, null=True)), + ("invite_code", models.CharField(max_length=100, unique=True)), + ("poc_id", models.CharField(blank=True, max_length=24, null=True)), + ("created_by", models.CharField(max_length=24)), + ("updated_by", models.CharField(max_length=24)), + ("is_deleted", models.BooleanField(default=False)), + ("created_at", models.DateTimeField(default=django.utils.timezone.now)), + ("updated_at", models.DateTimeField(default=django.utils.timezone.now)), + ("last_sync_at", models.DateTimeField(auto_now=True)), + ( + "sync_status", + models.CharField( + choices=[("SYNCED", "Synced"), ("PENDING", "Pending"), ("FAILED", "Failed")], + default="SYNCED", + max_length=20, + ), + ), + ("sync_error", models.TextField(blank=True, null=True)), + ], + options={ + "db_table": "postgres_teams", + "indexes": [ + models.Index(fields=["mongo_id"], name="postgres_te_mongo_i_abc268_idx"), + models.Index(fields=["invite_code"], name="postgres_te_invite__980f9f_idx"), + models.Index(fields=["created_by"], name="postgres_te_created_8f28f6_idx"), + models.Index(fields=["sync_status"], name="postgres_te_sync_st_19c6d6_idx"), + ], + }, + ), + migrations.CreateModel( + name="PostgresTeamCreationInviteCode", + fields=[ + ("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("mongo_id", models.CharField(blank=True, max_length=24, null=True, unique=True)), + ("code", models.CharField(max_length=100, unique=True)), + ("description", models.TextField(blank=True, null=True)), + ("created_by", models.CharField(max_length=24)), + ("used_by", models.CharField(blank=True, max_length=24, null=True)), + ("is_used", models.BooleanField(default=False)), + ("created_at", models.DateTimeField(default=django.utils.timezone.now)), + ("used_at", models.DateTimeField(blank=True, null=True)), + ("last_sync_at", models.DateTimeField(auto_now=True)), + ( + "sync_status", + models.CharField( + choices=[("SYNCED", "Synced"), ("PENDING", "Pending"), ("FAILED", "Failed")], + default="SYNCED", + max_length=20, + ), + ), + ("sync_error", models.TextField(blank=True, null=True)), + ], + options={ + "db_table": "postgres_team_creation_invite_codes", + "indexes": [ + models.Index(fields=["mongo_id"], name="postgres_te_mongo_i_9b5218_idx"), + models.Index(fields=["code"], name="postgres_te_code_e912c2_idx"), + models.Index(fields=["created_by"], name="postgres_te_created_cc1648_idx"), + models.Index(fields=["is_used"], name="postgres_te_is_used_23eea1_idx"), + models.Index(fields=["sync_status"], name="postgres_te_sync_st_0225fb_idx"), + ], + }, + ), + migrations.CreateModel( + name="PostgresUser", + fields=[ + ("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("mongo_id", models.CharField(blank=True, max_length=24, null=True, unique=True)), + ("google_id", models.CharField(max_length=255, unique=True)), + ("email_id", models.EmailField(max_length=254, unique=True)), + ("name", models.CharField(max_length=255)), + ("picture", models.URLField(blank=True, max_length=500, null=True)), + ("created_at", models.DateTimeField(default=django.utils.timezone.now)), + ("updated_at", models.DateTimeField(blank=True, null=True)), + ("last_sync_at", models.DateTimeField(auto_now=True)), + ( + "sync_status", + models.CharField( + choices=[("SYNCED", "Synced"), ("PENDING", "Pending"), ("FAILED", "Failed")], + default="SYNCED", + max_length=20, + ), + ), + ("sync_error", models.TextField(blank=True, null=True)), + ], + options={ + "db_table": "postgres_users", + "indexes": [ + models.Index(fields=["mongo_id"], name="postgres_us_mongo_i_b7de3d_idx"), + models.Index(fields=["google_id"], name="postgres_us_google__842c47_idx"), + models.Index(fields=["email_id"], name="postgres_us_email_i_fde0e2_idx"), + models.Index(fields=["sync_status"], name="postgres_us_sync_st_4b81bc_idx"), + ], + }, + ), + migrations.CreateModel( + name="PostgresUserRole", + fields=[ + ("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("mongo_id", models.CharField(blank=True, max_length=24, null=True, unique=True)), + ("user_id", models.CharField(max_length=24)), + ("role_name", models.CharField(max_length=50)), + ("scope", models.CharField(max_length=20)), + ("team_id", models.CharField(blank=True, max_length=24, null=True)), + ("is_active", models.BooleanField(default=True)), + ("created_at", models.DateTimeField(default=django.utils.timezone.now)), + ("created_by", models.CharField(default="system", max_length=24)), + ("last_sync_at", models.DateTimeField(auto_now=True)), + ( + "sync_status", + models.CharField( + choices=[("SYNCED", "Synced"), ("PENDING", "Pending"), ("FAILED", "Failed")], + default="SYNCED", + max_length=20, + ), + ), + ("sync_error", models.TextField(blank=True, null=True)), + ], + options={ + "db_table": "postgres_user_roles", + "indexes": [ + models.Index(fields=["mongo_id"], name="postgres_us_mongo_i_a0b3f8_idx"), + models.Index(fields=["user_id"], name="postgres_us_user_id_e6b62a_idx"), + models.Index(fields=["role_name"], name="postgres_us_role_na_7ec8fa_idx"), + models.Index(fields=["scope"], name="postgres_us_scope_f92854_idx"), + models.Index(fields=["team_id"], name="postgres_us_team_id_90ff18_idx"), + models.Index(fields=["is_active"], name="postgres_us_is_acti_558107_idx"), + models.Index(fields=["sync_status"], name="postgres_us_sync_st_58315c_idx"), + ], + "unique_together": {("user_id", "role_name", "scope", "team_id")}, + }, + ), + migrations.CreateModel( + name="PostgresUserTeamDetails", + fields=[ + ("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("mongo_id", models.CharField(blank=True, max_length=24, null=True, unique=True)), + ("user_id", models.CharField(max_length=24)), + ("team_id", models.CharField(max_length=24)), + ("created_by", models.CharField(max_length=24)), + ("updated_by", models.CharField(max_length=24)), + ("is_active", models.BooleanField(default=True)), + ("created_at", models.DateTimeField(default=django.utils.timezone.now)), + ("updated_at", models.DateTimeField(default=django.utils.timezone.now)), + ("last_sync_at", models.DateTimeField(auto_now=True)), + ( + "sync_status", + models.CharField( + choices=[("SYNCED", "Synced"), ("PENDING", "Pending"), ("FAILED", "Failed")], + default="SYNCED", + max_length=20, + ), + ), + ("sync_error", models.TextField(blank=True, null=True)), + ], + options={ + "db_table": "postgres_user_team_details", + "indexes": [ + models.Index(fields=["mongo_id"], name="postgres_us_mongo_i_c533ba_idx"), + models.Index(fields=["user_id"], name="postgres_us_user_id_50613a_idx"), + models.Index(fields=["team_id"], name="postgres_us_team_id_468318_idx"), + models.Index(fields=["is_active"], name="postgres_us_is_acti_a58a6c_idx"), + models.Index(fields=["sync_status"], name="postgres_us_sync_st_bbef4a_idx"), + ], + "unique_together": {("user_id", "team_id")}, + }, + ), + migrations.CreateModel( + name="PostgresWatchlist", + fields=[ + ("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("mongo_id", models.CharField(blank=True, max_length=24, null=True, unique=True)), + ("task_id", models.CharField(max_length=24)), + ("user_id", models.CharField(max_length=24)), + ("is_active", models.BooleanField(default=True)), + ("created_by", models.CharField(max_length=24)), + ("created_at", models.DateTimeField(default=django.utils.timezone.now)), + ("updated_by", models.CharField(blank=True, max_length=24, null=True)), + ("updated_at", models.DateTimeField(blank=True, null=True)), + ("last_sync_at", models.DateTimeField(auto_now=True)), + ( + "sync_status", + models.CharField( + choices=[("SYNCED", "Synced"), ("PENDING", "Pending"), ("FAILED", "Failed")], + default="SYNCED", + max_length=20, + ), + ), + ("sync_error", models.TextField(blank=True, null=True)), + ], + options={ + "db_table": "postgres_watchlist", + "indexes": [ + models.Index(fields=["mongo_id"], name="postgres_wa_mongo_i_5c0868_idx"), + models.Index(fields=["task_id"], name="postgres_wa_task_id_adb0e4_idx"), + models.Index(fields=["user_id"], name="postgres_wa_user_id_71c384_idx"), + models.Index(fields=["is_active"], name="postgres_wa_is_acti_ae4d9b_idx"), + models.Index(fields=["sync_status"], name="postgres_wa_sync_st_29bd9a_idx"), + models.Index(fields=["user_id", "task_id"], name="postgres_wa_user_id_c1421a_idx"), + ], + "unique_together": {("user_id", "task_id")}, + }, + ), + migrations.CreateModel( + name="PostgresTaskLabel", + fields=[ + ("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("label_mongo_id", models.CharField(max_length=24)), + ( + "task", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, related_name="task_labels", to="todo.postgrestask" + ), + ), + ], + options={ + "db_table": "postgres_task_labels", + "indexes": [models.Index(fields=["label_mongo_id"], name="postgres_ta_label_m_8f146d_idx")], + "unique_together": {("task", "label_mongo_id")}, + }, + ), + ] diff --git a/todo/migrations/0002_rename_postgres_ta_assignee_95ca3b_idx_postgres_ta_assigne_f1c6e7_idx_and_more.py b/todo/migrations/0002_rename_postgres_ta_assignee_95ca3b_idx_postgres_ta_assigne_f1c6e7_idx_and_more.py new file mode 100644 index 00000000..d402a6e8 --- /dev/null +++ b/todo/migrations/0002_rename_postgres_ta_assignee_95ca3b_idx_postgres_ta_assigne_f1c6e7_idx_and_more.py @@ -0,0 +1,32 @@ +# Generated by Django 5.1.5 on 2025-08-28 21:22 + +from django.db import migrations + + +class Migration(migrations.Migration): + dependencies = [ + ("todo", "0001_initial_setup"), + ] + + operations = [ + migrations.RenameIndex( + model_name="postgrestaskassignment", + new_name="postgres_ta_assigne_f1c6e7_idx", + old_name="postgres_ta_assignee_95ca3b_idx", + ), + migrations.RenameIndex( + model_name="postgrestaskassignment", + new_name="postgres_ta_user_ty_5664c0_idx", + old_name="postgres_ta_user_typ_d13fa3_idx", + ), + migrations.RenameIndex( + model_name="postgrestaskassignment", + new_name="postgres_ta_team_id_982105_idx", + old_name="postgres_ta_team_id_a0605f_idx", + ), + migrations.RenameIndex( + model_name="postgrestaskassignment", + new_name="postgres_ta_is_acti_8882a6_idx", + old_name="postgres_ta_is_acti_8b9698_idx", + ), + ] diff --git a/todo/migrations/__init__.py b/todo/migrations/__init__.py new file mode 100644 index 00000000..c4696eb1 --- /dev/null +++ b/todo/migrations/__init__.py @@ -0,0 +1 @@ +# This file makes the migrations directory a Python package diff --git a/todo/models/postgres/__init__.py b/todo/models/postgres/__init__.py new file mode 100644 index 00000000..7f961f4d --- /dev/null +++ b/todo/models/postgres/__init__.py @@ -0,0 +1,28 @@ +# Postgres models package for dual-write system + +from .user import PostgresUser +from .task import PostgresTask, PostgresTaskLabel, PostgresDeferredDetails +from .team import PostgresTeam, PostgresUserTeamDetails +from .label import PostgresLabel +from .role import PostgresRole +from .task_assignment import PostgresTaskAssignment +from .watchlist import PostgresWatchlist +from .user_role import PostgresUserRole +from .audit_log import PostgresAuditLog +from .team_creation_invite_code import PostgresTeamCreationInviteCode + +__all__ = [ + "PostgresUser", + "PostgresTask", + "PostgresTaskLabel", + "PostgresDeferredDetails", + "PostgresTeam", + "PostgresUserTeamDetails", + "PostgresLabel", + "PostgresRole", + "PostgresTaskAssignment", + "PostgresWatchlist", + "PostgresUserRole", + "PostgresAuditLog", + "PostgresTeamCreationInviteCode", +] diff --git a/todo/models/postgres/audit_log.py b/todo/models/postgres/audit_log.py new file mode 100644 index 00000000..c57b281c --- /dev/null +++ b/todo/models/postgres/audit_log.py @@ -0,0 +1,46 @@ +from django.db import models +from django.utils import timezone + + +class PostgresAuditLog(models.Model): + mongo_id = models.CharField(max_length=24, unique=True, null=True, blank=True) + + task_id = models.CharField(max_length=24, null=True, blank=True) + team_id = models.CharField(max_length=24, null=True, blank=True) + previous_executor_id = models.CharField(max_length=24, null=True, blank=True) + new_executor_id = models.CharField(max_length=24, null=True, blank=True) + spoc_id = models.CharField(max_length=24, null=True, blank=True) + action = models.CharField(max_length=100) + timestamp = models.DateTimeField(default=timezone.now) + status_from = models.CharField(max_length=20, null=True, blank=True) + status_to = models.CharField(max_length=20, null=True, blank=True) + assignee_from = models.CharField(max_length=24, null=True, blank=True) + assignee_to = models.CharField(max_length=24, null=True, blank=True) + performed_by = models.CharField(max_length=24, null=True, blank=True) + + last_sync_at = models.DateTimeField(auto_now=True) + sync_status = models.CharField( + max_length=20, + choices=[ + ("SYNCED", "Synced"), + ("PENDING", "Pending"), + ("FAILED", "Failed"), + ], + default="SYNCED", + ) + sync_error = models.TextField(null=True, blank=True) + + class Meta: + db_table = "postgres_audit_logs" + indexes = [ + models.Index(fields=["mongo_id"]), + models.Index(fields=["task_id"]), + models.Index(fields=["team_id"]), + models.Index(fields=["action"]), + models.Index(fields=["performed_by"]), + models.Index(fields=["timestamp"]), + models.Index(fields=["sync_status"]), + ] + + def __str__(self): + return f"{self.action} on task {self.task_id}" diff --git a/todo/models/postgres/label.py b/todo/models/postgres/label.py new file mode 100644 index 00000000..128ad9ad --- /dev/null +++ b/todo/models/postgres/label.py @@ -0,0 +1,50 @@ +from django.db import models +from django.utils import timezone + + +class PostgresLabel(models.Model): + """ + Postgres model for labels. + """ + + # MongoDB ObjectId as string for reference + mongo_id = models.CharField(max_length=24, unique=True, null=True, blank=True) + + # Label fields + name = models.CharField(max_length=100, unique=True) + color = models.CharField(max_length=7, default="#000000") # Hex color code + description = models.TextField(null=True, blank=True) + + # Timestamps + created_at = models.DateTimeField(default=timezone.now) + updated_at = models.DateTimeField(null=True, blank=True) + + # Sync metadata + last_sync_at = models.DateTimeField(auto_now=True) + sync_status = models.CharField( + max_length=20, + choices=[ + ("SYNCED", "Synced"), + ("PENDING", "Pending"), + ("FAILED", "Failed"), + ], + default="SYNCED", + ) + sync_error = models.TextField(null=True, blank=True) + + class Meta: + db_table = "postgres_labels" + indexes = [ + models.Index(fields=["mongo_id"]), + models.Index(fields=["name"]), + models.Index(fields=["sync_status"]), + ] + + def __str__(self): + return self.name + + def save(self, *args, **kwargs): + if not self.pk: # New instance + self.created_at = timezone.now() + self.updated_at = timezone.now() + super().save(*args, **kwargs) diff --git a/todo/models/postgres/role.py b/todo/models/postgres/role.py new file mode 100644 index 00000000..363de6bd --- /dev/null +++ b/todo/models/postgres/role.py @@ -0,0 +1,50 @@ +from django.db import models +from django.utils import timezone + + +class PostgresRole(models.Model): + """ + Postgres model for roles. + """ + + # MongoDB ObjectId as string for reference + mongo_id = models.CharField(max_length=24, unique=True, null=True, blank=True) + + # Role fields + name = models.CharField(max_length=100, unique=True) + description = models.TextField(null=True, blank=True) + permissions = models.JSONField(default=dict) # Store permissions as JSON + + # Timestamps + created_at = models.DateTimeField(default=timezone.now) + updated_at = models.DateTimeField(null=True, blank=True) + + # Sync metadata + last_sync_at = models.DateTimeField(auto_now=True) + sync_status = models.CharField( + max_length=20, + choices=[ + ("SYNCED", "Synced"), + ("PENDING", "Pending"), + ("FAILED", "Failed"), + ], + default="SYNCED", + ) + sync_error = models.TextField(null=True, blank=True) + + class Meta: + db_table = "postgres_roles" + indexes = [ + models.Index(fields=["mongo_id"]), + models.Index(fields=["name"]), + models.Index(fields=["sync_status"]), + ] + + def __str__(self): + return self.name + + def save(self, *args, **kwargs): + if not self.pk: # New instance + self.created_at = timezone.now() + self.updated_at = timezone.now() + super().save(*args, **kwargs) diff --git a/todo/models/postgres/task.py b/todo/models/postgres/task.py new file mode 100644 index 00000000..8b833fa8 --- /dev/null +++ b/todo/models/postgres/task.py @@ -0,0 +1,99 @@ +from django.db import models +from django.utils import timezone + + +class PostgresTask(models.Model): + """ + Postgres model for tasks, mirroring MongoDB TaskModel structure. + This enables future migration from MongoDB to Postgres. + """ + + # MongoDB ObjectId as string for reference + mongo_id = models.CharField(max_length=24, unique=True, null=True, blank=True) + + # Task fields + display_id = models.CharField(max_length=100, null=True, blank=True) + title = models.CharField(max_length=500) + description = models.TextField(null=True, blank=True) + + # Store the same format as MongoDB (integer for priority, string for status) + priority = models.IntegerField(default=3) # 1=HIGH, 2=MEDIUM, 3=LOW + status = models.CharField(max_length=20, default="TODO") + + # Boolean fields + is_acknowledged = models.BooleanField(default=False) + is_deleted = models.BooleanField(default=False) + + # Timestamps + started_at = models.DateTimeField(null=True, blank=True) + due_at = models.DateTimeField(null=True, blank=True) + created_at = models.DateTimeField(default=timezone.now) + updated_at = models.DateTimeField(null=True, blank=True) + + # References (as strings for now, will be foreign keys in future) + created_by = models.CharField(max_length=24) # MongoDB ObjectId as string + updated_by = models.CharField(max_length=24, null=True, blank=True) + + # Sync metadata + last_sync_at = models.DateTimeField(auto_now=True) + sync_status = models.CharField( + max_length=20, + choices=[ + ("SYNCED", "Synced"), + ("PENDING", "Pending"), + ("FAILED", "Failed"), + ], + default="SYNCED", + ) + sync_error = models.TextField(null=True, blank=True) + + class Meta: + db_table = "postgres_tasks" + indexes = [ + models.Index(fields=["mongo_id"]), + models.Index(fields=["display_id"]), + models.Index(fields=["status"]), + models.Index(fields=["priority"]), + models.Index(fields=["created_by"]), + models.Index(fields=["due_at"]), + models.Index(fields=["sync_status"]), + ] + + def __str__(self): + return f"{self.title} ({self.display_id or 'N/A'})" + + def save(self, *args, **kwargs): + if not self.pk: # New instance + self.created_at = timezone.now() + self.updated_at = timezone.now() + super().save(*args, **kwargs) + + +class PostgresTaskLabel(models.Model): + """ + Junction table for task-label relationships. + """ + + task = models.ForeignKey(PostgresTask, on_delete=models.CASCADE, related_name="task_labels") + label_mongo_id = models.CharField(max_length=24) # MongoDB ObjectId as string + + class Meta: + db_table = "postgres_task_labels" + unique_together = ["task", "label_mongo_id"] + indexes = [ + models.Index(fields=["label_mongo_id"]), + ] + + +class PostgresDeferredDetails(models.Model): + """ + Model for deferred task details. + """ + + task = models.OneToOneField(PostgresTask, on_delete=models.CASCADE, related_name="deferred_details") + deferred_at = models.DateTimeField(null=True, blank=True) + deferred_till = models.DateTimeField(null=True, blank=True) + deferred_by = models.CharField(max_length=24, null=True, blank=True) # MongoDB ObjectId as string + + class Meta: + db_table = "postgres_deferred_details" diff --git a/todo/models/postgres/task_assignment.py b/todo/models/postgres/task_assignment.py new file mode 100644 index 00000000..e341ac8f --- /dev/null +++ b/todo/models/postgres/task_assignment.py @@ -0,0 +1,62 @@ +from django.db import models +from django.utils import timezone + + +class PostgresTaskAssignment(models.Model): + """ + Postgres model for task assignments. + """ + + # MongoDB ObjectId as string for reference + mongo_id = models.CharField(max_length=24, unique=True, null=True, blank=True) + + # Assignment fields + task_mongo_id = models.CharField(max_length=24) # MongoDB ObjectId as string + assignee_id = models.CharField(max_length=24) # MongoDB ObjectId as string (user or team ID) + user_type = models.CharField(max_length=10, choices=[("user", "User"), ("team", "Team")]) # user or team + team_id = models.CharField( + max_length=24, null=True, blank=True + ) # MongoDB ObjectId as string (only for team assignments) + is_active = models.BooleanField(default=True) # Match MongoDB approach + + # Timestamps + created_at = models.DateTimeField(default=timezone.now) + updated_at = models.DateTimeField(null=True, blank=True) + + # References + created_by = models.CharField(max_length=24) # MongoDB ObjectId as string + updated_by = models.CharField(max_length=24, null=True, blank=True) # MongoDB ObjectId as string + + # Sync metadata + last_sync_at = models.DateTimeField(auto_now=True) + sync_status = models.CharField( + max_length=20, + choices=[ + ("SYNCED", "Synced"), + ("PENDING", "Pending"), + ("FAILED", "Failed"), + ], + default="SYNCED", + ) + sync_error = models.TextField(null=True, blank=True) + + class Meta: + db_table = "postgres_task_assignments" + indexes = [ + models.Index(fields=["mongo_id"]), + models.Index(fields=["task_mongo_id"]), + models.Index(fields=["assignee_id"]), + models.Index(fields=["user_type"]), + models.Index(fields=["team_id"]), + models.Index(fields=["is_active"]), + models.Index(fields=["sync_status"]), + ] + + def __str__(self): + return f"Task {self.task_mongo_id} assigned to {self.user_type} {self.assignee_id}" + + def save(self, *args, **kwargs): + if not self.pk: # New instance + self.created_at = timezone.now() + self.updated_at = timezone.now() + super().save(*args, **kwargs) diff --git a/todo/models/postgres/team.py b/todo/models/postgres/team.py new file mode 100644 index 00000000..b71ec4f6 --- /dev/null +++ b/todo/models/postgres/team.py @@ -0,0 +1,115 @@ +from django.db import models +from django.utils import timezone + + +class PostgresTeam(models.Model): + """ + Postgres model for teams, mirroring MongoDB TeamModel structure. + This enables future migration from MongoDB to Postgres. + """ + + # MongoDB ObjectId as string for reference + mongo_id = models.CharField(max_length=24, unique=True, null=True, blank=True) + + # Team fields + name = models.CharField(max_length=100) + description = models.TextField(null=True, blank=True) + invite_code = models.CharField(max_length=100, unique=True) + + # References (as strings for now, will be foreign keys in future) + poc_id = models.CharField(max_length=24, null=True, blank=True) # MongoDB ObjectId as string + created_by = models.CharField(max_length=24) # MongoDB ObjectId as string + updated_by = models.CharField(max_length=24) # MongoDB ObjectId as string + + # Boolean fields + is_deleted = models.BooleanField(default=False) + + # Timestamps + created_at = models.DateTimeField(default=timezone.now) + updated_at = models.DateTimeField(default=timezone.now) + + # Sync metadata + last_sync_at = models.DateTimeField(auto_now=True) + sync_status = models.CharField( + max_length=20, + choices=[ + ("SYNCED", "Synced"), + ("PENDING", "Pending"), + ("FAILED", "Failed"), + ], + default="SYNCED", + ) + sync_error = models.TextField(null=True, blank=True) + + class Meta: + db_table = "postgres_teams" + indexes = [ + models.Index(fields=["mongo_id"]), + models.Index(fields=["invite_code"]), + models.Index(fields=["created_by"]), + models.Index(fields=["sync_status"]), + ] + + def __str__(self): + return self.name + + def save(self, *args, **kwargs): + if not self.pk: # New instance + self.created_at = timezone.now() + self.updated_at = timezone.now() + super().save(*args, **kwargs) + + +class PostgresUserTeamDetails(models.Model): + """ + Postgres model for user-team relationships, mirroring MongoDB UserTeamDetailsModel structure. + """ + + # MongoDB ObjectId as string for reference + mongo_id = models.CharField(max_length=24, unique=True, null=True, blank=True) + + # References (as strings for now, will be foreign keys in future) + user_id = models.CharField(max_length=24) # MongoDB ObjectId as string + team_id = models.CharField(max_length=24) # MongoDB ObjectId as string + created_by = models.CharField(max_length=24) # MongoDB ObjectId as string + updated_by = models.CharField(max_length=24) # MongoDB ObjectId as string + + # Boolean fields + is_active = models.BooleanField(default=True) + + # Timestamps + created_at = models.DateTimeField(default=timezone.now) + updated_at = models.DateTimeField(default=timezone.now) + + # Sync metadata + last_sync_at = models.DateTimeField(auto_now=True) + sync_status = models.CharField( + max_length=20, + choices=[ + ("SYNCED", "Synced"), + ("PENDING", "Pending"), + ("FAILED", "Failed"), + ], + default="SYNCED", + ) + sync_error = models.TextField(null=True, blank=True) + + class Meta: + db_table = "postgres_user_team_details" + unique_together = ["user_id", "team_id"] + indexes = [ + models.Index(fields=["mongo_id"]), + models.Index(fields=["user_id"]), + models.Index(fields=["team_id"]), + models.Index(fields=["is_active"]), + models.Index(fields=["sync_status"]), + ] + + def __str__(self): + return f"User {self.user_id} in Team {self.team_id}" + + def save(self, *args, **kwargs): + if not self.pk: # New instance + self.created_at = timezone.now() + self.updated_at = timezone.now() + super().save(*args, **kwargs) diff --git a/todo/models/postgres/team_creation_invite_code.py b/todo/models/postgres/team_creation_invite_code.py new file mode 100644 index 00000000..827c4757 --- /dev/null +++ b/todo/models/postgres/team_creation_invite_code.py @@ -0,0 +1,56 @@ +from django.db import models +from django.utils import timezone + + +class PostgresTeamCreationInviteCode(models.Model): + """ + Postgres model for team creation invite codes, mirroring MongoDB TeamCreationInviteCodeModel structure. + This enables future migration from MongoDB to Postgres. + """ + + # MongoDB ObjectId as string for reference + mongo_id = models.CharField(max_length=24, unique=True, null=True, blank=True) + + # Invite code fields + code = models.CharField(max_length=100, unique=True) + description = models.TextField(null=True, blank=True) + + # User references + created_by = models.CharField(max_length=24) + used_by = models.CharField(max_length=24, null=True, blank=True) + + # Status and timestamps + is_used = models.BooleanField(default=False) + created_at = models.DateTimeField(default=timezone.now) + used_at = models.DateTimeField(null=True, blank=True) + + # Sync metadata + last_sync_at = models.DateTimeField(auto_now=True) + sync_status = models.CharField( + max_length=20, + choices=[ + ("SYNCED", "Synced"), + ("PENDING", "Pending"), + ("FAILED", "Failed"), + ], + default="SYNCED", + ) + sync_error = models.TextField(null=True, blank=True) + + class Meta: + db_table = "postgres_team_creation_invite_codes" + indexes = [ + models.Index(fields=["mongo_id"]), + models.Index(fields=["code"]), + models.Index(fields=["created_by"]), + models.Index(fields=["is_used"]), + models.Index(fields=["sync_status"]), + ] + + def __str__(self): + return f"Invite Code: {self.code} ({'Used' if self.is_used else 'Unused'})" + + def save(self, *args, **kwargs): + if not self.pk: # New instance + self.created_at = timezone.now() + super().save(*args, **kwargs) diff --git a/todo/models/postgres/user.py b/todo/models/postgres/user.py new file mode 100644 index 00000000..4fff61b9 --- /dev/null +++ b/todo/models/postgres/user.py @@ -0,0 +1,53 @@ +from django.db import models +from django.utils import timezone + + +class PostgresUser(models.Model): + """ + Postgres model for users, mirroring MongoDB UserModel structure. + This enables future migration from MongoDB to Postgres. + """ + + # MongoDB ObjectId as string for reference + mongo_id = models.CharField(max_length=24, unique=True, null=True, blank=True) + + # User fields + google_id = models.CharField(max_length=255, unique=True) + email_id = models.EmailField(unique=True) + name = models.CharField(max_length=255) + picture = models.URLField(max_length=500, null=True, blank=True) + + # Timestamps + created_at = models.DateTimeField(default=timezone.now) + updated_at = models.DateTimeField(null=True, blank=True) + + # Sync metadata + last_sync_at = models.DateTimeField(auto_now=True) + sync_status = models.CharField( + max_length=20, + choices=[ + ("SYNCED", "Synced"), + ("PENDING", "Pending"), + ("FAILED", "Failed"), + ], + default="SYNCED", + ) + sync_error = models.TextField(null=True, blank=True) + + class Meta: + db_table = "postgres_users" + indexes = [ + models.Index(fields=["mongo_id"]), + models.Index(fields=["google_id"]), + models.Index(fields=["email_id"]), + models.Index(fields=["sync_status"]), + ] + + def __str__(self): + return f"{self.name} ({self.email_id})" + + def save(self, *args, **kwargs): + if not self.pk: # New instance + self.created_at = timezone.now() + self.updated_at = timezone.now() + super().save(*args, **kwargs) diff --git a/todo/models/postgres/user_role.py b/todo/models/postgres/user_role.py new file mode 100644 index 00000000..9358e1f8 --- /dev/null +++ b/todo/models/postgres/user_role.py @@ -0,0 +1,42 @@ +from django.db import models +from django.utils import timezone + + +class PostgresUserRole(models.Model): + mongo_id = models.CharField(max_length=24, unique=True, null=True, blank=True) + + user_id = models.CharField(max_length=24) + role_name = models.CharField(max_length=50) + scope = models.CharField(max_length=20) + team_id = models.CharField(max_length=24, null=True, blank=True) + is_active = models.BooleanField(default=True) + created_at = models.DateTimeField(default=timezone.now) + created_by = models.CharField(max_length=24, default="system") + + last_sync_at = models.DateTimeField(auto_now=True) + sync_status = models.CharField( + max_length=20, + choices=[ + ("SYNCED", "Synced"), + ("PENDING", "Pending"), + ("FAILED", "Failed"), + ], + default="SYNCED", + ) + sync_error = models.TextField(null=True, blank=True) + + class Meta: + db_table = "postgres_user_roles" + unique_together = ["user_id", "role_name", "scope", "team_id"] + indexes = [ + models.Index(fields=["mongo_id"]), + models.Index(fields=["user_id"]), + models.Index(fields=["role_name"]), + models.Index(fields=["scope"]), + models.Index(fields=["team_id"]), + models.Index(fields=["is_active"]), + models.Index(fields=["sync_status"]), + ] + + def __str__(self): + return f"User {self.user_id} has Role {self.role_name} ({self.scope})" diff --git a/todo/models/postgres/watchlist.py b/todo/models/postgres/watchlist.py new file mode 100644 index 00000000..777242de --- /dev/null +++ b/todo/models/postgres/watchlist.py @@ -0,0 +1,59 @@ +from django.db import models +from django.utils import timezone + + +class PostgresWatchlist(models.Model): + """ + Postgres model for watchlists that matches MongoDB schema. + This represents a user watching a specific task. + """ + + # MongoDB ObjectId as string for reference + mongo_id = models.CharField(max_length=24, unique=True, null=True, blank=True) + + # Core watchlist fields matching MongoDB schema + task_id = models.CharField(max_length=24) # MongoDB ObjectId as string + user_id = models.CharField(max_length=24) # MongoDB ObjectId as string + is_active = models.BooleanField(default=True) + + # Audit fields + created_by = models.CharField(max_length=24) # MongoDB ObjectId as string + created_at = models.DateTimeField(default=timezone.now) + updated_by = models.CharField(max_length=24, null=True, blank=True) # MongoDB ObjectId as string + updated_at = models.DateTimeField(null=True, blank=True) + + # Sync metadata for dual write system + last_sync_at = models.DateTimeField(auto_now=True) + sync_status = models.CharField( + max_length=20, + choices=[ + ("SYNCED", "Synced"), + ("PENDING", "Pending"), + ("FAILED", "Failed"), + ], + default="SYNCED", + ) + sync_error = models.TextField(null=True, blank=True) + + class Meta: + db_table = "postgres_watchlist" + indexes = [ + models.Index(fields=["mongo_id"]), + models.Index(fields=["task_id"]), + models.Index(fields=["user_id"]), + models.Index(fields=["is_active"]), + models.Index(fields=["sync_status"]), + # Composite index for efficient queries + models.Index(fields=["user_id", "task_id"]), + ] + # Ensure unique user-task combination + unique_together = ["user_id", "task_id"] + + def __str__(self): + return f"Watchlist: User {self.user_id} -> Task {self.task_id}" + + def save(self, *args, **kwargs): + if not self.pk: # New instance + self.created_at = timezone.now() + self.updated_at = timezone.now() + super().save(*args, **kwargs) diff --git a/todo/repositories/abstract_repository.py b/todo/repositories/abstract_repository.py new file mode 100644 index 00000000..aae7a3a0 --- /dev/null +++ b/todo/repositories/abstract_repository.py @@ -0,0 +1,200 @@ +from abc import ABC, abstractmethod +from typing import Any, Dict, List, Optional, TypeVar, Generic +from pydantic import BaseModel + +T = TypeVar("T", bound=BaseModel) + + +class AbstractRepository(ABC, Generic[T]): + """ + Abstract repository interface that defines the contract for data access. + This enables seamless switching between MongoDB and Postgres in the future. + """ + + @abstractmethod + def create(self, data: Dict[str, Any]) -> T: + """Create a new document/record.""" + pass + + @abstractmethod + def get_by_id(self, id: str) -> Optional[T]: + """Get a document/record by ID.""" + pass + + @abstractmethod + def get_all(self, filters: Optional[Dict[str, Any]] = None, skip: int = 0, limit: int = 100) -> List[T]: + """Get all documents/records with optional filtering and pagination.""" + pass + + @abstractmethod + def update(self, id: str, data: Dict[str, Any]) -> Optional[T]: + """Update a document/record by ID.""" + pass + + @abstractmethod + def delete(self, id: str) -> bool: + """Delete a document/record by ID.""" + pass + + @abstractmethod + def count(self, filters: Optional[Dict[str, Any]] = None) -> int: + """Count documents/records with optional filtering.""" + pass + + @abstractmethod + def exists(self, id: str) -> bool: + """Check if a document/record exists by ID.""" + pass + + +class AbstractUserRepository(AbstractRepository[T]): + """Abstract repository for user operations.""" + + @abstractmethod + def get_by_email(self, email: str) -> Optional[T]: + """Get user by email address.""" + pass + + @abstractmethod + def get_by_google_id(self, google_id: str) -> Optional[T]: + """Get user by Google ID.""" + pass + + +class AbstractTaskRepository(AbstractRepository[T]): + """Abstract repository for task operations.""" + + @abstractmethod + def get_by_user( + self, user_id: str, filters: Optional[Dict[str, Any]] = None, skip: int = 0, limit: int = 100 + ) -> List[T]: + """Get tasks by user ID.""" + pass + + @abstractmethod + def get_by_team( + self, team_id: str, filters: Optional[Dict[str, Any]] = None, skip: int = 0, limit: int = 100 + ) -> List[T]: + """Get tasks by team ID.""" + pass + + @abstractmethod + def get_by_status( + self, status: str, filters: Optional[Dict[str, Any]] = None, skip: int = 0, limit: int = 100 + ) -> List[T]: + """Get tasks by status.""" + pass + + @abstractmethod + def get_by_priority( + self, priority: str, filters: Optional[Dict[str, Any]] = None, skip: int = 0, limit: int = 100 + ) -> List[T]: + """Get tasks by priority.""" + pass + + +class AbstractTeamRepository(AbstractRepository[T]): + """Abstract repository for team operations.""" + + @abstractmethod + def get_by_invite_code(self, invite_code: str) -> Optional[T]: + """Get team by invite code.""" + pass + + @abstractmethod + def get_by_user(self, user_id: str) -> List[T]: + """Get teams by user ID.""" + pass + + +class AbstractLabelRepository(AbstractRepository[T]): + """Abstract repository for label operations.""" + + @abstractmethod + def get_by_name(self, name: str) -> Optional[T]: + """Get label by name.""" + pass + + +class AbstractRoleRepository(AbstractRepository[T]): + """Abstract repository for role operations.""" + + @abstractmethod + def get_by_name(self, name: str) -> Optional[T]: + """Get role by name.""" + pass + + +class AbstractTaskAssignmentRepository(AbstractRepository[T]): + """Abstract repository for task assignment operations.""" + + @abstractmethod + def get_by_task(self, task_id: str) -> List[T]: + """Get assignments by task ID.""" + pass + + @abstractmethod + def get_by_user(self, user_id: str) -> List[T]: + """Get assignments by user ID.""" + pass + + @abstractmethod + def get_by_team(self, team_id: str) -> List[T]: + """Get assignments by team ID.""" + pass + + +class AbstractWatchlistRepository(AbstractRepository[T]): + """Abstract repository for watchlist operations.""" + + @abstractmethod + def get_by_user(self, user_id: str) -> List[T]: + """Get watchlists by user ID.""" + pass + + +class AbstractUserRoleRepository(AbstractRepository[T]): + """Abstract repository for user role operations.""" + + @abstractmethod + def get_by_user(self, user_id: str) -> List[T]: + """Get user roles by user ID.""" + pass + + @abstractmethod + def get_by_team(self, team_id: str) -> List[T]: + """Get user roles by team ID.""" + pass + + +class AbstractUserTeamDetailsRepository(AbstractRepository[T]): + """Abstract repository for user team details operations.""" + + @abstractmethod + def get_by_user(self, user_id: str) -> List[T]: + """Get user team details by user ID.""" + pass + + @abstractmethod + def get_by_team(self, team_id: str) -> List[T]: + """Get user team details by team ID.""" + pass + + +class AbstractAuditLogRepository(AbstractRepository[T]): + """Abstract repository for audit log operations.""" + + @abstractmethod + def get_by_user(self, user_id: str, skip: int = 0, limit: int = 100) -> List[T]: + """Get audit logs by user ID.""" + pass + + @abstractmethod + def get_by_collection(self, collection_name: str, skip: int = 0, limit: int = 100) -> List[T]: + """Get audit logs by collection name.""" + pass + + @abstractmethod + def get_by_action(self, action: str, skip: int = 0, limit: int = 100) -> List[T]: + """Get audit logs by action.""" + pass diff --git a/todo/repositories/audit_log_repository.py b/todo/repositories/audit_log_repository.py index d92f4627..8f94ed1b 100644 --- a/todo/repositories/audit_log_repository.py +++ b/todo/repositories/audit_log_repository.py @@ -1,6 +1,7 @@ from todo.models.audit_log import AuditLogModel from todo.repositories.common.mongo_repository import MongoRepository from datetime import datetime, timezone +from todo.services.enhanced_dual_write_service import EnhancedDualWriteService class AuditLogRepository(MongoRepository): @@ -13,6 +14,33 @@ def create(cls, audit_log: AuditLogModel) -> AuditLogModel: audit_log_dict = audit_log.model_dump(mode="json", by_alias=True, exclude_none=True) insert_result = collection.insert_one(audit_log_dict) audit_log.id = insert_result.inserted_id + + dual_write_service = EnhancedDualWriteService() + audit_log_data = { + "task_id": str(audit_log.task_id) if audit_log.task_id else None, + "team_id": str(audit_log.team_id) if audit_log.team_id else None, + "previous_executor_id": str(audit_log.previous_executor_id) if audit_log.previous_executor_id else None, + "new_executor_id": str(audit_log.new_executor_id) if audit_log.new_executor_id else None, + "spoc_id": str(audit_log.spoc_id) if audit_log.spoc_id else None, + "action": audit_log.action, + "timestamp": audit_log.timestamp, + "status_from": audit_log.status_from, + "status_to": audit_log.status_to, + "assignee_from": str(audit_log.assignee_from) if audit_log.assignee_from else None, + "assignee_to": str(audit_log.assignee_to) if audit_log.assignee_to else None, + "performed_by": str(audit_log.performed_by) if audit_log.performed_by else None, + } + + dual_write_success = dual_write_service.create_document( + collection_name="audit_logs", data=audit_log_data, mongo_id=str(audit_log.id) + ) + + if not dual_write_success: + import logging + + logger = logging.getLogger(__name__) + logger.warning(f"Failed to sync audit log {audit_log.id} to Postgres") + return audit_log @classmethod diff --git a/todo/repositories/postgres_repository.py b/todo/repositories/postgres_repository.py new file mode 100644 index 00000000..09ea78c3 --- /dev/null +++ b/todo/repositories/postgres_repository.py @@ -0,0 +1,304 @@ +from typing import Any, Dict, List, Optional, Type +from django.db import models +from django.core.exceptions import ObjectDoesNotExist + +from todo.repositories.abstract_repository import AbstractRepository +from todo.models.postgres import ( + PostgresUser, + PostgresTask, + PostgresTeam, + PostgresUserTeamDetails, + PostgresLabel, + PostgresRole, + PostgresTaskAssignment, + PostgresWatchlist, + PostgresUserRole, + PostgresAuditLog, +) + + +class BasePostgresRepository(AbstractRepository): + """ + Base Postgres repository implementation. + Provides common CRUD operations for Postgres models. + """ + + def __init__(self, model_class: Type[models.Model]): + self.model_class = model_class + + def create(self, data: Dict[str, Any]) -> Any: + """Create a new record in Postgres.""" + try: + instance = self.model_class.objects.create(**data) + return instance + except Exception as e: + raise Exception(f"Failed to create record: {str(e)}") + + def get_by_id(self, id: str) -> Optional[Any]: + """Get a record by ID (using mongo_id field).""" + try: + return self.model_class.objects.get(mongo_id=id) + except ObjectDoesNotExist: + return None + + def get_all(self, filters: Optional[Dict[str, Any]] = None, skip: int = 0, limit: int = 100) -> List[Any]: + """Get all records with optional filtering and pagination.""" + queryset = self.model_class.objects.all() + + if filters: + queryset = self._apply_filters(queryset, filters) + + return list(queryset[skip : skip + limit]) + + def update(self, id: str, data: Dict[str, Any]) -> Optional[Any]: + """Update a record by ID.""" + try: + instance = self.model_class.objects.get(mongo_id=id) + for field, value in data.items(): + if hasattr(instance, field): + setattr(instance, field, value) + instance.save() + return instance + except ObjectDoesNotExist: + return None + + def delete(self, id: str) -> bool: + """Delete a record by ID.""" + try: + instance = self.model_class.objects.get(mongo_id=id) + instance.delete() + return True + except ObjectDoesNotExist: + return False + + def count(self, filters: Optional[Dict[str, Any]] = None) -> int: + """Count records with optional filtering.""" + queryset = self.model_class.objects.all() + + if filters: + queryset = self._apply_filters(queryset, filters) + + return queryset.count() + + def exists(self, id: str) -> bool: + """Check if a record exists by ID.""" + return self.model_class.objects.filter(mongo_id=id).exists() + + def _apply_filters(self, queryset, filters: Dict[str, Any]): + """Apply filters to a queryset.""" + for field, value in filters.items(): + if hasattr(self.model_class, field): + if isinstance(value, dict): + # Handle complex filters like {'gte': value, 'lte': value} + for operator, operator_value in value.items(): + if operator == "gte": + queryset = queryset.filter(**{f"{field}__gte": operator_value}) + elif operator == "lte": + queryset = queryset.filter(**{f"{field}__lte": operator_value}) + elif operator == "contains": + queryset = queryset.filter(**{f"{field}__icontains": operator_value}) + elif operator == "in": + queryset = queryset.filter(**{f"{field}__in": operator_value}) + else: + queryset = queryset.filter(**{field: value}) + + return queryset + + +class PostgresUserRepository(BasePostgresRepository, AbstractRepository): + """Postgres repository for user operations.""" + + def __init__(self): + super().__init__(PostgresUser) + + def get_by_email(self, email: str) -> Optional[PostgresUser]: + """Get user by email address.""" + try: + return PostgresUser.objects.get(email_id=email) + except ObjectDoesNotExist: + return None + + def get_by_google_id(self, google_id: str) -> Optional[PostgresUser]: + """Get user by Google ID.""" + try: + return PostgresUser.objects.get(google_id=google_id) + except ObjectDoesNotExist: + return None + + +class PostgresTaskRepository(BasePostgresRepository, AbstractRepository): + """Postgres repository for task operations.""" + + def __init__(self): + super().__init__(PostgresTask) + + def get_by_user( + self, user_id: str, filters: Optional[Dict[str, Any]] = None, skip: int = 0, limit: int = 100 + ) -> List[PostgresTask]: + """Get tasks by user ID.""" + queryset = PostgresTask.objects.filter(created_by=user_id) + + if filters: + queryset = self._apply_filters(queryset, filters) + + return list(queryset[skip : skip + limit]) + + def get_by_team( + self, team_id: str, filters: Optional[Dict[str, Any]] = None, skip: int = 0, limit: int = 100 + ) -> List[PostgresTask]: + """Get tasks by team ID.""" + # This would need to be implemented based on your team-task relationship + # For now, returning empty list + return [] + + def get_by_status( + self, status: str, filters: Optional[Dict[str, Any]] = None, skip: int = 0, limit: int = 100 + ) -> List[PostgresTask]: + """Get tasks by status.""" + queryset = PostgresTask.objects.filter(status=status) + + if filters: + queryset = self._apply_filters(queryset, filters) + + return list(queryset[skip : skip + limit]) + + def get_by_priority( + self, priority: str, filters: Optional[Dict[str, Any]] = None, skip: int = 0, limit: int = 100 + ) -> List[PostgresTask]: + """Get tasks by priority.""" + queryset = PostgresTask.objects.filter(priority=priority) + + if filters: + queryset = self._apply_filters(queryset, filters) + + return list(queryset[skip : skip + limit]) + + +class PostgresTeamRepository(BasePostgresRepository, AbstractRepository): + """Postgres repository for team operations.""" + + def __init__(self): + super().__init__(PostgresTeam) + + def get_by_invite_code(self, invite_code: str) -> Optional[PostgresTeam]: + """Get team by invite code.""" + try: + return PostgresTeam.objects.get(invite_code=invite_code) + except ObjectDoesNotExist: + return None + + def get_by_user(self, user_id: str) -> List[PostgresTeam]: + """Get teams by user ID.""" + # Get teams where user is a member + user_teams = PostgresUserTeamDetails.objects.filter(user_id=user_id, is_active=True).values_list( + "team_id", flat=True + ) + + return list(PostgresTeam.objects.filter(mongo_id__in=user_teams)) + + +class PostgresLabelRepository(BasePostgresRepository, AbstractRepository): + """Postgres repository for label operations.""" + + def __init__(self): + super().__init__(PostgresLabel) + + def get_by_name(self, name: str) -> Optional[PostgresLabel]: + """Get label by name.""" + try: + return PostgresLabel.objects.get(name=name) + except ObjectDoesNotExist: + return None + + +class PostgresRoleRepository(BasePostgresRepository, AbstractRepository): + """Postgres repository for role operations.""" + + def __init__(self): + super().__init__(PostgresRole) + + def get_by_name(self, name: str) -> Optional[PostgresRole]: + """Get role by name.""" + try: + return PostgresRole.objects.get(name=name) + except ObjectDoesNotExist: + return None + + +class PostgresTaskAssignmentRepository(BasePostgresRepository, AbstractRepository): + """Postgres repository for task assignment operations.""" + + def __init__(self): + super().__init__(PostgresTaskAssignment) + + def get_by_task(self, task_id: str) -> List[PostgresTaskAssignment]: + """Get assignments by task ID.""" + return list(PostgresTaskAssignment.objects.filter(task_mongo_id=task_id)) + + def get_by_user(self, user_id: str) -> List[PostgresTaskAssignment]: + """Get assignments by user ID.""" + return list(PostgresTaskAssignment.objects.filter(user_mongo_id=user_id)) + + def get_by_team(self, team_id: str) -> List[PostgresTaskAssignment]: + """Get assignments by team ID.""" + return list(PostgresTaskAssignment.objects.filter(team_mongo_id=team_id)) + + +class PostgresWatchlistRepository(BasePostgresRepository, AbstractRepository): + """Postgres repository for watchlist operations.""" + + def __init__(self): + super().__init__(PostgresWatchlist) + + def get_by_user(self, user_id: str) -> List[PostgresWatchlist]: + """Get watchlists by user ID.""" + return list(PostgresWatchlist.objects.filter(user_mongo_id=user_id)) + + +class PostgresUserRoleRepository(BasePostgresRepository, AbstractRepository): + """Postgres repository for user role operations.""" + + def __init__(self): + super().__init__(PostgresUserRole) + + def get_by_user(self, user_id: str) -> List[PostgresUserRole]: + """Get user roles by user ID.""" + return list(PostgresUserRole.objects.filter(user_mongo_id=user_id)) + + def get_by_team(self, team_id: str) -> List[PostgresUserRole]: + """Get user roles by team ID.""" + return list(PostgresUserRole.objects.filter(team_mongo_id=team_id)) + + +class PostgresUserTeamDetailsRepository(BasePostgresRepository, AbstractRepository): + """Postgres repository for user team details operations.""" + + def __init__(self): + super().__init__(PostgresUserTeamDetails) + + def get_by_user(self, user_id: str) -> List[PostgresUserTeamDetails]: + """Get user team details by user ID.""" + return list(PostgresUserTeamDetails.objects.filter(user_id=user_id)) + + def get_by_team(self, team_id: str) -> List[PostgresUserTeamDetails]: + """Get user team details by team ID.""" + return list(PostgresUserTeamDetails.objects.filter(team_id=team_id)) + + +class PostgresAuditLogRepository(BasePostgresRepository, AbstractRepository): + """Postgres repository for audit log operations.""" + + def __init__(self): + super().__init__(PostgresAuditLog) + + def get_by_user(self, user_id: str, skip: int = 0, limit: int = 100) -> List[PostgresAuditLog]: + """Get audit logs by user ID.""" + return list(PostgresAuditLog.objects.filter(user_mongo_id=user_id)[skip : skip + limit]) + + def get_by_collection(self, collection_name: str, skip: int = 0, limit: int = 100) -> List[PostgresAuditLog]: + """Get audit logs by collection name.""" + return list(PostgresAuditLog.objects.filter(collection_name=collection_name)[skip : skip + limit]) + + def get_by_action(self, action: str, skip: int = 0, limit: int = 100) -> List[PostgresAuditLog]: + """Get audit logs by action.""" + return list(PostgresAuditLog.objects.filter(action=action)[skip : skip + limit]) diff --git a/todo/repositories/task_assignment_repository.py b/todo/repositories/task_assignment_repository.py index fdbf0b1c..773d9328 100644 --- a/todo/repositories/task_assignment_repository.py +++ b/todo/repositories/task_assignment_repository.py @@ -6,6 +6,7 @@ from todo.models.task_assignment import TaskAssignmentModel from todo.repositories.common.mongo_repository import MongoRepository from todo.models.common.pyobjectid import PyObjectId +from todo.services.enhanced_dual_write_service import EnhancedDualWriteService class TaskAssignmentRepository(MongoRepository): @@ -13,9 +14,6 @@ class TaskAssignmentRepository(MongoRepository): @classmethod def create(cls, task_assignment: TaskAssignmentModel) -> TaskAssignmentModel: - """ - Creates a new task assignment. - """ collection = cls.get_collection() task_assignment.created_at = datetime.now(timezone.utc) task_assignment.updated_at = None @@ -23,6 +21,30 @@ def create(cls, task_assignment: TaskAssignmentModel) -> TaskAssignmentModel: task_assignment_dict = task_assignment.model_dump(mode="json", by_alias=True, exclude_none=True) insert_result = collection.insert_one(task_assignment_dict) task_assignment.id = insert_result.inserted_id + + dual_write_service = EnhancedDualWriteService() + task_assignment_data = { + "task_mongo_id": str(task_assignment.task_id), + "assignee_id": str(task_assignment.assignee_id), + "user_type": task_assignment.user_type, + "team_id": str(task_assignment.team_id) if task_assignment.team_id else None, + "is_active": task_assignment.is_active, + "created_at": task_assignment.created_at, + "updated_at": task_assignment.updated_at, + "created_by": str(task_assignment.created_by), + "updated_by": str(task_assignment.updated_by) if task_assignment.updated_by else None, + } + + dual_write_success = dual_write_service.create_document( + collection_name="task_assignments", data=task_assignment_data, mongo_id=str(task_assignment.id) + ) + + if not dual_write_success: + import logging + + logger = logging.getLogger(__name__) + logger.warning(f"Failed to sync task assignment {task_assignment.id} to Postgres") + return task_assignment @classmethod @@ -108,6 +130,31 @@ def update_assignment( }, ) + # Sync deactivation to PostgreSQL + if current_assignment: + dual_write_service = EnhancedDualWriteService() + deactivation_data = { + "task_mongo_id": str(current_assignment.task_id), + "assignee_id": str(current_assignment.assignee_id), + "user_type": current_assignment.user_type, + "team_id": str(current_assignment.team_id) if current_assignment.team_id else None, + "is_active": False, + "created_at": current_assignment.created_at, + "updated_at": datetime.now(timezone.utc), + "created_by": str(current_assignment.created_by), + "updated_by": str(user_id), + } + + dual_write_success = dual_write_service.update_document( + collection_name="task_assignments", data=deactivation_data, mongo_id=str(current_assignment.id) + ) + + if not dual_write_success: + import logging + + logger = logging.getLogger(__name__) + logger.warning(f"Failed to sync task assignment deactivation {current_assignment.id} to Postgres") + new_assignment = TaskAssignmentModel( _id=PyObjectId(), task_id=PyObjectId(task_id), @@ -124,11 +171,13 @@ def update_assignment( @classmethod def delete_assignment(cls, task_id: str, user_id: str) -> bool: - """ - Soft delete a task assignment by setting is_active to False. - """ collection = cls.get_collection() try: + # Get current assignment first + current_assignment = cls.get_by_task_id(task_id) + if not current_assignment: + return False + # Try with ObjectId first result = collection.update_one( {"task_id": ObjectId(task_id), "is_active": True}, @@ -152,17 +201,45 @@ def delete_assignment(cls, task_id: str, user_id: str) -> bool: } }, ) + + if result.modified_count > 0: + # Sync to PostgreSQL + dual_write_service = EnhancedDualWriteService() + assignment_data = { + "task_mongo_id": str(current_assignment.task_id), + "assignee_id": str(current_assignment.assignee_id), + "user_type": current_assignment.user_type, + "team_id": str(current_assignment.team_id) if current_assignment.team_id else None, + "is_active": False, + "created_at": current_assignment.created_at, + "updated_at": datetime.now(timezone.utc), + "created_by": str(current_assignment.created_by), + "updated_by": str(user_id), + } + + dual_write_success = dual_write_service.update_document( + collection_name="task_assignments", data=assignment_data, mongo_id=str(current_assignment.id) + ) + + if not dual_write_success: + import logging + + logger = logging.getLogger(__name__) + logger.warning(f"Failed to sync task assignment deletion {current_assignment.id} to Postgres") + return result.modified_count > 0 except Exception: return False @classmethod def update_executor(cls, task_id: str, executor_id: str, user_id: str) -> bool: - """ - Update the executor_id for the active assignment of the given task_id. - """ collection = cls.get_collection() try: + # Get current assignment first + current_assignment = cls.get_by_task_id(task_id) + if not current_assignment: + return False + result = collection.update_one( {"task_id": ObjectId(task_id), "is_active": True}, { @@ -187,17 +264,45 @@ def update_executor(cls, task_id: str, executor_id: str, user_id: str) -> bool: } }, ) + + if result.modified_count > 0: + # Sync to PostgreSQL + dual_write_service = EnhancedDualWriteService() + assignment_data = { + "task_mongo_id": str(current_assignment.task_id), + "assignee_id": str(executor_id), + "user_type": "user", + "team_id": str(current_assignment.team_id) if current_assignment.team_id else None, + "is_active": current_assignment.is_active, + "created_at": current_assignment.created_at, + "updated_at": datetime.now(timezone.utc), + "created_by": str(current_assignment.created_by), + "updated_by": str(user_id), + } + + dual_write_success = dual_write_service.update_document( + collection_name="task_assignments", data=assignment_data, mongo_id=str(current_assignment.id) + ) + + if not dual_write_success: + import logging + + logger = logging.getLogger(__name__) + logger.warning(f"Failed to sync task assignment update {current_assignment.id} to Postgres") + return result.modified_count > 0 except Exception: return False @classmethod def deactivate_by_task_id(cls, task_id: str, user_id: str) -> bool: - """ - Deactivate all assignments for a specific task by setting is_active to False. - """ collection = cls.get_collection() try: + # Get all active assignments for this task + active_assignments = cls.get_by_task_id(task_id) + if not active_assignments: + return False + # Try with ObjectId first result = collection.update_many( {"task_id": ObjectId(task_id), "is_active": True}, @@ -221,6 +326,32 @@ def deactivate_by_task_id(cls, task_id: str, user_id: str) -> bool: } }, ) + + if result.modified_count > 0: + # Sync to PostgreSQL for each assignment + dual_write_service = EnhancedDualWriteService() + assignment_data = { + "task_mongo_id": str(active_assignments.task_id), + "assignee_id": str(active_assignments.assignee_id), + "user_type": active_assignments.user_type, + "team_id": str(active_assignments.team_id) if active_assignments.team_id else None, + "is_active": False, + "created_at": active_assignments.created_at, + "updated_at": datetime.now(timezone.utc), + "created_by": str(active_assignments.created_by), + "updated_by": str(user_id), + } + + dual_write_success = dual_write_service.update_document( + collection_name="task_assignments", data=assignment_data, mongo_id=str(active_assignments.id) + ) + + if not dual_write_success: + import logging + + logger = logging.getLogger(__name__) + logger.warning(f"Failed to sync task assignment deactivation {active_assignments.id} to Postgres") + return result.modified_count > 0 except Exception: return False diff --git a/todo/repositories/task_repository.py b/todo/repositories/task_repository.py index b2bb862f..038d6540 100644 --- a/todo/repositories/task_repository.py +++ b/todo/repositories/task_repository.py @@ -16,6 +16,8 @@ TaskStatus, ) from todo.repositories.team_repository import UserTeamDetailsRepository +from todo.services.enhanced_dual_write_service import EnhancedDualWriteService +from todo.models.postgres import PostgresTask, PostgresDeferredDetails class TaskRepository(MongoRepository): @@ -206,10 +208,43 @@ def create(cls, task: TaskModel) -> TaskModel: task.createdAt = datetime.now(timezone.utc) task.updatedAt = None + # Ensure createdAt is properly set + if not task.createdAt: + task.createdAt = datetime.now(timezone.utc) + task_dict = task.model_dump(mode="json", by_alias=True, exclude_none=True) insert_result = tasks_collection.insert_one(task_dict, session=session) task.id = insert_result.inserted_id + + dual_write_service = EnhancedDualWriteService() + + task_data = { + "title": task.title, + "description": task.description, + "priority": task.priority, + "status": task.status, + "displayId": task.displayId, + "isAcknowledged": task.isAcknowledged, + "isDeleted": task.isDeleted, + "startedAt": task.startedAt, + "dueAt": task.dueAt, + "createdAt": task.createdAt or datetime.now(timezone.utc), + "updatedAt": task.updatedAt, + "createdBy": str(task.createdBy), + "updatedBy": str(task.updatedBy) if task.updatedBy else None, + } + + dual_write_success = dual_write_service.create_document( + collection_name="tasks", data=task_data, mongo_id=str(task.id) + ) + + if not dual_write_success: + import logging + + logger = logging.getLogger(__name__) + logger.warning(f"Failed to sync task {task.id} to Postgres") + return task except Exception as e: @@ -259,9 +294,6 @@ def delete_by_id(cls, task_id: ObjectId, user_id: str) -> TaskModel | None: @classmethod def update(cls, task_id: str, update_data: dict) -> TaskModel | None: - """ - Updates a specific task by its ID with the given data. - """ if not isinstance(update_data, dict): raise ValueError("update_data must be a dictionary.") @@ -281,7 +313,40 @@ def update(cls, task_id: str, update_data: dict) -> TaskModel | None: ) if updated_task_doc: - return TaskModel(**updated_task_doc) + task_model = TaskModel(**updated_task_doc) + + dual_write_service = EnhancedDualWriteService() + task_data = { + "title": task_model.title, + "description": task_model.description, + "priority": task_model.priority, + "status": task_model.status, + "displayId": task_model.displayId, + "isAcknowledged": task_model.isAcknowledged, + "isDeleted": task_model.isDeleted, + "startedAt": task_model.startedAt, + "dueAt": task_model.dueAt, + "createdAt": task_model.createdAt, + "updatedAt": task_model.updatedAt, + "createdBy": str(task_model.createdBy), + "updatedBy": str(task_model.updatedBy) if task_model.updatedBy else None, + } + + dual_write_success = dual_write_service.update_document( + collection_name="tasks", data=task_data, mongo_id=str(task_model.id) + ) + + if not dual_write_success: + import logging + + logger = logging.getLogger(__name__) + logger.warning(f"Failed to sync task update {task_model.id} to Postgres") + + # Handle deferred details if present in update_data + if "deferredDetails" in update_data: + cls._handle_deferred_details_sync(task_id, update_data["deferredDetails"]) + + return task_model return None @classmethod @@ -307,3 +372,30 @@ def get_by_ids(cls, task_ids: List[str]) -> List[TaskModel]: object_ids = [ObjectId(task_id) for task_id in task_ids] cursor = tasks_collection.find({"_id": {"$in": object_ids}}) return [TaskModel(**doc) for doc in cursor] + + @classmethod + def _handle_deferred_details_sync(cls, task_id: str, deferred_details: dict) -> None: + """Handle deferred details synchronization to PostgreSQL""" + try: + postgres_task = PostgresTask.objects.get(mongo_id=task_id) + + if deferred_details: + deferred_details_data = { + "task": postgres_task, + "deferred_at": deferred_details.get("deferredAt"), + "deferred_till": deferred_details.get("deferredTill"), + "deferred_by": str(deferred_details.get("deferredBy")), + } + + PostgresDeferredDetails.objects.update_or_create(task=postgres_task, defaults=deferred_details_data) + else: + # Remove deferred details if None + PostgresDeferredDetails.objects.filter(task=postgres_task).delete() + + except PostgresTask.DoesNotExist: + pass + except Exception as e: + import logging + + logger = logging.getLogger(__name__) + logger.warning(f"Failed to sync deferred details to PostgreSQL for task {task_id}: {str(e)}") diff --git a/todo/repositories/team_creation_invite_code_repository.py b/todo/repositories/team_creation_invite_code_repository.py index 34a46a12..09f3dea2 100644 --- a/todo/repositories/team_creation_invite_code_repository.py +++ b/todo/repositories/team_creation_invite_code_repository.py @@ -4,6 +4,7 @@ from todo.repositories.common.mongo_repository import MongoRepository from todo.models.team_creation_invite_code import TeamCreationInviteCodeModel from todo.repositories.user_repository import UserRepository +from todo.services.enhanced_dual_write_service import EnhancedDualWriteService class TeamCreationInviteCodeRepository(MongoRepository): @@ -32,19 +33,64 @@ def validate_and_consume_code(cls, code: str, used_by: str) -> Optional[dict]: {"$set": {"is_used": True, "used_by": used_by, "used_at": current_time.isoformat()}}, return_document=True, ) + + if result: + # Sync the update to PostgreSQL + dual_write_service = EnhancedDualWriteService() + invite_code_data = { + "code": result["code"], + "description": result.get("description"), + "is_used": True, + "created_by": str(result["created_by"]), + "used_by": str(used_by), + "created_at": result.get("created_at"), + "used_at": current_time, + } + + dual_write_success = dual_write_service.update_document( + collection_name="team_creation_invite_codes", data=invite_code_data, mongo_id=str(result["_id"]) + ) + + if not dual_write_success: + import logging + + logger = logging.getLogger(__name__) + logger.warning(f"Failed to sync team creation invite code update {result['_id']} to Postgres") + return result except Exception as e: raise Exception(f"Error validating and consuming code: {e}") @classmethod def create(cls, team_invite_code: TeamCreationInviteCodeModel) -> TeamCreationInviteCodeModel: - """Create a new team invite code.""" collection = cls.get_collection() team_invite_code.created_at = datetime.now(timezone.utc) code_dict = team_invite_code.model_dump(mode="json", by_alias=True, exclude_none=True) insert_result = collection.insert_one(code_dict) team_invite_code.id = insert_result.inserted_id + + dual_write_service = EnhancedDualWriteService() + invite_code_data = { + "code": team_invite_code.code, + "description": team_invite_code.description, + "is_used": team_invite_code.is_used, + "created_by": str(team_invite_code.created_by), + "used_by": str(team_invite_code.used_by) if team_invite_code.used_by else None, + "created_at": team_invite_code.created_at, + "used_at": team_invite_code.used_at, + } + + dual_write_success = dual_write_service.create_document( + collection_name="team_creation_invite_codes", data=invite_code_data, mongo_id=str(team_invite_code.id) + ) + + if not dual_write_success: + import logging + + logger = logging.getLogger(__name__) + logger.warning(f"Failed to sync team creation invite code {team_invite_code.id} to Postgres") + return team_invite_code @classmethod diff --git a/todo/repositories/team_repository.py b/todo/repositories/team_repository.py index 02f3b967..31769287 100644 --- a/todo/repositories/team_repository.py +++ b/todo/repositories/team_repository.py @@ -5,6 +5,7 @@ from todo.models.team import TeamModel, UserTeamDetailsModel from todo.repositories.common.mongo_repository import MongoRepository +from todo.services.enhanced_dual_write_service import EnhancedDualWriteService class TeamRepository(MongoRepository): @@ -22,6 +23,30 @@ def create(cls, team: TeamModel) -> TeamModel: team_dict = team.model_dump(mode="json", by_alias=True, exclude_none=True) insert_result = teams_collection.insert_one(team_dict) team.id = insert_result.inserted_id + + dual_write_service = EnhancedDualWriteService() + team_data = { + "name": team.name, + "description": team.description, + "invite_code": team.invite_code, + "poc_id": str(team.poc_id) if team.poc_id else None, + "created_by": str(team.created_by), + "updated_by": str(team.updated_by), + "is_deleted": team.is_deleted, + "created_at": team.created_at, + "updated_at": team.updated_at, + } + + dual_write_success = dual_write_service.create_document( + collection_name="teams", data=team_data, mongo_id=str(team.id) + ) + + if not dual_write_success: + import logging + + logger = logging.getLogger(__name__) + logger.warning(f"Failed to sync team {team.id} to Postgres") + return team @classmethod @@ -113,6 +138,28 @@ def create(cls, user_team: UserTeamDetailsModel) -> UserTeamDetailsModel: user_team_dict = user_team.model_dump(mode="json", by_alias=True, exclude_none=True) insert_result = collection.insert_one(user_team_dict) user_team.id = insert_result.inserted_id + + dual_write_service = EnhancedDualWriteService() + user_team_data = { + "user_id": str(user_team.user_id), + "team_id": str(user_team.team_id), + "created_by": str(user_team.created_by), + "updated_by": str(user_team.updated_by), + "is_active": user_team.is_active, + "created_at": user_team.created_at, + "updated_at": user_team.updated_at, + } + + dual_write_success = dual_write_service.create_document( + collection_name="user_team_details", data=user_team_data, mongo_id=str(user_team.id) + ) + + if not dual_write_success: + import logging + + logger = logging.getLogger(__name__) + logger.warning(f"Failed to sync user team details {user_team.id} to Postgres") + return user_team @classmethod @@ -136,6 +183,28 @@ def create_many(cls, user_teams: list[UserTeamDetailsModel]) -> list[UserTeamDet for i, user_team in enumerate(user_teams): user_team.id = insert_result.inserted_ids[i] + dual_write_service = EnhancedDualWriteService() + for user_team in user_teams: + user_team_data = { + "user_id": str(user_team.user_id), + "team_id": str(user_team.team_id), + "created_by": str(user_team.created_by), + "updated_by": str(user_team.updated_by), + "is_active": user_team.is_active, + "created_at": user_team.created_at, + "updated_at": user_team.updated_at, + } + + dual_write_success = dual_write_service.create_document( + collection_name="user_team_details", data=user_team_data, mongo_id=str(user_team.id) + ) + + if not dual_write_success: + import logging + + logger = logging.getLogger(__name__) + logger.warning(f"Failed to sync user team details {user_team.id} to Postgres") + return user_teams @classmethod @@ -208,6 +277,10 @@ def remove_user_from_team(cls, team_id: str, user_id: str, updated_by_user_id: s """ collection = cls.get_collection() try: + current_relationship = collection.find_one({"team_id": team_id, "user_id": user_id, "is_active": True}) + if not current_relationship: + return False + result = collection.update_one( {"team_id": team_id, "user_id": user_id, "is_active": True}, { @@ -218,6 +291,29 @@ def remove_user_from_team(cls, team_id: str, user_id: str, updated_by_user_id: s } }, ) + + if result.modified_count > 0: + dual_write_service = EnhancedDualWriteService() + user_team_data = { + "user_id": str(current_relationship["user_id"]), + "team_id": str(current_relationship["team_id"]), + "is_active": False, + "created_by": str(current_relationship["created_by"]), + "updated_by": str(updated_by_user_id), + "created_at": current_relationship["created_at"], + "updated_at": datetime.now(timezone.utc), + } + + dual_write_success = dual_write_service.update_document( + collection_name="user_team_details", data=user_team_data, mongo_id=str(current_relationship["_id"]) + ) + + if not dual_write_success: + import logging + + logger = logging.getLogger(__name__) + logger.warning(f"Failed to sync user team removal {current_relationship['_id']} to Postgres") + return result.modified_count > 0 except Exception: return False diff --git a/todo/repositories/user_repository.py b/todo/repositories/user_repository.py index 5ca60cf9..a4e2ffe0 100644 --- a/todo/repositories/user_repository.py +++ b/todo/repositories/user_repository.py @@ -8,6 +8,7 @@ from todo_project.db.config import DatabaseManager from todo.constants.messages import RepositoryErrors from todo.exceptions.auth_exceptions import UserNotFoundException, APIException +from todo.services.enhanced_dual_write_service import EnhancedDualWriteService class UserRepository: @@ -67,7 +68,29 @@ def create_or_update(cls, user_data: dict) -> UserModel: if not result: raise APIException(RepositoryErrors.USER_OPERATION_FAILED) - return UserModel(**result) + user_model = UserModel(**result) + + dual_write_service = EnhancedDualWriteService() + user_data_for_postgres = { + "name": user_model.name, + "email_id": user_model.email_id, + "google_id": user_model.google_id, + "picture": user_model.picture, + "created_at": user_model.created_at, + "updated_at": user_model.updated_at, + } + + dual_write_success = dual_write_service.create_document( + collection_name="users", data=user_data_for_postgres, mongo_id=str(user_model.id) + ) + + if not dual_write_success: + import logging + + logger = logging.getLogger(__name__) + logger.warning(f"Failed to sync user {user_model.id} to Postgres") + + return user_model except Exception as e: if isinstance(e, APIException): diff --git a/todo/repositories/user_role_repository.py b/todo/repositories/user_role_repository.py index 36db79db..c5c93afc 100644 --- a/todo/repositories/user_role_repository.py +++ b/todo/repositories/user_role_repository.py @@ -6,6 +6,7 @@ from todo.models.user_role import UserRoleModel from todo.repositories.common.mongo_repository import MongoRepository from todo.constants.role import RoleScope, RoleName +from todo.services.enhanced_dual_write_service import EnhancedDualWriteService logger = logging.getLogger(__name__) @@ -38,6 +39,28 @@ def create(cls, user_role: UserRoleModel) -> UserRoleModel: user_role_dict = user_role.model_dump(mode="json", by_alias=True, exclude_none=True) result = collection.insert_one(user_role_dict) user_role.id = result.inserted_id + + dual_write_service = EnhancedDualWriteService() + user_role_data = { + "user_id": user_role.user_id, + "role_name": user_role.role_name.value if hasattr(user_role.role_name, "value") else user_role.role_name, + "scope": user_role.scope.value if hasattr(user_role.scope, "value") else user_role.scope, + "team_id": user_role.team_id, + "is_active": user_role.is_active, + "created_at": user_role.created_at, + "created_by": user_role.created_by, + } + + dual_write_success = dual_write_service.create_document( + collection_name="user_roles", data=user_role_data, mongo_id=str(user_role.id) + ) + + if not dual_write_success: + import logging + + logger = logging.getLogger(__name__) + logger.warning(f"Failed to sync user role {user_role.id} to Postgres") + return user_role @classmethod @@ -65,6 +88,27 @@ def get_user_roles( roles.append(UserRoleModel(**doc)) return roles + @classmethod + def get_by_user_role_scope_team(cls, user_id: str, role_id: str, scope: str, team_id: Optional[str] = None): + collection = cls.get_collection() + + try: + object_id = ObjectId(role_id) + except Exception: + return None + + query = {"_id": object_id, "user_id": user_id, "scope": scope, "is_active": True} + + if scope == "TEAM" and team_id: + query["team_id"] = team_id + elif scope == "GLOBAL": + query["team_id"] = None + + result = collection.find_one(query) + if result: + return UserRoleModel(**result) + return None + @classmethod def assign_role( cls, user_id: str, role_name: "RoleName", scope: "RoleScope", team_id: Optional[str] = None @@ -90,6 +134,32 @@ def remove_role_by_id(cls, user_id: str, role_id: str, scope: str, team_id: Opti elif scope == "GLOBAL": query["team_id"] = None + current_role = collection.find_one(query) + if not current_role: + return False + result = collection.update_one(query, {"$set": {"is_active": False}}) + if result.modified_count > 0: + dual_write_service = EnhancedDualWriteService() + user_role_data = { + "user_id": str(current_role["user_id"]), + "role_name": current_role["role_name"], + "scope": current_role["scope"], + "team_id": str(current_role["team_id"]) if current_role.get("team_id") else None, + "is_active": False, + "created_at": current_role["created_at"], + "created_by": str(current_role["created_by"]), + } + + dual_write_success = dual_write_service.update_document( + collection_name="user_roles", data=user_role_data, mongo_id=str(current_role["_id"]) + ) + + if not dual_write_success: + import logging + + logger = logging.getLogger(__name__) + logger.warning(f"Failed to sync user role removal {current_role['_id']} to Postgres") + return result.modified_count > 0 diff --git a/todo/repositories/user_team_details_repository.py b/todo/repositories/user_team_details_repository.py index d65689e5..e44c2935 100644 --- a/todo/repositories/user_team_details_repository.py +++ b/todo/repositories/user_team_details_repository.py @@ -1,10 +1,36 @@ from bson import ObjectId from todo.repositories.common.mongo_repository import MongoRepository +from todo.services.enhanced_dual_write_service import EnhancedDualWriteService class UserTeamDetailsRepository(MongoRepository): collection_name = "user_team_details" + @classmethod + def get_by_user_and_team(cls, user_id: str, team_id: str): + collection = cls.get_collection() + try: + user_id_obj = ObjectId(user_id) + except Exception: + user_id_obj = user_id + try: + team_id_obj = ObjectId(team_id) + except Exception: + team_id_obj = team_id + + queries = [ + {"user_id": user_id_obj, "team_id": team_id_obj}, + {"user_id": user_id, "team_id": team_id_obj}, + {"user_id": user_id_obj, "team_id": team_id}, + {"user_id": user_id, "team_id": team_id}, + ] + + for query in queries: + result = collection.find_one(query) + if result: + return result + return None + @classmethod def remove_member_from_team(cls, user_id: str, team_id: str) -> bool: collection = cls.get_collection() @@ -23,9 +49,20 @@ def remove_member_from_team(cls, user_id: str, team_id: str) -> bool: {"user_id": user_id, "team_id": team_id}, ] for query in queries: - print(f"DEBUG: Trying user_team_details delete query: {query}") - result = collection.delete_one(query) - print(f"DEBUG: delete_one result: deleted={result.deleted_count}") - if result.deleted_count > 0: - return True + document = collection.find_one(query) + if document: + result = collection.delete_one(query) + if result.deleted_count > 0: + dual_write_service = EnhancedDualWriteService() + dual_write_success = dual_write_service.delete_document( + collection_name="user_team_details", mongo_id=str(document["_id"]) + ) + + if not dual_write_success: + import logging + + logger = logging.getLogger(__name__) + logger.warning(f"Failed to sync user team details deletion {document['_id']} to Postgres") + + return True return False diff --git a/todo/repositories/watchlist_repository.py b/todo/repositories/watchlist_repository.py index 7a33e7a1..36625216 100644 --- a/todo/repositories/watchlist_repository.py +++ b/todo/repositories/watchlist_repository.py @@ -6,6 +6,7 @@ from todo.models.watchlist import WatchlistModel from todo.dto.watchlist_dto import WatchlistDTO from bson import ObjectId +from todo.services.enhanced_dual_write_service import EnhancedDualWriteService def _convert_objectids_to_str(obj): @@ -39,6 +40,28 @@ def create(cls, watchlist_model: WatchlistModel) -> WatchlistModel: doc.pop("_id", None) insert_result = cls.get_collection().insert_one(doc) watchlist_model.id = str(insert_result.inserted_id) + + dual_write_service = EnhancedDualWriteService() + watchlist_data = { + "task_id": str(watchlist_model.taskId), + "user_id": str(watchlist_model.userId), + "is_active": watchlist_model.isActive, + "created_by": str(watchlist_model.createdBy), + "created_at": watchlist_model.createdAt, + "updated_by": str(watchlist_model.updatedBy) if watchlist_model.updatedBy else None, + "updated_at": watchlist_model.updatedAt, + } + + dual_write_success = dual_write_service.create_document( + collection_name="watchlists", data=watchlist_data, mongo_id=str(watchlist_model.id) + ) + + if not dual_write_success: + import logging + + logger = logging.getLogger(__name__) + logger.warning(f"Failed to sync watchlist {watchlist_model.id} to Postgres") + return watchlist_model @classmethod @@ -306,6 +329,11 @@ def update(cls, taskId: ObjectId, isActive: bool, userId: ObjectId) -> dict: Update the watchlist status of a task. """ watchlist_collection = cls.get_collection() + + current_watchlist = cls.get_by_user_and_task(str(userId), str(taskId)) + if not current_watchlist: + return None + update_result = watchlist_collection.update_one( {"userId": str(userId), "taskId": str(taskId)}, { @@ -317,6 +345,28 @@ def update(cls, taskId: ObjectId, isActive: bool, userId: ObjectId) -> dict: }, ) + if update_result.modified_count > 0: + dual_write_service = EnhancedDualWriteService() + watchlist_data = { + "task_id": str(current_watchlist.taskId), + "user_id": str(current_watchlist.userId), + "is_active": isActive, + "created_by": str(current_watchlist.createdBy), + "created_at": current_watchlist.createdAt, + "updated_by": str(userId), + "updated_at": datetime.now(timezone.utc), + } + + dual_write_success = dual_write_service.update_document( + collection_name="watchlists", data=watchlist_data, mongo_id=str(current_watchlist.id) + ) + + if not dual_write_success: + import logging + + logger = logging.getLogger(__name__) + logger.warning(f"Failed to sync watchlist update {current_watchlist.id} to Postgres") + if update_result.modified_count == 0: return None return update_result diff --git a/todo/services/dual_write_service.py b/todo/services/dual_write_service.py new file mode 100644 index 00000000..a1c1dbc0 --- /dev/null +++ b/todo/services/dual_write_service.py @@ -0,0 +1,500 @@ +import logging +from typing import Any, Dict, List +from django.db import transaction +from django.utils import timezone + +from todo.models.postgres import ( + PostgresUser, + PostgresTask, + PostgresTaskLabel, + PostgresTeam, + PostgresUserTeamDetails, + PostgresLabel, + PostgresRole, + PostgresTaskAssignment, + PostgresWatchlist, + PostgresUserRole, + PostgresAuditLog, + PostgresTeamCreationInviteCode, +) + +logger = logging.getLogger(__name__) + + +class DualWriteService: + """ + Service for dual-write operations to MongoDB and Postgres. + Ensures data consistency across both databases. + """ + + # Mapping of MongoDB collection names to Postgres models + COLLECTION_MODEL_MAP = { + "users": PostgresUser, + "tasks": PostgresTask, + "teams": PostgresTeam, + "labels": PostgresLabel, + "roles": PostgresRole, + "task_assignments": PostgresTaskAssignment, + "watchlists": PostgresWatchlist, + "user_team_details": PostgresUserTeamDetails, + "user_roles": PostgresUserRole, + "audit_logs": PostgresAuditLog, + "team_creation_invite_codes": PostgresTeamCreationInviteCode, + } + + def __init__(self): + self.sync_failures = [] + + def create_document(self, collection_name: str, data: Dict[str, Any], mongo_id: str) -> bool: + """ + Create a document in both MongoDB and Postgres. + + Args: + collection_name: Name of the MongoDB collection + data: Document data + mongo_id: MongoDB ObjectId as string + + Returns: + bool: True if both writes succeeded, False otherwise + """ + try: + # First, write to MongoDB (this should already be done by the calling code) + # Then, write to Postgres + postgres_model = self._get_postgres_model(collection_name) + if not postgres_model: + logger.error(f"No Postgres model found for collection: {collection_name}") + return False + + # Transform data for Postgres + postgres_data = self._transform_data_for_postgres(collection_name, data, mongo_id) + + # Write to Postgres + with transaction.atomic(): + # Extract labels before creating the task + labels = postgres_data.pop("labels", []) if collection_name == "tasks" else [] + + postgres_instance = postgres_model.objects.create(**postgres_data) + + # Handle labels for tasks + if collection_name == "tasks" and labels: + self._sync_task_labels(postgres_instance, labels) + + logger.info(f"Successfully synced {collection_name}:{mongo_id} to Postgres") + return True + + except Exception as e: + error_msg = f"Failed to sync {collection_name}:{mongo_id} to Postgres: {str(e)}" + logger.error(error_msg) + self._record_sync_failure(collection_name, mongo_id, error_msg) + return False + + def update_document(self, collection_name: str, mongo_id: str, data: Dict[str, Any]) -> bool: + """ + Update a document in both MongoDB and Postgres. + + Args: + collection_name: Name of the MongoDB collection + mongo_id: MongoDB ObjectId as string + data: Updated document data + + Returns: + bool: True if both updates succeeded, False otherwise + """ + try: + postgres_model = self._get_postgres_model(collection_name) + if not postgres_model: + logger.error(f"No Postgres model found for collection: {collection_name}") + return False + + # Transform data for Postgres + postgres_data = self._transform_data_for_postgres(collection_name, data, mongo_id) + + # Update in Postgres + with transaction.atomic(): + # Extract labels before updating the task + labels = postgres_data.pop("labels", []) if collection_name == "tasks" else [] + + postgres_instance = postgres_model.objects.get(mongo_id=mongo_id) + preserve_fields = {"created_at", "mongo_id"} + + for field, value in postgres_data.items(): + if hasattr(postgres_instance, field) and field not in preserve_fields: + setattr(postgres_instance, field, value) + + postgres_instance.sync_status = "SYNCED" + postgres_instance.sync_error = None + postgres_instance.save() + + # Handle labels for tasks + if collection_name == "tasks": + self._sync_task_labels(postgres_instance, labels) + + logger.info(f"Successfully updated {collection_name}:{mongo_id} in Postgres") + return True + + except postgres_model.DoesNotExist: + # Document doesn't exist in Postgres, create it + return self.create_document(collection_name, data, mongo_id) + except Exception as e: + error_msg = f"Failed to update {collection_name}:{mongo_id} in Postgres: {str(e)}" + logger.error(error_msg) + self._record_sync_failure(collection_name, mongo_id, error_msg) + return False + + def delete_document(self, collection_name: str, mongo_id: str) -> bool: + """ + Delete a document from both MongoDB and Postgres. + + Args: + collection_name: Name of the MongoDB collection + mongo_id: MongoDB ObjectId as string + + Returns: + bool: True if both deletes succeeded, False otherwise + """ + try: + postgres_model = self._get_postgres_model(collection_name) + if not postgres_model: + logger.error(f"No Postgres model found for collection: {collection_name}") + return False + + # Soft delete in Postgres (mark as deleted) + with transaction.atomic(): + postgres_instance = postgres_model.objects.get(mongo_id=mongo_id) + if hasattr(postgres_instance, "is_deleted"): + postgres_instance.is_deleted = True + postgres_instance.sync_status = "SYNCED" + postgres_instance.sync_error = None + postgres_instance.save() + else: + # If no soft delete field, actually delete the record + postgres_instance.delete() + + logger.info(f"Successfully deleted {collection_name}:{mongo_id} from Postgres") + return True + + except postgres_model.DoesNotExist: + logger.warning(f"Document {collection_name}:{mongo_id} not found in Postgres for deletion") + return True # Consider this a success since the goal is achieved + except Exception as e: + error_msg = f"Failed to delete {collection_name}:{mongo_id} from Postgres: {str(e)}" + logger.error(error_msg) + self._record_sync_failure(collection_name, mongo_id, error_msg) + return False + + def _get_postgres_model(self, collection_name: str): + """Get the corresponding Postgres model for a MongoDB collection.""" + return self.COLLECTION_MODEL_MAP.get(collection_name) + + def _transform_data_for_postgres(self, collection_name: str, data: Dict[str, Any], mongo_id: str) -> Dict[str, Any]: + """ + Transform MongoDB document data to Postgres model format. + + Args: + collection_name: Name of the MongoDB collection + data: MongoDB document data + mongo_id: MongoDB ObjectId as string + + Returns: + Dict: Transformed data for Postgres + """ + # Start with basic sync metadata + postgres_data = { + "mongo_id": mongo_id, + "sync_status": "SYNCED", + "sync_error": None, + } + + # Handle special cases for different collections + if collection_name == "tasks": + postgres_data.update(self._transform_task_data(data)) + elif collection_name == "teams": + postgres_data.update(self._transform_team_data(data)) + elif collection_name == "users": + postgres_data.update(self._transform_user_data(data)) + elif collection_name == "labels": + postgres_data.update(self._transform_label_data(data)) + elif collection_name == "roles": + postgres_data.update(self._transform_role_data(data)) + elif collection_name == "task_assignments": + postgres_data.update(self._transform_task_assignment_data(data)) + elif collection_name == "watchlists": + postgres_data.update(self._transform_watchlist_data(data)) + elif collection_name == "user_team_details": + postgres_data.update(self._transform_user_team_details_data(data)) + elif collection_name == "user_roles": + postgres_data.update(self._transform_user_role_data(data)) + elif collection_name == "audit_logs": + postgres_data.update(self._transform_audit_log_data(data)) + elif collection_name == "team_creation_invite_codes": + postgres_data.update(self._transform_team_creation_invite_code_data(data)) + else: + # Generic transformation for unknown collections + postgres_data.update(self._transform_generic_data(data)) + + return postgres_data + + def _transform_task_data(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Transform task data for Postgres.""" + # Handle priority enum conversion + priority = data.get("priority", 3) + if hasattr(priority, "value"): # If it's an enum, get its value + priority = priority.value + + # Handle status enum conversion + status = data.get("status", "TODO") + if hasattr(status, "value"): # If it's an enum, get its value + status = status.value + + return { + "display_id": data.get("displayId"), + "title": data.get("title"), + "description": data.get("description"), + "priority": priority, # Store as integer like MongoDB + "status": status, # Store as string value like MongoDB + "is_acknowledged": data.get("isAcknowledged", False), + "is_deleted": data.get("isDeleted", False), + "started_at": data.get("startedAt"), + "due_at": data.get("dueAt"), + "created_at": data.get("createdAt"), + "updated_at": data.get("updatedAt"), + "created_by": str(data.get("createdBy", "")), + "updated_by": str(data.get("updatedBy", "")) if data.get("updatedBy") else None, + "labels": data.get("labels", []), # Include labels for processing + } + + def _transform_team_data(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Transform team data for Postgres.""" + return { + "name": data.get("name"), + "description": data.get("description"), + "invite_code": data.get("invite_code"), + "poc_id": str(data.get("poc_id", "")) if data.get("poc_id") else None, + "created_by": str(data.get("created_by", "")), + "updated_by": str(data.get("updated_by", "")), + "is_deleted": data.get("is_deleted", False), + "created_at": data.get("created_at"), + "updated_at": data.get("updated_at"), + } + + def _transform_user_data(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Transform user data for Postgres.""" + return { + "google_id": data.get("google_id"), + "email_id": data.get("email_id"), + "name": data.get("name"), + "picture": data.get("picture"), + "created_at": data.get("created_at"), + "updated_at": data.get("updated_at"), + } + + def _transform_label_data(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Transform label data for Postgres.""" + return { + "name": data.get("name"), + "color": data.get("color", "#000000"), + "description": data.get("description"), + "created_at": data.get("createdAt"), + "updated_at": data.get("updatedAt"), + } + + def _transform_role_data(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Transform role data for Postgres.""" + return { + "name": data.get("name"), + "description": data.get("description"), + "permissions": data.get("permissions", {}), + "created_at": data.get("created_at"), + "updated_at": data.get("updated_at"), + } + + def _transform_task_assignment_data(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Transform task assignment data for Postgres.""" + return { + "task_mongo_id": str(data.get("task_mongo_id", "")), + "assignee_id": str(data.get("assignee_id", "")), + "user_type": data.get("user_type", "user"), + "team_id": str(data.get("team_id", "")) if data.get("team_id") else None, + "is_active": data.get("is_active", True), + "created_at": data.get("created_at"), + "updated_at": data.get("updated_at"), + "created_by": str(data.get("created_by", "")), + "updated_by": str(data.get("updated_by", "")) if data.get("updated_by") else None, + } + + def _transform_watchlist_data(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Transform watchlist data for Postgres.""" + return { + "task_id": str(data.get("task_id", "")), + "user_id": str(data.get("user_id", "")), + "is_active": data.get("is_active", True), + "created_by": str(data.get("created_by", "")), + "updated_by": str(data.get("updated_by", "")) if data.get("updated_by") else None, + "created_at": data.get("created_at"), + "updated_at": data.get("updated_at"), + } + + def _sync_task_labels(self, postgres_task, labels: List[str]): + """ + Sync task labels to PostgresTaskLabel junction table. + + Args: + postgres_task: PostgresTask instance + labels: List of label MongoDB ObjectIds as strings + """ + try: + # Clear existing labels for this task + PostgresTaskLabel.objects.filter(task=postgres_task).delete() + + # Add new labels + for label_mongo_id in labels: + if label_mongo_id: # Skip empty labels + PostgresTaskLabel.objects.create(task=postgres_task, label_mongo_id=str(label_mongo_id)) + + logger.info(f"Successfully synced {len(labels)} labels for task {postgres_task.mongo_id}") + + except Exception as e: + logger.error(f"Failed to sync labels for task {postgres_task.mongo_id}: {str(e)}") + # Don't fail the entire operation, just log the error + + def _sync_task_assignment_update(self, task_mongo_id: str, new_assignment_data: Dict[str, Any]): + """ + Handle task assignment updates by deactivating old records and creating new ones. + This mirrors MongoDB's approach of soft deletes. + + Args: + task_mongo_id: MongoDB ObjectId of the task as string + new_assignment_data: Data for the new assignment + """ + try: + # Deactivate all existing assignments for this task + PostgresTaskAssignment.objects.filter(task_mongo_id=task_mongo_id).update( + status="REJECTED", # Mark as rejected instead of deleting + updated_at=timezone.now(), + ) + + # Create new assignment + PostgresTaskAssignment.objects.create( + mongo_id=new_assignment_data.get("mongo_id"), + task_mongo_id=new_assignment_data.get("task_mongo_id"), + user_mongo_id=new_assignment_data.get("user_mongo_id"), + team_mongo_id=new_assignment_data.get("team_mongo_id"), + status=new_assignment_data.get("status", "ASSIGNED"), + assigned_at=new_assignment_data.get("assigned_at"), + started_at=new_assignment_data.get("started_at"), + completed_at=new_assignment_data.get("completed_at"), + created_at=new_assignment_data.get("created_at"), + updated_at=new_assignment_data.get("updated_at"), + assigned_by=new_assignment_data.get("assigned_by"), + updated_by=new_assignment_data.get("updated_by"), + ) + + logger.info(f"Successfully synced task assignment update for task {task_mongo_id}") + + except Exception as e: + logger.error(f"Failed to sync task assignment update for task {task_mongo_id}: {str(e)}") + # Don't fail the entire operation, just log the error + + def _transform_user_team_details_data(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Transform user team details data for Postgres.""" + return { + "user_id": str(data.get("user_id", "")), + "team_id": str(data.get("team_id", "")), + "is_active": data.get("is_active", True), + "created_by": str(data.get("created_by", "")), + "updated_by": str(data.get("updated_by", "")), + "created_at": data.get("created_at"), + "updated_at": data.get("updated_at"), + } + + def _transform_user_role_data(self, data: Dict[str, Any]) -> Dict[str, Any]: + return { + "user_id": str(data.get("user_id", "")), + "role_name": data.get("role_name"), + "scope": data.get("scope"), + "team_id": str(data.get("team_id", "")) if data.get("team_id") else None, + "is_active": data.get("is_active", True), + "created_at": data.get("created_at"), + "created_by": str(data.get("created_by", "")), + } + + def _transform_audit_log_data(self, data: Dict[str, Any]) -> Dict[str, Any]: + return { + "task_id": str(data.get("task_id", "")) if data.get("task_id") else None, + "team_id": str(data.get("team_id", "")) if data.get("team_id") else None, + "previous_executor_id": str(data.get("previous_executor_id", "")) + if data.get("previous_executor_id") + else None, + "new_executor_id": str(data.get("new_executor_id", "")) if data.get("new_executor_id") else None, + "spoc_id": str(data.get("spoc_id", "")) if data.get("spoc_id") else None, + "action": data.get("action"), + "timestamp": data.get("timestamp"), + "status_from": data.get("status_from"), + "status_to": data.get("status_to"), + "assignee_from": str(data.get("assignee_from", "")) if data.get("assignee_from") else None, + "assignee_to": str(data.get("assignee_to", "")) if data.get("assignee_to") else None, + "performed_by": str(data.get("performed_by", "")) if data.get("performed_by") else None, + } + + def _transform_team_creation_invite_code_data(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Transform team creation invite code data for Postgres.""" + return { + "code": data.get("code"), + "description": data.get("description"), + "created_by": str(data.get("created_by", "")), + "used_by": str(data.get("used_by", "")) if data.get("used_by") else None, + "is_used": data.get("is_used", False), + "created_at": data.get("created_at"), + "used_at": data.get("used_at"), + } + + def _transform_generic_data(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Generic transformation for unknown collections.""" + # Convert MongoDB field names to snake_case and handle basic types + transformed = {} + for key, value in data.items(): + if key == "_id": + continue # Skip MongoDB _id field + + # Convert camelCase to snake_case + snake_key = "".join(["_" + c.lower() if c.isupper() else c for c in key]).lstrip("_") + + # Handle ObjectId conversion + if hasattr(value, "__str__") and len(str(value)) == 24: + transformed[snake_key] = str(value) + else: + transformed[snake_key] = value + + return transformed + + def _record_sync_failure(self, collection_name: str, mongo_id: str, error: str): + """Record a sync failure for alerting purposes.""" + failure_record = { + "collection": collection_name, + "mongo_id": mongo_id, + "error": error, + "timestamp": timezone.now(), + } + self.sync_failures.append(failure_record) + + # Log the failure + logger.error(f"Sync failure recorded: {failure_record}") + + # TODO: Implement alerting mechanism (email, Slack, etc.) + self._send_alert(failure_record) + + def _send_alert(self, failure_record: Dict[str, Any]): + """Send alert for sync failure.""" + # TODO: Implement actual alerting (email, Slack, etc.) + logger.critical(f"ALERT: Sync failure detected - {failure_record}") + + # For now, just log. In production, this would send emails/Slack messages + pass + + def get_sync_failures(self) -> list: + """Get list of recent sync failures.""" + return self.sync_failures.copy() + + def clear_sync_failures(self): + """Clear the sync failures list.""" + self.sync_failures.clear() diff --git a/todo/services/enhanced_dual_write_service.py b/todo/services/enhanced_dual_write_service.py new file mode 100644 index 00000000..cc321880 --- /dev/null +++ b/todo/services/enhanced_dual_write_service.py @@ -0,0 +1,179 @@ +import logging +from typing import Any, Dict, Optional +from django.conf import settings + +from todo.services.dual_write_service import DualWriteService + +logger = logging.getLogger(__name__) + + +class EnhancedDualWriteService(DualWriteService): + """ + Enhanced dual-write service that provides additional functionality. + Extends the base DualWriteService with batch operations and enhanced monitoring. + """ + + def __init__(self): + super().__init__() + self.enabled = getattr(settings, "DUAL_WRITE_ENABLED", True) + + def create_document(self, collection_name: str, data: Dict[str, Any], mongo_id: str) -> bool: + """ + Create a document in both MongoDB and Postgres. + """ + if not self.enabled: + logger.debug("Dual-write is disabled, skipping Postgres sync") + return True + + return super().create_document(collection_name, data, mongo_id) + + def update_document(self, collection_name: str, mongo_id: str, data: Dict[str, Any]) -> bool: + """ + Update a document in both MongoDB and Postgres. + """ + if not self.enabled: + logger.debug("Dual-write is disabled, skipping Postgres sync") + return True + + return super().update_document(collection_name, mongo_id, data) + + def delete_document(self, collection_name: str, mongo_id: str) -> bool: + """ + Delete a document from both MongoDB and Postgres. + """ + if not self.enabled: + logger.debug("Dual-write is disabled, skipping Postgres sync") + return True + + return super().delete_document(collection_name, mongo_id) + + def batch_operations(self, operations: list) -> bool: + """ + Perform multiple operations in batch. + """ + if not self.enabled: + logger.debug("Dual-write is disabled, skipping Postgres sync") + return True + + return self._batch_operations_sync(operations) + + def _batch_operations_sync(self, operations: list) -> bool: + """Perform batch operations synchronously.""" + success_count = 0 + failure_count = 0 + + for op in operations: + try: + collection_name = op["collection_name"] + data = op.get("data", {}) + mongo_id = op["mongo_id"] + operation = op["operation"] + + if operation == "create": + success = super().create_document(collection_name, data, mongo_id) + elif operation == "update": + success = super().update_document(collection_name, mongo_id, data) + elif operation == "delete": + success = super().delete_document(collection_name, mongo_id) + else: + logger.error(f"Unknown operation: {operation}") + failure_count += 1 + continue + + if success: + success_count += 1 + else: + failure_count += 1 + + except Exception as e: + logger.error(f"Error processing operation {op}: {str(e)}") + failure_count += 1 + + logger.info(f"Batch sync completed. Success: {success_count}, Failures: {failure_count}") + return failure_count == 0 + + def get_sync_status(self, collection_name: str, mongo_id: str) -> Optional[str]: + """ + Get the sync status of a document in Postgres. + + Args: + collection_name: Name of the MongoDB collection + mongo_id: MongoDB ObjectId as string + + Returns: + str: Sync status or None if not found + """ + try: + postgres_model = self._get_postgres_model(collection_name) + if not postgres_model: + return None + + instance = postgres_model.objects.get(mongo_id=mongo_id) + return instance.sync_status + except postgres_model.DoesNotExist: + return None + except Exception as e: + logger.error(f"Error getting sync status for {collection_name}:{mongo_id}: {str(e)}") + return None + + def get_sync_metrics(self) -> Dict[str, Any]: + """ + Get metrics about sync operations. + + Returns: + Dict: Sync metrics + """ + try: + metrics = { + "total_failures": len(self.sync_failures), + "failures_by_collection": {}, + "recent_failures": self.sync_failures[-10:] if self.sync_failures else [], + "enabled": self.enabled, + } + + # Count failures by collection + for failure in self.sync_failures: + collection = failure["collection"] + if collection not in metrics["failures_by_collection"]: + metrics["failures_by_collection"][collection] = 0 + metrics["failures_by_collection"][collection] += 1 + + return metrics + except Exception as e: + logger.error(f"Error getting sync metrics: {str(e)}") + return {} + + def retry_failed_sync(self, collection_name: str, mongo_id: str) -> bool: + """ + Retry a failed sync operation. + + Args: + collection_name: Name of the MongoDB collection + mongo_id: MongoDB ObjectId as string + + Returns: + bool: True if retry was successful, False otherwise + """ + try: + # Find the failure record + failure_record = None + for failure in self.sync_failures: + if failure["collection"] == collection_name and failure["mongo_id"] == mongo_id: + failure_record = failure + break + + if not failure_record: + logger.warning(f"No failure record found for {collection_name}:{mongo_id}") + return False + + # Remove from failures list + self.sync_failures.remove(failure_record) + + # Retry the operation (this would need the original data) + # For now, just log the retry attempt + logger.info(f"Retrying sync for {collection_name}:{mongo_id}") + + return True + except Exception as e: + logger.error(f"Error retrying failed sync for {collection_name}:{mongo_id}: {str(e)}") + return False diff --git a/todo/services/postgres_sync_service.py b/todo/services/postgres_sync_service.py new file mode 100644 index 00000000..1e6a79be --- /dev/null +++ b/todo/services/postgres_sync_service.py @@ -0,0 +1,228 @@ +import logging +from django.db import connection +from django.conf import settings + +from todo_project.db.config import DatabaseManager +from todo.services.dual_write_service import DualWriteService + +logger = logging.getLogger(__name__) + + +class PostgresSyncService: + """ + Service to synchronize PostgreSQL tables with MongoDB data. + Checks if tables exist and copies data from MongoDB if needed. + Currently handles labels and roles tables only. + """ + + def __init__(self): + self.db_manager = DatabaseManager() + self.dual_write_service = DualWriteService() + self.enabled = getattr(settings, "POSTGRES_SYNC_ENABLED", True) + + def sync_all_tables(self) -> bool: + """ + Synchronize labels and roles PostgreSQL tables with MongoDB data. + + Returns: + bool: True if all syncs completed successfully, False otherwise + """ + if not self.enabled: + logger.info("PostgreSQL sync is disabled, skipping") + return True + + logger.info("Starting PostgreSQL table synchronization for labels and roles") + logger.info(f"PostgreSQL sync enabled: {self.enabled}") + + sync_operations = [ + ("labels", self._sync_labels_table), + ("roles", self._sync_roles_table), + ] + + success_count = 0 + total_operations = len(sync_operations) + + for table_name, sync_func in sync_operations: + try: + logger.info(f"Syncing table: {table_name}") + if sync_func(): + logger.info(f"Successfully synced table: {table_name}") + success_count += 1 + else: + logger.error(f"Failed to sync table: {table_name}") + except Exception as e: + logger.error(f"Error syncing table {table_name}: {str(e)}") + + logger.info(f"PostgreSQL sync completed - {success_count}/{total_operations} tables synced successfully") + return success_count == total_operations + + def _check_table_exists(self, table_name: str) -> bool: + """ + Check if a PostgreSQL table exists. + + Args: + table_name: Name of the table to check + + Returns: + bool: True if table exists, False otherwise + """ + try: + with connection.cursor() as cursor: + cursor.execute( + """ + SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_schema = 'public' + AND table_name = %s + ); + """, + [table_name], + ) + return cursor.fetchone()[0] + except Exception as e: + logger.error(f"Error checking if table {table_name} exists: {str(e)}") + return False + + def _get_mongo_collection_count(self, collection_name: str) -> int: + """ + Get the count of documents in a MongoDB collection. + + Args: + collection_name: Name of the MongoDB collection + + Returns: + int: Number of documents in the collection + """ + try: + collection = self.db_manager.get_collection(collection_name) + + # Labels use isDeleted field for soft deletes + if collection_name == "labels": + return collection.count_documents({"isDeleted": {"$ne": True}}) + else: + # For roles and other collections without soft delete, count all documents + return collection.count_documents({}) + + except Exception as e: + logger.error(f"Error getting count for collection {collection_name}: {str(e)}") + return 0 + + def _get_postgres_table_count(self, table_name: str) -> int: + """ + Get the count of records in a PostgreSQL table. + + Args: + table_name: Name of the PostgreSQL table + + Returns: + int: Number of records in the table + """ + try: + with connection.cursor() as cursor: + cursor.execute(f"SELECT COUNT(*) FROM {table_name};") + return cursor.fetchone()[0] + except Exception as e: + logger.error(f"Error getting count for table {table_name}: {str(e)}") + return 0 + + def _sync_labels_table(self) -> bool: + """Synchronize the labels table.""" + table_name = "postgres_labels" + + if not self._check_table_exists(table_name): + logger.warning(f"Table {table_name} does not exist, skipping sync") + return True + + mongo_count = self._get_mongo_collection_count("labels") + postgres_count = self._get_postgres_table_count(table_name) + + if postgres_count >= mongo_count: + logger.info(f"Labels table already has {postgres_count} records, MongoDB has {mongo_count}. Skipping sync.") + return True + + logger.info(f"Syncing labels: MongoDB has {mongo_count} records, PostgreSQL has {postgres_count} records") + logger.info(f"Will sync {mongo_count - postgres_count} labels to PostgreSQL") + + try: + collection = self.db_manager.get_collection("labels") + labels = collection.find({"isDeleted": {"$ne": True}}) + + synced_count = 0 + for label in labels: + try: + # Check if label already exists in PostgreSQL + from todo.models.postgres.label import PostgresLabel + + existing = PostgresLabel.objects.filter(mongo_id=str(label["_id"])).first() + if existing: + continue + + # Transform data for PostgreSQL + postgres_data = self.dual_write_service._transform_label_data(label) + postgres_data["mongo_id"] = str(label["_id"]) + postgres_data["sync_status"] = "SYNCED" + + logger.debug(f"Creating label in PostgreSQL: {postgres_data}") + + # Create in PostgreSQL + PostgresLabel.objects.create(**postgres_data) + synced_count += 1 + + except Exception as e: + logger.error(f"Error syncing label {label.get('_id')}: {str(e)}") + continue + + logger.info(f"Successfully synced {synced_count} labels to PostgreSQL") + return True + + except Exception as e: + logger.error(f"Error syncing labels table: {str(e)}") + return False + + def _sync_roles_table(self) -> bool: + """Synchronize the roles table.""" + table_name = "postgres_roles" + + if not self._check_table_exists(table_name): + logger.warning(f"Table {table_name} does not exist, skipping sync") + return True + + mongo_count = self._get_mongo_collection_count("roles") + postgres_count = self._get_postgres_table_count(table_name) + + if postgres_count >= mongo_count: + logger.info(f"Roles table already has {postgres_count} records, MongoDB has {mongo_count}. Skipping sync.") + return True + + logger.info(f"Syncing roles: MongoDB has {mongo_count} records, PostgreSQL has {postgres_count} records") + + try: + collection = self.db_manager.get_collection("roles") + roles = collection.find({}) + + synced_count = 0 + for role in roles: + try: + from todo.models.postgres.role import PostgresRole + + existing = PostgresRole.objects.filter(mongo_id=str(role["_id"])).first() + if existing: + continue + + postgres_data = self.dual_write_service._transform_role_data(role) + postgres_data["mongo_id"] = str(role["_id"]) + postgres_data["sync_status"] = "SYNCED" + + PostgresRole.objects.create(**postgres_data) + synced_count += 1 + + except Exception as e: + logger.error(f"Error syncing role {role.get('_id')}: {str(e)}") + continue + + logger.info(f"Successfully synced {synced_count} roles to PostgreSQL") + return True + + except Exception as e: + logger.error(f"Error syncing roles table: {str(e)}") + return False diff --git a/todo/services/task_assignment_service.py b/todo/services/task_assignment_service.py index 592c5579..5b530f6b 100644 --- a/todo/services/task_assignment_service.py +++ b/todo/services/task_assignment_service.py @@ -3,13 +3,13 @@ from todo.dto.task_assignment_dto import TaskAssignmentResponseDTO, CreateTaskAssignmentDTO from todo.dto.responses.create_task_assignment_response import CreateTaskAssignmentResponse from todo.models.common.pyobjectid import PyObjectId +from todo.models.task_assignment import TaskAssignmentModel from todo.repositories.task_assignment_repository import TaskAssignmentRepository from todo.repositories.task_repository import TaskRepository from todo.repositories.user_repository import UserRepository from todo.repositories.team_repository import TeamRepository from todo.exceptions.user_exceptions import UserNotFoundException from todo.exceptions.task_exceptions import TaskNotFoundException -from todo.models.task_assignment import TaskAssignmentModel from todo.dto.task_assignment_dto import TaskAssignmentDTO from todo.models.audit_log import AuditLogModel from todo.repositories.audit_log_repository import AuditLogRepository @@ -58,6 +58,7 @@ def create_task_assignment(cls, dto: CreateTaskAssignmentDTO, user_id: str) -> C if not updated_assignment: raise ValueError("Failed to update task assignment") assignment = updated_assignment + else: # Create new assignment task_assignment = TaskAssignmentModel( diff --git a/todo_project/__init__.py b/todo_project/__init__.py index 84a4d93e..25c41747 100644 --- a/todo_project/__init__.py +++ b/todo_project/__init__.py @@ -1 +1 @@ -# Added this because without this file Django isn't able to auto detect the test files +# Django project initialization diff --git a/todo_project/db/init.py b/todo_project/db/init.py index e19cecbc..63ceb2e2 100644 --- a/todo_project/db/init.py +++ b/todo_project/db/init.py @@ -2,6 +2,7 @@ import time from todo_project.db.config import DatabaseManager from todo_project.db.migrations import run_all_migrations +from todo.services.postgres_sync_service import PostgresSyncService logger = logging.getLogger(__name__) @@ -50,6 +51,16 @@ def initialize_database(max_retries=5, retry_delay=2): if not migrations_success: logger.warning("Some database migrations failed, but continuing with initialization") + try: + postgres_sync_service = PostgresSyncService() + postgres_sync_success = postgres_sync_service.sync_all_tables() + if not postgres_sync_success: + logger.warning("Some PostgreSQL table synchronizations failed, but continuing with initialization") + else: + logger.info("PostgreSQL table synchronization completed successfully") + except Exception as e: + logger.warning(f"PostgreSQL table synchronization failed: {str(e)}, but continuing with initialization") + logger.info("Database initialization completed successfully") return True except Exception as e: diff --git a/todo_project/settings/base.py b/todo_project/settings/base.py index 049117e2..49281c06 100644 --- a/todo_project/settings/base.py +++ b/todo_project/settings/base.py @@ -19,6 +19,13 @@ MONGODB_URI = os.getenv("MONGODB_URI") DB_NAME = os.getenv("DB_NAME") +# Postgres Configuration +POSTGRES_HOST = os.getenv("POSTGRES_HOST", "localhost") +POSTGRES_PORT = os.getenv("POSTGRES_PORT", "5432") +POSTGRES_DB = os.getenv("POSTGRES_DB", "todo_postgres") +POSTGRES_USER = os.getenv("POSTGRES_USER", "todo_user") +POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD", "todo_password") + INSTALLED_APPS = [ "django.contrib.staticfiles", "corsheaders", @@ -27,6 +34,9 @@ "todo", "django.contrib.auth", "django.contrib.contenttypes", + "django.contrib.sessions", + "django.contrib.messages", + "django.contrib.admin", ] MIDDLEWARE = [ @@ -34,6 +44,8 @@ "django.middleware.security.SecurityMiddleware", "django.contrib.sessions.middleware.SessionMiddleware", "django.middleware.csrf.CsrfViewMiddleware", + "django.contrib.auth.middleware.AuthenticationMiddleware", + "django.contrib.messages.middleware.MessageMiddleware", "django.middleware.common.CommonMiddleware", "todo.middlewares.jwt_auth.JWTAuthenticationMiddleware", "django.middleware.clickjacking.XFrameOptionsMiddleware", @@ -126,7 +138,7 @@ "PRIVATE_KEY": os.getenv("PRIVATE_KEY"), "PUBLIC_KEY": os.getenv("PUBLIC_KEY"), "ACCESS_TOKEN_LIFETIME": int(os.getenv("ACCESS_LIFETIME", "3600")), - "REFRESH_TOKEN_LIFETIME": int(os.getenv("REFRESH_LIFETIME", "604800")), + "REFRESH_TOKEN_LIFETIME": int(os.getenv("REFRESH_TOKEN_LIFETIME", "604800")), } COOKIE_SETTINGS = { @@ -149,12 +161,34 @@ }, } -DATABASES = { - "default": { - "ENGINE": "django.db.backends.sqlite3", - "NAME": BASE_DIR / "db.sqlite3", +# Database Configuration +# Only configure PostgreSQL if not in testing mode +if not TESTING: + DATABASES = { + "default": { + "ENGINE": "django.db.backends.postgresql", + "NAME": POSTGRES_DB, + "USER": POSTGRES_USER, + "PASSWORD": POSTGRES_PASSWORD, + "HOST": POSTGRES_HOST, + "PORT": POSTGRES_PORT, + "OPTIONS": { + "sslmode": "prefer", + }, + } } -} +else: + DATABASES = { + "default": { + "ENGINE": "django.db.backends.sqlite3", + "NAME": ":memory:", + } + } + +# Dual-Write Configuration +DUAL_WRITE_ENABLED = os.getenv("DUAL_WRITE_ENABLED", "True").lower() == "true" +DUAL_WRITE_RETRY_ATTEMPTS = int(os.getenv("DUAL_WRITE_RETRY_ATTEMPTS", "3")) +DUAL_WRITE_RETRY_DELAY = int(os.getenv("DUAL_WRITE_RETRY_DELAY", "5")) # seconds PUBLIC_PATHS = [ "/favicon.ico", diff --git a/todo_project/settings/test.py b/todo_project/settings/test.py new file mode 100644 index 00000000..8fd9be81 --- /dev/null +++ b/todo_project/settings/test.py @@ -0,0 +1,10 @@ +from .base import * + +DUAL_WRITE_ENABLED = False + +# Remove PostgreSQL database configuration for tests +# This prevents Django from trying to connect to PostgreSQL +DATABASES = {} + +# Use MongoDB only for tests +# The tests will use testcontainers to spin up their own MongoDB instance