1212
1313import boto3
1414import dask .bag as dask_bag
15- import requests
16- from aind_codeocean_api .codeocean import CodeOceanClient
1715from aind_data_schema .core .metadata import ExternalPlatforms
16+ from codeocean import CodeOcean
17+ from codeocean .data_asset import DataAssetSearchOrigin , DataAssetSearchParams
1818from mypy_boto3_s3 import S3Client
1919from pymongo import MongoClient
2020from pymongo .operations import UpdateOne
21- from requests . exceptions import ReadTimeout
21+ from urllib3 . util import Retry
2222
2323from aind_data_asset_indexer .models import CodeOceanIndexBucketJobSettings
2424from aind_data_asset_indexer .utils import (
@@ -52,30 +52,51 @@ def __init__(self, job_settings: CodeOceanIndexBucketJobSettings):
5252 """Class constructor."""
5353 self .job_settings = job_settings
5454
55- def _get_external_data_asset_records (self ) -> Optional [List [dict ]]:
55+ @staticmethod
56+ def _get_external_data_asset_records (
57+ co_client : CodeOcean ,
58+ ) -> Optional [List [dict ]]:
5659 """
5760 Retrieves list of code ocean ids and locations for external data
5861 assets. The timeout is set to 600 seconds.
62+
63+ Parameters
64+ ----------
65+ co_client : CodeOcean
66+
5967 Returns
6068 -------
6169 List[dict] | None
6270 List items have shape {"id": str, "location": str}. If error occurs,
6371 return None.
72+
6473 """
6574 try :
66- response = requests .get (
67- self .job_settings .temp_codeocean_endpoint ,
68- timeout = 600 ,
75+ search_params = DataAssetSearchParams (
76+ archived = False ,
77+ origin = DataAssetSearchOrigin .External ,
78+ limit = 1000 ,
6979 )
70- if response .status_code == 200 :
71- return response .json ()
72- else :
73- return None
74- except ReadTimeout :
75- logging .error (
76- f"Read timed out at "
77- f"{ self .job_settings .temp_codeocean_endpoint } "
80+ data_assets = co_client .data_assets .search_data_assets_iterator (
81+ search_params = search_params
7882 )
83+ external_records = []
84+ for data_asset in data_assets :
85+ data_asset_source = data_asset .source_bucket
86+ if (
87+ data_asset_source is not None
88+ and data_asset_source .bucket is not None
89+ and data_asset_source .prefix is not None
90+ ):
91+ bucket = data_asset_source .bucket
92+ prefix = data_asset_source .prefix
93+ location = f"s3://{ bucket } /{ prefix } "
94+ external_records .append (
95+ {"id" : data_asset .id , "location" : location }
96+ )
97+ return external_records
98+ except Exception as e :
99+ logging .exception (e )
79100 return None
80101
81102 @staticmethod
@@ -97,7 +118,7 @@ def _map_external_list_to_dict(external_recs: List[dict]) -> dict:
97118 """
98119 new_records = dict ()
99120 for r in external_recs :
100- location = r .get ("source " )
121+ location = r .get ("location " )
101122 rec_id = r ["id" ]
102123 if location is not None and new_records .get (location ) is not None :
103124 old_id_set = new_records .get (location )
@@ -140,7 +161,7 @@ def _get_co_links_from_record(
140161 return external_links
141162
142163 def _update_external_links_in_docdb (
143- self , docdb_client : MongoClient
164+ self , docdb_client : MongoClient , co_client : CodeOcean
144165 ) -> None :
145166 """
146167 This method will:
@@ -159,7 +180,9 @@ def _update_external_links_in_docdb(
159180
160181 """
161182 # Should return a list like [{"id": co_id, "location": "s3://..."},]
162- list_of_co_ids_and_locations = self ._get_external_data_asset_records ()
183+ list_of_co_ids_and_locations = self ._get_external_data_asset_records (
184+ co_client = co_client
185+ )
163186 db = docdb_client [self .job_settings .doc_db_db_name ]
164187 collection = db [self .job_settings .doc_db_collection_name ]
165188 if list_of_co_ids_and_locations is not None :
@@ -394,9 +417,16 @@ def _delete_records_from_docdb(self, record_list: List[str]):
394417 def run_job (self ):
395418 """Main method to run."""
396419 logging .info ("Starting to scan through CodeOcean." )
397- co_client = CodeOceanClient (
420+ retry = Retry (
421+ total = 5 ,
422+ backoff_factor = 1 ,
423+ status_forcelist = [429 , 500 , 502 , 503 , 504 ],
424+ allowed_methods = ["GET" , "POST" ],
425+ )
426+ co_client = CodeOcean (
398427 domain = self .job_settings .codeocean_domain ,
399428 token = self .job_settings .codeocean_token .get_secret_value (),
429+ retries = retry ,
400430 )
401431 code_ocean_records = get_all_processed_codeocean_asset_records (
402432 co_client = co_client ,
@@ -416,7 +446,7 @@ def run_job(self):
416446 # Use existing client to add external links to fields
417447 logging .info ("Adding links to records." )
418448 self ._update_external_links_in_docdb (
419- docdb_client = iterator_docdb_client
449+ docdb_client = iterator_docdb_client , co_client = co_client
420450 )
421451 logging .info ("Finished adding links to records" )
422452 all_docdb_records = dict ()
0 commit comments