Skip to content

Commit 1df1fe2

Browse files
committed
add async thread helper class
1 parent 4fb9f87 commit 1df1fe2

File tree

2 files changed

+137
-103
lines changed

2 files changed

+137
-103
lines changed

src/apify/scrapy/_async_thread.py

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
import threading
5+
from concurrent import futures
6+
from datetime import timedelta
7+
from logging import getLogger
8+
from typing import TYPE_CHECKING, Any
9+
10+
if TYPE_CHECKING:
11+
from collections.abc import Coroutine
12+
13+
logger = getLogger(__name__)
14+
15+
16+
class AsyncThread:
17+
"""Class for running an asyncio event loop in a separate thread.
18+
19+
This allows running asynchronous coroutines from synchronous code by executingthem on an event loop
20+
that runs in its own dedicated thread.
21+
"""
22+
23+
def __init__(self) -> None:
24+
self._eventloop = asyncio.new_event_loop()
25+
26+
# Start the event loop in a dedicated daemon thread.
27+
self._thread = threading.Thread(
28+
target=self._start_event_loop,
29+
daemon=True,
30+
)
31+
self._thread.start()
32+
33+
def run_coro(
34+
self,
35+
coro: Coroutine,
36+
timeout: timedelta = timedelta(seconds=60),
37+
) -> Any:
38+
"""Run a coroutine on an event loop running in a separate thread.
39+
40+
This method schedules the coroutine to run on the event loop and blocks until the coroutine completes
41+
or the specified timeout is reached.
42+
43+
Args:
44+
coro: The coroutine to run.
45+
timeout: The maximum number of seconds to wait for the coroutine to finish.
46+
47+
Returns:
48+
The result returned by the coroutine.
49+
50+
Raises:
51+
TimeoutError: If the coroutine does not complete within the timeout.
52+
Exception: Any exception raised during coroutine execution.
53+
"""
54+
if not self._eventloop.is_running():
55+
logger.warning('Event loop is not running! Ignoring coroutine execution.')
56+
return None
57+
58+
# Submit the coroutine to the event loop running in the other thread.
59+
future = asyncio.run_coroutine_threadsafe(coro, self._eventloop)
60+
try:
61+
# Wait for the coroutine's result until the specified timeout.
62+
return future.result(timeout=timeout.total_seconds())
63+
except futures.TimeoutError as exc:
64+
logger.exception('Coroutine execution timed out.', exc_info=exc)
65+
raise
66+
except Exception as exc:
67+
logger.exception('Coroutine execution raised an exception.', exc_info=exc)
68+
raise
69+
70+
def close(self, timeout: timedelta = timedelta(seconds=60)) -> None:
71+
"""Close the event loop and its thread gracefully.
72+
73+
This method cancels all pending tasks, stops the event loop, and waits for the thread to exit.
74+
If the thread does not exit within the given timeout, a forced shutdown is attempted.
75+
76+
Args:
77+
timeout: The maximum number of seconds to wait for the event loop thread to exit.
78+
"""
79+
if self._eventloop.is_running():
80+
# Cancel all pending tasks in the event loop.
81+
self.run_coro(self._shutdown_tasks())
82+
83+
# Schedule the event loop to stop.
84+
self._eventloop.call_soon_threadsafe(self._eventloop.stop)
85+
86+
# Wait for the event loop thread to finish execution.
87+
self._thread.join(timeout=timeout.total_seconds())
88+
89+
# If the thread is still running after the timeout, force a shutdown.
90+
if self._thread.is_alive():
91+
logger.warning('Event loop thread did not exit cleanly! Forcing shutdown...')
92+
self._force_exit_event_loop()
93+
94+
def _start_event_loop(self) -> None:
95+
"""Set up and run the asyncio event loop in the dedicated thread."""
96+
asyncio.set_event_loop(self._eventloop)
97+
try:
98+
self._eventloop.run_forever()
99+
finally:
100+
self._eventloop.close()
101+
logger.debug('Asyncio event loop has been closed.')
102+
103+
async def _shutdown_tasks(self) -> None:
104+
"""Cancel all pending tasks in the event loop."""
105+
# Retrieve all tasks for the event loop, excluding the current task.
106+
tasks = [task for task in asyncio.all_tasks(self._eventloop) if task is not asyncio.current_task()]
107+
if not tasks:
108+
return
109+
110+
# Cancel each pending task.
111+
for task in tasks:
112+
task.cancel()
113+
114+
# Wait until all tasks have been cancelled or finished.
115+
await asyncio.gather(*tasks, return_exceptions=True)
116+
117+
def _force_exit_event_loop(self) -> None:
118+
"""Forcefully shut down the event loop and its thread."""
119+
try:
120+
logger.info('Forced shutdown of the event loop and its thread...')
121+
self._eventloop.call_soon_threadsafe(self._eventloop.stop)
122+
self._thread.join(timeout=5)
123+
except Exception as exc:
124+
logger.exception('Exception occurred during forced event loop shutdown.', exc_info=exc)

