@@ -167,7 +167,9 @@ def sign_download_url(self,
167167 """Signs a download URL for a remote file."""
168168 raise NotImplementedError
169169
170- def sign_upload_url (self , remote_path , minutes = SIGNED_URL_EXPIRATION_MINUTES ):
170+ def sign_upload_urls (self ,
171+ remote_paths : List [str ],
172+ minutes = SIGNED_URL_EXPIRATION_MINUTES ):
171173 """Signs an upload URL for a remote file."""
172174 raise NotImplementedError
173175
@@ -407,11 +409,11 @@ def sign_download_url(self,
407409 """Signs a download URL for a remote file."""
408410 return _sign_url (remote_path , method = 'GET' , minutes = minutes )
409411
410- def sign_upload_url (self ,
411- remote_path : str ,
412- minutes = SIGNED_URL_EXPIRATION_MINUTES ) -> str :
412+ def sign_upload_urls (self ,
413+ remote_paths : List [ str ] ,
414+ minutes = SIGNED_URL_EXPIRATION_MINUTES ) -> List [ str ] :
413415 """Signs an upload URL for a remote file."""
414- return _sign_url ( remote_path , method = 'PUT' , minutes = minutes )
416+ return _sign_urls ( remote_paths , method = 'PUT' , minutes = minutes )
415417
416418 def download_signed_url (self , signed_url ):
417419 """Downloads |signed_url|."""
@@ -438,21 +440,33 @@ def _sign_url(remote_path: str,
438440 minutes = SIGNED_URL_EXPIRATION_MINUTES ,
439441 method = 'GET' ) -> str :
440442 """Returns a signed URL for |remote_path| with |method|."""
443+ return _sign_urls ([remote_path ], minutes , method )[0 ]
444+
445+
446+ def _sign_urls (remote_paths : List [str ],
447+ minutes = SIGNED_URL_EXPIRATION_MINUTES ,
448+ method = 'GET' ) -> List [str ]:
441449 if _integration_test_env_doesnt_support_signed_urls ():
442- return remote_path
450+ return remote_paths
443451 minutes = datetime .timedelta (minutes = minutes )
444- bucket_name , object_path = get_bucket_name_and_path ( remote_path )
452+
445453 signing_creds , access_token = _signing_creds ()
446454 client = _storage_client ()
447- bucket = client .bucket (bucket_name )
448- blob = bucket .blob (object_path )
449- return blob .generate_signed_url (
450- version = 'v4' ,
451- expiration = minutes ,
452- method = method ,
453- credentials = signing_creds ,
454- access_token = access_token ,
455- service_account_email = signing_creds .service_account_email )
455+ urls = []
456+ for remote_path in remote_paths :
457+ bucket_name , object_path = get_bucket_name_and_path (remote_path )
458+ bucket = client .bucket (bucket_name )
459+ blob = bucket .blob (object_path )
460+
461+ urls .append (
462+ blob .generate_signed_url (
463+ version = 'v4' ,
464+ expiration = minutes ,
465+ method = method ,
466+ credentials = signing_creds ,
467+ access_token = access_token ,
468+ service_account_email = signing_creds .service_account_email ))
469+ return urls
456470
457471
458472class FileSystemProvider (StorageProvider ):
@@ -670,11 +684,13 @@ def sign_download_url(self,
670684 del minutes
671685 return remote_path
672686
673- def sign_upload_url (self , remote_path , minutes = SIGNED_URL_EXPIRATION_MINUTES ):
687+ def sign_upload_urls (self ,
688+ remote_paths : List [str ],
689+ minutes = SIGNED_URL_EXPIRATION_MINUTES ):
674690 """Returns remote_path since we are pretending to sign a URL for
675691 upload."""
676692 del minutes
677- return remote_path
693+ return remote_paths
678694
679695 def download_signed_url (self , signed_url ):
680696 """Downloads |signed_url|."""
@@ -1237,8 +1253,7 @@ def download_signed_url_to_file(url, filepath):
12371253def get_signed_upload_url (remote_path , minutes = SIGNED_URL_EXPIRATION_MINUTES ):
12381254 """Returns a signed upload URL for |remote_path|. Does not download the
12391255 contents."""
1240- provider = _provider ()
1241- return provider .sign_upload_url (remote_path , minutes = minutes )
1256+ return get_signed_upload_urls ([remote_path ], minutes = minutes )[0 ]
12421257
12431258
12441259def get_signed_download_url (remote_path , minutes = SIGNED_URL_EXPIRATION_MINUTES ):
@@ -1340,29 +1355,28 @@ def delete_signed_urls(urls):
13401355 logs .info ('Done deleting URLs.' )
13411356
13421357
1343- def _sign_urls_for_existing_file (
1344- url_and_include_delete_urls : Tuple [str , bool ],
1358+ def _sign_urls_for_existing_files (
1359+ urls_and_include_delete_urls : Tuple [List [ str ] , bool ],
13451360 minutes : int = SIGNED_URL_EXPIRATION_MINUTES ) -> Tuple [str , str ]:
1346- corpus_element_url , include_delete_urls = url_and_include_delete_urls
1347- download_url = get_signed_download_url (corpus_element_url , minutes )
1348- if include_delete_urls :
1349- delete_url = sign_delete_url (corpus_element_url , minutes )
1350- else :
1351- delete_url = ''
1352- return (download_url , delete_url )
1353-
1354-
1355- def _mappable_sign_urls_for_existing_file (url_and_include_delete_urls ):
1356- url , include_delete_urls = url_and_include_delete_urls
1357- return _sign_urls_for_existing_file (url , include_delete_urls )
1361+ corpus_element_urls , include_delete_urls = urls_and_include_delete_urls
1362+ signed_urls = []
1363+ for corpus_element_url in corpus_element_urls :
1364+ download_url = get_signed_download_url (corpus_element_url , minutes )
1365+ if include_delete_urls :
1366+ delete_url = sign_delete_url (corpus_element_url , minutes )
1367+ else :
1368+ delete_url = ''
1369+ signed_urls .append (download_url , delete_url )
1370+ return signed_urls
13581371
13591372
13601373def sign_urls_for_existing_files (urls ,
13611374 include_delete_urls ) -> List [Tuple [str , str ]]:
13621375 logs .info ('Signing URLs for existing files.' )
1363- args = ((url , include_delete_urls ) for url in urls )
1376+ url_batches = utils .batched (urls , 2 )
1377+ args = ((url_batch , include_delete_urls ) for url_batch in url_batches )
13641378 with concurrency .make_pool (cpu_bound = True , max_pool_size = 2 ) as pool :
1365- result = pool .map (_sign_urls_for_existing_file , args )
1379+ result = pool .map (_sign_urls_for_existing_files , args )
13661380 logs .info ('Done signing URLs for existing files.' )
13671381 return result
13681382
@@ -1372,6 +1386,14 @@ def get_arbitrary_signed_upload_url(remote_directory):
13721386 get_arbitrary_signed_upload_urls (remote_directory , num_uploads = 1 ))[0 ]
13731387
13741388
1389+ def get_signed_upload_urls (remote_paths : List [str ],
1390+ minutes = SIGNED_URL_EXPIRATION_MINUTES ):
1391+ """Returns a signed upload URL for |remote_path|. Does not download the
1392+ contents."""
1393+ provider = _provider ()
1394+ return provider .sign_upload_urls (remote_paths , minutes = minutes )
1395+
1396+
13751397def get_arbitrary_signed_upload_urls (remote_directory : str ,
13761398 num_uploads : int ) -> List [str ]:
13771399 """Returns |num_uploads| number of signed upload URLs to upload files with
@@ -1392,6 +1414,7 @@ def get_arbitrary_signed_upload_urls(remote_directory: str,
13921414 logs .info ('Signing URLs for arbitrary uploads.' )
13931415 with concurrency .make_pool (
13941416 _POOL_SIZE , cpu_bound = True , max_pool_size = 2 ) as pool :
1395- result = list (pool .map (get_signed_upload_url , urls ))
1417+ url_batches = utils .batched (urls , 2 )
1418+ result = list (pool .map (get_signed_upload_urls , url_batches ))
13961419 logs .info ('Done signing URLs for arbitrary uploads.' )
13971420 return result
0 commit comments