2
2
import json
3
3
import logging
4
4
from collections import OrderedDict
5
- from datetime import datetime
5
+ from datetime import datetime , timezone
6
6
from typing import Coroutine , Dict , Optional
7
7
from typing import OrderedDict as OrderedDictType
8
8
from typing import Set , Union
@@ -110,7 +110,7 @@ def __init__(self, id: str, name: Optional[str], client: Union[ApifyClientAsync,
110
110
self ._queue_head_dict = OrderedDict ()
111
111
self ._query_queue_head_promise = None
112
112
self ._in_progress = set ()
113
- self ._last_activity = datetime .utcnow ( )
113
+ self ._last_activity = datetime .now ( timezone . utc )
114
114
self ._recently_handled = LRUCache [bool ](max_length = RECENTLY_HANDLED_CACHE_SIZE )
115
115
self ._requests_cache = LRUCache (max_length = MAX_CACHED_REQUESTS )
116
116
@@ -140,7 +140,7 @@ async def add_request(self, request: Dict, *, forefront: bool = False) -> Dict:
140
140
_budget_ow (request , {
141
141
'url' : (str , True ),
142
142
})
143
- self ._last_activity = datetime .utcnow ( )
143
+ self ._last_activity = datetime .now ( timezone . utc )
144
144
145
145
if request .get ('uniqueKey' ) is None :
146
146
request ['uniqueKey' ] = request ['url' ] # TODO: Check Request class in crawlee and replicate uniqueKey generation logic...
@@ -215,7 +215,7 @@ async def fetch_next_request(self) -> Optional[Dict]:
215
215
})} """ )
216
216
return None
217
217
self ._in_progress .add (next_request_id )
218
- self ._last_activity = datetime .utcnow ( )
218
+ self ._last_activity = datetime .now ( timezone . utc )
219
219
220
220
try :
221
221
request = await self .get_request (next_request_id )
@@ -266,12 +266,12 @@ async def mark_request_as_handled(self, request: Dict) -> Optional[Dict]:
266
266
'uniqueKey' : (str , True ),
267
267
'handledAt' : (datetime , False ),
268
268
})
269
- self ._last_activity = datetime .utcnow ( )
269
+ self ._last_activity = datetime .now ( timezone . utc )
270
270
if request ['id' ] not in self ._in_progress :
271
271
logging .debug (f'Cannot mark request { request ["id" ]} as handled, because it is not in progress!' )
272
272
return None
273
273
274
- request ['handledAt' ] = request .get ('handledAt' , datetime .utcnow ( ))
274
+ request ['handledAt' ] = request .get ('handledAt' , datetime .now ( timezone . utc ))
275
275
queue_operation_info = await self ._client .update_request ({** request })
276
276
queue_operation_info ['uniqueKey' ] = request ['uniqueKey' ]
277
277
@@ -302,7 +302,7 @@ async def reclaim_request(self, request: Dict, forefront: bool = False) -> Optio
302
302
'id' : (str , True ),
303
303
'uniqueKey' : (str , True ),
304
304
})
305
- self ._last_activity = datetime .utcnow ( )
305
+ self ._last_activity = datetime .now ( timezone . utc )
306
306
307
307
if request ['id' ] not in self ._in_progress :
308
308
logging .debug (f'Cannot reclaim request { request ["id" ]} , because it is not in progress!' )
@@ -352,7 +352,8 @@ async def is_finished(self) -> bool:
352
352
Returns:
353
353
bool: `True` if all requests were already handled and there are no more left. `False` otherwise.
354
354
"""
355
- if self ._in_progress_count () > 0 and (datetime .utcnow () - self ._last_activity ).seconds > self ._internal_timeout_seconds :
355
+ seconds_since_last_activity = (datetime .now (timezone .utc ) - self ._last_activity ).seconds
356
+ if self ._in_progress_count () > 0 and seconds_since_last_activity > self ._internal_timeout_seconds :
356
357
message = f'The request queue seems to be stuck for { self ._internal_timeout_seconds } s, resetting internal state.'
357
358
logging .warning (message )
358
359
self ._reset ()
@@ -371,7 +372,7 @@ def _reset(self) -> None:
371
372
self ._assumed_total_count = 0
372
373
self ._assumed_handled_count = 0
373
374
self ._requests_cache .clear ()
374
- self ._last_activity = datetime .utcnow ( )
375
+ self ._last_activity = datetime .now ( timezone . utc )
375
376
376
377
def _cache_request (self , cache_key : str , queue_operation_info : Dict ) -> None :
377
378
self ._requests_cache [cache_key ] = {
@@ -382,7 +383,7 @@ def _cache_request(self, cache_key: str, queue_operation_info: Dict) -> None:
382
383
}
383
384
384
385
async def _queue_query_head (self , limit : int ) -> Dict :
385
- query_started_at = datetime .utcnow ( )
386
+ query_started_at = datetime .now ( timezone . utc )
386
387
387
388
list_head = await self ._client .list_head (limit = limit )
388
389
for request in list_head ['items' ]:
@@ -391,10 +392,10 @@ async def _queue_query_head(self, limit: int) -> Dict:
391
392
continue
392
393
self ._queue_head_dict [request ['id' ]] = request ['id' ]
393
394
self ._cache_request (_unique_key_to_request_id (request ['uniqueKey' ]), {
394
- 'request_id ' : request ['id' ],
395
- 'was_already_handled ' : False ,
396
- 'was_already_present ' : True ,
397
- 'unique_key ' : request ['uniqueKey' ],
395
+ 'requestId ' : request ['id' ],
396
+ 'wasAlreadyHandled ' : False ,
397
+ 'wasAlreadyPresent ' : True ,
398
+ 'uniqueKey ' : request ['uniqueKey' ],
398
399
})
399
400
400
401
# This is needed so that the next call to _ensureHeadIsNonEmpty() will fetch the queue head again.
@@ -440,7 +441,7 @@ async def _ensure_head_is_non_empty(self, ensure_consistency: bool = False, limi
440
441
# If ensureConsistency=true then we must ensure that either:
441
442
# - queueModifiedAt is older than queryStartedAt by at least API_PROCESSED_REQUESTS_DELAY_MILLIS
442
443
# - hadMultipleClients=false and this.assumedTotalCount<=this.assumedHandledCount
443
- is_database_consistent = (queue_head ['queryStartedAt' ] - queue_head ['queueModifiedAt' ]
444
+ is_database_consistent = (queue_head ['queryStartedAt' ] - queue_head ['queueModifiedAt' ]. replace ( tzinfo = timezone . utc )
444
445
).seconds >= (API_PROCESSED_REQUESTS_DELAY_MILLIS // 1000 )
445
446
is_locally_consistent = not queue_head ['hadMultipleClients' ] and self ._assumed_total_count <= self ._assumed_handled_count
446
447
# Consistent information from one source is enough to consider request queue finished.
@@ -459,7 +460,8 @@ async def _ensure_head_is_non_empty(self, ensure_consistency: bool = False, limi
459
460
460
461
# If we are repeating for consistency then wait required time.
461
462
if should_repeat_for_consistency :
462
- delay_seconds = (API_PROCESSED_REQUESTS_DELAY_MILLIS // 1000 ) - (datetime .utcnow () - queue_head ['queueModifiedAt' ]).seconds
463
+ delay_seconds = (API_PROCESSED_REQUESTS_DELAY_MILLIS // 1000 ) - \
464
+ (datetime .now (timezone .utc ) - queue_head ['queueModifiedAt' ]).seconds
463
465
logging .info (f'Waiting for { delay_seconds } s before considering the queue as finished to ensure that the data is consistent.' )
464
466
await asyncio .sleep (delay_seconds )
465
467
0 commit comments