Skip to content

Commit 017f5df

Browse files
committed
feat: add PostgreSQL synchronization service and management command for labels and roles
1 parent 58786ae commit 017f5df

File tree

6 files changed

+286
-7
lines changed

6 files changed

+286
-7
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from django.core.management.base import BaseCommand
2+
from todo.services.postgres_sync_service import PostgresSyncService
3+
4+
5+
class Command(BaseCommand):
6+
help = "Synchronize labels and roles PostgreSQL tables with MongoDB data"
7+
8+
def add_arguments(self, parser):
9+
parser.add_argument(
10+
"--force",
11+
action="store_true",
12+
help="Force sync even if tables already have data",
13+
)
14+
15+
def handle(self, *args, **options):
16+
self.stdout.write(self.style.SUCCESS("Starting PostgreSQL table synchronization for labels and roles..."))
17+
18+
try:
19+
postgres_sync_service = PostgresSyncService()
20+
21+
if options["force"]:
22+
self.stdout.write("Force sync enabled - will sync all tables regardless of existing data")
23+
24+
success = postgres_sync_service.sync_all_tables()
25+
26+
if success:
27+
self.stdout.write(self.style.SUCCESS("PostgreSQL table synchronization completed successfully!"))
28+
else:
29+
self.stdout.write(self.style.ERROR("Some PostgreSQL table synchronizations failed!"))
30+
except Exception as e:
31+
self.stdout.write(self.style.ERROR(f"PostgreSQL table synchronization failed: {str(e)}"))

todo/repositories/team_creation_invite_code_repository.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def validate_and_consume_code(cls, code: str, used_by: str) -> Optional[dict]:
3333
{"$set": {"is_used": True, "used_by": used_by, "used_at": current_time.isoformat()}},
3434
return_document=True,
3535
)
36-
36+
3737
if result:
3838
# Sync the update to PostgreSQL
3939
dual_write_service = EnhancedDualWriteService()
@@ -48,16 +48,15 @@ def validate_and_consume_code(cls, code: str, used_by: str) -> Optional[dict]:
4848
}
4949

5050
dual_write_success = dual_write_service.update_document(
51-
collection_name="team_creation_invite_codes",
52-
data=invite_code_data,
53-
mongo_id=str(result["_id"])
51+
collection_name="team_creation_invite_codes", data=invite_code_data, mongo_id=str(result["_id"])
5452
)
5553

5654
if not dual_write_success:
5755
import logging
56+
5857
logger = logging.getLogger(__name__)
5958
logger.warning(f"Failed to sync team creation invite code update {result['_id']} to Postgres")
60-
59+
6160
return result
6261
except Exception as e:
6362
raise Exception(f"Error validating and consuming code: {e}")

todo/services/dual_write_service.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -294,8 +294,8 @@ def _transform_label_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
294294
"name": data.get("name"),
295295
"color": data.get("color", "#000000"),
296296
"description": data.get("description"),
297-
"created_at": data.get("created_at"),
298-
"updated_at": data.get("updated_at"),
297+
"created_at": data.get("createdAt"),
298+
"updated_at": data.get("updatedAt"),
299299
}
300300

