@@ -25,8 +25,30 @@ def _normalize(text, form='NFC'):
2525 return unicodedata .normalize (form , str (text ))
2626
2727
28- def _download_from_s3 (bucket_name , prefix , local_dir ):
28+ def _is_write_local_or_both ():
29+ is_write_local_or_both = globals .config .get ('aws' ).get ('s3_and_or_local_file_system' )
30+ logger .debug (f"is_write_local_or_both: { is_write_local_or_both } " )
31+ return is_write_local_or_both is not None and (is_write_local_or_both == 'local' or is_write_local_or_both == 'both' )
32+
33+
34+ def _download_multiple_files_from_local (prefix , local_dir ):
35+ src = _get_local_write_path (prefix )
36+ shutil .copytree (src , local_dir , dirs_exist_ok = True )
37+
38+ def _get_local_write_path (dir_or_file : str = None ) -> str :
39+ if dir_or_file is not None :
40+ local_write_path = globals .config ['aws' ]['local_file_system_path' ] + '/' + dir_or_file
41+ else :
42+ local_write_path = globals .config ['aws' ]['local_file_system_path' ] + '/'
43+ logger .debug (f"local_write_path: { local_write_path } " )
44+ return local_write_path
45+
46+ def download_multiple_files_from_s3 (bucket_name , prefix , local_dir ):
47+ if _is_write_local_or_both ():
48+ return _download_multiple_files_from_local (prefix , local_dir )
49+
2950 """Downloads files from an S3 bucket and a specified prefix to a local directory."""
51+ logger .info (f"download_multiple_files_from_s3, bucket_name={ bucket_name } , prefix={ prefix } , local_dir={ local_dir } " )
3052 s3_client = boto3 .client ('s3' )
3153
3254 # Ensure the local directory exists
@@ -36,38 +58,46 @@ def _download_from_s3(bucket_name, prefix, local_dir):
3658 # List and download files
3759 try :
3860 response = s3_client .list_objects_v2 (Bucket = bucket_name , Prefix = prefix )
39- if 'Contents' in response :
40- for obj in response ['Contents' ]:
41- file_key = obj ['Key' ]
42- if file_key .endswith ('/' ):
43- continue
44- local_file_path = os .path .join (local_dir , os .path .basename (file_key ))
45- s3_client .download_file (bucket_name , file_key , local_file_path )
46- logger .debug (f"Downloaded: { local_file_path } " )
47- else :
48- logger .warning (f"No files found in S3 Bucket: '{ bucket_name } ' with Prefix: '{ prefix } '" )
61+ key_list = list_s3_files (bucket_name , prefix , suffix = None )
62+ for file_key in key_list :
63+ logger .debug (f"file_key={ file_key } , prefix={ prefix } " )
64+ local_file_key = file_key .replace (prefix , "" )
65+ parent_dir_in_s3 = os .path .dirname (local_file_key )
66+ logger .debug (f"local_file_key={ local_file_key } , parent_dir_in_s3={ parent_dir_in_s3 } " )
67+ # the first char for parent_dir_in_s3 would always be a '/' so skip that
68+ local_dir_to_create = os .path .join (local_dir , parent_dir_in_s3 [1 :])
69+ os .makedirs (local_dir_to_create , exist_ok = True )
70+ logger .debug (f"local_dir_to_create={ local_dir_to_create } , local_file_key={ local_file_key } " )
71+ local_file_to_create = os .path .basename (local_file_key )
72+ if file_key .endswith ('/' ):
73+ logger .info (f"skipping file_key={ file_key } " )
74+ continue
75+
76+ local_file_path = os .path .join (local_dir_to_create , local_file_to_create )
77+ logger .debug (f"bucket_name={ bucket_name } , file_key={ file_key } , local_file_path={ local_file_path } " )
78+ s3_client .download_file (bucket_name , file_key , local_file_path )
79+ logger .debug (f"download_multiple_files_from_s3, Downloaded: { local_file_path } " )
4980 except Exception as e :
5081 logger .error (f"An error occurred while downloading from S3: { e } " )
5182
52-
5383class CustomTokenizer :
5484 """A custom tokenizer class"""
5585 TOKENS : int = 1000
5686 WORDS : int = 750
5787
5888 def __init__ (self , bucket , prefix , local_dir ):
59- logger . info (f"CustomTokenizer, based on HF transformers, { bucket } "
60- f"prefix: { prefix } local_dir: { local_dir } " )
89+ print (f"CustomTokenizer, based on HF transformers, { bucket } "
90+ f"prefix: { prefix } local_dir: { local_dir } " )
6191 # Check if the tokenizer files exist in s3 and if not, use the autotokenizer
62- _download_from_s3 (bucket , prefix , local_dir )
92+ download_multiple_files_from_s3 (bucket , prefix , local_dir )
6393 # Load the tokenizer from the local directory
6494 dir_not_empty = any (Path (local_dir ).iterdir ())
6595 if dir_not_empty is True :
66- logger . info ("loading the provided tokenizer from local_dir={local_dir}" )
96+ print ("loading the provided tokenizer from local_dir={local_dir}" )
6797 self .tokenizer = AutoTokenizer .from_pretrained (local_dir )
6898 else :
69- logger . error (f"no tokenizer provided, the { local_dir } is empty, "
70- f"using default tokenizer i.e. { self .WORDS } words = { self .TOKENS } tokens" )
99+ print (f"no tokenizer provided, the { local_dir } is empty, "
100+ f"using default tokenizer i.e. { self .WORDS } words = { self .TOKENS } tokens" )
71101 self .tokenizer = None
72102
73103 def count_tokens (self , text ):
@@ -90,8 +120,8 @@ def load_config(config_file) -> Dict:
90120 region_name = session .region_name
91121 if region_name is None :
92122 print (f"boto3.session.Session().region_name is { region_name } , "
93- f"going to use an s3 client to determine region name" )
94- region_name = boto3 . client ( 's3' ). meta . region_name
123+ f"going to use an metadata api to determine region name" )
124+ region_name = requests . get ( "http://169.254.169.254/latest/ meta-data/placement/availability-zone" ). text [: - 1 ]
95125 print (f"region_name={ region_name } , also setting the AWS_DEFAULT_REGION env var" )
96126 os .environ ["AWS_DEFAULT_REGION" ] = region_name
97127 print (f"region_name={ region_name } " )
@@ -210,19 +240,6 @@ def _is_write_local_only():
210240 logger .debug (f"is_write_local_only: { is_write_local_only } " )
211241 return is_write_local_only is not None and is_write_local_only == 'local'
212242
213- def _is_write_local_or_both ():
214- is_write_local_or_both = globals .config .get ('aws' ).get ('s3_and_or_local_file_system' )
215- logger .debug (f"is_write_local_or_both: { is_write_local_or_both } " )
216- return is_write_local_or_both is not None and (is_write_local_or_both == 'local' or is_write_local_or_both == 'both' )
217-
218- def _get_local_write_path (dir_or_file : str = None ) -> str :
219- if dir_or_file is not None :
220- local_write_path = globals .config ['aws' ]['local_file_system_path' ] + '/' + dir_or_file
221- else :
222- local_write_path = globals .config ['aws' ]['local_file_system_path' ] + '/'
223- logger .debug (f"local_write_path: { local_write_path } " )
224- return local_write_path
225-
226243def _upload_file_to_local (local_path : str , s3_path : str ) -> None :
227244 dest = _get_local_write_path (s3_path )
228245 shutil .copy (local_path , dest )
@@ -372,43 +389,4 @@ def list_s3_files(bucket, prefix, suffix='.json'):
372389 logger .info (f"there are total of { len (return_list )} items in bucket={ bucket } , prefix={ prefix } , suffix={ suffix } " )
373390 return return_list
374391
375- def _download_multiple_files_from_local (prefix , local_dir ):
376- src = _get_local_write_path (prefix )
377- shutil .copytree (src , local_dir , dirs_exist_ok = True )
378-
379- def download_multiple_files_from_s3 (bucket_name , prefix , local_dir ):
380- if _is_write_local_or_both ():
381- return _download_multiple_files_from_local (prefix , local_dir )
382-
383- """Downloads files from an S3 bucket and a specified prefix to a local directory."""
384- logger .info (f"download_multiple_files_from_s3, bucket_name={ bucket_name } , prefix={ prefix } , local_dir={ local_dir } " )
385- s3_client = boto3 .client ('s3' )
386-
387- # Ensure the local directory exists
388- if not os .path .exists (local_dir ):
389- os .makedirs (local_dir )
390-
391- # List and download files
392- try :
393- response = s3_client .list_objects_v2 (Bucket = bucket_name , Prefix = prefix )
394- key_list = list_s3_files (bucket_name , prefix , suffix = None )
395- for file_key in key_list :
396- logger .debug (f"file_key={ file_key } , prefix={ prefix } " )
397- local_file_key = file_key .replace (prefix , "" )
398- parent_dir_in_s3 = os .path .dirname (local_file_key )
399- logger .debug (f"local_file_key={ local_file_key } , parent_dir_in_s3={ parent_dir_in_s3 } " )
400- # the first char for parent_dir_in_s3 would always be a '/' so skip that
401- local_dir_to_create = os .path .join (local_dir , parent_dir_in_s3 [1 :])
402- os .makedirs (local_dir_to_create , exist_ok = True )
403- logger .debug (f"local_dir_to_create={ local_dir_to_create } , local_file_key={ local_file_key } " )
404- local_file_to_create = os .path .basename (local_file_key )
405- if file_key .endswith ('/' ):
406- logger .info (f"skipping file_key={ file_key } " )
407- continue
408392
409- local_file_path = os .path .join (local_dir_to_create , local_file_to_create )
410- logger .debug (f"bucket_name={ bucket_name } , file_key={ file_key } , local_file_path={ local_file_path } " )
411- s3_client .download_file (bucket_name , file_key , local_file_path )
412- logger .debug (f"download_multiple_files_from_s3, Downloaded: { local_file_path } " )
413- except Exception as e :
414- logger .error (f"An error occurred while downloading from S3: { e } " )
0 commit comments