@@ -168,7 +168,7 @@ def format_local_time_filter(utc_dt, format_str='%Y-%m-%d %H:%M'):
168168
169169def  schedule_all_repositories ():
170170    """Schedule all active repositories on startup""" 
171-     from  datetime  import  datetime   # Import to ensure availability 
171+     from  datetime  import  datetime ,  timedelta   # Import to ensure availability 
172172
173173    try :
174174        # Clean up any stuck 'running' jobs from previous sessions 
@@ -182,13 +182,47 @@ def schedule_all_repositories():
182182                logger .info (f"Marked stuck job as failed: { stuck_job .id } { stuck_job .repository_id }  )
183183            db .session .commit ()
184184
185+         # Auto-cleanup: Remove duplicate backup jobs created within last hour 
186+         cutoff  =  datetime .utcnow () -  timedelta (hours = 1 )
187+         recent_jobs  =  BackupJob .query .filter (BackupJob .created_at  >  cutoff ).all ()
188+         
189+         # Group by repository and find duplicates 
190+         repo_jobs  =  {}
191+         for  job  in  recent_jobs :
192+             repo_id  =  job .repository_id 
193+             if  repo_id  not  in repo_jobs :
194+                 repo_jobs [repo_id ] =  []
195+             repo_jobs [repo_id ].append (job )
196+         
197+         duplicates_cleaned  =  0 
198+         for  repo_id , jobs  in  repo_jobs .items ():
199+             if  len (jobs ) >  1 :
200+                 # Sort by creation time, keep the first one, mark others as failed 
201+                 jobs .sort (key = lambda  j : j .created_at )
202+                 for  duplicate_job  in  jobs [1 :]:
203+                     if  duplicate_job .status  in  ['pending' , 'running' ]:
204+                         duplicate_job .status  =  'failed' 
205+                         duplicate_job .error_message  =  'Duplicate job automatically cleaned up' 
206+                         duplicate_job .completed_at  =  datetime .utcnow ()
207+                         duplicates_cleaned  +=  1 
208+                         logger .info (f"Auto-cleaned duplicate job { duplicate_job .id } { repo_id }  )
209+         
210+         if  duplicates_cleaned  >  0 :
211+             db .session .commit ()
212+             logger .info (f"Auto-cleaned { duplicates_cleaned }  )
213+ 
185214        # First, clear any existing jobs to prevent duplicates 
186215        existing_jobs  =  scheduler .get_jobs ()
187216        for  job  in  existing_jobs :
188217            if  job .id .startswith ('backup_' ):
189218                scheduler .remove_job (job .id )
190219                logger .info (f"Removed existing job on startup: { job .id }  )
191220
221+         # Clear our tracking as well 
222+         with  _job_tracking_lock :
223+             _scheduled_jobs .clear ()
224+             logger .info ("Cleared job tracking set" )
225+         
192226        repositories  =  Repository .query .filter_by (is_active = True ).all ()
193227        scheduled_count  =  0 
194228        for  repository  in  repositories :
@@ -197,6 +231,65 @@ def schedule_all_repositories():
197231                scheduled_count  +=  1 
198232                logger .info (f"Scheduled backup job for repository: { repository .name } { repository .schedule_type }  )
199233        logger .info (f"Scheduled { scheduled_count }  )
234+         
235+         # Schedule a periodic health check job to monitor for duplicates 
236+         def  scheduler_health_check ():
237+             from  datetime  import  datetime , timedelta 
238+             with  app .app_context ():
239+                 try :
240+                     # Check for duplicate jobs in scheduler 
241+                     all_jobs  =  scheduler .get_jobs ()
242+                     backup_jobs  =  [job  for  job  in  all_jobs  if  job .id .startswith ('backup_' )]
243+                     job_ids  =  [job .id  for  job  in  backup_jobs ]
244+                     
245+                     # Check for duplicate job IDs 
246+                     if  len (job_ids ) !=  len (set (job_ids )):
247+                         logger .error ("Duplicate scheduler job IDs detected! Cleaning up..." )
248+                         # Remove all backup jobs and reschedule 
249+                         for  job  in  backup_jobs :
250+                             scheduler .remove_job (job .id )
251+                         
252+                         # Clear tracking and reschedule 
253+                         with  _job_tracking_lock :
254+                             _scheduled_jobs .clear ()
255+                         
256+                         # Reschedule active repositories 
257+                         repositories  =  Repository .query .filter_by (is_active = True ).all ()
258+                         for  repo  in  repositories :
259+                             if  repo .schedule_type  !=  'manual' :
260+                                 schedule_backup_job (repo )
261+                         
262+                         logger .info ("Scheduler health check: cleaned up and rescheduled jobs" )
263+                     
264+                     # Auto-cleanup old failed jobs (older than 7 days) 
265+                     old_cutoff  =  datetime .utcnow () -  timedelta (days = 7 )
266+                     old_jobs  =  BackupJob .query .filter (
267+                         BackupJob .status  ==  'failed' ,
268+                         BackupJob .created_at  <  old_cutoff 
269+                     ).all ()
270+                     
271+                     if  old_jobs :
272+                         for  old_job  in  old_jobs :
273+                             db .session .delete (old_job )
274+                         db .session .commit ()
275+                         logger .info (f"Auto-cleaned { len (old_jobs )}  )
276+                         
277+                 except  Exception  as  e :
278+                     logger .error (f"Scheduler health check failed: { e }  )
279+         
280+         # Schedule health check to run every 6 hours 
281+         scheduler .add_job (
282+             func = scheduler_health_check ,
283+             trigger = CronTrigger (hour = '*/6' , timezone = LOCAL_TZ ),
284+             id = 'scheduler_health_check' ,
285+             name = 'Scheduler Health Check' ,
286+             replace_existing = True ,
287+             misfire_grace_time = 300 ,
288+             coalesce = True ,
289+             max_instances = 1 
290+         )
291+         logger .info ("Scheduled periodic scheduler health check" )
292+         
200293    except  Exception  as  e :
201294        logger .error (f"Error scheduling repositories on startup: { e }  )
202295
@@ -205,6 +298,10 @@ def schedule_all_repositories():
205298_scheduler_lock  =  threading .Lock ()
206299_scheduler_initialized  =  False 
207300
301+ # Global tracking of scheduled jobs to prevent duplicates 
302+ _scheduled_jobs  =  set ()
303+ _job_tracking_lock  =  threading .Lock ()
304+ 
208305def  ensure_scheduler_initialized ():
209306    """Ensure scheduler is initialized with existing repositories (thread-safe)""" 
210307    global  _scheduler_initialized 
@@ -622,11 +719,23 @@ def favicon():
622719
623720def  schedule_backup_job (repository ):
624721    """Schedule a backup job for a repository""" 
722+     global  _scheduled_jobs 
723+     
625724    if  not  repository .is_active :
626725        logger .info (f"Repository { repository .name }  )
627726        return 
628727
629728    job_id  =  f'backup_{ repository .id }  
729+     
730+     # Thread-safe check to prevent duplicate scheduling 
731+     with  _job_tracking_lock :
732+         if  job_id  in  _scheduled_jobs :
733+             logger .warning (f"Job { job_id }  )
734+             return 
735+         
736+         # Mark this job as being scheduled 
737+         _scheduled_jobs .add (job_id )
738+     
630739    logger .info (f"Attempting to schedule job { job_id } { repository .name }  )
631740
632741    # Remove existing job if it exists - try multiple ways to ensure it's gone 
@@ -635,6 +744,9 @@ def schedule_backup_job(repository):
635744        if  existing_job :
636745            scheduler .remove_job (job_id )
637746            logger .info (f"Removed existing scheduled job: { job_id }  )
747+             # Also remove from our tracking 
748+             with  _job_tracking_lock :
749+                 _scheduled_jobs .discard (job_id )
638750        else :
639751            logger .info (f"No existing job found for { job_id }  )
640752    except  Exception  as  e :
@@ -643,6 +755,8 @@ def schedule_backup_job(repository):
643755    # Double-check that job is really gone 
644756    if  scheduler .get_job (job_id ):
645757        logger .error (f"Job { job_id }  )
758+         with  _job_tracking_lock :
759+             _scheduled_jobs .discard (job_id )
646760        return 
647761
648762    # Create a wrapper function that includes Flask app context 
@@ -657,7 +771,26 @@ def backup_with_context():
657771                    logger .warning (f"Repository { repository .id }  )
658772                    return 
659773
660-                 # Check if there's already a running backup for this repository 
774+                 # Multiple layers of duplicate prevention 
775+                 
776+                 # 0. Auto-cleanup: Mark any long-running jobs as failed 
777+                 stuck_cutoff  =  datetime .utcnow () -  timedelta (hours = 2 )
778+                 stuck_jobs  =  BackupJob .query .filter_by (
779+                     repository_id = repository .id ,
780+                     status = 'running' 
781+                 ).filter (
782+                     BackupJob .started_at  <  stuck_cutoff 
783+                 ).all ()
784+                 
785+                 if  stuck_jobs :
786+                     logger .warning (f"Found { len (stuck_jobs )} { repo .name }  )
787+                     for  stuck  in  stuck_jobs :
788+                         stuck .status  =  'failed' 
789+                         stuck .error_message  =  'Job stuck for over 2 hours, automatically failed' 
790+                         stuck .completed_at  =  datetime .utcnow ()
791+                     db .session .commit ()
792+                 
793+                 # 1. Check if there's already a running backup for this repository 
661794                running_job  =  BackupJob .query .filter_by (
662795                    repository_id = repository .id , 
663796                    status = 'running' 
@@ -667,20 +800,45 @@ def backup_with_context():
667800                    logger .warning (f"Backup already running for repository { repo .name } { running_job .id }  )
668801                    return 
669802
670-                 # Additional check: ensure no backup started in the  last 30 seconds  to prevent rapid duplicates 
671-                 recent_cutoff  =  datetime .utcnow () -  timedelta (seconds = 30 )
803+                 # 2. Check for very recent backups (within  last 2 minutes)  to prevent rapid duplicates 
804+                 recent_cutoff  =  datetime .utcnow () -  timedelta (minutes = 2 )
672805                recent_backup  =  BackupJob .query .filter_by (
673806                    repository_id = repository .id 
674807                ).filter (
675808                    BackupJob .started_at  >  recent_cutoff 
676809                ).first ()
677810
678811                if  recent_backup :
679-                     logger .warning (f"Recent backup found for repository { repo .name } { recent_backup .started_at }  )
812+                     logger .warning (f"Recent backup found for repository { repo .name } { recent_backup .started_at }  to prevent duplicates " )
680813                    return 
681814
682-                 logger .info (f"Starting scheduled backup for repository: { repo .name }  )
683-                 backup_service .backup_repository (repo )
815+                 # 3. Use a file-based lock to prevent concurrent executions 
816+                 import  fcntl 
817+                 import  tempfile 
818+                 import  os 
819+                 
820+                 lock_file_path  =  os .path .join (tempfile .gettempdir (), f"backup_lock_{ repository .id }  )
821+                 
822+                 try :
823+                     lock_file  =  open (lock_file_path , 'w' )
824+                     fcntl .flock (lock_file .fileno (), fcntl .LOCK_EX  |  fcntl .LOCK_NB )
825+                     logger .info (f"Acquired file lock for repository { repo .name }  )
826+                     
827+                     try :
828+                         logger .info (f"Starting scheduled backup for repository: { repo .name }  )
829+                         backup_service .backup_repository (repo )
830+                         logger .info (f"Completed scheduled backup for repository: { repo .name }  )
831+                     finally :
832+                         fcntl .flock (lock_file .fileno (), fcntl .LOCK_UN )
833+                         lock_file .close ()
834+                         try :
835+                             os .unlink (lock_file_path )
836+                         except :
837+                             pass 
838+                         
839+                 except  (IOError , OSError ) as  lock_error :
840+                     logger .warning (f"Could not acquire lock for repository { repo .name } { lock_error }  )
841+                     return 
684842
685843            except  Exception  as  e :
686844                logger .error (f"Error in scheduled backup for repository { repository .id } { e }  , exc_info = True )
@@ -771,6 +929,9 @@ def backup_with_context():
771929        logger .info (f"Job { job_id } { added_job .next_run_time }  )
772930    else :
773931        logger .error (f"Failed to schedule job { job_id }  )
932+         # Remove from tracking if scheduling failed 
933+         with  _job_tracking_lock :
934+             _scheduled_jobs .discard (job_id )
774935
775936# Initialize scheduler with existing repositories at startup 
776937# This runs after all functions are defined 
0 commit comments