1
1
import asyncio
2
2
import json
3
+ import logging
3
4
from pathlib import Path
4
5
5
6
from more_itertools import flatten
11
12
12
13
from apify ._configuration import Configuration
13
14
15
+ logger = logging .getLogger (__name__ )
16
+
14
17
15
18
class ApifyFileSystemKeyValueStoreClient (FileSystemKeyValueStoreClient ):
16
19
"""Apify-specific implementation of the `FileSystemKeyValueStoreClient`.
@@ -31,7 +34,7 @@ async def purge(self) -> None:
31
34
# First try to find the alternative format of the input file and process it if it exists.
32
35
for file_path in self .path_to_kvs .glob ('*' ):
33
36
if file_path .name in configuration .input_key_candidates :
34
- await self ._process_input_json (file_path )
37
+ await self ._sanitize_input_json (file_path )
35
38
36
39
async with self ._lock :
37
40
files_to_keep = set (
@@ -50,18 +53,29 @@ async def purge(self) -> None:
50
53
update_modified_at = True ,
51
54
)
52
55
53
- async def _process_input_json (self , path : Path ) -> None :
54
- """Process simple input json file to format expected by the FileSystemKeyValueStoreClient.
56
+ async def _sanitize_input_json (self , path : Path ) -> None :
57
+ """Transform an input json file to match the naming convention expected by the FileSystemKeyValueStoreClient.
55
58
56
59
For example: INPUT.json -> INPUT, INPUT.json.metadata
57
60
"""
61
+ configuration = Configuration .get_global_configuration ()
62
+
63
+ f = None
58
64
try :
59
65
f = await asyncio .to_thread (path .open )
60
66
input_data = json .load (f )
61
67
finally :
62
- f .close ()
68
+ if f is not None :
69
+ f .close ()
70
+
71
+ if await self .record_exists (key = configuration .canonical_input_key ):
72
+ logger .warning (f'Redundant input file found: { path } ' )
73
+ return
74
+
75
+ logger .info (f'Renaming input file: { path .name } -> { configuration .canonical_input_key } ' )
76
+
63
77
await asyncio .to_thread (path .unlink , missing_ok = True )
64
- await self .set_value (key = path . stem , value = input_data )
78
+ await self .set_value (key = configuration . canonical_input_key , value = input_data )
65
79
66
80
@override
67
81
async def get_value (self , * , key : str ) -> KeyValueStoreRecord | None :
0 commit comments