diff --git a/.github/workflows/run_code_checks.yaml b/.github/workflows/run_code_checks.yaml index dd9b4d23..b424a17b 100644 --- a/.github/workflows/run_code_checks.yaml +++ b/.github/workflows/run_code_checks.yaml @@ -26,5 +26,5 @@ jobs: integration_tests: name: Integration tests needs: [lint_check, type_check, unit_tests] - uses: apify/workflows/.github/workflows/python_integration_tests.yaml@main + uses: apify/workflows/.github/workflows/python_integration_tests.yaml@fix-integration-tests-from-forks secrets: inherit diff --git a/docs/02_guides/05_scrapy.mdx b/docs/02_guides/05_scrapy.mdx index 35b6bb5e..523a2423 100644 --- a/docs/02_guides/05_scrapy.mdx +++ b/docs/02_guides/05_scrapy.mdx @@ -40,6 +40,7 @@ The Apify SDK provides several custom components to support integration with the - [`apify.scrapy.ApifyScheduler`](https://docs.apify.com/sdk/python/reference/class/ApifyScheduler) - Replaces Scrapy's default [scheduler](https://docs.scrapy.org/en/latest/topics/scheduler.html) with one that uses Apify's [request queue](https://docs.apify.com/platform/storage/request-queue) for storing requests. It manages enqueuing, dequeuing, and maintaining the state and priority of requests. - [`apify.scrapy.ActorDatasetPushPipeline`](https://docs.apify.com/sdk/python/reference/class/ActorDatasetPushPipeline) - A Scrapy [item pipeline](https://docs.scrapy.org/en/latest/topics/item-pipeline.html) that pushes scraped items to Apify's [dataset](https://docs.apify.com/platform/storage/dataset). When enabled, every item produced by the spider is sent to the dataset. - [`apify.scrapy.ApifyHttpProxyMiddleware`](https://docs.apify.com/sdk/python/reference/class/ApifyHttpProxyMiddleware) - A Scrapy [middleware](https://docs.scrapy.org/en/latest/topics/downloader-middleware.html) that manages proxy configurations. This middleware replaces Scrapy's default `HttpProxyMiddleware` to facilitate the use of Apify's proxy service. +- [`apify.scrapy.extensions.ApifyCacheStorage`](https://docs.apify.com/sdk/python/reference/class/ApifyCacheStorage) - A storage backend for Scrapy's built-in [HTTP cache middleware](https://docs.scrapy.org/en/latest/topics/downloader-middleware.html#module-scrapy.downloadermiddlewares.httpcache). This backend uses Apify's [key-value store](https://docs.apify.com/platform/storage/key-value-store). Make sure to set `HTTPCACHE_ENABLED` and `HTTPCACHE_EXPIRATION_SECS` in your settings, or caching won't work. Additional helper functions in the [`apify.scrapy`](https://github.com/apify/apify-sdk-python/tree/master/src/apify/scrapy) subpackage include: @@ -94,6 +95,12 @@ The following example demonstrates a Scrapy Actor that scrapes page titles and e +## Dealing with ‘imminent migration to another host’ + +Under some circumstances, the platform may decide to [migrate your Actor](https://docs.apify.com/academy/expert-scraping-with-apify/migrations-maintaining-state) from one piece of infrastructure to another while it's in progress. While [Crawlee](https://crawlee.dev/python)-based projects can pause and resume their work after a restart, achieving the same with a Scrapy-based project can be challenging. + +As a workaround for this issue (tracked as [apify/actor-templates#303](https://github.com/apify/actor-templates/issues/303)), turn on caching with `HTTPCACHE_ENABLED` and set `HTTPCACHE_EXPIRATION_SECS` to at least a few minutes—the exact value depends on your use case. If your Actor gets migrated and restarted, the subsequent run will hit the cache, making it fast and avoiding unnecessary resource consumption. + ## Conclusion In this guide you learned how to use Scrapy in Apify Actors. You can now start building your own web scraping projects using Scrapy, the Apify SDK and host them on the Apify platform. See the [Actor templates](https://apify.com/templates/categories/python) to get started with your own scraping tasks. If you have questions or need assistance, feel free to reach out on our [GitHub](https://github.com/apify/apify-sdk-python) or join our [Discord community](https://discord.com/invite/jyEM2PRvMU). Happy scraping! diff --git a/docs/02_guides/code/scrapy_project/src/settings.py b/docs/02_guides/code/scrapy_project/src/settings.py index 62e11bfb..5c0e56e3 100644 --- a/docs/02_guides/code/scrapy_project/src/settings.py +++ b/docs/02_guides/code/scrapy_project/src/settings.py @@ -7,3 +7,5 @@ TELNETCONSOLE_ENABLED = False # Do not change the Twisted reactor unless you really know what you are doing. TWISTED_REACTOR = 'twisted.internet.asyncioreactor.AsyncioSelectorReactor' +HTTPCACHE_ENABLED = True +HTTPCACHE_EXPIRATION_SECS = 7200 diff --git a/src/apify/scrapy/extensions/__init__.py b/src/apify/scrapy/extensions/__init__.py new file mode 100644 index 00000000..e9bccd1f --- /dev/null +++ b/src/apify/scrapy/extensions/__init__.py @@ -0,0 +1,3 @@ +from apify.scrapy.extensions._httpcache import ApifyCacheStorage + +__all__ = ['ApifyCacheStorage'] diff --git a/src/apify/scrapy/extensions/_httpcache.py b/src/apify/scrapy/extensions/_httpcache.py new file mode 100644 index 00000000..509c4d8a --- /dev/null +++ b/src/apify/scrapy/extensions/_httpcache.py @@ -0,0 +1,212 @@ +from __future__ import annotations + +import gzip +import io +import pickle +import re +import struct +from logging import getLogger +from time import time +from typing import TYPE_CHECKING + +from scrapy.http.headers import Headers +from scrapy.responsetypes import responsetypes + +from apify import Configuration +from apify.apify_storage_client import ApifyStorageClient +from apify.scrapy._async_thread import AsyncThread +from apify.storages import KeyValueStore + +if TYPE_CHECKING: + from scrapy import Request, Spider + from scrapy.http.response import Response + from scrapy.settings import BaseSettings + from scrapy.utils.request import RequestFingerprinterProtocol + +logger = getLogger(__name__) + + +class ApifyCacheStorage: + """A Scrapy cache storage that uses the Apify `KeyValueStore` to store responses. + + It can be set as a storage for Scrapy's built-in `HttpCacheMiddleware`, which caches + responses to requests. See HTTPCache middleware settings (prefixed with `HTTPCACHE_`) + in the Scrapy documentation for more information. Requires the asyncio Twisted reactor + to be installed. + """ + + def __init__(self, settings: BaseSettings) -> None: + self._expiration_max_items = 100 + self._expiration_secs: int = settings.getint('HTTPCACHE_EXPIRATION_SECS') + self._spider: Spider | None = None + self._kvs: KeyValueStore | None = None + self._fingerprinter: RequestFingerprinterProtocol | None = None + self._async_thread: AsyncThread | None = None + + def open_spider(self, spider: Spider) -> None: + """Open the cache storage for a spider.""" + logger.debug('Using Apify key value cache storage', extra={'spider': spider}) + self._spider = spider + self._fingerprinter = spider.crawler.request_fingerprinter + kvs_name = get_kvs_name(spider.name) + + async def open_kvs() -> KeyValueStore: + config = Configuration.get_global_configuration() + if config.is_at_home: + storage_client = ApifyStorageClient.from_config(config) + return await KeyValueStore.open(name=kvs_name, storage_client=storage_client) + return await KeyValueStore.open(name=kvs_name) + + logger.debug("Starting background thread for cache storage's event loop") + self._async_thread = AsyncThread() + logger.debug(f"Opening cache storage's {kvs_name!r} key value store") + self._kvs = self._async_thread.run_coro(open_kvs()) + + def close_spider(self, _: Spider, current_time: int | None = None) -> None: + """Close the cache storage for a spider.""" + if self._async_thread is None: + raise ValueError('Async thread not initialized') + + logger.info(f'Cleaning up cache items (max {self._expiration_max_items})') + if self._expiration_secs > 0: + if current_time is None: + current_time = int(time()) + + async def expire_kvs() -> None: + if self._kvs is None: + raise ValueError('Key value store not initialized') + i = 0 + async for item in self._kvs.iterate_keys(): + value = await self._kvs.get_value(item.key) + try: + gzip_time = read_gzip_time(value) + except Exception as e: + logger.warning(f'Malformed cache item {item.key}: {e}') + await self._kvs.set_value(item.key, None) + else: + if self._expiration_secs < current_time - gzip_time: + logger.debug(f'Expired cache item {item.key}') + await self._kvs.set_value(item.key, None) + else: + logger.debug(f'Valid cache item {item.key}') + if i == self._expiration_max_items: + break + i += 1 + + self._async_thread.run_coro(expire_kvs()) + + logger.debug('Closing cache storage') + try: + self._async_thread.close() + except KeyboardInterrupt: + logger.warning('Shutdown interrupted by KeyboardInterrupt!') + except Exception: + logger.exception('Exception occurred while shutting down cache storage') + finally: + logger.debug('Cache storage closed') + + def retrieve_response(self, _: Spider, request: Request, current_time: int | None = None) -> Response | None: + """Retrieve a response from the cache storage.""" + if self._async_thread is None: + raise ValueError('Async thread not initialized') + if self._kvs is None: + raise ValueError('Key value store not initialized') + if self._fingerprinter is None: + raise ValueError('Request fingerprinter not initialized') + + key = self._fingerprinter.fingerprint(request).hex() + value = self._async_thread.run_coro(self._kvs.get_value(key)) + + if value is None: + logger.debug('Cache miss', extra={'request': request}) + return None + + if current_time is None: + current_time = int(time()) + if 0 < self._expiration_secs < current_time - read_gzip_time(value): + logger.debug('Cache expired', extra={'request': request}) + return None + + data = from_gzip(value) + url = data['url'] + status = data['status'] + headers = Headers(data['headers']) + body = data['body'] + respcls = responsetypes.from_args(headers=headers, url=url, body=body) + + logger.debug('Cache hit', extra={'request': request}) + return respcls(url=url, headers=headers, status=status, body=body) + + def store_response(self, _: Spider, request: Request, response: Response) -> None: + """Store a response in the cache storage.""" + if self._async_thread is None: + raise ValueError('Async thread not initialized') + if self._kvs is None: + raise ValueError('Key value store not initialized') + if self._fingerprinter is None: + raise ValueError('Request fingerprinter not initialized') + + key = self._fingerprinter.fingerprint(request).hex() + data = { + 'status': response.status, + 'url': response.url, + 'headers': dict(response.headers), + 'body': response.body, + } + value = to_gzip(data) + self._async_thread.run_coro(self._kvs.set_value(key, value)) + + +def to_gzip(data: dict, mtime: int | None = None) -> bytes: + """Dump a dictionary to a gzip-compressed byte stream.""" + with io.BytesIO() as byte_stream: + with gzip.GzipFile(fileobj=byte_stream, mode='wb', mtime=mtime) as gzip_file: + pickle.dump(data, gzip_file, protocol=4) + return byte_stream.getvalue() + + +def from_gzip(gzip_bytes: bytes) -> dict: + """Load a dictionary from a gzip-compressed byte stream.""" + with io.BytesIO(gzip_bytes) as byte_stream, gzip.GzipFile(fileobj=byte_stream, mode='rb') as gzip_file: + data: dict = pickle.load(gzip_file) + return data + + +def read_gzip_time(gzip_bytes: bytes) -> int: + """Read the modification time from a gzip-compressed byte stream without decompressing the data.""" + header = gzip_bytes[:10] + header_components = struct.unpack(' str: + """Get the key value store name for a spider. + + The key value store name is derived from the spider name by replacing all special characters + with hyphens and trimming leading and trailing hyphens. The resulting name is prefixed with + 'httpcache-' and truncated to the maximum length. + + The documentation + [about storages](https://docs.apify.com/platform/storage/usage#named-and-unnamed-storages) + mentions that names can be up to 63 characters long, so the default max length is set to 60. + + Such naming isn't unique per spider, but should be sufficiently unique for most use cases. + The name of the key value store should indicate to which spider it belongs, e.g. in + the listing in the Apify's console. + + Args: + spider_name: Value of the Spider instance's name attribute. + max_length: Maximum length of the key value store name. + + Returns: Key value store name. + + Raises: + ValueError: If the spider name contains only special characters. + """ + slug = re.sub(r'[^a-zA-Z0-9-]', '-', spider_name) + slug = re.sub(r'-+', '-', slug) + slug = slug.strip('-') + if not slug: + raise ValueError(f'Unsupported spider name: {spider_name!r} (slug: {slug!r})') + return f'httpcache-{slug}'[:max_length] diff --git a/src/apify/scrapy/utils.py b/src/apify/scrapy/utils.py index 860c1c33..98af8d87 100644 --- a/src/apify/scrapy/utils.py +++ b/src/apify/scrapy/utils.py @@ -44,6 +44,9 @@ def apply_apify_settings(*, settings: Settings | None = None, proxy_config: dict settings['DOWNLOADER_MIDDLEWARES']['scrapy.downloadermiddlewares.httpproxy.HttpProxyMiddleware'] = None settings['DOWNLOADER_MIDDLEWARES']['apify.scrapy.middlewares.ApifyHttpProxyMiddleware'] = 750 + # Set the default HTTPCache middleware storage backend to ApifyCacheStorage + settings['HTTPCACHE_STORAGE'] = 'apify.scrapy.extensions.ApifyCacheStorage' + # Store the proxy configuration settings['APIFY_PROXY_SETTINGS'] = proxy_config diff --git a/tests/unit/scrapy/extensions/__init__.py b/tests/unit/scrapy/extensions/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/scrapy/extensions/test_httpcache.py b/tests/unit/scrapy/extensions/test_httpcache.py new file mode 100644 index 00000000..51adbd63 --- /dev/null +++ b/tests/unit/scrapy/extensions/test_httpcache.py @@ -0,0 +1,71 @@ +from time import time + +import pytest + +from apify.scrapy.extensions._httpcache import from_gzip, get_kvs_name, read_gzip_time, to_gzip + +FIXTURE_DICT = {'name': 'Alice'} + +FIXTURE_BYTES = ( + b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xffk`\x99*\xcc\x00\x01\xb5SzX\xf2\x12s' + b'S\xa7\xf4\xb0:\xe6d&\xa7N)\xd6\x03\x00\x1c\xe8U\x9c\x1e\x00\x00\x00' +) + + +def test_gzip() -> None: + assert from_gzip(to_gzip(FIXTURE_DICT)) == FIXTURE_DICT + + +def test_to_gzip() -> None: + data_bytes = to_gzip(FIXTURE_DICT, mtime=0) + + assert data_bytes == FIXTURE_BYTES + + +def test_from_gzip() -> None: + data_dict = from_gzip(FIXTURE_BYTES) + + assert data_dict == FIXTURE_DICT + + +def test_read_gzip_time() -> None: + assert read_gzip_time(FIXTURE_BYTES) == 0 + + +def test_read_gzip_time_non_zero() -> None: + current_time = int(time()) + data_bytes = to_gzip(FIXTURE_DICT, mtime=current_time) + + assert read_gzip_time(data_bytes) == current_time + + +@pytest.mark.parametrize( + ('spider_name', 'expected'), + [ + ('test', 'httpcache-test'), + ('123', 'httpcache-123'), + ('test-spider', 'httpcache-test-spider'), + ('test_spider', 'httpcache-test-spider'), + ('test spider', 'httpcache-test-spider'), + ('test👻spider', 'httpcache-test-spider'), + ('test@spider', 'httpcache-test-spider'), + (' test spider ', 'httpcache-test-spider'), + ('testspider.com', 'httpcache-testspider-com'), + ('t' * 100, 'httpcache-tttttttttttttttttttttttttttttttttttttttttttttttttt'), + ], +) +def test_get_kvs_name(spider_name: str, expected: str) -> None: + assert get_kvs_name(spider_name) == expected + + +@pytest.mark.parametrize( + ('spider_name'), + [ + '', + '-', + '-@-/-', + ], +) +def test_get_kvs_name_raises(spider_name: str) -> None: + with pytest.raises(ValueError, match='Unsupported spider name'): + assert get_kvs_name(spider_name)