diff --git a/app.py b/app.py index 681c7a7..0dbb620 100644 --- a/app.py +++ b/app.py @@ -168,7 +168,7 @@ def format_local_time_filter(utc_dt, format_str='%Y-%m-%d %H:%M'): def schedule_all_repositories(): """Schedule all active repositories on startup""" - from datetime import datetime # Import to ensure availability + from datetime import datetime, timedelta # Import to ensure availability try: # Clean up any stuck 'running' jobs from previous sessions @@ -182,6 +182,35 @@ def schedule_all_repositories(): logger.info(f"Marked stuck job as failed: {stuck_job.id} for repository {stuck_job.repository_id}") db.session.commit() + # Auto-cleanup: Remove duplicate backup jobs created within last hour + cutoff = datetime.utcnow() - timedelta(hours=1) + recent_jobs = BackupJob.query.filter(BackupJob.created_at > cutoff).all() + + # Group by repository and find duplicates + repo_jobs = {} + for job in recent_jobs: + repo_id = job.repository_id + if repo_id not in repo_jobs: + repo_jobs[repo_id] = [] + repo_jobs[repo_id].append(job) + + duplicates_cleaned = 0 + for repo_id, jobs in repo_jobs.items(): + if len(jobs) > 1: + # Sort by creation time, keep the first one, mark others as failed + jobs.sort(key=lambda j: j.created_at) + for duplicate_job in jobs[1:]: + if duplicate_job.status in ['pending', 'running']: + duplicate_job.status = 'failed' + duplicate_job.error_message = 'Duplicate job automatically cleaned up' + duplicate_job.completed_at = datetime.utcnow() + duplicates_cleaned += 1 + logger.info(f"Auto-cleaned duplicate job {duplicate_job.id} for repository {repo_id}") + + if duplicates_cleaned > 0: + db.session.commit() + logger.info(f"Auto-cleaned {duplicates_cleaned} duplicate backup jobs") + # First, clear any existing jobs to prevent duplicates existing_jobs = scheduler.get_jobs() for job in existing_jobs: @@ -189,6 +218,11 @@ def schedule_all_repositories(): scheduler.remove_job(job.id) logger.info(f"Removed existing job on startup: {job.id}") + # Clear our tracking as well + with _job_tracking_lock: + _scheduled_jobs.clear() + logger.info("Cleared job tracking set") + repositories = Repository.query.filter_by(is_active=True).all() scheduled_count = 0 for repository in repositories: @@ -197,6 +231,65 @@ def schedule_all_repositories(): scheduled_count += 1 logger.info(f"Scheduled backup job for repository: {repository.name} ({repository.schedule_type})") logger.info(f"Scheduled {scheduled_count} backup jobs on startup") + + # Schedule a periodic health check job to monitor for duplicates + def scheduler_health_check(): + from datetime import datetime, timedelta + with app.app_context(): + try: + # Check for duplicate jobs in scheduler + all_jobs = scheduler.get_jobs() + backup_jobs = [job for job in all_jobs if job.id.startswith('backup_')] + job_ids = [job.id for job in backup_jobs] + + # Check for duplicate job IDs + if len(job_ids) != len(set(job_ids)): + logger.error("Duplicate scheduler job IDs detected! Cleaning up...") + # Remove all backup jobs and reschedule + for job in backup_jobs: + scheduler.remove_job(job.id) + + # Clear tracking and reschedule + with _job_tracking_lock: + _scheduled_jobs.clear() + + # Reschedule active repositories + repositories = Repository.query.filter_by(is_active=True).all() + for repo in repositories: + if repo.schedule_type != 'manual': + schedule_backup_job(repo) + + logger.info("Scheduler health check: cleaned up and rescheduled jobs") + + # Auto-cleanup old failed jobs (older than 7 days) + old_cutoff = datetime.utcnow() - timedelta(days=7) + old_jobs = BackupJob.query.filter( + BackupJob.status == 'failed', + BackupJob.created_at < old_cutoff + ).all() + + if old_jobs: + for old_job in old_jobs: + db.session.delete(old_job) + db.session.commit() + logger.info(f"Auto-cleaned {len(old_jobs)} old failed backup jobs") + + except Exception as e: + logger.error(f"Scheduler health check failed: {e}") + + # Schedule health check to run every 6 hours + scheduler.add_job( + func=scheduler_health_check, + trigger=CronTrigger(hour='*/6', timezone=LOCAL_TZ), + id='scheduler_health_check', + name='Scheduler Health Check', + replace_existing=True, + misfire_grace_time=300, + coalesce=True, + max_instances=1 + ) + logger.info("Scheduled periodic scheduler health check") + except Exception as e: logger.error(f"Error scheduling repositories on startup: {e}") @@ -205,6 +298,10 @@ def schedule_all_repositories(): _scheduler_lock = threading.Lock() _scheduler_initialized = False +# Global tracking of scheduled jobs to prevent duplicates +_scheduled_jobs = set() +_job_tracking_lock = threading.Lock() + def ensure_scheduler_initialized(): """Ensure scheduler is initialized with existing repositories (thread-safe)""" global _scheduler_initialized @@ -622,11 +719,23 @@ def favicon(): def schedule_backup_job(repository): """Schedule a backup job for a repository""" + global _scheduled_jobs + if not repository.is_active: logger.info(f"Repository {repository.name} is inactive, not scheduling") return job_id = f'backup_{repository.id}' + + # Thread-safe check to prevent duplicate scheduling + with _job_tracking_lock: + if job_id in _scheduled_jobs: + logger.warning(f"Job {job_id} already being scheduled, skipping duplicate") + return + + # Mark this job as being scheduled + _scheduled_jobs.add(job_id) + logger.info(f"Attempting to schedule job {job_id} for repository {repository.name}") # Remove existing job if it exists - try multiple ways to ensure it's gone @@ -635,6 +744,9 @@ def schedule_backup_job(repository): if existing_job: scheduler.remove_job(job_id) logger.info(f"Removed existing scheduled job: {job_id}") + # Also remove from our tracking + with _job_tracking_lock: + _scheduled_jobs.discard(job_id) else: logger.info(f"No existing job found for {job_id}") except Exception as e: @@ -643,6 +755,8 @@ def schedule_backup_job(repository): # Double-check that job is really gone if scheduler.get_job(job_id): logger.error(f"Job {job_id} still exists after removal attempt, aborting schedule") + with _job_tracking_lock: + _scheduled_jobs.discard(job_id) return # Create a wrapper function that includes Flask app context @@ -657,7 +771,26 @@ def backup_with_context(): logger.warning(f"Repository {repository.id} not found or inactive, skipping backup") return - # Check if there's already a running backup for this repository + # Multiple layers of duplicate prevention + + # 0. Auto-cleanup: Mark any long-running jobs as failed + stuck_cutoff = datetime.utcnow() - timedelta(hours=2) + stuck_jobs = BackupJob.query.filter_by( + repository_id=repository.id, + status='running' + ).filter( + BackupJob.started_at < stuck_cutoff + ).all() + + if stuck_jobs: + logger.warning(f"Found {len(stuck_jobs)} stuck running jobs for repository {repo.name}, cleaning up") + for stuck in stuck_jobs: + stuck.status = 'failed' + stuck.error_message = 'Job stuck for over 2 hours, automatically failed' + stuck.completed_at = datetime.utcnow() + db.session.commit() + + # 1. Check if there's already a running backup for this repository running_job = BackupJob.query.filter_by( repository_id=repository.id, status='running' @@ -667,8 +800,8 @@ def backup_with_context(): logger.warning(f"Backup already running for repository {repo.name} (job {running_job.id}), skipping") return - # Additional check: ensure no backup started in the last 30 seconds to prevent rapid duplicates - recent_cutoff = datetime.utcnow() - timedelta(seconds=30) + # 2. Check for very recent backups (within last 2 minutes) to prevent rapid duplicates + recent_cutoff = datetime.utcnow() - timedelta(minutes=2) recent_backup = BackupJob.query.filter_by( repository_id=repository.id ).filter( @@ -676,11 +809,36 @@ def backup_with_context(): ).first() if recent_backup: - logger.warning(f"Recent backup found for repository {repo.name} (started at {recent_backup.started_at}), skipping") + logger.warning(f"Recent backup found for repository {repo.name} (started at {recent_backup.started_at}), skipping to prevent duplicates") return - logger.info(f"Starting scheduled backup for repository: {repo.name}") - backup_service.backup_repository(repo) + # 3. Use a file-based lock to prevent concurrent executions + import fcntl + import tempfile + import os + + lock_file_path = os.path.join(tempfile.gettempdir(), f"backup_lock_{repository.id}") + + try: + lock_file = open(lock_file_path, 'w') + fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + logger.info(f"Acquired file lock for repository {repo.name}") + + try: + logger.info(f"Starting scheduled backup for repository: {repo.name}") + backup_service.backup_repository(repo) + logger.info(f"Completed scheduled backup for repository: {repo.name}") + finally: + fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) + lock_file.close() + try: + os.unlink(lock_file_path) + except: + pass + + except (IOError, OSError) as lock_error: + logger.warning(f"Could not acquire lock for repository {repo.name}, another backup may be running: {lock_error}") + return except Exception as e: logger.error(f"Error in scheduled backup for repository {repository.id}: {e}", exc_info=True) @@ -771,6 +929,9 @@ def backup_with_context(): logger.info(f"Job {job_id} successfully scheduled, next run: {added_job.next_run_time}") else: logger.error(f"Failed to schedule job {job_id} - job not found after creation") + # Remove from tracking if scheduling failed + with _job_tracking_lock: + _scheduled_jobs.discard(job_id) # Initialize scheduler with existing repositories at startup # This runs after all functions are defined diff --git a/backup_service.py b/backup_service.py index fa2f581..4a1cbf0 100644 --- a/backup_service.py +++ b/backup_service.py @@ -43,6 +43,12 @@ def backup_repository(self, repository): logger.warning(f"Very recent backup found for repository {repository.name} (started at {recent_job.started_at}), skipping to prevent duplicates") return + # Auto-cleanup: Check for and clean up any orphaned temp directories + user_backup_dir = self.backup_base_dir / f"user_{repository.user_id}" + repo_backup_dir = user_backup_dir / repository.name + if repo_backup_dir.exists(): + self._cleanup_temp_directories(repo_backup_dir) + # Create backup job record backup_job = BackupJob( user_id=repository.user_id,