Skip to content

Commit 5bae238

Browse files
authored
feat: Various fixes and improvements (#41)
Fixes: - removed tons of todos that I either validated (e.g. vs crawlee) or removed since they're not really relevant - env var for local storage directory now properly handled, also added a test - validation of request queue's request dict via budget ow - LRUCache change yield from to return, narrow return type - memory storage tests: removed write_metadata=True since it's not the default and reworked the tests to work without metadata. Improvements: - added some more cases in various tests - added an e2e test that simulates 2 local runs. First run uses local storage to create files, second run reads from them. Basically checks if rerunning the actor locally works as intended.
1 parent b89bbcf commit 5bae238

18 files changed

+207
-56
lines changed

src/apify/_utils.py

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from enum import Enum
2020
from typing import Any, Callable, Dict, Generic, ItemsView, Iterator, NoReturn, Optional
2121
from typing import OrderedDict as OrderedDictType
22-
from typing import Type, TypeVar, Union, ValuesView, cast, overload
22+
from typing import Tuple, Type, TypeVar, Union, ValuesView, cast, overload
2323

2424
import aioshutil
2525
import psutil
@@ -285,12 +285,12 @@ def _is_file_or_bytes(value: Any) -> bool:
285285
return isinstance(value, (bytes, bytearray, io.IOBase))
286286

287287

288-
def _maybe_parse_body(body: bytes, content_type: str) -> Any: # TODO: Improve return type
288+
def _maybe_parse_body(body: bytes, content_type: str) -> Any:
289289
try:
290290
if _is_content_type_json(content_type):
291291
return json.loads(body) # Returns any
292292
elif _is_content_type_xml(content_type) or _is_content_type_text(content_type):
293-
return body.decode('utf-8') # TODO: Check if utf-8 can be assumed
293+
return body.decode('utf-8')
294294
except ValueError as err:
295295
print('_maybe_parse_body error', err)
296296
return body
@@ -361,12 +361,11 @@ def __setitem__(self, key: str, value: T) -> None:
361361

362362
def __delitem__(self, key: str) -> None:
363363
"""Remove an item from the cache."""
364-
# TODO: maybe do? self._cache.__delitem__(key)
365364
del self._cache[key]
366365

367-
def __iter__(self) -> Iterator:
366+
def __iter__(self) -> Iterator[str]:
368367
"""Iterate over the keys of the cache in order of insertion."""
369-
yield from self._cache.__iter__()
368+
return self._cache.__iter__()
370369

371370
def __len__(self) -> int:
372371
"""Get the number of items in the cache."""
@@ -383,3 +382,38 @@ def items(self) -> ItemsView[str, T]: # Needed so we don't mutate the cache by
383382

384383
def _is_running_in_ipython() -> bool:
385384
return getattr(builtins, '__IPYTHON__', False)
385+
386+
387+
@overload
388+
def _budget_ow(value: Union[str, int, float, bool], predicate: Tuple[Type, bool], value_name: str) -> None: # noqa: U100
389+
...
390+
391+
392+
@overload
393+
def _budget_ow(value: Dict, predicate: Dict[str, Tuple[Type, bool]]) -> None: # noqa: U100
394+
...
395+
396+
397+
def _budget_ow(
398+
value: Union[Dict, str, int, float, bool],
399+
predicate: Union[Dict[str, Tuple[Type, bool]], Tuple[Type, bool]],
400+
value_name: Optional[str] = None,
401+
) -> None:
402+
"""Budget version of ow."""
403+
def validate_single(field_value: Any, expected_type: Type, required: bool, name: str) -> None:
404+
if field_value is None and required:
405+
raise ValueError(f'"{name}" is required!')
406+
if (field_value is not None or required) and not isinstance(field_value, expected_type):
407+
raise ValueError(f'"{name}" must be of type "{expected_type.__name__}" but it is "{type(field_value).__name__}"!')
408+
409+
# Validate object
410+
if isinstance(value, dict) and isinstance(predicate, dict):
411+
for key, (field_type, required) in predicate.items():
412+
field_value = value.get(key)
413+
validate_single(field_value, field_type, required, key)
414+
# Validate "primitive"
415+
elif isinstance(value, (int, str, float, bool)) and isinstance(predicate, tuple) and value_name is not None:
416+
field_type, required = predicate
417+
validate_single(value, field_type, required, value_name)
418+
else:
419+
raise ValueError('Wrong input!')

src/apify/actor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -570,7 +570,7 @@ async def _push_data_internal(self, data: Any) -> None:
570570
if not data:
571571
return
572572

573-
if not isinstance(data, list): # TODO: Memory storage does this on its own...
573+
if not isinstance(data, list):
574574
data = [data]
575575

576576
dataset = await self.open_dataset()

src/apify/memory_storage/memory_storage.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
from aiofiles import ospath
77
from aiofiles.os import rename, scandir
88

9+
from .._utils import _maybe_parse_bool
10+
from ..consts import ApifyEnvVars
911
from .resource_clients.dataset import DatasetClient
1012
from .resource_clients.dataset_collection import DatasetCollectionClient
1113
from .resource_clients.key_value_store import KeyValueStoreClient
@@ -38,7 +40,7 @@ class MemoryStorage:
3840
"""Indicates whether a purge was already performed on this instance"""
3941

4042
def __init__(
41-
self, *, local_data_directory: str = './storage', write_metadata: Optional[bool] = None, persist_storage: Optional[bool] = None,
43+
self, *, local_data_directory: Optional[str] = None, write_metadata: Optional[bool] = None, persist_storage: Optional[bool] = None,
4244
) -> None:
4345
"""Initialize the MemoryStorage.
4446
@@ -47,13 +49,12 @@ def __init__(
4749
persist_storage (bool, optional): Whether to persist the data to the `local_data_directory` or just keep them in memory
4850
write_metadata (bool, optional): Whether to persist metadata of the storages as well
4951
"""
50-
self._local_data_directory = local_data_directory # TODO: Make this work with `APIFY_LOCAL_STORAGE_DIR`
52+
self._local_data_directory = local_data_directory or os.getenv(ApifyEnvVars.LOCAL_STORAGE_DIR) or './storage'
5153
self._datasets_directory = os.path.join(self._local_data_directory, 'datasets')
5254
self._key_value_stores_directory = os.path.join(self._local_data_directory, 'key_value_stores')
5355
self._request_queues_directory = os.path.join(self._local_data_directory, 'request_queues')
5456
self._write_metadata = write_metadata if write_metadata is not None else '*' in os.getenv('DEBUG', '')
55-
self._persist_storage = persist_storage if persist_storage is not None else not any(
56-
os.getenv('APIFY_PERSIST_STORAGE', 'true') == s for s in ['false', '0', ''])
57+
self._persist_storage = persist_storage if persist_storage is not None else _maybe_parse_bool(os.getenv(ApifyEnvVars.PERSIST_STORAGE, 'true'))
5758
self._datasets_handled = []
5859
self._key_value_stores_handled = []
5960
self._request_queues_handled = []

src/apify/memory_storage/resource_clients/request_queue.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,6 @@ async def add_request(self, request: Dict, *, forefront: Optional[bool] = None)
154154
Returns:
155155
dict: The added request.
156156
"""
157-
# TODO: Throw if uniqueKey or url missing from request dict, also do for update_request...
158157
existing_queue_by_id = _find_or_cache_request_queue_by_possible_id(self._client, self._name or self._id)
159158

160159
if existing_queue_by_id is None:
@@ -175,7 +174,6 @@ async def add_request(self, request: Dict, *, forefront: Optional[bool] = None)
175174
}
176175

177176
existing_queue_by_id._requests[request_model['id']] = request_model
178-
# TODO: Validate the next line logic, seems wrong in crawlee
179177
existing_queue_by_id._pending_request_count += 0 if request_model['orderNo'] is None else 1
180178
await existing_queue_by_id._update_timestamps(True)
181179
await _update_request_queue_item(
@@ -248,7 +246,6 @@ async def update_request(self, request: Dict, *, forefront: Optional[bool] = Non
248246
request_was_handled_before_update = existing_request['orderNo'] is None
249247

250248
# We add 1 pending request if previous state was handled
251-
# TODO: Validate the next 2 lines logic, seems wrong in crawlee
252249
if is_request_handled_state_changing:
253250
pending_count_adjustment = 1 if request_was_handled_before_update else -1
254251

src/apify/storages/dataset.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ async def export_to(
242242

243243
if content_type == 'text/csv':
244244
output = io.StringIO()
245-
writer = csv.writer(output, quoting=csv.QUOTE_MINIMAL) # TODO: Compare quoting behavior with TS impl
245+
writer = csv.writer(output, quoting=csv.QUOTE_MINIMAL)
246246
writer.writerows([items[0].keys(), *[item.values() for item in items]])
247247
value = output.getvalue()
248248
return await key_value_store.set_value(key, value, content_type)

src/apify/storages/key_value_store.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,10 @@ async def iterate_keys(self, exclusive_start_key: Optional[str] = None) -> Async
123123
where `key` is the record key, and `info` is an object that contains a single property `size`
124124
indicating size of the record in bytes.
125125
"""
126-
index = 0
127126
while True:
128127
list_keys = await self._client.list_keys(exclusive_start_key=exclusive_start_key)
129128
for item in list_keys['items']:
130129
yield IterateKeysTuple(item['key'], {'size': item['size']})
131-
index += 1
132130

133131
if not list_keys['isTruncated']:
134132
break

src/apify/storages/request_queue.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from apify_client import ApifyClientAsync
1111
from apify_client.clients import RequestQueueClientAsync
1212

13-
from .._utils import LRUCache, _crypto_random_object_id, _unique_key_to_request_id
13+
from .._utils import LRUCache, _budget_ow, _crypto_random_object_id, _unique_key_to_request_id
1414
from ..config import Configuration
1515
from ..consts import REQUEST_QUEUE_HEAD_MAX_LIMIT
1616
from ..memory_storage import MemoryStorage
@@ -126,7 +126,7 @@ async def _create_instance(cls, request_queue_id_or_name: str, client: Union[Api
126126
def _get_default_name(cls, config: Configuration) -> str:
127127
return config.default_request_queue_id
128128

129-
async def add_request(self, request: Dict, *, forefront: bool = False) -> Dict: # TODO: Validate request with pydantic
129+
async def add_request(self, request: Dict, *, forefront: bool = False) -> Dict:
130130
"""Add a request to the queue.
131131
132132
Args:
@@ -136,8 +136,14 @@ async def add_request(self, request: Dict, *, forefront: bool = False) -> Dict:
136136
Returns:
137137
dict: Information about the queue operation with keys `requestId`, `uniqueKey`, `wasAlreadyPresent`, `wasAlreadyHandled`.
138138
"""
139+
_budget_ow(request, {
140+
'url': (str, True),
141+
})
139142
self._last_activity = datetime.utcnow()
140143

144+
if request.get('uniqueKey') is None:
145+
request['uniqueKey'] = request['url'] # TODO: Check Request class in crawlee and replicate uniqueKey generation logic...
146+
141147
cache_key = _unique_key_to_request_id(request['uniqueKey'])
142148
cached_info = self._requests_cache.get(cache_key)
143149

@@ -174,7 +180,8 @@ async def get_request(self, request_id: str) -> Optional[Dict]:
174180
Returns:
175181
dict, optional: The retrieved request, or `None`, if it does not exist.
176182
"""
177-
return await self._client.get_request(request_id) # TODO: Maybe create a Request class?
183+
_budget_ow(request_id, (str, True), 'request_id')
184+
return await self._client.get_request(request_id) # TODO: Maybe create a Request dataclass?
178185

179186
async def fetch_next_request(self) -> Optional[Dict]:
180187
"""Return the next request in the queue to be processed.
@@ -241,7 +248,7 @@ async def fetch_next_request(self) -> Optional[Dict]:
241248

242249
return request
243250

244-
async def mark_request_as_handled(self, request: Dict) -> Optional[Dict]: # TODO: Validate request with pydantic
251+
async def mark_request_as_handled(self, request: Dict) -> Optional[Dict]:
245252
"""Mark a request as handled after successful processing.
246253
247254
Handled requests will never again be returned by the `RequestQueue.fetch_next_request` method.
@@ -253,6 +260,11 @@ async def mark_request_as_handled(self, request: Dict) -> Optional[Dict]: # TOD
253260
dict, optional: Information about the queue operation with keys `requestId`, `uniqueKey`, `wasAlreadyPresent`, `wasAlreadyHandled`.
254261
`None` if the given request was not in progress.
255262
"""
263+
_budget_ow(request, {
264+
'id': (str, True),
265+
'uniqueKey': (str, True),
266+
'handledAt': (datetime, False),
267+
})
256268
self._last_activity = datetime.utcnow()
257269
if request['id'] not in self._in_progress:
258270
logging.debug(f'Cannot mark request {request["id"]} as handled, because it is not in progress!')
@@ -272,7 +284,7 @@ async def mark_request_as_handled(self, request: Dict) -> Optional[Dict]: # TOD
272284

273285
return queue_operation_info
274286

275-
async def reclaim_request(self, request: Dict, forefront: bool = False) -> Optional[Dict]: # TODO: Validate request with pydantic
287+
async def reclaim_request(self, request: Dict, forefront: bool = False) -> Optional[Dict]:
276288
"""Reclaim a failed request back to the queue.
277289
278290
The request will be returned for processing later again
@@ -285,6 +297,10 @@ async def reclaim_request(self, request: Dict, forefront: bool = False) -> Optio
285297
dict, optional: Information about the queue operation with keys `requestId`, `uniqueKey`, `wasAlreadyPresent`, `wasAlreadyHandled`.
286298
`None` if the given request was not in progress.
287299
"""
300+
_budget_ow(request, {
301+
'id': (str, True),
302+
'uniqueKey': (str, True),
303+
})
288304
self._last_activity = datetime.utcnow()
289305

290306
if request['id'] not in self._in_progress:
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import pytest
2+
3+
from apify import Actor
4+
from apify.config import Configuration
5+
from apify.consts import ApifyEnvVars
6+
from apify.memory_storage import MemoryStorage
7+
from apify.storage_client_manager import StorageClientManager
8+
from apify.storages import StorageManager
9+
10+
11+
@pytest.mark.parametrize('purge_on_start', [True, False])
12+
async def test_actor_memory_storage_e2e(monkeypatch: pytest.MonkeyPatch, tmp_path: str, purge_on_start: bool) -> None:
13+
"""This test simulates two clean runs using memory storage.
14+
The second run attempts to access data created by the first one.
15+
We run 2 configurations with different `purge_on_start`."""
16+
# Configure purging env var
17+
monkeypatch.setenv(ApifyEnvVars.PURGE_ON_START, 'true' if purge_on_start else 'false')
18+
# Store old storage client so we have the object reference for comparison
19+
old_client = StorageClientManager.get_storage_client()
20+
async with Actor:
21+
old_default_kvs = await Actor.open_key_value_store()
22+
old_non_default_kvs = await Actor.open_key_value_store('non-default')
23+
# Create data in default and non-default key-value store
24+
await old_default_kvs.set_value('test', 'default value')
25+
await old_non_default_kvs.set_value('test', 'non-default value')
26+
27+
# Clean up singletons and mock a new memory storage
28+
monkeypatch.setattr(Actor, '_default_instance', None)
29+
monkeypatch.setattr(Configuration, '_default_instance', None)
30+
monkeypatch.setattr(StorageManager, '_default_instance', None)
31+
monkeypatch.setattr(StorageClientManager, '_default_instance', None)
32+
33+
new_patched_memory_storage = MemoryStorage(local_data_directory=tmp_path)
34+
35+
def get_storage_client() -> 'MemoryStorage':
36+
return new_patched_memory_storage
37+
monkeypatch.setattr(StorageClientManager, 'get_storage_client', get_storage_client)
38+
39+
# We simulate another clean run, we expect the memory storage to read from the local data directory
40+
# Default storages are purged based on purge_on_start parameter.
41+
async with Actor:
42+
# Check if we're using a different memory storage instance
43+
assert old_client is not StorageClientManager.get_storage_client()
44+
default_kvs = await Actor.open_key_value_store()
45+
assert default_kvs is not old_default_kvs
46+
non_default_kvs = await Actor.open_key_value_store('non-default')
47+
assert non_default_kvs is not old_non_default_kvs
48+
default_value = await default_kvs.get_value('test')
49+
non_default_value = await non_default_kvs.get_value('test')
50+
if purge_on_start:
51+
assert default_value is None
52+
else:
53+
assert default_value == 'default value'
54+
assert non_default_value == 'non-default value'

tests/unit/conftest.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import asyncio
22
import inspect
33
from collections import defaultdict
4-
from typing import Any, AsyncIterator, Callable, Dict, List, Optional, Tuple, get_type_hints
4+
from typing import Any, Callable, Dict, List, Optional, Tuple, get_type_hints
55

66
import pytest
77

@@ -119,7 +119,5 @@ def apify_client_async_patcher(monkeypatch: pytest.MonkeyPatch) -> ApifyClientAs
119119

120120

121121
@pytest.fixture()
122-
async def memory_storage(tmp_path: str) -> AsyncIterator[MemoryStorage]:
123-
ms = MemoryStorage(local_data_directory=tmp_path, write_metadata=True) # TODO: Remove write_metadata=True as it's not the default setting...
124-
yield ms
125-
await ms.purge() # TODO: Do we want this here? there are unit tests specifically for purge
122+
def memory_storage(tmp_path: str) -> MemoryStorage:
123+
return MemoryStorage(local_data_directory=tmp_path)

tests/unit/memory_storage/resource_clients/test_dataset.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,16 @@ async def test_get(dataset_client: DatasetClient) -> None:
3838

3939
async def test_update(dataset_client: DatasetClient) -> None:
4040
new_dataset_name = 'test-update'
41+
await dataset_client.push_items({'abc': 123})
4142
old_dataset_info = await dataset_client.get()
4243
assert old_dataset_info is not None
4344
old_dataset_directory = os.path.join(dataset_client._client._datasets_directory, old_dataset_info['name'])
4445
new_dataset_directory = os.path.join(dataset_client._client._datasets_directory, new_dataset_name)
45-
assert os.path.exists(os.path.join(old_dataset_directory, '__metadata__.json')) is True
46-
assert os.path.exists(os.path.join(new_dataset_directory, '__metadata__.json')) is False
46+
assert os.path.exists(os.path.join(old_dataset_directory, '000000001.json')) is True
47+
assert os.path.exists(os.path.join(new_dataset_directory, '000000001.json')) is False
4748
updated_dataset_info = await dataset_client.update(name=new_dataset_name)
48-
assert os.path.exists(os.path.join(old_dataset_directory, '__metadata__.json')) is False
49-
assert os.path.exists(os.path.join(new_dataset_directory, '__metadata__.json')) is True
49+
assert os.path.exists(os.path.join(old_dataset_directory, '000000001.json')) is False
50+
assert os.path.exists(os.path.join(new_dataset_directory, '000000001.json')) is True
5051
# Only modifiedAt and accessedAt should be different
5152
assert old_dataset_info['createdAt'] == updated_dataset_info['createdAt']
5253
assert old_dataset_info['modifiedAt'] != updated_dataset_info['modifiedAt']
@@ -57,12 +58,13 @@ async def test_update(dataset_client: DatasetClient) -> None:
5758

5859

5960
async def test_delete(dataset_client: DatasetClient) -> None:
61+
await dataset_client.push_items({'abc': 123})
6062
dataset_info = await dataset_client.get()
6163
assert dataset_info is not None
6264
dataset_directory = os.path.join(dataset_client._client._datasets_directory, dataset_info['name'])
63-
assert os.path.exists(os.path.join(dataset_directory, '__metadata__.json')) is True
65+
assert os.path.exists(os.path.join(dataset_directory, '000000001.json')) is True
6466
await dataset_client.delete()
65-
assert os.path.exists(os.path.join(dataset_directory, '__metadata__.json')) is False
67+
assert os.path.exists(os.path.join(dataset_directory, '000000001.json')) is False
6668
# Does not crash when called again
6769
await dataset_client.delete()
6870

0 commit comments

Comments
 (0)