1
1
import asyncio
2
2
import json
3
3
import logging
4
- from pathlib import Path
5
4
6
5
from more_itertools import flatten
7
6
from typing_extensions import override
@@ -31,10 +30,7 @@ async def purge(self) -> None:
31
30
"""
32
31
configuration = Configuration .get_global_configuration ()
33
32
34
- # First try to find the alternative format of the input file and process it if it exists.
35
- for file_path in self .path_to_kvs .glob ('*' ):
36
- if file_path .name in configuration .input_key_candidates :
37
- await self ._sanitize_input_json (file_path )
33
+ await self ._sanitize_input_json_files ()
38
34
39
35
async with self ._lock :
40
36
files_to_keep = set (
@@ -53,29 +49,31 @@ async def purge(self) -> None:
53
49
update_modified_at = True ,
54
50
)
55
51
56
- async def _sanitize_input_json (self , path : Path ) -> None :
57
- """Transform an input json file to match the naming convention expected by the FileSystemKeyValueStoreClient.
58
-
59
- For example: INPUT.json -> INPUT, INPUT.__metadata__.json
60
- """
52
+ async def _sanitize_input_json_files (self ) -> None :
53
+ """Handle missing metadata for input files."""
61
54
configuration = Configuration .get_global_configuration ()
62
-
63
- f = None
64
- try :
65
- f = await asyncio .to_thread (path .open )
66
- input_data = json .load (f )
67
- finally :
68
- if f is not None :
69
- f .close ()
70
-
71
- if await self .record_exists (key = configuration .canonical_input_key ):
72
- logger .warning (f'Redundant input file found: { path } ' )
73
- return
74
-
75
- logger .info (f'Renaming input file: { path .name } -> { configuration .canonical_input_key } ' )
76
-
77
- await asyncio .to_thread (path .unlink , missing_ok = True )
78
- await self .set_value (key = configuration .canonical_input_key , value = input_data )
55
+ alternative_keys = configuration .input_key_candidates - {configuration .canonical_input_key }
56
+
57
+ if (self .path_to_kvs / configuration .canonical_input_key ).exists ():
58
+ # Handle missing metadata
59
+ if not await self .record_exists (key = configuration .canonical_input_key ):
60
+ input_data = await asyncio .to_thread (
61
+ lambda : json .loads ((self .path_to_kvs / configuration .canonical_input_key ).read_text ())
62
+ )
63
+ await self .set_value (key = configuration .canonical_input_key , value = input_data )
64
+
65
+ for alternative_key in alternative_keys :
66
+ if (alternative_input_file := self .path_to_kvs / alternative_key ).exists ():
67
+ logger .warning (f'Redundant input file found: { alternative_input_file } ' )
68
+ else :
69
+ for alternative_key in alternative_keys :
70
+ alternative_input_file = self .path_to_kvs / alternative_key
71
+
72
+ # Handle missing metadata
73
+ if alternative_input_file .exists () and not await self .record_exists (key = alternative_key ):
74
+ with alternative_input_file .open () as f :
75
+ input_data = await asyncio .to_thread (lambda : json .load (f ))
76
+ await self .set_value (key = configuration .canonical_input_key , value = input_data )
79
77
80
78
@override
81
79
async def get_value (self , * , key : str ) -> KeyValueStoreRecord | None :
0 commit comments