301301
def _transform_role_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
import logging
2+
from django.db import connection
3+
from django.conf import settings
4+
5+
from todo_project.db.config import DatabaseManager
6+
from todo.services.dual_write_service import DualWriteService
7+
8+
logger = logging.getLogger(__name__)
9+
10+
11+
class PostgresSyncService:
12+
"""
13+
Service to synchronize PostgreSQL tables with MongoDB data.
14+
Checks if tables exist and copies data from MongoDB if needed.
15+
Currently handles labels and roles tables only.
16+
"""
17+
18+
def __init__(self):
19+
self.db_manager = DatabaseManager()
20+
self.dual_write_service = DualWriteService()
21+
self.enabled = getattr(settings, "POSTGRES_SYNC_ENABLED", True)
22+
23+
def sync_all_tables(self) -> bool:
24+
"""
25+
Synchronize labels and roles PostgreSQL tables with MongoDB data.
26+
27+
Returns:
28+
bool: True if all syncs completed successfully, False otherwise
29+
"""
30+
if not self.enabled:
31+
logger.info("PostgreSQL sync is disabled, skipping")
32+
return True
33+
34+
logger.info("Starting PostgreSQL table synchronization for labels and roles")
35+
logger.info(f"PostgreSQL sync enabled: {self.enabled}")
36+
37+
sync_operations = [
38+
("labels", self._sync_labels_table),
39+
("roles", self._sync_roles_table),
40+
]
41+
42+
success_count = 0
43+
total_operations = len(sync_operations)
44+
45+
for table_name, sync_func in sync_operations:
46+
try:
47+
logger.info(f"Syncing table: {table_name}")
48+
if sync_func():
49+
logger.info(f"Successfully synced table: {table_name}")
50+
success_count += 1
51+
else:
52+
logger.error(f"Failed to sync table: {table_name}")
53+
except Exception as e:
54+
logger.error(f"Error syncing table {table_name}: {str(e)}")
55+
56+
logger.info(f"PostgreSQL sync completed - {success_count}/{total_operations} tables synced successfully")
57+
return success_count == total_operations
58+
59+
def _check_table_exists(self, table_name: str) -> bool:
60+
"""
61+
Check if a PostgreSQL table exists.
62+
63+
Args:
64+
table_name: Name of the table to check
65+
66+
Returns:
67+
bool: True if table exists, False otherwise
68+
"""
69+
try:
70+
with connection.cursor() as cursor:
71+
cursor.execute(
72+
"""
73+
SELECT EXISTS (
74+
SELECT FROM information_schema.tables
75+
WHERE table_schema = 'public'
76+
AND table_name = %s
77+
);
78+
""",
79+
[table_name],
80+
)
81+
return cursor.fetchone()[0]
82+
except Exception as e:
83+
logger.error(f"Error checking if table {table_name} exists: {str(e)}")
84+
return False
85+
86+
def _get_mongo_collection_count(self, collection_name: str) -> int:
87+
"""
88+
Get the count of documents in a MongoDB collection.
89+
90+
Args:
91+
collection_name: Name of the MongoDB collection
92+
93+
Returns:
94+
int: Number of documents in the collection
95+
"""
96+
try:
97+
collection = self.db_manager.get_collection(collection_name)
98+
99+
# Labels use isDeleted field for soft deletes
100+
if collection_name == "labels":
101+
return collection.count_documents({"isDeleted": {"$ne": True}})
102+
else:
103+
# For roles and other collections without soft delete, count all documents
104+
return collection.count_documents({})
105+
106+
except Exception as e:
107+
logger.error(f"Error getting count for collection {collection_name}: {str(e)}")
108+
return 0
109+
110+
def _get_postgres_table_count(self, table_name: str) -> int:
111+
"""
112+
Get the count of records in a PostgreSQL table.
113+
114+
Args:
115+
table_name: Name of the PostgreSQL table
116+
117+
Returns:
118+
int: Number of records in the table
119+
"""
120+
try:
121+
with connection.cursor() as cursor:
122+
cursor.execute(f"SELECT COUNT(*) FROM {table_name};")
123+
return cursor.fetchone()[0]
124+
except Exception as e:
125+
logger.error(f"Error getting count for table {table_name}: {str(e)}")
126+
return 0
127+
128+
def _sync_labels_table(self) -> bool:
129+
"""Synchronize the labels table."""
130+
table_name = "postgres_labels"
131+
132+
if not self._check_table_exists(table_name):
133+
logger.warning(f"Table {table_name} does not exist, skipping sync")
134+
return True
135+
136+
mongo_count = self._get_mongo_collection_count("labels")
137+
postgres_count = self._get_postgres_table_count(table_name)
138+
139+
if postgres_count >= mongo_count:
140+
logger.info(f"Labels table already has {postgres_count} records, MongoDB has {mongo_count}. Skipping sync.")
141+
return True
142+
143+
logger.info(f"Syncing labels: MongoDB has {mongo_count} records, PostgreSQL has {postgres_count} records")
144+
logger.info(f"Will sync {mongo_count - postgres_count} labels to PostgreSQL")
145+
146+
try:
147+
collection = self.db_manager.get_collection("labels")
148+
labels = collection.find({"isDeleted": {"$ne": True}})
149+
150+
synced_count = 0
151+
for label in labels:
152+
try:
153+
# Check if label already exists in PostgreSQL
154+
from todo.models.postgres.label import PostgresLabel
155+
156+
existing = PostgresLabel.objects.filter(mongo_id=str(label["_id"])).first()
157+
if existing:
158+
continue
159+
160+
# Transform data for PostgreSQL
161+
postgres_data = self.dual_write_service._transform_label_data(label)
162+
postgres_data["mongo_id"] = str(label["_id"])
163+
postgres_data["sync_status"] = "SYNCED"
164+
165+
logger.debug(f"Creating label in PostgreSQL: {postgres_data}")
166+
167+
# Create in PostgreSQL
168+
PostgresLabel.objects.create(**postgres_data)
169+
synced_count += 1
170+
171+
except Exception as e:
172+
logger.error(f"Error syncing label {label.get('_id')}: {str(e)}")
173+
continue
174+
175+
logger.info(f"Successfully synced {synced_count} labels to PostgreSQL")
176+
return True
177+
178+
except Exception as e:
179+
logger.error(f"Error syncing labels table: {str(e)}")
180+
return False
181+
182+
def _sync_roles_table(self) -> bool:
183+
"""Synchronize the roles table."""
184+
table_name = "postgres_roles"
185+
186+
if not self._check_table_exists(table_name):
187+
logger.warning(f"Table {table_name} does not exist, skipping sync")
188+
return True
189+
190+
mongo_count = self._get_mongo_collection_count("roles")
191+
postgres_count = self._get_postgres_table_count(table_name)
192+
193+
if postgres_count >= mongo_count:
194+
logger.info(f"Roles table already has {postgres_count} records, MongoDB has {mongo_count}. Skipping sync.")
195+
return True
196+
197+
logger.info(f"Syncing roles: MongoDB has {mongo_count} records, PostgreSQL has {postgres_count} records")
198+
199+
try:
200+
collection = self.db_manager.get_collection("roles")
201+
roles = collection.find({})
202+
203+
synced_count = 0
204+
for role in roles:
205+
try:
206+
from todo.models.postgres.role import PostgresRole
207+
208+
existing = PostgresRole.objects.filter(mongo_id=str(role["_id"])).first()
209+
if existing:
210+
continue
211+
212+
postgres_data = self.dual_write_service._transform_role_data(role)
213+
postgres_data["mongo_id"] = str(role["_id"])
214+
postgres_data["sync_status"] = "SYNCED"
215+
216+
PostgresRole.objects.create(**postgres_data)
217+
synced_count += 1
218+
219+
except Exception as e:
220+
logger.error(f"Error syncing role {role.get('_id')}: {str(e)}")
221+
continue
222+
223+
logger.info(f"Successfully synced {synced_count} roles to PostgreSQL")
224+
return True
225+
226+
except Exception as e:
227+
logger.error(f"Error syncing roles table: {str(e)}")
228+
return False

