11import asyncio
22import json
33import logging
4+ from itertools import chain
5+ from pathlib import Path
46
5- from more_itertools import flatten
67from typing_extensions import Self , override
78
89from crawlee ._consts import METADATA_FILENAME
10+ from crawlee ._utils .file import atomic_write , infer_mime_type , json_dumps
911from crawlee .configuration import Configuration as CrawleeConfiguration
1012from crawlee .storage_clients ._file_system import FileSystemKeyValueStoreClient
11- from crawlee .storage_clients .models import KeyValueStoreRecord
13+ from crawlee .storage_clients .models import KeyValueStoreMetadata , KeyValueStoreRecord , KeyValueStoreRecordMetadata
1214
13- from apify ._configuration import Configuration
15+ from apify ._configuration import Configuration as ApifyConfiguration
1416
1517logger = logging .getLogger (__name__ )
1618
@@ -22,6 +24,18 @@ class ApifyFileSystemKeyValueStoreClient(FileSystemKeyValueStoreClient):
2224 directory, except for the metadata file and the `INPUT.json` file.
2325 """
2426
27+ def __init__ (
28+ self ,
29+ * ,
30+ metadata : KeyValueStoreMetadata ,
31+ path_to_kvs : Path ,
32+ lock : asyncio .Lock ,
33+ ) -> None :
34+ super ().__init__ (metadata = metadata , path_to_kvs = path_to_kvs , lock = lock )
35+ global_configuration = ApifyConfiguration .get_global_configuration ()
36+ self ._input_key = global_configuration .input_key
37+ self ._input_key_filename = global_configuration .input_key
38+
2539 @override
2640 @classmethod
2741 async def open (
@@ -34,7 +48,18 @@ async def open(
3448 ) -> Self :
3549 client = await super ().open (id = id , name = name , alias = alias , configuration = configuration )
3650
37- await client ._sanitize_input_json_files () # noqa: SLF001 - it's okay, this is a factory method
51+ if isinstance (configuration , ApifyConfiguration ):
52+ client ._input_key = configuration .input_key # noqa: SLF001 - it's okay, this is a factory method
53+ input_key_filename = cls ._get_input_key_file_name (
54+ path_to_kvs = client .path_to_kvs , configuration = configuration
55+ )
56+ client ._input_key_filename = input_key_filename # noqa: SLF001 - it's okay, this is a factory method
57+ input_file_path = client .path_to_kvs / input_key_filename
58+ input_file_metadata_path = client .path_to_kvs / f'{ input_file_path } .{ METADATA_FILENAME } '
59+ if input_file_path .exists () and not input_file_metadata_path .exists ():
60+ await cls ._create_missing_metadata_for_input_file (
61+ key = configuration .input_key , record_path = input_file_path
62+ )
3863
3964 return client
4065
@@ -43,14 +68,10 @@ async def purge(self) -> None:
4368 """Purges the key-value store by deleting all its contents.
4469
4570 It deletes all files in the key-value store directory, except for the metadata file and
46- the `INPUT.json` file. It also updates the metadata to reflect that the store has been purged .
71+ the input related file and its metadata.
4772 """
48- configuration = Configuration .get_global_configuration ()
49-
5073 async with self ._lock :
51- files_to_keep = set (
52- flatten ([key , f'{ key } .{ METADATA_FILENAME } ' ] for key in configuration .input_key_candidates )
53- )
74+ files_to_keep = {self ._input_key_filename , f'{ self ._input_key_filename } .{ METADATA_FILENAME } ' }
5475 files_to_keep .add (METADATA_FILENAME )
5576
5677 for file_path in self .path_to_kvs .glob ('*' ):
@@ -64,40 +85,61 @@ async def purge(self) -> None:
6485 update_modified_at = True ,
6586 )
6687
67- async def _sanitize_input_json_files (self ) -> None :
68- """Handle missing metadata for input files."""
69- configuration = Configuration .get_global_configuration ()
70- alternative_keys = configuration .input_key_candidates - {configuration .canonical_input_key }
71-
72- if (self .path_to_kvs / configuration .canonical_input_key ).exists ():
73- # Refresh metadata to prevent inconsistencies
74- input_data = await asyncio .to_thread (
75- lambda : json .loads ((self .path_to_kvs / configuration .canonical_input_key ).read_text ())
76- )
77- await self .set_value (key = configuration .canonical_input_key , value = input_data )
88+ @override
89+ async def get_value (self , * , key : str ) -> KeyValueStoreRecord | None :
90+ if key == self ._input_key :
91+ # Potentially point to custom input file name instead
92+ key = self ._input_key_filename
93+ return await super ().get_value (key = key )
7894
79- for alternative_key in alternative_keys :
80- if (alternative_input_file := self .path_to_kvs / alternative_key ).exists ():
81- logger .warning (f'Redundant input file found: { alternative_input_file } ' )
95+ @staticmethod
96+ async def _create_missing_metadata_for_input_file (key : str , record_path : Path ) -> None :
97+ # Read the actual value
98+ try :
99+ content = await asyncio .to_thread (record_path .read_bytes )
100+ except FileNotFoundError :
101+ logger .warning (f'Input file disparaged on path: "{ record_path } "' )
102+ return
103+
104+ # Figure out the metadata from the file content
105+ size = len (content )
106+ if record_path .suffix == '.json' :
107+ value = json .loads (content .decode ('utf-8' ))
108+ elif record_path .suffix == '.txt' :
109+ value = content .decode ('utf-8' )
110+ elif record_path .suffix == '' :
111+ try :
112+ value = json .loads (content .decode ('utf-8' ))
113+ except json .JSONDecodeError :
114+ value = content
82115 else :
83- for alternative_key in alternative_keys :
84- alternative_input_file = self .path_to_kvs / alternative_key
116+ value = content
85117
86- # Only process files that actually exist
87- if alternative_input_file .exists ():
88- # Refresh metadata to prevent inconsistencies
89- with alternative_input_file .open () as f :
90- input_data = await asyncio .to_thread (lambda : json .load (f ))
91- await self .set_value (key = alternative_key , value = input_data )
118+ content_type = infer_mime_type (value )
92119
93- @ override
94- async def get_value ( self , * , key : str ) -> KeyValueStoreRecord | None :
95- configuration = Configuration . get_global_configuration ( )
120+ record_metadata = KeyValueStoreRecordMetadata ( key = key , content_type = content_type , size = size )
121+ record_metadata_filepath = record_path . with_name ( f' { record_path . name } . { METADATA_FILENAME } ' )
122+ record_metadata_content = await json_dumps ( record_metadata . model_dump () )
96123
97- if key in configuration .input_key_candidates :
98- for candidate in configuration .input_key_candidates :
99- value = await super ().get_value (key = candidate )
100- if value is not None :
101- return value
124+ # Write the record metadata to the file.
125+ await atomic_write (record_metadata_filepath , record_metadata_content )
102126
103- return await super ().get_value (key = key )
127+ @staticmethod
128+ def _get_input_key_file_name (path_to_kvs : Path , configuration : ApifyConfiguration ) -> str :
129+ found_input_files = set ()
130+ for file_path in chain (
131+ path_to_kvs .glob (f'{ configuration .input_key } .*' ), path_to_kvs .glob (f'{ configuration .input_key } ' )
132+ ):
133+ if file_path .suffix == f'.{ METADATA_FILENAME } ' :
134+ # Ignore metadata files
135+ continue
136+ found_input_files .add (file_path .name )
137+
138+ if len (found_input_files ) > 1 :
139+ raise RuntimeError (f'Only one input file is allowed. Following input files found: { found_input_files } ' )
140+
141+ if len (found_input_files ) == 1 :
142+ return found_input_files .pop ()
143+
144+ # No custom input file found, return the default input key
145+ return configuration .input_key
0 commit comments