Skip to content

Commit 381c044

Browse files
committed
feat(scrapy): add Scrapy cache using Apify KV store
This code has been originally developed for the https://github.com/juniorguru/plucker/ project, which is licensed under AGPL-3.0-only. I am the sole author of that code and hereby I grant the https://github.com/apify/apify-sdk-python/ project the right to use it under an Apache-2.0 license, without the copyleft taking effect. My intention is to contribute this code to the upstream, remove it from my project, and only import the component from the apify package as a dependency, as I believe this component could be useful to other users of the apify package.
1 parent fd7650a commit 381c044

File tree

2 files changed

+203
-0
lines changed

2 files changed

+203
-0
lines changed

src/apify/scrapy/cache.py

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
from __future__ import annotations
2+
3+
import gzip
4+
import io
5+
import pickle
6+
import struct
7+
from logging import getLogger
8+
from time import time
9+
from typing import TYPE_CHECKING
10+
11+
from scrapy.http.headers import Headers
12+
from scrapy.responsetypes import responsetypes
13+
14+
from ._async_thread import AsyncThread
15+
from apify import Configuration
16+
from apify.apify_storage_client import ApifyStorageClient
17+
from apify.storages import KeyValueStore
18+
19+
if TYPE_CHECKING:
20+
from scrapy import Request, Spider
21+
from scrapy.http.response import Response
22+
from scrapy.settings import BaseSettings
23+
from scrapy.utils.request import RequestFingerprinterProtocol
24+
25+
logger = getLogger(__name__)
26+
27+
28+
class ApifyCacheStorage:
29+
"""A Scrapy cache storage that uses the Apify `KeyValueStore` to store responses.
30+
31+
This cache storage requires the asyncio Twisted reactor to be installed.
32+
"""
33+
34+
def __init__(self, settings: BaseSettings) -> None:
35+
self.expiration_max_items = 100
36+
self.expiration_secs: int = settings.getint("HTTPCACHE_EXPIRATION_SECS")
37+
self.spider: Spider | None = None
38+
self._kv: KeyValueStore | None = None
39+
self._fingerprinter: RequestFingerprinterProtocol | None = None
40+
self._async_thread: AsyncThread | None = None
41+
42+
def open_spider(self, spider: Spider) -> None:
43+
logger.debug("Using Apify key value cache storage", extra={"spider": spider})
44+
self.spider = spider
45+
self._fingerprinter = spider.crawler.request_fingerprinter
46+
kv_name = f"httpcache-{spider.name}"
47+
48+
async def open_kv() -> KeyValueStore:
49+
config = Configuration.get_global_configuration()
50+
if config.is_at_home:
51+
storage_client = ApifyStorageClient.from_config(config)
52+
return await KeyValueStore.open(
53+
name=kv_name, storage_client=storage_client
54+
)
55+
return await KeyValueStore.open(name=kv_name)
56+
57+
logger.debug("Starting background thread for cache storage's event loop")
58+
self._async_thread = AsyncThread()
59+
logger.debug(f"Opening cache storage's {kv_name!r} key value store")
60+
self._kv = self._async_thread.run_coro(open_kv())
61+
62+
def close_spider(self, spider: Spider, current_time: int | None = None) -> None:
63+
assert self._async_thread is not None, "Async thread not initialized"
64+
65+
logger.info(f"Cleaning up cache items (max {self.expiration_max_items})")
66+
if 0 < self.expiration_secs:
67+
if current_time is None:
68+
current_time = int(time())
69+
70+
async def expire_kv() -> None:
71+
assert self._kv is not None, "Key value store not initialized"
72+
i = 0
73+
async for item in self._kv.iterate_keys():
74+
value = await self._kv.get_value(item.key)
75+
try:
76+
gzip_time = read_gzip_time(value)
77+
except Exception as e:
78+
logger.warning(f"Malformed cache item {item.key}: {e}")
79+
await self._kv.set_value(item.key, None)
80+
else:
81+
if self.expiration_secs < current_time - gzip_time:
82+
logger.debug(f"Expired cache item {item.key}")
83+
await self._kv.set_value(item.key, None)
84+
else:
85+
logger.debug(f"Valid cache item {item.key}")
86+
if i == self.expiration_max_items:
87+
break
88+
i += 1
89+
90+
self._async_thread.run_coro(expire_kv())
91+
92+
logger.debug("Closing cache storage")
93+
try:
94+
self._async_thread.close()
95+
except KeyboardInterrupt:
96+
logger.warning("Shutdown interrupted by KeyboardInterrupt!")
97+
except Exception:
98+
logger.exception("Exception occurred while shutting down cache storage")
99+
finally:
100+
logger.debug("Cache storage closed")
101+
102+
def retrieve_response(
103+
self, spider: Spider, request: Request, current_time: int | None = None
104+
) -> Response | None:
105+
assert self._async_thread is not None, "Async thread not initialized"
106+
assert self._kv is not None, "Key value store not initialized"
107+
assert self._fingerprinter is not None, "Request fingerprinter not initialized"
108+
109+
key = self._fingerprinter.fingerprint(request).hex()
110+
value = self._async_thread.run_coro(self._kv.get_value(key))
111+
112+
if value is None:
113+
logger.debug("Cache miss", extra={"request": request})
114+
return None
115+
116+
if current_time is None:
117+
current_time = int(time())
118+
if 0 < self.expiration_secs < current_time - read_gzip_time(value):
119+
logger.debug("Cache expired", extra={"request": request})
120+
return None
121+
122+
data = from_gzip(value)
123+
url = data["url"]
124+
status = data["status"]
125+
headers = Headers(data["headers"])
126+
body = data["body"]
127+
respcls = responsetypes.from_args(headers=headers, url=url, body=body)
128+
129+
logger.debug("Cache hit", extra={"request": request})
130+
return respcls(url=url, headers=headers, status=status, body=body)
131+
132+
def store_response(
133+
self, spider: Spider, request: Request, response: Response
134+
) -> None:
135+
assert self._async_thread is not None, "Async thread not initialized"
136+
assert self._kv is not None, "Key value store not initialized"
137+
assert self._fingerprinter is not None, "Request fingerprinter not initialized"
138+
139+
key = self._fingerprinter.fingerprint(request).hex()
140+
data = {
141+
"status": response.status,
142+
"url": response.url,
143+
"headers": dict(response.headers),
144+
"body": response.body,
145+
}
146+
value = to_gzip(data)
147+
self._async_thread.run_coro(self._kv.set_value(key, value))
148+
149+
150+
def to_gzip(data: dict, mtime: int | None = None) -> bytes:
151+
"""Dump a dictionary to a gzip-compressed byte stream."""
152+
with io.BytesIO() as byte_stream:
153+
with gzip.GzipFile(fileobj=byte_stream, mode="wb", mtime=mtime) as gzip_file:
154+
pickle.dump(data, gzip_file, protocol=4)
155+
return byte_stream.getvalue()
156+
157+
158+
def from_gzip(gzip_bytes: bytes) -> dict:
159+
"""Load a dictionary from a gzip-compressed byte stream."""
160+
with io.BytesIO(gzip_bytes) as byte_stream, gzip.GzipFile(fileobj=byte_stream, mode="rb") as gzip_file:
161+
return pickle.load(gzip_file)
162+
163+
164+
def read_gzip_time(gzip_bytes: bytes) -> int:
165+
"""Read the modification time from a gzip-compressed byte stream without decompressing the data."""
166+
header = gzip_bytes[:10]
167+
header_components = struct.unpack("<HBBI2B", header)
168+
return header_components[3]

