Skip to content

Commit b05b916

Browse files
authored
feat: adds method to attach links to external data assets (#87)
1 parent 94cebb1 commit b05b916

File tree

5 files changed

+455
-7
lines changed

5 files changed

+455
-7
lines changed

src/aind_data_asset_indexer/aind_bucket_indexer.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -266,12 +266,14 @@ def _resolve_schema_information(
266266
object_key = create_object_key(
267267
prefix=prefix, filename=core_schema_file_name
268268
)
269-
common_kwargs["core_schema_info_in_root"] = (
270-
get_dict_of_file_info(
271-
s3_client=s3_client,
272-
bucket=self.job_settings.s3_bucket,
273-
keys=[object_key],
274-
).get(object_key)
269+
common_kwargs[
270+
"core_schema_info_in_root"
271+
] = get_dict_of_file_info(
272+
s3_client=s3_client,
273+
bucket=self.job_settings.s3_bucket,
274+
keys=[object_key],
275+
).get(
276+
object_key
275277
)
276278
self._copy_file_from_root_to_subdir(**common_kwargs)
277279
# If field is null, a file exists in the root folder, and

src/aind_data_asset_indexer/codeocean_bucket_indexer.py

Lines changed: 190 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,18 @@
66
import os
77
import sys
88
import warnings
9-
from typing import List
9+
from datetime import datetime
10+
from typing import List, Optional, Union
1011

1112
import boto3
1213
import dask.bag as dask_bag
14+
import requests
1315
from aind_codeocean_api.codeocean import CodeOceanClient
16+
from aind_data_schema.core.metadata import ExternalPlatforms
1417
from mypy_boto3_s3 import S3Client
1518
from pymongo import MongoClient
19+
from pymongo.operations import UpdateOne
20+
from requests.exceptions import ReadTimeout
1621

1722
from aind_data_asset_indexer.models import CodeOceanIndexBucketJobSettings
1823
from aind_data_asset_indexer.utils import (
@@ -44,6 +49,184 @@ def __init__(self, job_settings: CodeOceanIndexBucketJobSettings):
4449
"""Class constructor."""
4550
self.job_settings = job_settings
4651

52+
def _get_external_data_asset_records(self) -> Optional[List[dict]]:
53+
"""
54+
Retrieves list of code ocean ids and locations for external data
55+
assets. The timeout is set to 600 seconds.
56+
Returns
57+
-------
58+
List[dict] | None
59+
List items have shape {"id": str, "location": str}. If error occurs,
60+
return None.
61+
"""
62+
try:
63+
response = requests.get(
64+
self.job_settings.temp_codeocean_endpoint,
65+
timeout=600,
66+
)
67+
if response.status_code == 200:
68+
return response.json()
69+
else:
70+
return None
71+
except ReadTimeout:
72+
logging.error(
73+
f"Read timed out at "
74+
f"{self.job_settings.temp_codeocean_endpoint}"
75+
)
76+
return None
77+
78+
@staticmethod
79+
def _map_external_list_to_dict(external_recs: List[dict]) -> dict:
80+
"""
81+
Maps the response received from Code Ocean into a dict. For example,
82+
[{"id": "abc", "location": "s3://bucket/prefix},
83+
{"id": "def", "location": "s3://bucket/prefix"}]
84+
will be mapped to {"s3://bucket/prefix": ["abc", "def"]}
85+
Parameters
86+
----------
87+
external_recs : List[dict]
88+
89+
Returns
90+
-------
91+
dict
92+
93+
"""
94+
new_records = dict()
95+
for r in external_recs:
96+
location = r["location"]
97+
rec_id = r["id"]
98+
if new_records.get(location) is not None:
99+
old_id_set = new_records.get(location)
100+
old_id_set.add(rec_id)
101+
new_records[location] = old_id_set
102+
else:
103+
new_records[location] = {rec_id}
104+
return new_records
105+
106+
@staticmethod
107+
def _get_co_links_from_record(
108+
docdb_record: Union[dict, list]
109+
) -> List[str]:
110+
"""
111+
Small utility to parse the external_links field of the docdb record.
112+
Supports the legacy type.
113+
Parameters
114+
----------
115+
docdb_record : dict | list
116+
The legacy type was a list, while the current version is a dict.
117+
118+
Returns
119+
-------
120+
List[str]
121+
122+
"""
123+
external_links = docdb_record.get("external_links", [])
124+
125+
# Hopefully, ExternalPlatforms.CODEOCEAN doesn't change
126+
if isinstance(external_links, dict):
127+
external_links = external_links.get(
128+
ExternalPlatforms.CODEOCEAN.value, []
129+
)
130+
else:
131+
external_links = [
132+
r.get(ExternalPlatforms.CODEOCEAN.value)
133+
for r in external_links
134+
]
135+
return external_links
136+
137+
def _update_external_links_in_docdb(
138+
self, docdb_client: MongoClient
139+
) -> None:
140+
"""
141+
This method will:
142+
1) Retrieve a list of codeocean data asset ids and locations from CO
143+
2) Paginate through the docdb records where the location doesn't match
144+
the internal co bucket.
145+
3) Add or remove the external_links from the docdb record if needed.
146+
Parameters
147+
----------
148+
docdb_client : MongoClient
149+
150+
Returns
151+
-------
152+
None
153+
154+
"""
155+
# Should return a list like [{"id": co_id, "location": "s3://..."},]
156+
list_of_co_ids_and_locations = self._get_external_data_asset_records()
157+
db = docdb_client[self.job_settings.doc_db_db_name]
158+
collection = db[self.job_settings.doc_db_collection_name]
159+
if list_of_co_ids_and_locations is not None:
160+
co_loc_to_id_map = self._map_external_list_to_dict(
161+
list_of_co_ids_and_locations
162+
)
163+
pages = paginate_docdb(
164+
docdb_client=docdb_client,
165+
db_name=self.job_settings.doc_db_db_name,
166+
collection_name=self.job_settings.doc_db_collection_name,
167+
filter_query={
168+
"location": {
169+
"$not": {
170+
"$regex": f"^s3://{self.job_settings.s3_bucket}.*"
171+
}
172+
}
173+
},
174+
projection={"_id": 1, "location": 1, "external_links": 1},
175+
page_size=500,
176+
)
177+
for page in pages:
178+
records_to_update = []
179+
for record in page:
180+
location = record.get("location")
181+
external_links = self._get_co_links_from_record(record)
182+
code_ocean_ids = (
183+
None
184+
if location is None
185+
else co_loc_to_id_map.get(location)
186+
)
187+
docdb_rec_id = record["_id"]
188+
if (
189+
external_links is not None
190+
and code_ocean_ids is not None
191+
and code_ocean_ids != set(external_links)
192+
):
193+
new_external_links = code_ocean_ids
194+
elif external_links is not None and not code_ocean_ids:
195+
logging.info(
196+
f"No code ocean data asset ids found for "
197+
f"{location}. Removing external links from record."
198+
)
199+
new_external_links = dict()
200+
else:
201+
new_external_links = None
202+
if new_external_links is not None:
203+
record_links = {
204+
ExternalPlatforms.CODEOCEAN.value: sorted(
205+
list(new_external_links)
206+
)
207+
}
208+
last_modified = datetime.utcnow().isoformat()
209+
records_to_update.append(
210+
UpdateOne(
211+
filter={"_id": docdb_rec_id},
212+
update={
213+
"$set": {
214+
"external_links": record_links,
215+
"last_modified": last_modified,
216+
}
217+
},
218+
upsert=False,
219+
)
220+
)
221+
if len(records_to_update) > 0:
222+
logging.info(f"Updating {len(records_to_update)} records")
223+
write_response = collection.bulk_write(
224+
requests=records_to_update
225+
)
226+
logging.debug(write_response)
227+
else:
228+
logging.error("There was an error retrieving external links!")
229+
47230
def _process_codeocean_record(
48231
self,
49232
codeocean_record: dict,
@@ -220,6 +403,12 @@ def run_job(self):
220403
password=self.job_settings.doc_db_password.get_secret_value(),
221404
authSource="admin",
222405
)
406+
# Use existing client to add external links to fields
407+
logging.info("Adding links to records.")
408+
self._update_external_links_in_docdb(
409+
docdb_client=iterator_docdb_client
410+
)
411+
logging.info("Finished adding links to records")
223412
all_docdb_records = dict()
224413
docdb_pages = paginate_docdb(
225414
db_name=self.job_settings.doc_db_db_name,

src/aind_data_asset_indexer/models.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,12 @@ class CodeOceanIndexBucketJobSettings(IndexJobSettings):
122122
doc_db_collection_name: str
123123
codeocean_domain: str
124124
codeocean_token: SecretStr
125+
temp_codeocean_endpoint: str = Field(
126+
description=(
127+
"Temp proxy to access code ocean information from their analytics "
128+
"databases."
129+
)
130+
)
125131

126132
@classmethod
127133
def from_param_store(cls, param_store_name: str):

0 commit comments

Comments
 (0)