@@ -515,6 +515,7 @@ def __init__(self, manager: PopularCratesManager):
515515 self .stats = {"success" : 0 , "failed" : 0 , "skipped" : 0 , "total" : 0 }
516516 self ._workers : list [asyncio .Task ] = []
517517 self ._monitor_task : asyncio .Task | None = None
518+ self ._runner_task : asyncio .Task | None = None
518519 self ._start_time : datetime | None = None
519520 self ._processed_crates : set [str ] = set () # For duplicate detection
520521 self ._memory_monitor_task : asyncio .Task | None = None
@@ -533,14 +534,20 @@ def __init__(self, manager: PopularCratesManager):
533534 self ._task_refs = (
534535 set ()
535536 ) # Strong references to prevent GC # Currently processing crate name
537+ self ._semaphore_lock = asyncio .Lock ()
536538
537539 async def start (self ):
538540 """Start pre-ingestion in background (non-blocking)."""
539541 logger .info ("Starting background pre-ingestion of popular crates" )
540542 self ._start_time = datetime .now ()
541543
542544 # Create background task for the main runner
543- asyncio .create_task (self ._run ())
545+ if self ._runner_task and not self ._runner_task .done ():
546+ logger .warning ("Pre-ingestion runner task already running." )
547+ return
548+ self ._runner_task = asyncio .create_task (self ._run ())
549+ self ._task_refs .add (self ._runner_task )
550+ self ._runner_task .add_done_callback (self ._task_refs .discard )
544551
545552 async def pause (self ) -> bool :
546553 """Pause the worker."""
@@ -560,6 +567,19 @@ async def resume(self) -> bool:
560567 return True
561568 return False
562569
570+ def _spawn_workers (self ):
571+ """Spawn worker tasks up to the current adaptive concurrency."""
572+ # Clear finished worker tasks
573+ self ._workers = [w for w in self ._workers if not w .done ()]
574+
575+ # Spawn new workers if needed
576+ for i in range (len (self ._workers ), self ._adaptive_concurrency ):
577+ worker = asyncio .create_task (self ._ingest_worker (i ))
578+ self ._workers .append (worker )
579+ # Store strong reference to prevent GC
580+ self ._task_refs .add (worker )
581+ worker .add_done_callback (self ._task_refs .discard )
582+
563583 async def stop (self ) -> bool :
564584 """Stop the worker gracefully."""
565585 if self ._state in [WorkerState .RUNNING , WorkerState .PAUSED ]:
@@ -610,12 +630,7 @@ async def _run(self):
610630 await self .queue .put ((priority , crate .name ))
611631
612632 # Start worker tasks with adaptive concurrency
613- for i in range (self ._adaptive_concurrency ):
614- worker = asyncio .create_task (self ._ingest_worker (i ))
615- self ._workers .append (worker )
616- # Store strong reference to prevent GC
617- self ._task_refs .add (worker )
618- worker .add_done_callback (self ._task_refs .discard )
633+ self ._spawn_workers ()
619634
620635 # Start progress monitor
621636 self ._monitor_task = asyncio .create_task (self ._monitor_progress ())
@@ -663,28 +678,35 @@ async def _ingest_worker(self, worker_id: int):
663678
664679 # Get next crate from priority queue (with timeout for graceful shutdown)
665680 try :
666- priority , crate_name = await asyncio .wait_for (
667- self .queue .get (), timeout = 1.0
681+ # Use a short timeout so STOPPING state is honored promptly
682+ priority , crate_spec = await asyncio .wait_for (
683+ self .queue .get (), timeout = 0.1
668684 )
669685 except asyncio .TimeoutError :
670- # Check if queue is empty and we should exit
671- if self .queue .empty ():
686+ # Do not exit if queue is temporarily empty. The scheduler or callers
687+ # might add more items; only exit on STOPPING.
688+ if self ._state == WorkerState .STOPPING :
672689 break
673690 continue
674691
675692 # Skip if already processed (double-check for duplicates)
676- if crate_name in self ._processed_crates :
677- logger .debug (f"Worker { worker_id } : skipping duplicate { crate_name } " )
693+ name_for_dupe = parse_crate_spec (crate_spec )[0 ] if "@" in crate_spec else crate_spec
694+ if name_for_dupe in self ._processed_crates :
695+ logger .debug (f"Worker { worker_id } : skipping duplicate { name_for_dupe } " )
678696 self .stats ["skipped" ] += 1
679697 self .queue .task_done ()
680698 continue
681699
682700 # Mark as processed to prevent duplicates
683- self ._processed_crates .add (crate_name )
701+ self ._processed_crates .add (name_for_dupe )
684702
685703 # Process the crate with semaphore control
686- async with self .semaphore :
687- await self ._ingest_single_crate (crate_name )
704+ # Safely snapshot semaphore under lock to avoid races on reassignment
705+ async with self ._semaphore_lock :
706+ semaphore = self .semaphore
707+
708+ async with semaphore :
709+ await self ._ingest_single_crate (crate_spec )
688710
689711 # Mark task as done
690712 self .queue .task_done ()
@@ -693,12 +715,20 @@ async def _ingest_worker(self, worker_id: int):
693715 break
694716 except Exception as e :
695717 logger .error (f"Worker { worker_id } error: { e } " )
696- # Still mark as done to avoid hanging
697- self .queue .task_done ()
718+ # Still mark as done to avoid hanging when we had dequeued
719+ if 'crate_spec' in locals ():
720+ self .queue .task_done ()
698721
699- async def _ingest_single_crate (self , crate_name : str ):
722+ async def _ingest_single_crate (self , crate_spec : str ):
700723 """Ingest a single crate with error handling and progress tracking."""
701724 start_time = time .time ()
725+ try :
726+ crate_name , version_spec = parse_crate_spec (crate_spec )
727+ except Exception as e :
728+ self .stats ["failed" ] += 1
729+ logger .debug (f"Invalid crate spec '{ crate_spec } ': { e } " )
730+ return
731+
702732 self .current_crate = crate_name
703733
704734 # Initialize progress tracking for this crate
@@ -709,8 +739,10 @@ async def _ingest_single_crate(self, crate_name: str):
709739 }
710740
711741 try :
712- # Get the latest stable version (0-10% progress)
713- version = await fetch_current_stable_version (crate_name )
742+ # Get the version (0-10% progress)
743+ version = version_spec if version_spec and version_spec != "latest" else None
744+ if not version :
745+ version = await fetch_current_stable_version (crate_name )
714746 if not version :
715747 logger .debug (f"No stable version found for { crate_name } , skipping" )
716748 self .stats ["skipped" ] += 1
@@ -842,28 +874,28 @@ async def _monitor_memory(self):
842874
843875 # Reduce concurrency if needed
844876 if self ._adaptive_concurrency > 1 :
845- self ._adaptive_concurrency = max (
846- 1 , self ._adaptive_concurrency - 1
847- )
848- logger .info (
849- f"Reduced concurrency to { self ._adaptive_concurrency } "
850- )
851-
852- # Update semaphore
853- self .semaphore = asyncio .Semaphore (self ._adaptive_concurrency )
877+ new_cc = max (1 , self ._adaptive_concurrency - 1 )
878+ else :
879+ new_cc = self ._adaptive_concurrency
854880
855881 elif (
856882 memory_mb < 600
857883 and self ._adaptive_concurrency < config .PRE_INGEST_CONCURRENCY
858884 ):
859885 # Increase concurrency if memory allows
860- self . _adaptive_concurrency = min (
886+ new_cc = min (
861887 config .PRE_INGEST_CONCURRENCY , self ._adaptive_concurrency + 1
862888 )
863- logger .debug (
864- f"Increased concurrency to { self ._adaptive_concurrency } "
889+ else :
890+ new_cc = self ._adaptive_concurrency
891+
892+ if new_cc != self ._adaptive_concurrency :
893+ logger .info (
894+ f"Adjusting concurrency from { self ._adaptive_concurrency } to { new_cc } "
865895 )
866- self .semaphore = asyncio .Semaphore (self ._adaptive_concurrency )
896+ async with self ._semaphore_lock :
897+ self ._adaptive_concurrency = new_cc
898+ self .semaphore = asyncio .Semaphore (self ._adaptive_concurrency )
867899
868900 except asyncio .CancelledError :
869901 break
@@ -1060,6 +1092,12 @@ async def _schedule_ingestion(self):
10601092 self ._last_run = datetime .now ()
10611093
10621094 try :
1095+ # Ensure workers are running for the scheduled job
1096+ if not self .worker ._workers or all (t .done () for t in self .worker ._workers ):
1097+ logger .info (
1098+ "Workers are not running. Spawning workers for scheduled run."
1099+ )
1100+ self .worker ._spawn_workers ()
10631101 # Check memory before scheduling
10641102 if not await self ._should_schedule ():
10651103 logger .warning (
@@ -1088,7 +1126,10 @@ async def _schedule_ingestion(self):
10881126 await self .worker .queue .put ((priority , crate .name ))
10891127
10901128 # Process the queue (reuse existing worker logic)
1091- await self .worker .queue .join ()
1129+ if self .worker .queue .empty ():
1130+ logger .info ("Queue is empty, nothing to process for this scheduled run." )
1131+ else :
1132+ await self .worker .queue .join ()
10921133
10931134 self ._runs_completed += 1
10941135 logger .info (
0 commit comments