Skip to content

Commit 4ecf8a6

Browse files
committed
fix(spiders): Fix for when the pausing system has large queue
1 parent cf0de7e commit 4ecf8a6

File tree

1 file changed

+33
-6
lines changed

1 file changed

+33
-6
lines changed

scrapling/spiders/engine.py

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ def __init__(
5050
self._checkpoint_manager = CheckpointManager(crawldir or "", interval)
5151
self._last_checkpoint_time: float = 0.0
5252
self._pause_requested: bool = False
53+
self._force_stop: bool = False
5354
self.paused: bool = False
5455

5556
def _is_domain_allowed(self, request: Request) -> bool:
@@ -144,10 +145,23 @@ async def _task_wrapper(self, request: Request) -> None:
144145
self._active_tasks -= 1
145146

146147
def request_pause(self) -> None:
147-
"""Request a graceful pause of the crawl."""
148-
if not self._pause_requested:
148+
"""Request a graceful pause of the crawl.
149+
150+
First call: requests graceful pause (waits for active tasks).
151+
Second call: forces immediate stop.
152+
"""
153+
if self._force_stop:
154+
return # Already forcing stop
155+
156+
if self._pause_requested:
157+
# Second Ctrl+C - force stop
158+
self._force_stop = True
159+
log.warning("Force stop requested, cancelling immediately...")
160+
else:
149161
self._pause_requested = True
150-
log.info("Pause requested, waiting for in-flight requests to complete...")
162+
log.info(
163+
"Pause requested, waiting for in-flight requests to complete (press Ctrl+C again to force stop)..."
164+
)
151165

152166
async def _save_checkpoint(self) -> None:
153167
"""Save current state to checkpoint files."""
@@ -215,15 +229,22 @@ async def crawl(self) -> CrawlStats:
215229
# Process queue
216230
async with create_task_group() as tg:
217231
while self._running:
218-
# Check for pause request
232+
# Check for pause/stop request
219233
if self._checkpoint_system_enabled:
220234
if self._pause_requested:
221235
# Wait for active tasks to complete
222-
if self._active_tasks == 0:
236+
if self._active_tasks == 0 or self._force_stop:
237+
if self._force_stop:
238+
log.warning(f"Force stopping with {self._active_tasks} active tasks")
239+
223240
await self._save_checkpoint()
224241
self.paused = True
225242
self._running = False
226-
log.info("Spider paused, checkpoint saved")
243+
244+
if not self._force_stop:
245+
log.info("Spider paused, checkpoint saved")
246+
else:
247+
tg.cancel_scope.cancel()
227248
break
228249
# Wait briefly and check again
229250
await anyio.sleep(0.05)
@@ -243,6 +264,12 @@ async def crawl(self) -> CrawlStats:
243264
await anyio.sleep(0.05)
244265
continue
245266

267+
# Only spawn tasks up to concurrent_requests limit
268+
# This prevents spawning thousands of waiting tasks
269+
if self._active_tasks >= self.spider.concurrent_requests:
270+
await anyio.sleep(0.01)
271+
continue
272+
246273
request = await self.scheduler.dequeue()
247274
self._active_tasks += 1
248275
tg.start_soon(self._task_wrapper, request)

0 commit comments

Comments
 (0)