@@ -33,25 +33,23 @@ class ApifyCacheStorage:
33
33
34
34
def __init__ (self , settings : BaseSettings ) -> None :
35
35
self .expiration_max_items = 100
36
- self .expiration_secs : int = settings .getint (" HTTPCACHE_EXPIRATION_SECS" )
36
+ self .expiration_secs : int = settings .getint (' HTTPCACHE_EXPIRATION_SECS' )
37
37
self .spider : Spider | None = None
38
38
self ._kv : KeyValueStore | None = None
39
39
self ._fingerprinter : RequestFingerprinterProtocol | None = None
40
40
self ._async_thread : AsyncThread | None = None
41
41
42
42
def open_spider (self , spider : Spider ) -> None :
43
- logger .debug (" Using Apify key value cache storage" , extra = {" spider" : spider })
43
+ logger .debug (' Using Apify key value cache storage' , extra = {' spider' : spider })
44
44
self .spider = spider
45
45
self ._fingerprinter = spider .crawler .request_fingerprinter
46
- kv_name = f" httpcache-{ spider .name } "
46
+ kv_name = f' httpcache-{ spider .name } '
47
47
48
48
async def open_kv () -> KeyValueStore :
49
49
config = Configuration .get_global_configuration ()
50
50
if config .is_at_home :
51
51
storage_client = ApifyStorageClient .from_config (config )
52
- return await KeyValueStore .open (
53
- name = kv_name , storage_client = storage_client
54
- )
52
+ return await KeyValueStore .open (name = kv_name , storage_client = storage_client )
55
53
return await KeyValueStore .open (name = kv_name )
56
54
57
55
logger .debug ("Starting background thread for cache storage's event loop" )
@@ -60,88 +58,84 @@ async def open_kv() -> KeyValueStore:
60
58
self ._kv = self ._async_thread .run_coro (open_kv ())
61
59
62
60
def close_spider (self , spider : Spider , current_time : int | None = None ) -> None :
63
- assert self ._async_thread is not None , " Async thread not initialized"
61
+ assert self ._async_thread is not None , ' Async thread not initialized'
64
62
65
- logger .info (f" Cleaning up cache items (max { self .expiration_max_items } )" )
63
+ logger .info (f' Cleaning up cache items (max { self .expiration_max_items } )' )
66
64
if 0 < self .expiration_secs :
67
65
if current_time is None :
68
66
current_time = int (time ())
69
67
70
68
async def expire_kv () -> None :
71
- assert self ._kv is not None , " Key value store not initialized"
69
+ assert self ._kv is not None , ' Key value store not initialized'
72
70
i = 0
73
71
async for item in self ._kv .iterate_keys ():
74
72
value = await self ._kv .get_value (item .key )
75
73
try :
76
74
gzip_time = read_gzip_time (value )
77
75
except Exception as e :
78
- logger .warning (f" Malformed cache item { item .key } : { e } " )
76
+ logger .warning (f' Malformed cache item { item .key } : { e } ' )
79
77
await self ._kv .set_value (item .key , None )
80
78
else :
81
79
if self .expiration_secs < current_time - gzip_time :
82
- logger .debug (f" Expired cache item { item .key } " )
80
+ logger .debug (f' Expired cache item { item .key } ' )
83
81
await self ._kv .set_value (item .key , None )
84
82
else :
85
- logger .debug (f" Valid cache item { item .key } " )
83
+ logger .debug (f' Valid cache item { item .key } ' )
86
84
if i == self .expiration_max_items :
87
85
break
88
86
i += 1
89
87
90
88
self ._async_thread .run_coro (expire_kv ())
91
89
92
- logger .debug (" Closing cache storage" )
90
+ logger .debug (' Closing cache storage' )
93
91
try :
94
92
self ._async_thread .close ()
95
93
except KeyboardInterrupt :
96
- logger .warning (" Shutdown interrupted by KeyboardInterrupt!" )
94
+ logger .warning (' Shutdown interrupted by KeyboardInterrupt!' )
97
95
except Exception :
98
- logger .exception (" Exception occurred while shutting down cache storage" )
96
+ logger .exception (' Exception occurred while shutting down cache storage' )
99
97
finally :
100
- logger .debug (" Cache storage closed" )
98
+ logger .debug (' Cache storage closed' )
101
99
102
- def retrieve_response (
103
- self , spider : Spider , request : Request , current_time : int | None = None
104
- ) -> Response | None :
105
- assert self ._async_thread is not None , "Async thread not initialized"
106
- assert self ._kv is not None , "Key value store not initialized"
107
- assert self ._fingerprinter is not None , "Request fingerprinter not initialized"
100
+ def retrieve_response (self , spider : Spider , request : Request , current_time : int | None = None ) -> Response | None :
101
+ assert self ._async_thread is not None , 'Async thread not initialized'
102
+ assert self ._kv is not None , 'Key value store not initialized'
103
+ assert self ._fingerprinter is not None , 'Request fingerprinter not initialized'
108
104
109
105
key = self ._fingerprinter .fingerprint (request ).hex ()
110
106
value = self ._async_thread .run_coro (self ._kv .get_value (key ))
111
107
112
108
if value is None :
113
- logger .debug (" Cache miss" , extra = {" request" : request })
109
+ logger .debug (' Cache miss' , extra = {' request' : request })
114
110
return None
115
111
116
112
if current_time is None :
117
113
current_time = int (time ())
118
114
if 0 < self .expiration_secs < current_time - read_gzip_time (value ):
119
- logger .debug (" Cache expired" , extra = {" request" : request })
115
+ logger .debug (' Cache expired' , extra = {' request' : request })
120
116
return None
121
117
122
118
data = from_gzip (value )
123
- url = data [" url" ]
124
- status = data [" status" ]
125
- headers = Headers (data [" headers" ])
126
- body = data [" body" ]
119
+ url = data [' url' ]
120
+ status = data [' status' ]
121
+ headers = Headers (data [' headers' ])
122
+ body = data [' body' ]
127
123
respcls = responsetypes .from_args (headers = headers , url = url , body = body )
128
124
129
- logger .debug (" Cache hit" , extra = {" request" : request })
125
+ logger .debug (' Cache hit' , extra = {' request' : request })
130
126
return respcls (url = url , headers = headers , status = status , body = body )
131
127
132
- def store_response (
133
- self , spider : Spider , request : Request , response : Response
134
- ) -> None :
135
- assert self ._async_thread is not None , "Async thread not initialized"
136
- assert self ._kv is not None , "Key value store not initialized"
137
- assert self ._fingerprinter is not None , "Request fingerprinter not initialized"
128
+ def store_response (self , spider : Spider , request : Request , response : Response ) -> None :
129
+ assert self ._async_thread is not None , 'Async thread not initialized'
130
+ assert self ._kv is not None , 'Key value store not initialized'
131
+ assert self ._fingerprinter is not None , 'Request fingerprinter not initialized'
138
132
139
133
key = self ._fingerprinter .fingerprint (request ).hex ()
140
134
data = {
141
- " status" : response .status ,
142
- " url" : response .url ,
143
- " headers" : dict (response .headers ),
144
- " body" : response .body ,
135
+ ' status' : response .status ,
136
+ ' url' : response .url ,
137
+ ' headers' : dict (response .headers ),
138
+ ' body' : response .body ,
145
139
}
146
140
value = to_gzip (data )
147
141
self ._async_thread .run_coro (self ._kv .set_value (key , value ))
@@ -150,19 +144,19 @@ def store_response(
150
144
def to_gzip (data : dict , mtime : int | None = None ) -> bytes :
151
145
"""Dump a dictionary to a gzip-compressed byte stream."""
152
146
with io .BytesIO () as byte_stream :
153
- with gzip .GzipFile (fileobj = byte_stream , mode = "wb" , mtime = mtime ) as gzip_file :
147
+ with gzip .GzipFile (fileobj = byte_stream , mode = 'wb' , mtime = mtime ) as gzip_file :
154
148
pickle .dump (data , gzip_file , protocol = 4 )
155
149
return byte_stream .getvalue ()
156
150
157
151
158
152
def from_gzip (gzip_bytes : bytes ) -> dict :
159
153
"""Load a dictionary from a gzip-compressed byte stream."""
160
- with io .BytesIO (gzip_bytes ) as byte_stream , gzip .GzipFile (fileobj = byte_stream , mode = "rb" ) as gzip_file :
154
+ with io .BytesIO (gzip_bytes ) as byte_stream , gzip .GzipFile (fileobj = byte_stream , mode = 'rb' ) as gzip_file :
161
155
return pickle .load (gzip_file )
162
156
163
157
164
158
def read_gzip_time (gzip_bytes : bytes ) -> int :
165
159
"""Read the modification time from a gzip-compressed byte stream without decompressing the data."""
166
160
header = gzip_bytes [:10 ]
167
- header_components = struct .unpack (" <HBBI2B" , header )
161
+ header_components = struct .unpack (' <HBBI2B' , header )
168
162
return header_components [3 ]
0 commit comments