Skip to content

Commit c21b3ce

Browse files
authored
feat: upgrade aind data schema (#106)
* feat: upgrade to aind-data-schema v1.2.0 * refactor: use is_dict_corrupt from aind-data-schema * feat: create_metadata_json from aind-data-schema * fix: check is_dict_corrupt for existing .nd.json * refactor: use core schemas list from aind-data-schema * build: update pydnatic lower bound
1 parent f2bbfe0 commit c21b3ce

File tree

11 files changed

+415
-361
lines changed

11 files changed

+415
-361
lines changed

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ dependencies = [
2020
"boto3",
2121
"boto3-stubs[s3]",
2222
"pydantic-settings>=2.0",
23-
"pydantic>=2.7,<2.9",
23+
"pydantic>=2.10",
2424
"pymongo==4.3.3",
2525
"dask==2023.5.0",
26-
"aind-data-schema==1.0.0",
26+
"aind-data-schema==1.2.0",
2727
"aind-codeocean-api==0.5.0",
2828
]
2929

src/aind_data_asset_indexer/aind_bucket_indexer.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import boto3
1313
import dask.bag as dask_bag
14+
from aind_data_schema.base import is_dict_corrupt
1415
from mypy_boto3_s3 import S3Client
1516
from mypy_boto3_s3.type_defs import CopySourceTypeDef
1617
from pymongo import MongoClient
@@ -30,7 +31,6 @@
3031
get_dict_of_file_info,
3132
get_s3_bucket_and_prefix,
3233
get_s3_location,
33-
is_dict_corrupt,
3434
is_prefix_valid,
3535
is_record_location_valid,
3636
iterate_through_top_level,
@@ -215,15 +215,15 @@ def _resolve_schema_information(
215215
The fields in the DocDb record that will require updating.
216216
"""
217217
docdb_record_fields_to_update = dict()
218-
for core_schema_file_name in core_schema_file_names:
219-
field_name = core_schema_file_name.replace(".json", "")
218+
for (
219+
field_name,
220+
core_schema_file_name,
221+
) in core_schema_file_names.items():
220222
is_in_record = docdb_record.get(field_name) is not None
221223
is_in_root = (
222224
core_schema_info_in_root.get(core_schema_file_name) is not None
223225
)
224-
is_in_copy_subdir = (
225-
core_schema_file_name in list_of_schemas_in_copy_subdir
226-
)
226+
is_in_copy_subdir = field_name in list_of_schemas_in_copy_subdir
227227
# To avoid copying and pasting the same arguments, we'll keep it
228228
# them in a dict
229229
common_kwargs = {
@@ -580,8 +580,9 @@ def _process_prefix(
580580
collection = db[
581581
self.job_settings.doc_db_collection_name
582582
]
583-
if "_id" in json_contents:
584-
# TODO: check is_dict_corrupt(json_contents)
583+
if "_id" in json_contents and not is_dict_corrupt(
584+
json_contents
585+
):
585586
logging.info(
586587
f"Adding record to docdb for: {location}"
587588
)
@@ -602,6 +603,10 @@ def _process_prefix(
602603
self.job_settings.copy_original_md_subdir
603604
),
604605
)
606+
elif "_id" in json_contents:
607+
logging.warning(
608+
f"Metadata record for {location} is corrupt!"
609+
)
605610
else:
606611
logging.warning(
607612
f"Metadata record for {location} "

src/aind_data_asset_indexer/utils.py

Lines changed: 47 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,12 @@
1111

1212
from aind_codeocean_api.codeocean import CodeOceanClient
1313
from aind_data_schema.core.data_description import DataLevel, DataRegex
14-
from aind_data_schema.core.metadata import ExternalPlatforms, Metadata
15-
from aind_data_schema.utils.json_writer import SchemaWriter
14+
from aind_data_schema.core.metadata import CORE_FILES as CORE_SCHEMAS
15+
from aind_data_schema.core.metadata import (
16+
ExternalPlatforms,
17+
Metadata,
18+
create_metadata_json,
19+
)
1620
from botocore.exceptions import ClientError
1721
from mypy_boto3_s3 import S3Client
1822
from mypy_boto3_s3.type_defs import (
@@ -23,12 +27,9 @@
2327

2428
metadata_filename = Metadata.default_filename()
2529

26-
# TODO: This would be better if it was available in aind-data-schema
27-
core_schema_file_names = [
28-
s.default_filename()
29-
for s in SchemaWriter.get_schemas()
30-
if s.default_filename() != metadata_filename
31-
]
30+
core_schema_file_names = {
31+
field_name: f"{field_name}.json" for field_name in CORE_SCHEMAS
32+
}
3233

3334

3435
def create_object_key(prefix: str, filename: str) -> str:
@@ -328,11 +329,10 @@ def does_s3_metadata_copy_exist(
328329
Bucket=bucket, Prefix=copy_prefix, Delimiter="/"
329330
)
330331
if "Contents" in response:
331-
core_schemas = [s.replace(".json", "") for s in core_schema_file_names]
332332
pattern = re.escape(copy_prefix) + r"([a-zA-Z0-9_]+)\.\d{8}\.json$"
333333
for obj in response["Contents"]:
334334
m = re.match(pattern, obj["Key"])
335-
if m is not None and m.group(1) in core_schemas:
335+
if m is not None and m.group(1) in CORE_SCHEMAS:
336336
return True
337337
return False
338338

@@ -359,8 +359,8 @@ def list_metadata_copies(
359359
Returns
360360
-------
361361
List[str]
362-
A list of the core schemas in the copy_subdir without timestamp, e..g,
363-
["subject.json", "procedures.json", "processing.json"]
362+
A list of the core schemas in the copy_subdir without timestamp, e.g,
363+
["subject", "procedures", "processing"]
364364
"""
365365
# Use trailing slash and delimiter to get top-level objects in copy_subdir
366366
copy_prefix = create_object_key(prefix, copy_subdir.strip("/") + "/")
@@ -369,12 +369,11 @@ def list_metadata_copies(
369369
)
370370
files = []
371371
if "Contents" in response:
372-
core_schemas = [s.replace(".json", "") for s in core_schema_file_names]
373372
pattern = re.escape(copy_prefix) + r"([a-zA-Z0-9_]+)\.\d{8}\.json$"
374373
for obj in response["Contents"]:
375374
m = re.match(pattern, obj["Key"])
376-
if m is not None and m.group(1) in core_schemas:
377-
files.append(f"{m.group(1)}.json")
375+
if m is not None and m.group(1) in CORE_SCHEMAS:
376+
files.append(m.group(1))
378377
return files
379378

380379

@@ -440,12 +439,10 @@ def get_dict_of_core_schema_file_info(
440439
...
441440
}
442441
"""
443-
key_map = dict(
444-
[
445-
(create_object_key(prefix=prefix, filename=s), s)
446-
for s in core_schema_file_names
447-
]
448-
)
442+
key_map = {
443+
create_object_key(prefix=prefix, filename=file_name): file_name
444+
for file_name in core_schema_file_names.values()
445+
}
449446
file_info = get_dict_of_file_info(
450447
s3_client=s3_client, bucket=bucket, keys=list(key_map.keys())
451448
)
@@ -491,32 +488,6 @@ def iterate_through_top_level(
491488
]
492489

493490

494-
def is_dict_corrupt(input_dict: dict) -> bool:
495-
"""
496-
Checks that all the keys, included nested keys, don't contain '$' or '.'
497-
498-
Parameters
499-
----------
500-
input_dict : dict
501-
502-
Returns
503-
-------
504-
bool
505-
True if input_dict is not a dict, or if nested dictionary keys contain
506-
forbidden characters. False otherwise.
507-
508-
"""
509-
if not isinstance(input_dict, dict):
510-
return True
511-
for key, value in input_dict.items():
512-
if "$" in key or "." in key:
513-
return True
514-
elif isinstance(value, dict):
515-
if is_dict_corrupt(value):
516-
return True
517-
return False
518-
519-
520491
def download_json_file_from_s3(
521492
s3_client: S3Client, bucket: str, object_key: str
522493
) -> Optional[dict]:
@@ -578,44 +549,34 @@ def build_metadata_record_from_prefix(
578549
there are issues with Metadata construction.
579550
580551
"""
581-
file_keys = [
582-
create_object_key(prefix=prefix, filename=file_name)
583-
for file_name in core_schema_file_names
584-
]
585-
s3_file_responses = get_dict_of_file_info(
586-
s3_client=s3_client, bucket=bucket, keys=file_keys
552+
core_files_infos = get_dict_of_core_schema_file_info(
553+
s3_client=s3_client, bucket=bucket, prefix=prefix
587554
)
588555
record_name = prefix.strip("/") if optional_name is None else optional_name
589556
try:
590-
metadata_dict = {
591-
"name": record_name,
592-
"location": get_s3_location(bucket=bucket, prefix=prefix),
593-
}
594-
if optional_created is not None:
595-
metadata_dict["created"] = optional_created
596-
if optional_external_links is not None:
597-
metadata_dict["external_links"] = optional_external_links
598-
for object_key, response_data in s3_file_responses.items():
557+
core_jsons = dict()
558+
for field_name, file_name in core_schema_file_names.items():
559+
response_data = core_files_infos.get(file_name)
599560
if response_data is not None:
600-
field_name = object_key.split("/")[-1].replace(".json", "")
561+
object_key = create_object_key(prefix, file_name)
601562
json_contents = download_json_file_from_s3(
602563
s3_client=s3_client, bucket=bucket, object_key=object_key
603564
)
604565
if json_contents is not None:
605-
is_corrupt = is_dict_corrupt(input_dict=json_contents)
606-
if not is_corrupt:
607-
metadata_dict[field_name] = json_contents
608-
# TODO: We should handle constructing the Metadata file in a better way
609-
# in aind-data-schema. By using model_validate, a lot of info from the
610-
# original files get removed. For now, we can use model_construct
611-
# until a better method is implemented in aind-data-schema. This will
612-
# mark all the initial files as metadata_status=Unknown
613-
metadata_dict = Metadata.model_construct(
614-
**metadata_dict
615-
).model_dump_json(warnings=False, by_alias=True)
566+
core_jsons[field_name] = json_contents
567+
# Construct Metadata file using core schema jsons
568+
# Validation and de/serialization are handled in aind-data-schema
569+
metadata_dict = create_metadata_json(
570+
name=record_name,
571+
location=get_s3_location(bucket=bucket, prefix=prefix),
572+
core_jsons=core_jsons,
573+
optional_created=optional_created,
574+
optional_external_links=optional_external_links,
575+
)
576+
metadata_str = json.dumps(metadata_dict)
616577
except Exception:
617-
metadata_dict = None
618-
return metadata_dict
578+
metadata_str = None
579+
return metadata_str
619580

620581

621582
def cond_copy_then_sync_core_json_files(
@@ -705,17 +666,13 @@ def copy_core_json_files(
705666
"""
706667
tgt_copy_subdir = copy_original_md_subdir.strip("/")
707668
tgt_copy_prefix = create_object_key(prefix, tgt_copy_subdir)
708-
core_files_keys = [
709-
create_object_key(prefix=prefix, filename=s)
710-
for s in core_schema_file_names
711-
]
712-
core_files_infos = get_dict_of_file_info(
713-
s3_client=s3_client, bucket=bucket, keys=core_files_keys
669+
core_files_infos = get_dict_of_core_schema_file_info(
670+
s3_client=s3_client, bucket=bucket, prefix=prefix
714671
)
715-
for file_name in core_schema_file_names:
672+
for file_name in core_schema_file_names.values():
716673
source = create_object_key(prefix, file_name)
717674
source_location = get_s3_location(bucket=bucket, prefix=source)
718-
source_file_info = core_files_infos[source]
675+
source_file_info = core_files_infos[file_name]
719676
if source_file_info is not None:
720677
date_stamp = source_file_info["last_modified"].strftime("%Y%m%d")
721678
target = create_object_key(
@@ -766,16 +723,11 @@ def sync_core_json_files(
766723
None
767724
"""
768725
md_record_json = json.loads(metadata_json)
769-
core_files_keys = [
770-
create_object_key(prefix=prefix, filename=s)
771-
for s in core_schema_file_names
772-
]
773-
core_files_infos = get_dict_of_file_info(
774-
s3_client=s3_client, bucket=bucket, keys=core_files_keys
726+
core_files_infos = get_dict_of_core_schema_file_info(
727+
s3_client=s3_client, bucket=bucket, prefix=prefix
775728
)
776-
for file_name in core_schema_file_names:
729+
for field_name, file_name in core_schema_file_names.items():
777730
object_key = create_object_key(prefix, file_name)
778-
field_name = file_name.replace(".json", "")
779731
location = get_s3_location(bucket=bucket, prefix=object_key)
780732
if (
781733
field_name in md_record_json
@@ -785,7 +737,7 @@ def sync_core_json_files(
785737
field_contents_str = json.dumps(field_contents)
786738
# Core schema jsons are created if they don't already exist.
787739
# Otherwise, they are only updated if their contents are outdated.
788-
if core_files_infos[object_key] is None:
740+
if core_files_infos[file_name] is None:
789741
logging.info(f"Uploading new {field_name} to {location}")
790742
response = upload_json_str_to_s3(
791743
bucket=bucket,
@@ -795,7 +747,7 @@ def sync_core_json_files(
795747
)
796748
logging.debug(response)
797749
else:
798-
s3_object_hash = core_files_infos[object_key]["e_tag"].strip(
750+
s3_object_hash = core_files_infos[file_name]["e_tag"].strip(
799751
'"'
800752
)
801753
core_field_md5_hash = compute_md5_hash(field_contents_str)
@@ -818,7 +770,7 @@ def sync_core_json_files(
818770
else:
819771
# If a core field is None but the core json exists,
820772
# delete the core json.
821-
if core_files_infos[object_key] is not None:
773+
if core_files_infos[file_name] is not None:
822774
logging.info(
823775
f"{field_name} not found in metadata.nd.json for "
824776
f"{prefix} but {location} exists! Deleting."

0 commit comments

Comments
 (0)