todo_project/db/init.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import time
33
from todo_project.db.config import DatabaseManager
44
from todo_project.db.migrations import run_all_migrations
5+
from todo.services.postgres_sync_service import PostgresSyncService
56

67
logger = logging.getLogger(__name__)
78

@@ -50,6 +51,16 @@ def initialize_database(max_retries=5, retry_delay=2):
5051
if not migrations_success:
5152
logger.warning("Some database migrations failed, but continuing with initialization")
5253

54+
try:
55+
postgres_sync_service = PostgresSyncService()
56+
postgres_sync_success = postgres_sync_service.sync_all_tables()
57+
if not postgres_sync_success:
58+
logger.warning("Some PostgreSQL table synchronizations failed, but continuing with initialization")
59+
else:
60+
logger.info("PostgreSQL table synchronization completed successfully")
61+
except Exception as e:
62+
logger.warning(f"PostgreSQL table synchronization failed: {str(e)}, but continuing with initialization")
63+
5364
logger.info("Database initialization completed successfully")
5465
return True
5566
except Exception as e:

todo_project/settings/test.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from .base import *
2+
3+
DUAL_WRITE_ENABLED = False
4+
5+
# Remove PostgreSQL database configuration for tests
6+
# This prevents Django from trying to connect to PostgreSQL
7+
DATABASES = {}
8+
9+
# Use MongoDB only for tests
10+
# The tests will use testcontainers to spin up their own MongoDB instance

0 commit comments

Comments
 (0)