1
1
from __future__ import annotations
2
2
3
- import asyncio
4
- import threading
5
3
import traceback
6
- from concurrent import futures
7
4
from logging import getLogger
8
- from typing import TYPE_CHECKING , Any
5
+ from typing import TYPE_CHECKING
9
6
10
7
from scrapy import Spider
11
8
from scrapy .core .scheduler import BaseScheduler
12
9
from scrapy .utils .reactor import is_asyncio_reactor_installed
13
10
11
+ from ._async_thread import AsyncThread
12
+ from .requests import to_apify_request , to_scrapy_request
14
13
from apify import Configuration
15
14
from apify .apify_storage_client import ApifyStorageClient
16
- from apify .scrapy .requests import to_apify_request , to_scrapy_request
17
15
from apify .storages import RequestQueue
18
16
19
17
if TYPE_CHECKING :
20
- from collections .abc import Coroutine
21
-
22
18
from scrapy .http .request import Request
23
19
from twisted .internet .defer import Deferred
24
20
25
21
logger = getLogger (__name__ )
26
22
27
- _TIMEOUT = 60
28
- """The timeout for waiting on asyncio coroutines to finish."""
29
-
30
-
31
- def _start_event_loop (eventloop : asyncio .AbstractEventLoop ) -> None :
32
- """Set and run the event loop until it is stopped.
33
-
34
- Args:
35
- eventloop: The asyncio event loop to run.
36
- """
37
- asyncio .set_event_loop (eventloop )
38
- try :
39
- eventloop .run_forever ()
40
- finally :
41
- eventloop .close ()
42
- logger .debug ('Asyncio event loop has been closed.' )
43
-
44
-
45
- def _run_async_coro (eventloop : asyncio .AbstractEventLoop , coro : Coroutine ) -> Any :
46
- """Run a coroutine on the given loop in our separate thread, waiting for its result.
47
-
48
- Args:
49
- eventloop: The asyncio event loop to run the coroutine on.
50
- coro: The coroutine to run.
51
-
52
- Returns:
53
- The result of the coroutine.
54
- """
55
- if not eventloop .is_running ():
56
- logger .warning ('Event loop is not running! Ignoring coroutine execution.' )
57
- return None
58
-
59
- future = asyncio .run_coroutine_threadsafe (coro , eventloop )
60
- try :
61
- return future .result (timeout = _TIMEOUT )
62
- except futures .TimeoutError as exc :
63
- logger .exception ('Coroutine execution timed out.' , exc_info = exc )
64
- raise
65
- except Exception as exc :
66
- logger .exception ('Coroutine execution raised an exception.' , exc_info = exc )
67
- raise
68
-
69
-
70
- async def _shutdown_async_tasks (eventloop : asyncio .AbstractEventLoop ) -> None :
71
- """Cancel and wait for all pending tasks on the current event loop.
72
-
73
- Args:
74
- eventloop: The asyncio event loop to cancel tasks on.
75
- """
76
- tasks = [task for task in asyncio .all_tasks (eventloop ) if task is not asyncio .current_task ()]
77
- if not tasks :
78
- return
79
- for task in tasks :
80
- task .cancel ()
81
- await asyncio .gather (* tasks , return_exceptions = True )
82
-
83
-
84
- def _force_exit_event_loop (eventloop : asyncio .AbstractEventLoop , thread : threading .Thread ) -> None :
85
- """Forcefully shut down the event loop and its thread.
86
-
87
- Args:
88
- eventloop: The asyncio event loop to stop.
89
- thread: The thread running the event loop.
90
- """
91
- try :
92
- logger .info ('Forced shutdown of the event loop and its thread...' )
93
- eventloop .call_soon_threadsafe (eventloop .stop )
94
- thread .join (timeout = 5 )
95
- except Exception as exc :
96
- logger .exception ('Exception occurred during forced event loop shutdown.' , exc_info = exc )
97
-
98
23
99
24
class ApifyScheduler (BaseScheduler ):
100
25
"""A Scrapy scheduler that uses the Apify request queue to manage requests.
@@ -112,10 +37,8 @@ def __init__(self) -> None:
112
37
self ._rq : RequestQueue | None = None
113
38
self .spider : Spider | None = None
114
39
115
- # Create a new event loop and run it in a separate thread.
116
- self ._eventloop = asyncio .new_event_loop ()
117
- self ._thread = threading .Thread (target = lambda : _start_event_loop (self ._eventloop ), daemon = True )
118
- self ._thread .start ()
40
+ # A thread with the asyncio event loop to run coroutines on.
41
+ self ._async_thread = AsyncThread ()
119
42
120
43
def open (self , spider : Spider ) -> Deferred [None ] | None :
121
44
"""Open the scheduler.
@@ -133,7 +56,7 @@ async def open_rq() -> RequestQueue:
133
56
return await RequestQueue .open ()
134
57
135
58
try :
136
- self ._rq = _run_async_coro ( self ._eventloop , open_rq ())
59
+ self ._rq = self ._async_thread . run_coro ( open_rq ())
137
60
except Exception :
138
61
traceback .print_exc ()
139
62
raise
@@ -150,20 +73,7 @@ def close(self, reason: str) -> None:
150
73
"""
151
74
logger .debug (f'Closing { self .__class__ .__name__ } due to { reason } ...' )
152
75
try :
153
- if self ._eventloop .is_running ():
154
- # Cancel all pending tasks in the event loop.
155
- _run_async_coro (self ._eventloop , _shutdown_async_tasks (self ._eventloop ))
156
-
157
- # Stop the event loop.
158
- self ._eventloop .call_soon_threadsafe (self ._eventloop .stop )
159
-
160
- # Wait for the event loop thread to exit.
161
- self ._thread .join (timeout = _TIMEOUT )
162
-
163
- # If the thread is still alive, execute a forced shutdown.
164
- if self ._thread .is_alive ():
165
- logger .warning ('Event loop thread did not exit cleanly! Forcing shutdown...' )
166
- _force_exit_event_loop (self ._eventloop , self ._thread )
76
+ self ._async_thread .close ()
167
77
168
78
except KeyboardInterrupt :
169
79
logger .warning ('Shutdown interrupted by KeyboardInterrupt!' )
@@ -184,7 +94,7 @@ def has_pending_requests(self) -> bool:
184
94
raise TypeError ('self._rq must be an instance of the RequestQueue class' )
185
95
186
96
try :
187
- is_finished = _run_async_coro ( self ._eventloop , self ._rq .is_finished ())
97
+ is_finished = self ._async_thread . run_coro ( self ._rq .is_finished ())
188
98
except Exception :
189
99
traceback .print_exc ()
190
100
raise
@@ -217,7 +127,7 @@ def enqueue_request(self, request: Request) -> bool:
217
127
raise TypeError ('self._rq must be an instance of the RequestQueue class' )
218
128
219
129
try :
220
- result = _run_async_coro ( self ._eventloop , self ._rq .add_request (apify_request ))
130
+ result = self ._async_thread . run_coro ( self ._rq .add_request (apify_request ))
221
131
except Exception :
222
132
traceback .print_exc ()
223
133
raise
@@ -236,7 +146,7 @@ def next_request(self) -> Request | None:
236
146
raise TypeError ('self._rq must be an instance of the RequestQueue class' )
237
147
238
148
try :
239
- apify_request = _run_async_coro ( self ._eventloop , self ._rq .fetch_next_request ())
149
+ apify_request = self ._async_thread . run_coro ( self ._rq .fetch_next_request ())
240
150
except Exception :
241
151
traceback .print_exc ()
242
152
raise
@@ -248,10 +158,10 @@ def next_request(self) -> Request | None:
248
158
if not isinstance (self .spider , Spider ):
249
159
raise TypeError ('self.spider must be an instance of the Spider class' )
250
160
251
- # Let the request queue know that the request is being handled. Every request should be marked as handled,
252
- # retrying is handled by the Scrapy's RetryMiddleware.
161
+ # Let the request queue know that the request is being handled. Every request should
162
+ # be marked as handled, retrying is handled by the Scrapy's RetryMiddleware.
253
163
try :
254
- _run_async_coro ( self ._eventloop , self ._rq .mark_request_as_handled (apify_request ))
164
+ self ._async_thread . run_coro ( self ._rq .mark_request_as_handled (apify_request ))
255
165
except Exception :
256
166
traceback .print_exc ()
257
167
raise
0 commit comments