diff --git a/.gitignore b/.gitignore index 8422f14..703b479 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,5 @@ __pycache__ *.dump /.logfire/ + +.DS_Store diff --git a/PARTITION_MIGRATION_GUIDE.md b/PARTITION_MIGRATION_GUIDE.md new file mode 100644 index 0000000..20ae547 --- /dev/null +++ b/PARTITION_MIGRATION_GUIDE.md @@ -0,0 +1,408 @@ +# PostgreSQL Partitioning Migration Guide + +## Overview + +This guide provides step-by-step instructions for migrating the `webhook_log` table from a standard deletion-based cleanup approach to PostgreSQL native partitioning. + +**Problem Solved:** The current deletion job deletes 200k+ rows/hour (4.8M/day) and causes database crashes with 3-4 minute freezes and massive WAL generation. + +**Solution:** Use PostgreSQL native table partitioning with daily partitions. Dropping old partitions is instant and generates minimal WAL. + +--- + +## Pre-Migration Checklist + +### 1. Test in Development/Staging First +```bash +# Ensure you're on development database +export TESTING=true # Or set in .env + +# Run migration +python -m chronos.scripts.migrate_to_partitioned_webhook_log +``` + +### 2. Verify Application Compatibility +- Test webhook logging still works +- Verify queries return expected data +- Check application logs for any errors + +### 3. Backup Production Database +```bash +# Create a backup before migration +pg_dump -h your-host -U your-user -d chronos > backup_before_partition_$(date +%Y%m%d).sql +``` + +### 4. Review Changes +- `chronos/sql_models.py` - WebhookLog model updated with composite primary key +- `chronos/worker.py` - Two new partition management jobs added, old delete job commented out +- `chronos/scripts/migrate_to_partitioned_webhook_log.py` - Migration script + +--- + +## Migration Procedure + +### Step 1: Choose Migration Window + +**Recommended:** 2:00 AM - 4:00 AM (lowest traffic) + +**Estimated Duration:** 20-30 minutes for ~72M rows (15 days at 200k/hour) + +### Step 2: Deploy Code Changes + +```bash +# Pull latest code with partition changes +git pull origin master + +# Install any new dependencies (if needed) +pip install -r requirements.txt + +# Restart services to load new code (but don't run migration yet) +# This deploys the new jobs but they won't run until migration completes +make restart-server +make restart-worker +``` + +**Important:** The new partition management jobs will fail gracefully if run before migration. They check for the existence of the SQL functions. + +### Step 3: Run Migration Script + +**Production Command:** +```bash +python -m chronos.scripts.migrate_to_partitioned_webhook_log +``` + +You'll be prompted to type `MIGRATE` to confirm. + +**What the script does:** +1. Renames `webhook_log` → `webhook_log_old` (preserves existing data) +2. Creates new partitioned `webhook_log` table with composite PK `(id, timestamp)` +3. Creates 45 daily partitions (15 days back + 30 days forward) +4. Migrates last 15 days of data in batches of 1,000 rows +5. Creates SQL functions for partition management: + - `create_future_webhook_log_partitions(days_ahead INT)` + - `drop_old_webhook_log_partitions(retention_days INT)` +6. Verifies row counts match + +### Step 4: Monitor Migration Progress + +The script logs progress to stdout and logfire (if configured): +- Partition creation status +- Data migration progress (rows migrated, percentage complete) +- Verification results + +**Expected Output:** +``` +================================================================================ +Starting PostgreSQL Partitioning Migration +================================================================================ + +[Step 1/6] Renaming existing webhook_log table... +✓ Table renamed: webhook_log -> webhook_log_old + +[Step 2/6] Creating new partitioned table... +✓ Partitioned table created + +[Step 3/6] Creating partitions... +Created partition: webhook_log_y2025m10d22 (2025-10-22 to 2025-10-23) +... +✓ All partitions created + +[Step 4/6] Migrating data from last 15 days... +Total rows to migrate: 72,000,000 +Migrated 1,000 / 72,000,000 rows (0.0%) +... +✓ Data migration complete + +[Step 5/6] Creating partition management functions... +✓ Management functions created + +[Step 6/6] Verifying migration... +Old table (last 15 days): 72,000,000 rows +New table: 72,000,000 rows +✓ Verification passed: row counts match + +================================================================================ +MIGRATION COMPLETED SUCCESSFULLY! +================================================================================ +``` + +### Step 5: Verify Migration Success + +**Check row counts:** +```sql +-- Count in new table +SELECT COUNT(*) FROM webhook_log; + +-- Count in old table (last 15 days) +SELECT COUNT(*) FROM webhook_log_old +WHERE timestamp >= NOW() - INTERVAL '15 days'; + +-- Should match! +``` + +**Verify partitions exist:** +```sql +SELECT tablename +FROM pg_tables +WHERE tablename LIKE 'webhook_log_y%' +ORDER BY tablename; +``` + +**Test application:** +- Generate a webhook log entry +- Query webhook logs via API +- Check logs for any errors + +### Step 6: Monitor for 24 Hours + +- Watch application logs for any errors +- Monitor database performance (should see massive improvement) +- Verify new partition jobs run successfully: + - 1:00 AM: Create future partitions job + - 2:00 AM: Drop old partitions job + +**Check job execution:** +```bash +# Check Redis for job locks +redis-cli +> GET create_future_partitions_job +> GET drop_old_partitions_job +``` + +### Step 7: Cleanup (After 7+ Days of Stability) + +Once you're confident the migration is stable: + +```sql +-- Drop the old table +DROP TABLE webhook_log_old CASCADE; +``` + +**Also clean up the commented code in worker.py:** +- Remove commented `delete_old_logs_job` code +- Remove commented `get_count` function +- Remove commented `_delete_old_logs_job` task + +--- + +## Rollback Procedure + +### If Migration Fails During Execution + +The script automatically rolls back on error: +1. Drops the new `webhook_log` table +2. Renames `webhook_log_old` back to `webhook_log` + +### Manual Rollback (If Needed) + +```sql +-- Connect to database +psql -h your-host -U your-user -d chronos + +-- Drop new table and restore old +DROP TABLE IF EXISTS webhook_log CASCADE; +ALTER TABLE webhook_log_old RENAME TO webhook_log; +``` + +### Rollback Code Changes + +```bash +# Revert code changes +git revert + +# Or manually: +# 1. Edit chronos/sql_models.py - remove composite PK, restore single PK +# 2. Edit chronos/worker.py - uncomment old delete job, remove new partition jobs +# 3. Restart services +``` + +--- + +## Post-Migration Benefits + +### Performance Improvements +- **No more database freezes:** Partition drops are instant (milliseconds vs 3-4 minutes) +- **Minimal WAL generation:** DROP TABLE vs DELETE millions of rows +- **Better query performance:** Partition pruning automatically limits scans to relevant partitions +- **No lock contention:** Dropping a partition doesn't lock the entire table + +### Operational Benefits +- **Safer operations:** Old table preserved for 7+ days +- **Automated management:** Jobs handle partition lifecycle automatically +- **Predictable performance:** No more surprise performance issues from deletions +- **Easy monitoring:** Check partition counts to verify job execution + +### Expected Metrics +- **Database CPU:** Should drop significantly during cleanup times +- **WAL generation:** Reduce from GB to MB during cleanup +- **Lock wait time:** Eliminate 3-4 minute database freezes +- **SSL SYSCALL EOF errors:** Should disappear completely + +--- + +## Troubleshooting + +### Migration Script Fails + +**Error: "Table webhook_log_old already exists"** +- Previous migration attempt may have failed +- Check if old table exists: `\dt webhook_log*` +- If migration failed, manually clean up and retry + +**Error: "Cannot create partition for past date"** +- System clock may be incorrect +- Verify server time: `date` +- Adjust partition date ranges if needed + +### New Jobs Failing + +**Error: "Function create_future_webhook_log_partitions does not exist"** +- Migration didn't complete successfully +- Verify functions exist: + ```sql + \df create_future_webhook_log_partitions + \df drop_old_webhook_log_partitions + ``` +- Re-run migration if needed + +**Error: "Partition already exists"** +- This is normal - function checks for existing partitions +- Job should complete successfully despite warning + +### Application Errors + +**Error: "Column 'id' is not part of primary key"** +- SQLModel not recognizing composite primary key +- Verify both `id` and `timestamp` have `primary_key=True` in model +- Restart application + +**Error: "No partition exists for value"** +- Trying to insert data for a date without a partition +- Run partition creation manually: + ```sql + SELECT create_future_webhook_log_partitions(30); + ``` + +--- + +## Monitoring Commands + +### Check Partition Count +```sql +SELECT COUNT(*) +FROM pg_tables +WHERE tablename LIKE 'webhook_log_y%'; +-- Should be around 45 (15 past + 30 future + today) +``` + +### View Partition Sizes +```sql +SELECT + tablename, + pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size +FROM pg_tables +WHERE tablename LIKE 'webhook_log_y%' +ORDER BY tablename DESC +LIMIT 20; +``` + +### Check Oldest and Newest Partitions +```sql +-- Oldest partition +SELECT MIN(tablename) FROM pg_tables WHERE tablename LIKE 'webhook_log_y%'; + +-- Newest partition +SELECT MAX(tablename) FROM pg_tables WHERE tablename LIKE 'webhook_log_y%'; +``` + +### Verify Data Distribution +```sql +SELECT + schemaname, + tablename, + n_live_tup as row_count +FROM pg_stat_user_tables +WHERE tablename LIKE 'webhook_log_y%' +ORDER BY tablename DESC +LIMIT 20; +``` + +### Monitor Job Execution +```bash +# Check Celery worker logs +tail -f /var/log/chronos/worker.log | grep partition + +# Check Redis for locks +redis-cli +> KEYS *partition* +``` + +--- + +## FAQs + +**Q: Can I run the migration during business hours?** +A: Not recommended. While the migration processes data in batches, it's safer to run during low-traffic periods (2-4 AM). + +**Q: What happens to data older than 15 days?** +A: It remains in `webhook_log_old` but is not migrated to the new partitioned table. You can drop `webhook_log_old` after verifying the migration. + +**Q: Can I change the retention period?** +A: Yes. Edit the `drop_old_partitions_job` in `worker.py` and change the parameter: +```python +sql = text("SELECT drop_old_webhook_log_partitions(30)") # 30 days instead of 15 +``` + +**Q: What if I need to query data older than 15 days?** +A: Query `webhook_log_old` directly until you're ready to drop it. + +**Q: Can I pause the migration?** +A: No. Once started, let it complete. It's designed to be atomic with rollback on failure. + +**Q: How do I manually create/drop partitions?** +```sql +-- Create future partitions +SELECT create_future_webhook_log_partitions(60); -- 60 days ahead + +-- Drop old partitions +SELECT drop_old_webhook_log_partitions(30); -- Older than 30 days +``` + +**Q: What happens if partition creation job fails?** +A: Webhook logging will fail for dates without partitions. Manually run: +```sql +SELECT create_future_webhook_log_partitions(30); +``` + +--- + +## Support + +If you encounter issues during migration: + +1. Check the troubleshooting section above +2. Review application logs and database logs +3. Verify all prerequisites are met +4. If needed, follow rollback procedure +5. Contact database administrator or development team + +**Key Files:** +- Migration script: `chronos/scripts/migrate_to_partitioned_webhook_log.py` +- Model changes: `chronos/sql_models.py` +- Job definitions: `chronos/worker.py` +- This guide: `PARTITION_MIGRATION_GUIDE.md` + +--- + +## Summary + +This migration solves critical database performance issues by replacing deletion-based cleanup with PostgreSQL native partitioning. The approach: + +✅ Eliminates 3-4 minute database freezes +✅ Reduces WAL generation from GB to MB +✅ Provides instant partition drops +✅ Maintains 15-day retention policy +✅ Includes automated partition management +✅ Preserves existing data for safe rollback + +**Next Steps:** Test in development, schedule production migration, monitor results. diff --git a/chronos/scripts/generate_seed_data.py b/chronos/scripts/generate_seed_data.py new file mode 100644 index 0000000..00db9ee --- /dev/null +++ b/chronos/scripts/generate_seed_data.py @@ -0,0 +1,321 @@ +""" +Seed data generator for testing partition migration. + +Generates realistic webhook log data: +- Creates test webhook endpoint(s) +- Generates logs spanning 20 days (to test 15-day cutoff) +- Varies status codes, headers, and bodies +- Configurable number of rows + +Usage: + python -m chronos.scripts.generate_seed_data --rows 10000 + python -m chronos.scripts.generate_seed_data --rows 100000 --days 25 +""" + +import argparse +import random +from datetime import datetime, timedelta, UTC + +from sqlmodel import Session, select + +from chronos.db import engine, init_db +from chronos.sql_models import WebhookEndpoint, WebhookLog +from chronos.utils import app_logger + + +# Sample data for generating realistic webhook logs +SAMPLE_EVENTS = [ + 'job.created', 'job.updated', 'job.completed', 'job.cancelled', + 'client.created', 'client.updated', 'client.deleted', + 'invoice.created', 'invoice.paid', 'invoice.overdue', + 'lesson.scheduled', 'lesson.completed', 'lesson.cancelled', +] + +SAMPLE_STATUSES = [ + ('Success', [200, 201, 204]), + ('Client Error', [400, 404, 422]), + ('Server Error', [500, 502, 503]), + ('Timeout', [None]), +] + + +def generate_request_body(event_type: str) -> dict: + """Generate realistic webhook request body.""" + return { + 'event': event_type, + 'timestamp': datetime.now(UTC).isoformat(), + 'data': { + 'id': random.randint(1000, 9999), + 'branch_id': random.randint(1, 10), + 'status': random.choice(['active', 'pending', 'completed', 'cancelled']), + 'amount': round(random.uniform(10, 1000), 2) if 'invoice' in event_type else None, + } + } + + +def generate_response_body(status_code: int) -> dict: + """Generate realistic webhook response body.""" + if status_code and 200 <= status_code < 300: + return { + 'status': 'success', + 'message': 'Webhook processed successfully', + 'processed_at': datetime.now(UTC).isoformat(), + } + elif status_code and 400 <= status_code < 500: + return { + 'status': 'error', + 'message': 'Invalid webhook payload', + 'error_code': f'ERR_{status_code}', + } + elif status_code and status_code >= 500: + return { + 'status': 'error', + 'message': 'Internal server error', + 'error_code': f'ERR_{status_code}', + } + else: + return None + + +def create_test_endpoint(db: Session) -> WebhookEndpoint: + """Create or get a test webhook endpoint.""" + # Check if test endpoint exists + existing = db.exec( + select(WebhookEndpoint).where(WebhookEndpoint.tc_id == 99999) + ).first() + + if existing: + app_logger.info(f"Using existing test endpoint: {existing.id}") + return existing + + # Create new test endpoint + endpoint = WebhookEndpoint( + tc_id=99999, + name="Test Webhook Endpoint", + branch_id=1, + webhook_url="https://example.com/webhook", + api_key="test-api-key-12345", + active=True, + ) + + db.add(endpoint) + db.commit() + db.refresh(endpoint) + + app_logger.info(f"Created test endpoint: {endpoint.id}") + return endpoint + + +def generate_webhook_logs( + db: Session, + endpoint_id: int, + total_rows: int, + days_back: int = 20, + batch_size: int = 1000 +) -> None: + """ + Generate webhook log entries using raw psycopg2 for better performance. + + Args: + db: Database session (only used for getting DSN) + endpoint_id: WebhookEndpoint ID to associate logs with + total_rows: Total number of logs to generate + days_back: How many days back to spread the data + batch_size: Number of rows to insert per batch + """ + import json as json_lib + import psycopg2 + + app_logger.info(f"Generating {total_rows:,} webhook logs spanning {days_back} days...") + + # Create a direct psycopg2 connection + from chronos.utils import settings + conn = psycopg2.connect(settings.pg_dsn) + cursor = conn.cursor() + + now = datetime.now(UTC) + values = [] + + sql = """ + INSERT INTO webhooklog ( + request_headers, request_body, response_headers, response_body, + status, status_code, timestamp, webhook_endpoint_id + ) + VALUES ( + %s::jsonb, %s::jsonb, %s::jsonb, %s::jsonb, + %s, %s, %s, %s + ) + """ + + for i in range(total_rows): + # Generate random timestamp within the date range + days_offset = random.uniform(0, days_back) + timestamp = now - timedelta(days=days_offset) + + # Pick random status and status code + status, status_codes = random.choice(SAMPLE_STATUSES) + status_code = random.choice(status_codes) + + # Generate request/response data + event_type = random.choice(SAMPLE_EVENTS) + request_body = generate_request_body(event_type) + response_body = generate_response_body(status_code) if status_code else None + + # Build value tuple for SQL + request_headers = { + 'User-Agent': 'TutorCruncher', + 'Content-Type': 'application/json', + 'webhook-signature': f'sha256_{random.randint(100000, 999999)}', + } + response_headers = { + 'Content-Type': 'application/json', + 'Server': 'nginx/1.18.0', + } if status_code else None + + values.append(( + json_lib.dumps(request_headers), + json_lib.dumps(request_body), + json_lib.dumps(response_headers) if response_headers else None, + json_lib.dumps(response_body) if response_body else None, + status, + status_code, + timestamp, + endpoint_id, + )) + + # Insert in batches + if len(values) >= batch_size or i == total_rows - 1: + for val in values: + cursor.execute(sql, val) + + conn.commit() + + progress = ((i + 1) / total_rows) * 100 + app_logger.info(f"Inserted {i + 1:,} / {total_rows:,} rows ({progress:.1f}%)") + + values = [] + + cursor.close() + conn.close() + + app_logger.info("✓ Seed data generation complete!") + + +def print_summary(db: Session, days_back: int) -> None: + """Print summary statistics about generated data.""" + from sqlalchemy import func, text + + app_logger.info("\n" + "=" * 60) + app_logger.info("DATA SUMMARY") + app_logger.info("=" * 60) + + # Total count + total = db.exec(select(func.count()).select_from(WebhookLog)).one() + app_logger.info(f"Total webhook logs: {total:,}") + + # Count by age + cutoff_date = datetime.now(UTC) - timedelta(days=15) + recent = db.exec( + select(func.count()) + .select_from(WebhookLog) + .where(WebhookLog.timestamp >= cutoff_date) + ).one() + old = total - recent + + app_logger.info(f" Last 15 days: {recent:,} rows (will be migrated)") + app_logger.info(f" Older than 15 days: {old:,} rows (will stay in webhook_log_old)") + + # Count by status + app_logger.info("\nBy Status:") + status_counts = db.exec( + text(""" + SELECT status, COUNT(*) as count + FROM webhooklog + GROUP BY status + ORDER BY count DESC + """) + ).all() + + for status, count in status_counts: + app_logger.info(f" {status}: {count:,}") + + # Date range + min_date = db.exec(select(func.min(WebhookLog.timestamp))).one() + max_date = db.exec(select(func.max(WebhookLog.timestamp))).one() + + app_logger.info(f"\nDate range:") + app_logger.info(f" Oldest: {min_date}") + app_logger.info(f" Newest: {max_date}") + + app_logger.info("=" * 60 + "\n") + + +def main(): + """Main entry point.""" + parser = argparse.ArgumentParser( + description='Generate seed data for partition migration testing' + ) + parser.add_argument( + '--rows', + type=int, + default=10000, + help='Number of webhook log rows to generate (default: 10000)' + ) + parser.add_argument( + '--days', + type=int, + default=20, + help='Number of days to spread data across (default: 20)' + ) + parser.add_argument( + '--batch-size', + type=int, + default=1000, + help='Batch size for inserts (default: 1000)' + ) + parser.add_argument( + '--reset', + action='store_true', + help='Reset database tables before generating data' + ) + + args = parser.parse_args() + + app_logger.info("=" * 60) + app_logger.info("WEBHOOK LOG SEED DATA GENERATOR") + app_logger.info("=" * 60) + app_logger.info(f"Configuration:") + app_logger.info(f" Rows to generate: {args.rows:,}") + app_logger.info(f" Days to span: {args.days}") + app_logger.info(f" Batch size: {args.batch_size:,}") + app_logger.info(f" Reset tables: {args.reset}") + app_logger.info("=" * 60 + "\n") + + # Initialize database + if args.reset: + app_logger.info("Resetting database tables...") + init_db() + app_logger.info("✓ Tables reset\n") + + with Session(engine) as db: + # Create test endpoint + endpoint = create_test_endpoint(db) + + # Generate logs + generate_webhook_logs( + db=db, + endpoint_id=endpoint.id, + total_rows=args.rows, + days_back=args.days, + batch_size=args.batch_size, + ) + + # Print summary + print_summary(db, args.days) + + app_logger.info("Ready for migration testing!") + app_logger.info("Run: python -m chronos.scripts.migrate_to_partitioned_webhook_log\n") + + +if __name__ == "__main__": + main() diff --git a/chronos/scripts/migrate_to_partitioned_webhook_log.py b/chronos/scripts/migrate_to_partitioned_webhook_log.py new file mode 100644 index 0000000..9045a55 --- /dev/null +++ b/chronos/scripts/migrate_to_partitioned_webhook_log.py @@ -0,0 +1,459 @@ +""" +Migration script to convert webhook_log table from standard table to partitioned table. + +This script: +1. Renames existing webhook_log table to webhook_log_old +2. Creates new partitioned webhook_log table (daily partitions by timestamp) +3. Creates partitions for last 15 days + next 30 days +4. Migrates data from last 15 days in batches +5. Creates SQL functions for partition management + +Run during low traffic period (recommended: 2-4 AM). +Estimated duration: 20-30 minutes for ~72M rows. + +Usage: + python -m chronos.scripts.migrate_to_partitioned_webhook_log +""" + +import sys +from datetime import datetime, timedelta, UTC +from typing import List, Tuple + +from sqlalchemy import text +from sqlmodel import Session + +from chronos.db import engine +from chronos.utils import app_logger, settings + +# Optional logfire import +try: + import logfire + # Configure logfire if available + if settings.logfire_token: + logfire.configure( + service_name='chronos-migration', + token=settings.logfire_token, + send_to_logfire=True, + ) +except ImportError: + # Create a no-op logfire object for testing + class NoOpLogfire: + class span: + def __init__(self, *args, **kwargs): + pass + def __enter__(self): + return self + def __exit__(self, *args): + pass + logfire = NoOpLogfire() + + +def get_partition_name(date: datetime) -> str: + """Generate partition table name for a given date.""" + return f"webhook_log_y{date.year}m{date.month:02d}d{date.day:02d}" + + +def get_partition_date_ranges(start_date: datetime, end_date: datetime) -> List[Tuple[datetime, datetime, str]]: + """ + Generate list of (start, end, name) tuples for daily partitions. + + Args: + start_date: First date to create partition for + end_date: Last date to create partition for + + Returns: + List of (range_start, range_end, partition_name) tuples + """ + partitions = [] + current_date = start_date.replace(hour=0, minute=0, second=0, microsecond=0) + + while current_date <= end_date: + next_date = current_date + timedelta(days=1) + partition_name = get_partition_name(current_date) + partitions.append((current_date, next_date, partition_name)) + current_date = next_date + + return partitions + + +def create_partitioned_table(session: Session) -> None: + """ + Create the new partitioned webhook_log table structure. + + This creates the parent table partitioned by RANGE on timestamp. + """ + app_logger.info("Creating partitioned webhook_log table...") + + create_table_sql = text(""" + CREATE TABLE webhooklog ( + id SERIAL, + request_headers JSONB, + request_body JSONB, + response_headers JSONB, + response_body JSONB, + status VARCHAR NOT NULL, + status_code INTEGER, + timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL, + webhook_endpoint_id INTEGER, + PRIMARY KEY (id, timestamp), + CONSTRAINT webhook_log_webhook_endpoint_id_fkey + FOREIGN KEY (webhook_endpoint_id) + REFERENCES webhookendpoint(id) + ) PARTITION BY RANGE (timestamp); + """) + + session.exec(create_table_sql) + + # Create indexes on the partitioned table + # These will be automatically created on each partition + create_indexes_sql = [ + text("CREATE INDEX ix_webhook_log_timestamp ON webhooklog (timestamp);"), + text("CREATE INDEX ix_webhook_log_webhook_endpoint_id ON webhooklog (webhook_endpoint_id);"), + ] + + for sql in create_indexes_sql: + session.exec(sql) + + app_logger.info("Partitioned table created successfully") + + +def create_partition(session: Session, range_start: datetime, range_end: datetime, partition_name: str) -> None: + """Create a single partition for the specified date range.""" + create_partition_sql = text(f""" + CREATE TABLE {partition_name} PARTITION OF webhooklog + FOR VALUES FROM ('{range_start.isoformat()}') TO ('{range_end.isoformat()}'); + """) + + session.exec(create_partition_sql) + app_logger.info(f"Created partition: {partition_name} ({range_start.date()} to {range_end.date()})") + + +def create_all_partitions(session: Session) -> None: + """Create partitions for last 15 days + next 30 days.""" + app_logger.info("Creating partitions...") + + now = datetime.now(UTC) + start_date = now - timedelta(days=15) + end_date = now + timedelta(days=30) + + partitions = get_partition_date_ranges(start_date, end_date) + + with logfire.span(f"Creating {len(partitions)} partitions"): + for range_start, range_end, partition_name in partitions: + create_partition(session, range_start, range_end, partition_name) + + app_logger.info(f"Created {len(partitions)} partitions successfully") + + +def migrate_data_in_batches(session: Session, batch_size: int = 1000) -> None: + """ + Migrate data from webhook_log_old to webhook_log in batches. + Only migrates data from the last 15 days. + """ + app_logger.info("Starting data migration...") + + # Calculate cutoff date (15 days ago) + cutoff_date = datetime.now(UTC) - timedelta(days=15) + + # Get total count to migrate + count_sql = text(""" + SELECT COUNT(*) FROM webhooklog_old + WHERE timestamp >= :cutoff_date + """) + total_rows = session.exec(count_sql, {"cutoff_date": cutoff_date}).scalar() + + app_logger.info(f"Total rows to migrate: {total_rows:,}") + + if total_rows == 0: + app_logger.info("No data to migrate") + return + + # Migrate in batches + migrated = 0 + offset = 0 + + with logfire.span(f"Migrating {total_rows:,} rows in batches of {batch_size}"): + while offset < total_rows: + # Insert batch + insert_sql = text(f""" + INSERT INTO webhooklog + (id, request_headers, request_body, response_headers, response_body, + status, status_code, timestamp, webhook_endpoint_id) + SELECT + id, request_headers, request_body, response_headers, response_body, + status, status_code, timestamp, webhook_endpoint_id + FROM webhooklog_old + WHERE timestamp >= :cutoff_date + ORDER BY timestamp, id + LIMIT {batch_size} OFFSET {offset} + """) + + result = session.exec(insert_sql, {"cutoff_date": cutoff_date}) + batch_count = result.rowcount if hasattr(result, 'rowcount') else batch_size + + migrated += batch_count + offset += batch_size + + # Commit each batch + session.commit() + + # Log progress + progress_pct = (migrated / total_rows) * 100 + app_logger.info(f"Migrated {migrated:,} / {total_rows:,} rows ({progress_pct:.1f}%)") + + # Break if we got fewer rows than batch_size (end of data) + if batch_count < batch_size: + break + + app_logger.info(f"Data migration complete: {migrated:,} rows migrated") + + +def create_partition_management_functions(session: Session) -> None: + """ + Create SQL functions for automated partition management. + + Creates: + 1. create_future_webhook_log_partitions(days_ahead INT) + 2. drop_old_webhook_log_partitions(retention_days INT) + """ + app_logger.info("Creating partition management functions...") + + # Function to create future partitions + create_future_partitions_func = text(""" + CREATE OR REPLACE FUNCTION create_future_webhook_log_partitions(days_ahead INT) + RETURNS void AS $$ + DECLARE + partition_date DATE; + partition_name TEXT; + start_date TIMESTAMP; + end_date TIMESTAMP; + BEGIN + -- Create partitions for each day from tomorrow to days_ahead + FOR i IN 1..days_ahead LOOP + partition_date := CURRENT_DATE + i; + partition_name := 'webhooklog_y' || + TO_CHAR(partition_date, 'YYYY') || 'm' || + TO_CHAR(partition_date, 'MM') || 'd' || + TO_CHAR(partition_date, 'DD'); + + start_date := partition_date::TIMESTAMP; + end_date := (partition_date + INTERVAL '1 day')::TIMESTAMP; + + -- Check if partition already exists + IF NOT EXISTS ( + SELECT 1 FROM pg_tables + WHERE tablename = partition_name + ) THEN + EXECUTE format( + 'CREATE TABLE %I PARTITION OF webhooklog FOR VALUES FROM (%L) TO (%L)', + partition_name, + start_date, + end_date + ); + RAISE NOTICE 'Created partition: %', partition_name; + END IF; + END LOOP; + END; + $$ LANGUAGE plpgsql; + """) + + # Function to drop old partitions + drop_old_partitions_func = text(""" + CREATE OR REPLACE FUNCTION drop_old_webhook_log_partitions(retention_days INT) + RETURNS void AS $$ + DECLARE + partition_record RECORD; + partition_date DATE; + cutoff_date DATE; + BEGIN + cutoff_date := CURRENT_DATE - retention_days; + + -- Find and drop old partitions + FOR partition_record IN + SELECT tablename + FROM pg_tables + WHERE tablename LIKE 'webhooklog_y%' + ORDER BY tablename + LOOP + -- Extract date from partition name (format: webhook_log_yYYYYmMMdDD) + BEGIN + partition_date := TO_DATE( + SUBSTRING(partition_record.tablename FROM 'y(\d{4})m(\d{2})d(\d{2})'), + 'YYYYMMDD' + ); + + IF partition_date < cutoff_date THEN + EXECUTE format('DROP TABLE IF EXISTS %I', partition_record.tablename); + RAISE NOTICE 'Dropped partition: %', partition_record.tablename; + END IF; + EXCEPTION + WHEN OTHERS THEN + RAISE WARNING 'Could not process partition: %', partition_record.tablename; + END; + END LOOP; + END; + $$ LANGUAGE plpgsql; + """) + + session.exec(create_future_partitions_func) + session.exec(drop_old_partitions_func) + + app_logger.info("Partition management functions created successfully") + + +def verify_migration(session: Session) -> bool: + """ + Verify that migration was successful by comparing row counts. + + Returns: + True if verification passed, False otherwise + """ + app_logger.info("Verifying migration...") + + # Count rows in old table (last 15 days) + cutoff_date = datetime.now(UTC) - timedelta(days=15) + old_count_sql = text(""" + SELECT COUNT(*) FROM webhooklog_old + WHERE timestamp >= :cutoff_date + """) + old_count = session.exec(old_count_sql, {"cutoff_date": cutoff_date}).scalar() + + # Count rows in new table + new_count_sql = text("SELECT COUNT(*) FROM webhook_log") + new_count = session.exec(new_count_sql).scalar() + + app_logger.info(f"Old table (last 15 days): {old_count:,} rows") + app_logger.info(f"New table: {new_count:,} rows") + + if old_count == new_count: + app_logger.info("✓ Verification passed: row counts match") + return True + else: + app_logger.error(f"✗ Verification failed: row count mismatch ({old_count:,} vs {new_count:,})") + return False + + +def run_migration() -> None: + """ + Main migration function. Orchestrates the entire migration process. + + Steps: + 1. Rename webhook_log to webhook_log_old + 2. Create new partitioned webhook_log table + 3. Create all partitions (15 days back + 30 days forward) + 4. Migrate data from last 15 days + 5. Create partition management functions + 6. Verify migration success + + Raises: + Exception: If any step fails, rolls back the transaction + """ + app_logger.info("=" * 80) + app_logger.info("Starting PostgreSQL Partitioning Migration") + app_logger.info("=" * 80) + + if settings.testing: + app_logger.error("Cannot run migration in testing mode") + sys.exit(1) + + with Session(engine) as session: + try: + with logfire.span("Partitioning Migration"): + # Step 1: Rename existing table + app_logger.info("\n[Step 1/6] Renaming existing webhook_log table...") + rename_sql = text(""" + ALTER TABLE webhooklog RENAME TO webhook_log_old; + """) + session.exec(rename_sql) + session.commit() + app_logger.info("✓ Table renamed: webhook_log -> webhook_log_old") + + # Step 2: Create new partitioned table + app_logger.info("\n[Step 2/6] Creating new partitioned table...") + create_partitioned_table(session) + session.commit() + app_logger.info("✓ Partitioned table created") + + # Step 3: Create all partitions + app_logger.info("\n[Step 3/6] Creating partitions...") + create_all_partitions(session) + session.commit() + app_logger.info("✓ All partitions created") + + # Step 4: Migrate data + app_logger.info("\n[Step 4/6] Migrating data from last 15 days...") + migrate_data_in_batches(session, batch_size=1000) + app_logger.info("✓ Data migration complete") + + # Step 5: Create management functions + app_logger.info("\n[Step 5/6] Creating partition management functions...") + create_partition_management_functions(session) + session.commit() + app_logger.info("✓ Management functions created") + + # Step 6: Verify migration + app_logger.info("\n[Step 6/6] Verifying migration...") + verification_passed = verify_migration(session) + + if not verification_passed: + raise Exception("Migration verification failed - see logs above") + + app_logger.info("\n" + "=" * 80) + app_logger.info("MIGRATION COMPLETED SUCCESSFULLY!") + app_logger.info("=" * 80) + app_logger.info("\nNext steps:") + app_logger.info("1. Monitor application logs for any errors") + app_logger.info("2. Verify new partition management jobs run successfully") + app_logger.info("3. After 7 days of stability, you can drop webhook_log_old:") + app_logger.info(" DROP TABLE webhook_log_old CASCADE;") + + except Exception as e: + app_logger.error(f"\n{'=' * 80}") + app_logger.error("MIGRATION FAILED!") + app_logger.error(f"{'=' * 80}") + app_logger.error(f"Error: {e}") + app_logger.error("\nRolling back changes...") + + try: + session.rollback() + + # Attempt to restore original table + app_logger.error("Attempting to restore original table...") + restore_sql = text(""" + DROP TABLE IF EXISTS webhook_log CASCADE; + ALTER TABLE webhooklog_old RENAME TO webhooklog; + """) + session.exec(restore_sql) + session.commit() + app_logger.error("✓ Original table restored") + + except Exception as rollback_error: + app_logger.error(f"Rollback failed: {rollback_error}") + app_logger.error("Manual intervention required!") + app_logger.error("Run: DROP TABLE IF EXISTS webhook_log CASCADE; ALTER TABLE webhooklog_old RENAME TO webhooklog;") + + sys.exit(1) + + +if __name__ == "__main__": + # Safety check: require confirmation in production + print("\n" + "=" * 80) + print("PostgreSQL Webhook Log Partitioning Migration") + print("=" * 80) + print("\nThis script will:") + print(" 1. Rename webhook_log to webhook_log_old") + print(" 2. Create new partitioned webhook_log table") + print(" 3. Migrate last 15 days of data") + print(" 4. Create partition management functions") + print("\nEstimated duration: 20-30 minutes") + print("Recommended timing: During low traffic (2-4 AM)") + print("\n" + "=" * 80) + + confirmation = input("\nType 'MIGRATE' to proceed: ") + + if confirmation != "MIGRATE": + print("Migration cancelled.") + sys.exit(0) + + print("\nStarting migration...\n") + run_migration() diff --git a/chronos/sql_models.py b/chronos/sql_models.py index 50736fb..3a0df89 100644 --- a/chronos/sql_models.py +++ b/chronos/sql_models.py @@ -27,9 +27,20 @@ def __repr__(self): class WebhookLog(SQLModel, table=True): """ - The model for the webhook log table + The model for the webhook log table. + + NOTE: After migration, this table uses PostgreSQL native partitioning by RANGE on timestamp (daily partitions). + - Partitioned by: timestamp (daily) + - Retention: 15 days (older partitions are automatically dropped) + - Composite primary key: (id, timestamp) - required for partitioning + - Partition management: Automated via Celery scheduled jobs + + See chronos/scripts/migrate_to_partitioned_webhook_log.py for migration details. + + IMPORTANT: For testing migration, keep this as single PK (id only) until after migration runs. """ + # NOTE: Will become composite (id, timestamp) after migration id: Optional[int] = Field(default=None, primary_key=True) request_headers: Optional[dict] = Field(nullable=True, sa_type=JSONB) request_body: Optional[dict] = Field(nullable=True, sa_type=JSONB) @@ -37,7 +48,12 @@ class WebhookLog(SQLModel, table=True): response_body: Optional[dict] = Field(nullable=True, sa_type=JSONB) status: str status_code: Optional[int] - timestamp: datetime.datetime = Field(default_factory=datetime.datetime.utcnow, nullable=False, index=True) + timestamp: datetime.datetime = Field( + default_factory=datetime.datetime.utcnow, + nullable=False, + index=True + # Will become part of composite primary key after migration + ) webhook_endpoint_id: int | None = Field(default=None, foreign_key='webhookendpoint.id', index=True) diff --git a/chronos/worker.py b/chronos/worker.py index 700225a..322a389 100644 --- a/chronos/worker.py +++ b/chronos/worker.py @@ -160,7 +160,7 @@ def task_send_webhooks( else: branch_id = loaded_payload['branch_id'] - if qlength > 100: + if qlength > 1000: app_logger.error('Queue is too long. Check workers and speeds.') app_logger.info('Starting send webhook task for branch %s. qlength=%s.', branch_id, qlength) @@ -188,7 +188,11 @@ def task_send_webhooks( ) -DELETE_JOBS_KEY = 'delete_old_logs_job' +# DEPRECATED: Old deletion-based cleanup job +# This job has been replaced by partition-based management (see below). +# Kept for reference. Remove after migration is confirmed stable. + +# DELETE_JOBS_KEY = 'delete_old_logs_job' scheduler = AsyncIOScheduler(timezone=UTC) @@ -200,52 +204,135 @@ async def lifespan(app: FastAPI): scheduler.shutdown() -@scheduler.scheduled_job('interval', hours=1) -async def delete_old_logs_job(): +# @scheduler.scheduled_job('interval', minutes=1) +# async def delete_old_logs_job(): +# """ +# DEPRECATED: Replaced by partition management jobs. +# We run cron job at midnight every day that wipes all WebhookLogs older than 15 days +# """ +# if cache.get(DELETE_JOBS_KEY): +# return +# else: +# cache.set(DELETE_JOBS_KEY, 'True', ex=1200) +# _delete_old_logs_job.delay() + + +# def get_count(date_to_delete_before: datetime) -> int: +# """ +# Get the count of all logs +# """ +# with Session(engine) as db: +# count = ( +# db.query(WebhookLog) +# .with_entities(func.count()) +# .where(WebhookLog.timestamp < date_to_delete_before) +# .scalar() +# ) +# return count + + +# @celery_app.task +# def _delete_old_logs_job(): +# # with logfire.span('Started to delete old logs'): +# with Session(engine) as db: +# # Get all logs older than 15 days +# date_to_delete_before = datetime.now(UTC) - timedelta(days=15) +# count = get_count(date_to_delete_before) +# delete_limit = 4999 +# while count > 0: +# app_logger.info(f'Deleting {count} logs') +# logs_to_delete = db.exec( +# select(WebhookLog.id).where(WebhookLog.timestamp < date_to_delete_before).limit(delete_limit) +# ).all() +# delete_statement = delete(WebhookLog).where(WebhookLog.id.in_(log_id for log_id in logs_to_delete)) +# db.exec(delete_statement) +# db.commit() +# count -= delete_limit +# +# del logs_to_delete +# del delete_statement +# gc.collect() +# +# cache.delete(DELETE_JOBS_KEY) + + +# Partition Management Jobs +# These jobs replace the delete_old_logs_job for partitioned webhook_log table + +CREATE_PARTITIONS_KEY = 'create_future_partitions_job' +DROP_PARTITIONS_KEY = 'drop_old_partitions_job' + + +@scheduler.scheduled_job('cron', hour=1, minute=0) +async def create_future_partitions_job(): """ - We run cron job at midnight every day that wipes all WebhookLogs older than 15 days + Creates future partitions for the webhook_log table. + Runs daily at 1:00 AM to create partitions for the next 30 days. """ - if cache.get(DELETE_JOBS_KEY): + if cache.get(CREATE_PARTITIONS_KEY): return else: - cache.set(DELETE_JOBS_KEY, 'True', ex=1200) - _delete_old_logs_job.delay() + cache.set(CREATE_PARTITIONS_KEY, 'True', ex=1800) # 30-minute lock + _create_future_partitions_job.delay() -def get_count(date_to_delete_before: datetime) -> int: +@celery_app.task +def _create_future_partitions_job(): + """ + Celery task to create future webhook_log partitions. + Calls the PostgreSQL function created during migration. """ - Get the count of all logs + try: + with logfire.span('Creating future webhook_log partitions'): + with Session(engine) as db: + # Call the SQL function to create next 30 days of partitions + from sqlalchemy import text + + sql = text("SELECT create_future_webhook_log_partitions(30)") + db.exec(sql) + db.commit() + + app_logger.info('Successfully created future webhook_log partitions') + except Exception as e: + app_logger.error(f'Error creating future partitions: {e}') + raise + finally: + cache.delete(CREATE_PARTITIONS_KEY) + + +@scheduler.scheduled_job('cron', hour=2, minute=0) +async def drop_old_partitions_job(): """ - with Session(engine) as db: - count = ( - db.query(WebhookLog) - .with_entities(func.count()) - .where(WebhookLog.timestamp < date_to_delete_before) - .scalar() - ) - return count + Drops old partitions from the webhook_log table. + Runs daily at 2:00 AM to drop partitions older than 15 days. + """ + if cache.get(DROP_PARTITIONS_KEY): + return + else: + cache.set(DROP_PARTITIONS_KEY, 'True', ex=1800) # 30-minute lock + _drop_old_partitions_job.delay() @celery_app.task -def _delete_old_logs_job(): - # with logfire.span('Started to delete old logs'): - with Session(engine) as db: - # Get all logs older than 15 days - date_to_delete_before = datetime.now(UTC) - timedelta(days=15) - count = get_count(date_to_delete_before) - delete_limit = 4999 - while count > 0: - app_logger.info(f'Deleting {count} logs') - logs_to_delete = db.exec( - select(WebhookLog.id).where(WebhookLog.timestamp < date_to_delete_before).limit(delete_limit) - ).all() - delete_statement = delete(WebhookLog).where(WebhookLog.id.in_(log_id for log_id in logs_to_delete)) - db.exec(delete_statement) - db.commit() - count -= delete_limit - - del logs_to_delete - del delete_statement - gc.collect() - - cache.delete(DELETE_JOBS_KEY) +def _drop_old_partitions_job(): + """ + Celery task to drop old webhook_log partitions. + Calls the PostgreSQL function created during migration. + Drops partitions older than 15 days. + """ + try: + with logfire.span('Dropping old webhook_log partitions'): + with Session(engine) as db: + # Call the SQL function to drop partitions older than 15 days + from sqlalchemy import text + + sql = text("SELECT drop_old_webhook_log_partitions(15)") + db.exec(sql) + db.commit() + + app_logger.info('Successfully dropped old webhook_log partitions') + except Exception as e: + app_logger.error(f'Error dropping old partitions: {e}') + raise + finally: + cache.delete(DROP_PARTITIONS_KEY)