Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 168 additions & 7 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def format_local_time_filter(utc_dt, format_str='%Y-%m-%d %H:%M'):

def schedule_all_repositories():
"""Schedule all active repositories on startup"""
from datetime import datetime # Import to ensure availability
from datetime import datetime, timedelta # Import to ensure availability

try:
# Clean up any stuck 'running' jobs from previous sessions
Expand All @@ -182,13 +182,47 @@ def schedule_all_repositories():
logger.info(f"Marked stuck job as failed: {stuck_job.id} for repository {stuck_job.repository_id}")
db.session.commit()

# Auto-cleanup: Remove duplicate backup jobs created within last hour
cutoff = datetime.utcnow() - timedelta(hours=1)
recent_jobs = BackupJob.query.filter(BackupJob.created_at > cutoff).all()

# Group by repository and find duplicates
repo_jobs = {}
for job in recent_jobs:
repo_id = job.repository_id
if repo_id not in repo_jobs:
repo_jobs[repo_id] = []
repo_jobs[repo_id].append(job)

duplicates_cleaned = 0
for repo_id, jobs in repo_jobs.items():
if len(jobs) > 1:
# Sort by creation time, keep the first one, mark others as failed
jobs.sort(key=lambda j: j.created_at)
for duplicate_job in jobs[1:]:
if duplicate_job.status in ['pending', 'running']:
duplicate_job.status = 'failed'
duplicate_job.error_message = 'Duplicate job automatically cleaned up'
duplicate_job.completed_at = datetime.utcnow()
duplicates_cleaned += 1
logger.info(f"Auto-cleaned duplicate job {duplicate_job.id} for repository {repo_id}")

if duplicates_cleaned > 0:
db.session.commit()
logger.info(f"Auto-cleaned {duplicates_cleaned} duplicate backup jobs")

# First, clear any existing jobs to prevent duplicates
existing_jobs = scheduler.get_jobs()
for job in existing_jobs:
if job.id.startswith('backup_'):
scheduler.remove_job(job.id)
logger.info(f"Removed existing job on startup: {job.id}")

# Clear our tracking as well
with _job_tracking_lock:
_scheduled_jobs.clear()
logger.info("Cleared job tracking set")

repositories = Repository.query.filter_by(is_active=True).all()
scheduled_count = 0
for repository in repositories:
Expand All @@ -197,6 +231,65 @@ def schedule_all_repositories():
scheduled_count += 1
logger.info(f"Scheduled backup job for repository: {repository.name} ({repository.schedule_type})")
logger.info(f"Scheduled {scheduled_count} backup jobs on startup")

# Schedule a periodic health check job to monitor for duplicates
def scheduler_health_check():
from datetime import datetime, timedelta
with app.app_context():
try:
# Check for duplicate jobs in scheduler
all_jobs = scheduler.get_jobs()
backup_jobs = [job for job in all_jobs if job.id.startswith('backup_')]
job_ids = [job.id for job in backup_jobs]

# Check for duplicate job IDs
if len(job_ids) != len(set(job_ids)):
logger.error("Duplicate scheduler job IDs detected! Cleaning up...")
# Remove all backup jobs and reschedule
for job in backup_jobs:
scheduler.remove_job(job.id)

# Clear tracking and reschedule
with _job_tracking_lock:
_scheduled_jobs.clear()

# Reschedule active repositories
repositories = Repository.query.filter_by(is_active=True).all()
for repo in repositories:
if repo.schedule_type != 'manual':
schedule_backup_job(repo)

logger.info("Scheduler health check: cleaned up and rescheduled jobs")

# Auto-cleanup old failed jobs (older than 7 days)
old_cutoff = datetime.utcnow() - timedelta(days=7)
old_jobs = BackupJob.query.filter(
BackupJob.status == 'failed',
BackupJob.created_at < old_cutoff
).all()

if old_jobs:
for old_job in old_jobs:
db.session.delete(old_job)
db.session.commit()
logger.info(f"Auto-cleaned {len(old_jobs)} old failed backup jobs")

except Exception as e:
logger.error(f"Scheduler health check failed: {e}")