tests/unit/scrapy/test_cache.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from time import time
2+
3+
from apify.scrapy.cache import from_gzip, read_gzip_time, to_gzip
4+
5+
FIXTURE_BYTES = (
6+
b"\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xffk`\x99*\xcc\x00\x01\xb5SzX\xf2\x12s"
7+
b"S\xa7\xf4\xb0:\xe6d&\xa7N)\xd6\x03\x00\x1c\xe8U\x9c\x1e\x00\x00\x00"
8+
)
9+
10+
11+
def test_gzip() -> None:
12+
assert from_gzip(to_gzip({"name": "Alice"})) == {"name": "Alice"}
13+
14+
15+
def test_to_gzip() -> None:
16+
data_bytes = to_gzip({"name": "Alice"}, mtime=0)
17+
18+
assert data_bytes == FIXTURE_BYTES
19+
20+
21+
def test_from_gzip() -> None:
22+
data_dict = from_gzip(FIXTURE_BYTES)
23+
24+
assert data_dict == {"name": "Alice"}
25+
26+
27+
def test_read_gzip_time() -> None:
28+
assert read_gzip_time(FIXTURE_BYTES) == 0
29+
30+
31+
def test_read_gzip_time_non_zero() -> None:
32+
current_time = int(time())
33+
data_bytes = to_gzip({"name": "Alice"}, mtime=current_time)
34+
35+
assert read_gzip_time(data_bytes) == current_time

0 commit comments

Comments
 (0)