src/apify/scrapy/scheduler.py

Lines changed: 13 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -1,100 +1,25 @@
11
from __future__ import annotations
22

3-
import asyncio
4-
import threading
53
import traceback
6-
from concurrent import futures
74
from logging import getLogger
8-
from typing import TYPE_CHECKING, Any
5+
from typing import TYPE_CHECKING
96

107
from scrapy import Spider
118
from scrapy.core.scheduler import BaseScheduler
129
from scrapy.utils.reactor import is_asyncio_reactor_installed
1310

11+
from ._async_thread import AsyncThread
12+
from .requests import to_apify_request, to_scrapy_request
1413
from apify import Configuration
1514
from apify.apify_storage_client import ApifyStorageClient
16-
from apify.scrapy.requests import to_apify_request, to_scrapy_request
1715
from apify.storages import RequestQueue
1816

1917
if TYPE_CHECKING:
20-
from collections.abc import Coroutine
21-
2218
from scrapy.http.request import Request
2319
from twisted.internet.defer import Deferred
2420

2521
logger = getLogger(__name__)
2622

27-
_TIMEOUT = 60
28-
"""The timeout for waiting on asyncio coroutines to finish."""
29-
30-
31-
def _start_event_loop(eventloop: asyncio.AbstractEventLoop) -> None:
32-
"""Set and run the event loop until it is stopped.
33-
34-
Args:
35-
eventloop: The asyncio event loop to run.
36-
"""
37-
asyncio.set_event_loop(eventloop)
38-
try:
39-
eventloop.run_forever()
40-
finally:
41-
eventloop.close()
42-
logger.debug('Asyncio event loop has been closed.')
43-
44-
45-
def _run_async_coro(eventloop: asyncio.AbstractEventLoop, coro: Coroutine) -> Any:
46-
"""Run a coroutine on the given loop in our separate thread, waiting for its result.
47-
48-
Args:
49-
eventloop: The asyncio event loop to run the coroutine on.
50-
coro: The coroutine to run.
51-
52-
Returns:
53-
The result of the coroutine.
54-
"""
55-
if not eventloop.is_running():
56-
logger.warning('Event loop is not running! Ignoring coroutine execution.')
57-
return None
58-
59-
future = asyncio.run_coroutine_threadsafe(coro, eventloop)
60-
try:
61-
return future.result(timeout=_TIMEOUT)
62-
except futures.TimeoutError as exc:
63-
logger.exception('Coroutine execution timed out.', exc_info=exc)
64-
raise
65-
except Exception as exc:
66-
logger.exception('Coroutine execution raised an exception.', exc_info=exc)
67-
raise
68-
69-
70-
async def _shutdown_async_tasks(eventloop: asyncio.AbstractEventLoop) -> None:
71-
"""Cancel and wait for all pending tasks on the current event loop.
72-
73-
Args:
74-
eventloop: The asyncio event loop to cancel tasks on.
75-
"""
76-
tasks = [task for task in asyncio.all_tasks(eventloop) if task is not asyncio.current_task()]
77-
if not tasks:
78-
return
79-
for task in tasks:
80-
task.cancel()
81-
await asyncio.gather(*tasks, return_exceptions=True)
82-
83-
84-
def _force_exit_event_loop(eventloop: asyncio.AbstractEventLoop, thread: threading.Thread) -> None:
85-
"""Forcefully shut down the event loop and its thread.
86-
87-
Args:
88-
eventloop: The asyncio event loop to stop.
89-
thread: The thread running the event loop.
90-
"""
91-
try:
92-
logger.info('Forced shutdown of the event loop and its thread...')
93-
eventloop.call_soon_threadsafe(eventloop.stop)
94-
thread.join(timeout=5)
95-
except Exception as exc:
96-
logger.exception('Exception occurred during forced event loop shutdown.', exc_info=exc)
97-
9823

