|
6 | 6 | import os |
7 | 7 | import sys |
8 | 8 | import warnings |
9 | | -from typing import List |
| 9 | +from datetime import datetime |
| 10 | +from typing import List, Optional, Union |
10 | 11 |
|
11 | 12 | import boto3 |
12 | 13 | import dask.bag as dask_bag |
| 14 | +import requests |
13 | 15 | from aind_codeocean_api.codeocean import CodeOceanClient |
| 16 | +from aind_data_schema.core.metadata import ExternalPlatforms |
14 | 17 | from mypy_boto3_s3 import S3Client |
15 | 18 | from pymongo import MongoClient |
| 19 | +from pymongo.operations import UpdateOne |
| 20 | +from requests.exceptions import ReadTimeout |
16 | 21 |
|
17 | 22 | from aind_data_asset_indexer.models import CodeOceanIndexBucketJobSettings |
18 | 23 | from aind_data_asset_indexer.utils import ( |
@@ -44,6 +49,184 @@ def __init__(self, job_settings: CodeOceanIndexBucketJobSettings): |
44 | 49 | """Class constructor.""" |
45 | 50 | self.job_settings = job_settings |
46 | 51 |
|
| 52 | + def _get_external_data_asset_records(self) -> Optional[List[dict]]: |
| 53 | + """ |
| 54 | + Retrieves list of code ocean ids and locations for external data |
| 55 | + assets. The timeout is set to 600 seconds. |
| 56 | + Returns |
| 57 | + ------- |
| 58 | + List[dict] | None |
| 59 | + List items have shape {"id": str, "location": str}. If error occurs, |
| 60 | + return None. |
| 61 | + """ |
| 62 | + try: |
| 63 | + response = requests.get( |
| 64 | + self.job_settings.temp_codeocean_endpoint, |
| 65 | + timeout=600, |
| 66 | + ) |
| 67 | + if response.status_code == 200: |
| 68 | + return response.json() |
| 69 | + else: |
| 70 | + return None |
| 71 | + except ReadTimeout: |
| 72 | + logging.error( |
| 73 | + f"Read timed out at " |
| 74 | + f"{self.job_settings.temp_codeocean_endpoint}" |
| 75 | + ) |
| 76 | + return None |
| 77 | + |
| 78 | + @staticmethod |
| 79 | + def _map_external_list_to_dict(external_recs: List[dict]) -> dict: |
| 80 | + """ |
| 81 | + Maps the response received from Code Ocean into a dict. For example, |
| 82 | + [{"id": "abc", "location": "s3://bucket/prefix}, |
| 83 | + {"id": "def", "location": "s3://bucket/prefix"}] |
| 84 | + will be mapped to {"s3://bucket/prefix": ["abc", "def"]} |
| 85 | + Parameters |
| 86 | + ---------- |
| 87 | + external_recs : List[dict] |
| 88 | +
|
| 89 | + Returns |
| 90 | + ------- |
| 91 | + dict |
| 92 | +
|
| 93 | + """ |
| 94 | + new_records = dict() |
| 95 | + for r in external_recs: |
| 96 | + location = r.get("source") |
| 97 | + rec_id = r["id"] |
| 98 | + if location is not None and new_records.get(location) is not None: |
| 99 | + old_id_set = new_records.get(location) |
| 100 | + old_id_set.add(rec_id) |
| 101 | + new_records[location] = old_id_set |
| 102 | + else: |
| 103 | + new_records[location] = {rec_id} |
| 104 | + return new_records |
| 105 | + |
| 106 | + @staticmethod |
| 107 | + def _get_co_links_from_record( |
| 108 | + docdb_record: Union[dict, list] |
| 109 | + ) -> List[str]: |
| 110 | + """ |
| 111 | + Small utility to parse the external_links field of the docdb record. |
| 112 | + Supports the legacy type. |
| 113 | + Parameters |
| 114 | + ---------- |
| 115 | + docdb_record : dict | list |
| 116 | + The legacy type was a list, while the current version is a dict. |
| 117 | +
|
| 118 | + Returns |
| 119 | + ------- |
| 120 | + List[str] |
| 121 | +
|
| 122 | + """ |
| 123 | + external_links = docdb_record.get("external_links", []) |
| 124 | + |
| 125 | + # Hopefully, ExternalPlatforms.CODEOCEAN doesn't change |
| 126 | + if isinstance(external_links, dict): |
| 127 | + external_links = external_links.get( |
| 128 | + ExternalPlatforms.CODEOCEAN.value, [] |
| 129 | + ) |
| 130 | + else: |
| 131 | + external_links = [ |
| 132 | + r.get(ExternalPlatforms.CODEOCEAN.value) |
| 133 | + for r in external_links |
| 134 | + ] |
| 135 | + return external_links |
| 136 | + |
| 137 | + def _update_external_links_in_docdb( |
| 138 | + self, docdb_client: MongoClient |
| 139 | + ) -> None: |
| 140 | + """ |
| 141 | + This method will: |
| 142 | + 1) Retrieve a list of codeocean data asset ids and locations from CO |
| 143 | + 2) Paginate through the docdb records where the location doesn't match |
| 144 | + the internal co bucket. |
| 145 | + 3) Add or remove the external_links from the docdb record if needed. |
| 146 | + Parameters |
| 147 | + ---------- |
| 148 | + docdb_client : MongoClient |
| 149 | +
|
| 150 | + Returns |
| 151 | + ------- |
| 152 | + None |
| 153 | +
|
| 154 | + """ |
| 155 | + # Should return a list like [{"id": co_id, "location": "s3://..."},] |
| 156 | + list_of_co_ids_and_locations = self._get_external_data_asset_records() |
| 157 | + db = docdb_client[self.job_settings.doc_db_db_name] |
| 158 | + collection = db[self.job_settings.doc_db_collection_name] |
| 159 | + if list_of_co_ids_and_locations is not None: |
| 160 | + co_loc_to_id_map = self._map_external_list_to_dict( |
| 161 | + list_of_co_ids_and_locations |
| 162 | + ) |
| 163 | + pages = paginate_docdb( |
| 164 | + docdb_client=docdb_client, |
| 165 | + db_name=self.job_settings.doc_db_db_name, |
| 166 | + collection_name=self.job_settings.doc_db_collection_name, |
| 167 | + filter_query={ |
| 168 | + "location": { |
| 169 | + "$not": { |
| 170 | + "$regex": f"^s3://{self.job_settings.s3_bucket}.*" |
| 171 | + } |
| 172 | + } |
| 173 | + }, |
| 174 | + projection={"_id": 1, "location": 1, "external_links": 1}, |
| 175 | + page_size=500, |
| 176 | + ) |
| 177 | + for page in pages: |
| 178 | + records_to_update = [] |
| 179 | + for record in page: |
| 180 | + location = record.get("location") |
| 181 | + external_links = self._get_co_links_from_record(record) |
| 182 | + code_ocean_ids = ( |
| 183 | + None |
| 184 | + if location is None |
| 185 | + else co_loc_to_id_map.get(location) |
| 186 | + ) |
| 187 | + docdb_rec_id = record["_id"] |
| 188 | + if ( |
| 189 | + external_links is not None |
| 190 | + and code_ocean_ids is not None |
| 191 | + and code_ocean_ids != set(external_links) |
| 192 | + ): |
| 193 | + new_external_links = code_ocean_ids |
| 194 | + elif external_links is not None and not code_ocean_ids: |
| 195 | + logging.info( |
| 196 | + f"No code ocean data asset ids found for " |
| 197 | + f"{location}. Removing external links from record." |
| 198 | + ) |
| 199 | + new_external_links = dict() |
| 200 | + else: |
| 201 | + new_external_links = None |
| 202 | + if new_external_links is not None: |
| 203 | + record_links = { |
| 204 | + ExternalPlatforms.CODEOCEAN.value: sorted( |
| 205 | + list(new_external_links) |
| 206 | + ) |
| 207 | + } |
| 208 | + last_modified = datetime.utcnow().isoformat() |
| 209 | + records_to_update.append( |
| 210 | + UpdateOne( |
| 211 | + filter={"_id": docdb_rec_id}, |
| 212 | + update={ |
| 213 | + "$set": { |
| 214 | + "external_links": record_links, |
| 215 | + "last_modified": last_modified, |
| 216 | + } |
| 217 | + }, |
| 218 | + upsert=False, |
| 219 | + ) |
| 220 | + ) |
| 221 | + if len(records_to_update) > 0: |
| 222 | + logging.info(f"Updating {len(records_to_update)} records") |
| 223 | + write_response = collection.bulk_write( |
| 224 | + requests=records_to_update |
| 225 | + ) |
| 226 | + logging.debug(write_response) |
| 227 | + else: |
| 228 | + logging.error("There was an error retrieving external links!") |
| 229 | + |
47 | 230 | def _process_codeocean_record( |
48 | 231 | self, |
49 | 232 | codeocean_record: dict, |
@@ -220,6 +403,12 @@ def run_job(self): |
220 | 403 | password=self.job_settings.doc_db_password.get_secret_value(), |
221 | 404 | authSource="admin", |
222 | 405 | ) |
| 406 | + # Use existing client to add external links to fields |
| 407 | + logging.info("Adding links to records.") |
| 408 | + self._update_external_links_in_docdb( |
| 409 | + docdb_client=iterator_docdb_client |
| 410 | + ) |
| 411 | + logging.info("Finished adding links to records") |
223 | 412 | all_docdb_records = dict() |
224 | 413 | docdb_pages = paginate_docdb( |
225 | 414 | db_name=self.job_settings.doc_db_db_name, |
|
0 commit comments