Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/run_code_checks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ jobs:
integration_tests:
name: Integration tests
needs: [lint_check, type_check, unit_tests]
uses: apify/workflows/.github/workflows/python_integration_tests.yaml@main
uses: apify/workflows/.github/workflows/python_integration_tests.yaml@fix-integration-tests-from-forks
secrets: inherit
7 changes: 7 additions & 0 deletions docs/02_guides/05_scrapy.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ The Apify SDK provides several custom components to support integration with the
- [`apify.scrapy.ApifyScheduler`](https://docs.apify.com/sdk/python/reference/class/ApifyScheduler) - Replaces Scrapy's default [scheduler](https://docs.scrapy.org/en/latest/topics/scheduler.html) with one that uses Apify's [request queue](https://docs.apify.com/platform/storage/request-queue) for storing requests. It manages enqueuing, dequeuing, and maintaining the state and priority of requests.
- [`apify.scrapy.ActorDatasetPushPipeline`](https://docs.apify.com/sdk/python/reference/class/ActorDatasetPushPipeline) - A Scrapy [item pipeline](https://docs.scrapy.org/en/latest/topics/item-pipeline.html) that pushes scraped items to Apify's [dataset](https://docs.apify.com/platform/storage/dataset). When enabled, every item produced by the spider is sent to the dataset.
- [`apify.scrapy.ApifyHttpProxyMiddleware`](https://docs.apify.com/sdk/python/reference/class/ApifyHttpProxyMiddleware) - A Scrapy [middleware](https://docs.scrapy.org/en/latest/topics/downloader-middleware.html) that manages proxy configurations. This middleware replaces Scrapy's default `HttpProxyMiddleware` to facilitate the use of Apify's proxy service.
- [`apify.scrapy.extensions.ApifyCacheStorage`](https://docs.apify.com/sdk/python/reference/class/ApifyCacheStorage) - A storage backend for Scrapy's built-in [HTTP cache middleware](https://docs.scrapy.org/en/latest/topics/downloader-middleware.html#module-scrapy.downloadermiddlewares.httpcache). This backend uses Apify's [key-value store](https://docs.apify.com/platform/storage/key-value-store). Make sure to set `HTTPCACHE_ENABLED` and `HTTPCACHE_EXPIRATION_SECS` in your settings, or caching won't work.

Additional helper functions in the [`apify.scrapy`](https://github.com/apify/apify-sdk-python/tree/master/src/apify/scrapy) subpackage include:

Expand Down Expand Up @@ -94,6 +95,12 @@ The following example demonstrates a Scrapy Actor that scrapes page titles and e
</TabItem>
</Tabs>

## Dealing with ‘imminent migration to another host’

Under some circumstances, the platform may decide to [migrate your Actor](https://docs.apify.com/academy/expert-scraping-with-apify/migrations-maintaining-state) from one piece of infrastructure to another while it's in progress. While [Crawlee](https://crawlee.dev/python)-based projects can pause and resume their work after a restart, achieving the same with a Scrapy-based project can be challenging.

As a workaround for this issue (tracked as [apify/actor-templates#303](https://github.com/apify/actor-templates/issues/303)), turn on caching with `HTTPCACHE_ENABLED` and set `HTTPCACHE_EXPIRATION_SECS` to at least a few minutes—the exact value depends on your use case. If your Actor gets migrated and restarted, the subsequent run will hit the cache, making it fast and avoiding unnecessary resource consumption.

## Conclusion

In this guide you learned how to use Scrapy in Apify Actors. You can now start building your own web scraping projects using Scrapy, the Apify SDK and host them on the Apify platform. See the [Actor templates](https://apify.com/templates/categories/python) to get started with your own scraping tasks. If you have questions or need assistance, feel free to reach out on our [GitHub](https://github.com/apify/apify-sdk-python) or join our [Discord community](https://discord.com/invite/jyEM2PRvMU). Happy scraping!
Expand Down
2 changes: 2 additions & 0 deletions docs/02_guides/code/scrapy_project/src/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@
TELNETCONSOLE_ENABLED = False
# Do not change the Twisted reactor unless you really know what you are doing.
TWISTED_REACTOR = 'twisted.internet.asyncioreactor.AsyncioSelectorReactor'
HTTPCACHE_ENABLED = True
HTTPCACHE_EXPIRATION_SECS = 7200
3 changes: 3 additions & 0 deletions src/apify/scrapy/extensions/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from apify.scrapy.extensions._httpcache import ApifyCacheStorage

__all__ = ['ApifyCacheStorage']
190 changes: 190 additions & 0 deletions src/apify/scrapy/extensions/_httpcache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
from __future__ import annotations

import gzip
import io
import pickle
import re
import struct
from logging import getLogger
from time import time
from typing import TYPE_CHECKING

from scrapy.http.headers import Headers
from scrapy.responsetypes import responsetypes

from apify import Configuration
from apify.apify_storage_client import ApifyStorageClient
from apify.scrapy._async_thread import AsyncThread
from apify.storages import KeyValueStore

if TYPE_CHECKING:
from scrapy import Request, Spider
from scrapy.http.response import Response
from scrapy.settings import BaseSettings
from scrapy.utils.request import RequestFingerprinterProtocol

logger = getLogger(__name__)


class ApifyCacheStorage:
"""A Scrapy cache storage that uses the Apify `KeyValueStore` to store responses.
It can be set as a storage for Scrapy's built-in `HttpCacheMiddleware`, which caches
responses to requests. See HTTPCache middleware settings (prefixed with `HTTPCACHE_`)
in the Scrapy documentation for more information. Requires the asyncio Twisted reactor
to be installed.
"""

def __init__(self, settings: BaseSettings) -> None:
self._expiration_max_items = 100
self._expiration_secs: int = settings.getint('HTTPCACHE_EXPIRATION_SECS')
self._spider: Spider | None = None
self._kvs: KeyValueStore | None = None
self._fingerprinter: RequestFingerprinterProtocol | None = None
self._async_thread: AsyncThread | None = None

def open_spider(self, spider: Spider) -> None:
"""Open the cache storage for a spider."""
logger.debug('Using Apify key value cache storage', extra={'spider': spider})
self._spider = spider
self._fingerprinter = spider.crawler.request_fingerprinter
kvs_name = get_kvs_name(spider.name)

async def open_kvs() -> KeyValueStore:
config = Configuration.get_global_configuration()
if config.is_at_home:
storage_client = ApifyStorageClient.from_config(config)
return await KeyValueStore.open(name=kvs_name, storage_client=storage_client)
return await KeyValueStore.open(name=kvs_name)

logger.debug("Starting background thread for cache storage's event loop")
self._async_thread = AsyncThread()
logger.debug(f"Opening cache storage's {kvs_name!r} key value store")
self._kvs = self._async_thread.run_coro(open_kvs())

def close_spider(self, _: Spider, current_time: int | None = None) -> None:
"""Close the cache storage for a spider."""
if self._async_thread is None:
raise ValueError('Async thread not initialized')

logger.info(f'Cleaning up cache items (max {self._expiration_max_items})')
if self._expiration_secs > 0:
if current_time is None:
current_time = int(time())

async def expire_kvs() -> None:
if self._kvs is None:
raise ValueError('Key value store not initialized')
i = 0
async for item in self._kvs.iterate_keys():
value = await self._kvs.get_value(item.key)
try:
gzip_time = read_gzip_time(value)
except Exception as e:
logger.warning(f'Malformed cache item {item.key}: {e}')
await self._kvs.set_value(item.key, None)
else:
if self._expiration_secs < current_time - gzip_time:
logger.debug(f'Expired cache item {item.key}')
await self._kvs.set_value(item.key, None)
else:
logger.debug(f'Valid cache item {item.key}')
if i == self._expiration_max_items:
break
i += 1

self._async_thread.run_coro(expire_kvs())

logger.debug('Closing cache storage')
try:
self._async_thread.close()
except KeyboardInterrupt:
logger.warning('Shutdown interrupted by KeyboardInterrupt!')
except Exception:
logger.exception('Exception occurred while shutting down cache storage')
finally:
logger.debug('Cache storage closed')

def retrieve_response(self, _: Spider, request: Request, current_time: int | None = None) -> Response | None:
"""Retrieve a response from the cache storage."""
if self._async_thread is None:
raise ValueError('Async thread not initialized')
if self._kvs is None:
raise ValueError('Key value store not initialized')
if self._fingerprinter is None:
raise ValueError('Request fingerprinter not initialized')

key = self._fingerprinter.fingerprint(request).hex()
value = self._async_thread.run_coro(self._kvs.get_value(key))

if value is None:
logger.debug('Cache miss', extra={'request': request})
return None

if current_time is None:
current_time = int(time())
if 0 < self._expiration_secs < current_time - read_gzip_time(value):
logger.debug('Cache expired', extra={'request': request})
return None

data = from_gzip(value)
url = data['url']
status = data['status']
headers = Headers(data['headers'])
body = data['body']
respcls = responsetypes.from_args(headers=headers, url=url, body=body)

logger.debug('Cache hit', extra={'request': request})
return respcls(url=url, headers=headers, status=status, body=body)

def store_response(self, _: Spider, request: Request, response: Response) -> None:
"""Store a response in the cache storage."""
if self._async_thread is None:
raise ValueError('Async thread not initialized')
if self._kvs is None:
raise ValueError('Key value store not initialized')
if self._fingerprinter is None:
raise ValueError('Request fingerprinter not initialized')

key = self._fingerprinter.fingerprint(request).hex()
data = {
'status': response.status,
'url': response.url,
'headers': dict(response.headers),
'body': response.body,
}
value = to_gzip(data)
self._async_thread.run_coro(self._kvs.set_value(key, value))


def to_gzip(data: dict, mtime: int | None = None) -> bytes:
"""Dump a dictionary to a gzip-compressed byte stream."""
with io.BytesIO() as byte_stream:
with gzip.GzipFile(fileobj=byte_stream, mode='wb', mtime=mtime) as gzip_file:
pickle.dump(data, gzip_file, protocol=4)
return byte_stream.getvalue()


def from_gzip(gzip_bytes: bytes) -> dict:
"""Load a dictionary from a gzip-compressed byte stream."""
with io.BytesIO(gzip_bytes) as byte_stream, gzip.GzipFile(fileobj=byte_stream, mode='rb') as gzip_file:
data: dict = pickle.load(gzip_file)
return data


def read_gzip_time(gzip_bytes: bytes) -> int:
"""Read the modification time from a gzip-compressed byte stream without decompressing the data."""
header = gzip_bytes[:10]
header_components = struct.unpack('<HBBI2B', header)
mtime: int = header_components[3]
return mtime


def get_kvs_name(spider_name: str) -> str:
"""Get the key value store name for a spider."""
slug = re.sub(r'[^a-zA-Z0-9-]', '-', spider_name)
slug = re.sub(r'-+', '-', slug)
slug = slug.strip('-')
if not slug:
raise ValueError(f'Unsupported spider name: {spider_name!r} (slug: {slug!r})')
return f'httpcache-{slug}'
3 changes: 3 additions & 0 deletions src/apify/scrapy/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ def apply_apify_settings(*, settings: Settings | None = None, proxy_config: dict
settings['DOWNLOADER_MIDDLEWARES']['scrapy.downloadermiddlewares.httpproxy.HttpProxyMiddleware'] = None
settings['DOWNLOADER_MIDDLEWARES']['apify.scrapy.middlewares.ApifyHttpProxyMiddleware'] = 750

# Set the default HTTPCache middleware storage backend to ApifyCacheStorage
settings['HTTPCACHE_STORAGE'] = 'apify.scrapy.extensions.ApifyCacheStorage'

# Store the proxy configuration
settings['APIFY_PROXY_SETTINGS'] = proxy_config

Expand Down
Empty file.
70 changes: 70 additions & 0 deletions tests/unit/scrapy/extensions/test_httpcache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from time import time

import pytest

from apify.scrapy.extensions._httpcache import from_gzip, get_kvs_name, read_gzip_time, to_gzip

FIXTURE_DICT = {'name': 'Alice'}

FIXTURE_BYTES = (
b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xffk`\x99*\xcc\x00\x01\xb5SzX\xf2\x12s'
b'S\xa7\xf4\xb0:\xe6d&\xa7N)\xd6\x03\x00\x1c\xe8U\x9c\x1e\x00\x00\x00'
)


def test_gzip() -> None:
assert from_gzip(to_gzip(FIXTURE_DICT)) == FIXTURE_DICT


def test_to_gzip() -> None:
data_bytes = to_gzip(FIXTURE_DICT, mtime=0)

assert data_bytes == FIXTURE_BYTES


def test_from_gzip() -> None:
data_dict = from_gzip(FIXTURE_BYTES)

assert data_dict == FIXTURE_DICT


def test_read_gzip_time() -> None:
assert read_gzip_time(FIXTURE_BYTES) == 0


def test_read_gzip_time_non_zero() -> None:
current_time = int(time())
data_bytes = to_gzip(FIXTURE_DICT, mtime=current_time)

assert read_gzip_time(data_bytes) == current_time


@pytest.mark.parametrize(
('spider_name', 'expected'),
[
('test', 'httpcache-test'),
('123', 'httpcache-123'),
('test-spider', 'httpcache-test-spider'),
('test_spider', 'httpcache-test-spider'),
('test spider', 'httpcache-test-spider'),
('test👻spider', 'httpcache-test-spider'),
('test@spider', 'httpcache-test-spider'),
(' test spider ', 'httpcache-test-spider'),
('testspider.com', 'httpcache-testspider-com'),
],
)
def test_get_kvs_name(spider_name: str, expected: str) -> None:
assert get_kvs_name(spider_name) == expected


@pytest.mark.parametrize(
('spider_name'),
[
'',
'-',
'-@-/-',
],
)
def test_get_kvs_name_raises(spider_name: str) -> None:
with pytest.raises(ValueError, match='Unsupported spider name'):
assert get_kvs_name(spider_name)
Loading