9924
class ApifyScheduler(BaseScheduler):
10025
"""A Scrapy scheduler that uses the Apify request queue to manage requests.
@@ -112,10 +37,8 @@ def __init__(self) -> None:
11237
self._rq: RequestQueue | None = None
11338
self.spider: Spider | None = None
11439

115-
# Create a new event loop and run it in a separate thread.
116-
self._eventloop = asyncio.new_event_loop()
117-
self._thread = threading.Thread(target=lambda: _start_event_loop(self._eventloop), daemon=True)
118-
self._thread.start()
40+
# A thread with the asyncio event loop to run coroutines on.
41+
self._async_thread = AsyncThread()
11942

12043
def open(self, spider: Spider) -> Deferred[None] | None:
12144
"""Open the scheduler.
@@ -133,7 +56,7 @@ async def open_rq() -> RequestQueue:
13356
return await RequestQueue.open()
13457

13558
try:
136-
self._rq = _run_async_coro(self._eventloop, open_rq())
59+
self._rq = self._async_thread.run_coro(open_rq())
13760
except Exception:
13861
traceback.print_exc()
13962
raise
@@ -150,20 +73,7 @@ def close(self, reason: str) -> None:
15073
"""
15174
logger.debug(f'Closing {self.__class__.__name__} due to {reason}...')
15275
try:
153-
if self._eventloop.is_running():
154-
# Cancel all pending tasks in the event loop.
155-
_run_async_coro(self._eventloop, _shutdown_async_tasks(self._eventloop))
156-
157-
# Stop the event loop.
158-
self._eventloop.call_soon_threadsafe(self._eventloop.stop)
159-
160-
# Wait for the event loop thread to exit.
161-
self._thread.join(timeout=_TIMEOUT)
162-
163-
# If the thread is still alive, execute a forced shutdown.
164-
if self._thread.is_alive():
165-
logger.warning('Event loop thread did not exit cleanly! Forcing shutdown...')
166-
_force_exit_event_loop(self._eventloop, self._thread)
76+
self._async_thread.close()
16777

16878
except KeyboardInterrupt:
16979
logger.warning('Shutdown interrupted by KeyboardInterrupt!')
@@ -184,7 +94,7 @@ def has_pending_requests(self) -> bool:
18494
raise TypeError('self._rq must be an instance of the RequestQueue class')
18595

18696
try:
187-
is_finished = _run_async_coro(self._eventloop, self._rq.is_finished())
97+
is_finished = self._async_thread.run_coro(self._rq.is_finished())
18898
except Exception:
18999
traceback.print_exc()
190100
raise
@@ -217,7 +127,7 @@ def enqueue_request(self, request: Request) -> bool:
217127
raise TypeError('self._rq must be an instance of the RequestQueue class')
218128

219129
try:
220-
result = _run_async_coro(self._eventloop, self._rq.add_request(apify_request))
130+
result = self._async_thread.run_coro(self._rq.add_request(apify_request))
221131
except Exception:
222132
traceback.print_exc()
223133
raise
@@ -236,7 +146,7 @@ def next_request(self) -> Request | None:
236146
raise TypeError('self._rq must be an instance of the RequestQueue class')
237147

238148
try:
239-
apify_request = _run_async_coro(self._eventloop, self._rq.fetch_next_request())
149+
apify_request = self._async_thread.run_coro(self._rq.fetch_next_request())
240150
except Exception:
241151
traceback.print_exc()
242152
raise
@@ -248,10 +158,10 @@ def next_request(self) -> Request | None:
248158
if not isinstance(self.spider, Spider):
249159
raise TypeError('self.spider must be an instance of the Spider class')
250160

251-
# Let the request queue know that the request is being handled. Every request should be marked as handled,
252-
# retrying is handled by the Scrapy's RetryMiddleware.
161+
# Let the request queue know that the request is being handled. Every request should
162+
# be marked as handled, retrying is handled by the Scrapy's RetryMiddleware.
253163
try:
254-
_run_async_coro(self._eventloop, self._rq.mark_request_as_handled(apify_request))
164+
self._async_thread.run_coro(self._rq.mark_request_as_handled(apify_request))
255165
except Exception:
256166
traceback.print_exc()
257167
raise

0 commit comments

Comments
 (0)