# Schedule health check to run every 6 hours
scheduler.add_job(
func=scheduler_health_check,
trigger=CronTrigger(hour='*/6', timezone=LOCAL_TZ),
id='scheduler_health_check',
name='Scheduler Health Check',
replace_existing=True,
misfire_grace_time=300,
coalesce=True,
max_instances=1
)
logger.info("Scheduled periodic scheduler health check")

except Exception as e:
logger.error(f"Error scheduling repositories on startup: {e}")

Expand All @@ -205,6 +298,10 @@ def schedule_all_repositories():
_scheduler_lock = threading.Lock()
_scheduler_initialized = False

# Global tracking of scheduled jobs to prevent duplicates
_scheduled_jobs = set()
_job_tracking_lock = threading.Lock()

def ensure_scheduler_initialized():
"""Ensure scheduler is initialized with existing repositories (thread-safe)"""
global _scheduler_initialized
Expand Down Expand Up @@ -622,11 +719,23 @@ def favicon():

def schedule_backup_job(repository):
"""Schedule a backup job for a repository"""
global _scheduled_jobs

if not repository.is_active:
logger.info(f"Repository {repository.name} is inactive, not scheduling")
return

job_id = f'backup_{repository.id}'

# Thread-safe check to prevent duplicate scheduling
with _job_tracking_lock:
if job_id in _scheduled_jobs:
logger.warning(f"Job {job_id} already being scheduled, skipping duplicate")
return

# Mark this job as being scheduled
_scheduled_jobs.add(job_id)

logger.info(f"Attempting to schedule job {job_id} for repository {repository.name}")

# Remove existing job if it exists - try multiple ways to ensure it's gone
Expand All @@ -635,6 +744,9 @@ def schedule_backup_job(repository):
if existing_job:
scheduler.remove_job(job_id)
logger.info(f"Removed existing scheduled job: {job_id}")
# Also remove from our tracking
with _job_tracking_lock:
_scheduled_jobs.discard(job_id)
else:
logger.info(f"No existing job found for {job_id}")
except Exception as e:
Expand All @@ -643,6 +755,8 @@ def schedule_backup_job(repository):
# Double-check that job is really gone
if scheduler.get_job(job_id):
logger.error(f"Job {job_id} still exists after removal attempt, aborting schedule")
with _job_tracking_lock:
_scheduled_jobs.discard(job_id)
return

# Create a wrapper function that includes Flask app context
Expand All @@ -657,7 +771,26 @@ def backup_with_context():
logger.warning(f"Repository {repository.id} not found or inactive, skipping backup")
return

# Check if there's already a running backup for this repository
# Multiple layers of duplicate prevention

# 0. Auto-cleanup: Mark any long-running jobs as failed
stuck_cutoff = datetime.utcnow() - timedelta(hours=2)
stuck_jobs = BackupJob.query.filter_by(
repository_id=repository.id,
status='running'
).filter(
BackupJob.started_at < stuck_cutoff
).all()

if stuck_jobs:
logger.warning(f"Found {len(stuck_jobs)} stuck running jobs for repository {repo.name}, cleaning up")
for stuck in stuck_jobs:
stuck.status = 'failed'
stuck.error_message = 'Job stuck for over 2 hours, automatically failed'
stuck.completed_at = datetime.utcnow()
db.session.commit()

# 1. Check if there's already a running backup for this repository
running_job = BackupJob.query.filter_by(
repository_id=repository.id,
status='running'
Expand All @@ -667,20 +800,45 @@ def backup_with_context():
logger.warning(f"Backup already running for repository {repo.name} (job {running_job.id}), skipping")
return

# Additional check: ensure no backup started in the last 30 seconds to prevent rapid duplicates
recent_cutoff = datetime.utcnow() - timedelta(seconds=30)
# 2. Check for very recent backups (within last 2 minutes) to prevent rapid duplicates
recent_cutoff = datetime.utcnow() - timedelta(minutes=2)
Copy link

Copilot AI Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The time window for preventing duplicates has changed from 30 seconds to 2 minutes without explanation. This magic number should be made configurable or at least documented why 2 minutes was chosen.

Copilot uses AI. Check for mistakes.
recent_backup = BackupJob.query.filter_by(
repository_id=repository.id
).filter(
BackupJob.started_at > recent_cutoff
).first()

if recent_backup:
logger.warning(f"Recent backup found for repository {repo.name} (started at {recent_backup.started_at}), skipping")
logger.warning(f"Recent backup found for repository {repo.name} (started at {recent_backup.started_at}), skipping to prevent duplicates")
return

logger.info(f"Starting scheduled backup for repository: {repo.name}")
backup_service.backup_repository(repo)
# 3. Use a file-based lock to prevent concurrent executions
import fcntl
import tempfile
import os

lock_file_path = os.path.join(tempfile.gettempdir(), f"backup_lock_{repository.id}")

try:
lock_file = open(lock_file_path, 'w')
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
logger.info(f"Acquired file lock for repository {repo.name}")

try:
logger.info(f"Starting scheduled backup for repository: {repo.name}")
backup_service.backup_repository(repo)
logger.info(f"Completed scheduled backup for repository: {repo.name}")
finally:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
lock_file.close()
try:
os.unlink(lock_file_path)
except:
pass
Comment on lines +834 to +837
Copy link

Copilot AI Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using bare except: clause suppresses all exceptions including system exits and keyboard interrupts. Use a specific exception type like OSError or FileNotFoundError instead.

Suggested change
try:
os.unlink(lock_file_path)
except:
pass
try:
os.unlink(lock_file_path)
except OSError:
pass

Copilot uses AI. Check for mistakes.

except (IOError, OSError) as lock_error:
Comment on lines +822 to +839
Copy link

Copilot AI Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file handle should be managed with a context manager (with statement) to ensure proper cleanup even if an exception occurs before the explicit close() call.

Suggested change
try:
lock_file = open(lock_file_path, 'w')
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
logger.info(f"Acquired file lock for repository {repo.name}")
try:
logger.info(f"Starting scheduled backup for repository: {repo.name}")
backup_service.backup_repository(repo)
logger.info(f"Completed scheduled backup for repository: {repo.name}")
finally:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
lock_file.close()
try:
os.unlink(lock_file_path)
except:
pass
except (IOError, OSError) as lock_error:
try:
with open(lock_file_path, 'w') as lock_file:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
logger.info(f"Acquired file lock for repository {repo.name}")
try:
logger.info(f"Starting scheduled backup for repository: {repo.name}")
backup_service.backup_repository(repo)
logger.info(f"Completed scheduled backup for repository: {repo.name}")
finally:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
try:
os.unlink(lock_file_path)
except:
pass
except (IOError, OSError) as lock_error:

Copilot uses AI. Check for mistakes.
logger.warning(f"Could not acquire lock for repository {repo.name}, another backup may be running: {lock_error}")
return

except Exception as e:
logger.error(f"Error in scheduled backup for repository {repository.id}: {e}", exc_info=True)
Expand Down Expand Up @@ -771,6 +929,9 @@ def backup_with_context():
logger.info(f"Job {job_id} successfully scheduled, next run: {added_job.next_run_time}")
else:
logger.error(f"Failed to schedule job {job_id} - job not found after creation")
# Remove from tracking if scheduling failed
with _job_tracking_lock:
_scheduled_jobs.discard(job_id)

# Initialize scheduler with existing repositories at startup
# This runs after all functions are defined
Expand Down
6 changes: 6 additions & 0 deletions backup_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ def backup_repository(self, repository):
logger.warning(f"Very recent backup found for repository {repository.name} (started at {recent_job.started_at}), skipping to prevent duplicates")
return

# Auto-cleanup: Check for and clean up any orphaned temp directories
user_backup_dir = self.backup_base_dir / f"user_{repository.user_id}"
repo_backup_dir = user_backup_dir / repository.name
if repo_backup_dir.exists():
self._cleanup_temp_directories(repo_backup_dir)
Comment on lines +46 to +50
Copy link

Copilot AI Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method _cleanup_temp_directories is called but not defined in the shown code. This will result in an AttributeError when the backup service attempts to clean up temp directories.

Copilot uses AI. Check for mistakes.

# Create backup job record
backup_job = BackupJob(
user_id=repository.user_id,
Expand Down