|
| 1 | +""" |
| 2 | +Collection metadata retrieval module. |
| 3 | +
|
| 4 | +This module provides a standalone interface for retrieving collection metadata |
| 5 | +from NASA's Common Metadata Repository (CMR) in UMM-C format, which is then |
| 6 | +parsed into a structured dataclass for use throughout the metgen pipeline. |
| 7 | +""" |
| 8 | + |
| 9 | +import logging |
| 10 | +from typing import Optional, Union |
| 11 | + |
| 12 | +import earthaccess |
| 13 | + |
| 14 | +from nsidc.metgen import constants |
| 15 | +from nsidc.metgen.models import CollectionMetadata |
| 16 | + |
| 17 | + |
| 18 | +class CollectionMetadataReader: |
| 19 | + """ |
| 20 | + Reader class for retrieving and parsing collection metadata. |
| 21 | + """ |
| 22 | + |
| 23 | + def __init__(self, environment: str = "uat"): |
| 24 | + """ |
| 25 | + Initialize the collection reader. |
| 26 | +
|
| 27 | + Args: |
| 28 | + environment: Environment to query ("uat" or "prod") |
| 29 | + """ |
| 30 | + self.environment = environment.lower() |
| 31 | + self.provider = self._get_provider() |
| 32 | + self.logger = logging.getLogger(constants.ROOT_LOGGER) |
| 33 | + |
| 34 | + def _get_provider(self) -> str: |
| 35 | + """Get the appropriate CMR provider based on environment.""" |
| 36 | + return ( |
| 37 | + constants.CMR_PROD_PROVIDER |
| 38 | + if self.environment == "prod" |
| 39 | + else constants.CMR_UAT_PROVIDER |
| 40 | + ) |
| 41 | + |
| 42 | + def _get_earthaccess_system(self): |
| 43 | + """Get the Earthdata Login system object.""" |
| 44 | + return earthaccess.PROD if self.environment == "prod" else earthaccess.UAT |
| 45 | + |
| 46 | + def get_collection_metadata( |
| 47 | + self, short_name: str, version: Union[str, int] |
| 48 | + ) -> CollectionMetadata: |
| 49 | + """ |
| 50 | + Retrieve collection metadata from CMR. |
| 51 | +
|
| 52 | + Args: |
| 53 | + short_name: Collection short name (e.g., "SNEX23_SSADUCk") |
| 54 | + version: Collection version (e.g., "1" or 1) |
| 55 | +
|
| 56 | + Returns: |
| 57 | + CollectionMetadata object containing parsed collection metadata |
| 58 | +
|
| 59 | + Raises: |
| 60 | + Exception: If Earthdata login fails or CMR query returns invalid data |
| 61 | + """ |
| 62 | + version_str = str(version) |
| 63 | + |
| 64 | + # Attempt Earthdata login |
| 65 | + if not earthaccess.login( |
| 66 | + strategy="environment", system=self._get_earthaccess_system() |
| 67 | + ): |
| 68 | + raise Exception( |
| 69 | + f"Earthdata login failed, cannot retrieve UMM-C metadata for " |
| 70 | + f"{short_name}.{version_str}" |
| 71 | + ) |
| 72 | + |
| 73 | + self.logger.info("Earthdata login succeeded.") |
| 74 | + |
| 75 | + # Search for collection in CMR |
| 76 | + cmr_response = earthaccess.search_datasets( |
| 77 | + short_name=short_name, |
| 78 | + version=version_str, |
| 79 | + has_granules=None, # Find collections with or without granules |
| 80 | + provider=self.provider, |
| 81 | + ) |
| 82 | + |
| 83 | + # Validate and parse response |
| 84 | + ummc = self._validate_cmr_response(cmr_response, short_name, version_str) |
| 85 | + |
| 86 | + return self._parse_ummc_metadata(ummc, short_name, version_str) |
| 87 | + |
| 88 | + def _validate_cmr_response( |
| 89 | + self, response: list, short_name: str, version: str |
| 90 | + ) -> dict: |
| 91 | + """ |
| 92 | + Validate the CMR response and extract the UMM-C record. |
| 93 | +
|
| 94 | + Args: |
| 95 | + response: Raw response from earthaccess |
| 96 | + short_name: Collection short name for error messages |
| 97 | + version: Collection version for error messages |
| 98 | +
|
| 99 | + Returns: |
| 100 | + Validated UMM-C dictionary |
| 101 | +
|
| 102 | + Raises: |
| 103 | + ValueError: If response is invalid |
| 104 | + """ |
| 105 | + if not response: |
| 106 | + raise ValueError( |
| 107 | + f"Empty UMM-C response from CMR for {short_name}.{version}" |
| 108 | + ) |
| 109 | + |
| 110 | + if len(response) > 1: |
| 111 | + raise ValueError( |
| 112 | + f"Multiple UMM-C records returned from CMR for {short_name}.{version}, " |
| 113 | + "none will be used." |
| 114 | + ) |
| 115 | + |
| 116 | + # Check that the response item is a dict before extracting |
| 117 | + if not isinstance(response[0], dict) or "umm" not in response[0]: |
| 118 | + raise ValueError( |
| 119 | + f"No UMM-C content in CMR response for {short_name}.{version}" |
| 120 | + ) |
| 121 | + |
| 122 | + # Extract the UMM-C content |
| 123 | + ummc = response[0].get("umm", response[0]) |
| 124 | + |
| 125 | + if not isinstance(ummc, dict): |
| 126 | + raise ValueError( |
| 127 | + f"Invalid UMM-C format in CMR response for {short_name}.{version}" |
| 128 | + ) |
| 129 | + |
| 130 | + return ummc |
| 131 | + |
| 132 | + def _parse_ummc_metadata( |
| 133 | + self, ummc: dict, short_name: str, version: str |
| 134 | + ) -> CollectionMetadata: |
| 135 | + """ |
| 136 | + Parse UMM-C record into structured metadata. |
| 137 | +
|
| 138 | + Args: |
| 139 | + ummc: UMM-C dictionary from CMR |
| 140 | + short_name: Collection short name |
| 141 | + version: Collection version |
| 142 | +
|
| 143 | + Returns: |
| 144 | + Populated CollectionMetadata object |
| 145 | + """ |
| 146 | + # Extract temporal extent and check for errors |
| 147 | + temporal_extent, temporal_error = self._parse_temporal_extent(ummc) |
| 148 | + |
| 149 | + # Build the metadata object |
| 150 | + return CollectionMetadata( |
| 151 | + short_name=short_name, |
| 152 | + version=version, |
| 153 | + entry_title=ummc.get("EntryTitle", f"{short_name}.{version}"), |
| 154 | + granule_spatial_representation=self._extract_nested_value( |
| 155 | + ummc, constants.GRANULE_SPATIAL_REP_PATH |
| 156 | + ), |
| 157 | + spatial_extent=self._extract_nested_value( |
| 158 | + ummc, constants.SPATIAL_EXTENT_PATH |
| 159 | + ), |
| 160 | + temporal_extent=temporal_extent, |
| 161 | + temporal_extent_error=temporal_error, |
| 162 | + ) |
| 163 | + |
| 164 | + def _parse_temporal_extent( |
| 165 | + self, ummc: dict |
| 166 | + ) -> tuple[Optional[list], Optional[str]]: |
| 167 | + """ |
| 168 | + Parse temporal extent from UMM-C, checking for validity. |
| 169 | +
|
| 170 | + Returns: |
| 171 | + Tuple of (temporal_extent, error_message) |
| 172 | + """ |
| 173 | + temporal_extent = self._extract_nested_value( |
| 174 | + ummc, constants.TEMPORAL_EXTENT_PATH |
| 175 | + ) |
| 176 | + |
| 177 | + if not temporal_extent: |
| 178 | + return None, None |
| 179 | + |
| 180 | + # Check if there are multiple temporal extents |
| 181 | + if len(temporal_extent) > 1: |
| 182 | + return ( |
| 183 | + temporal_extent, |
| 184 | + "Collection metadata must only contain one temporal extent when " |
| 185 | + "collection_temporal_override is set.", |
| 186 | + ) |
| 187 | + |
| 188 | + # Extract temporal details from the first extent |
| 189 | + temporal_details = self._get_temporal_details(temporal_extent[0]) |
| 190 | + |
| 191 | + if temporal_details and len(temporal_details) > 1: |
| 192 | + return ( |
| 193 | + temporal_details, |
| 194 | + "Collection metadata must only contain one temporal range or a single " |
| 195 | + "temporal value when collection_temporal_override is set.", |
| 196 | + ) |
| 197 | + |
| 198 | + return temporal_details, None |
| 199 | + |
| 200 | + def _get_temporal_details(self, temporal_extent: dict) -> Optional[list]: |
| 201 | + """ |
| 202 | + Extract temporal range or single date from temporal extent. |
| 203 | + """ |
| 204 | + # Check for single date times first |
| 205 | + single_dates = self._extract_nested_value( |
| 206 | + temporal_extent, constants.TEMPORAL_SINGLE_PATH |
| 207 | + ) |
| 208 | + if single_dates: |
| 209 | + return single_dates |
| 210 | + |
| 211 | + # Otherwise check for range date times |
| 212 | + return self._extract_nested_value( |
| 213 | + temporal_extent, constants.TEMPORAL_RANGE_PATH |
| 214 | + ) |
| 215 | + |
| 216 | + def _extract_nested_value( |
| 217 | + self, data: dict, keys: list[str] |
| 218 | + ) -> Optional[Union[str, list, dict]]: |
| 219 | + """ |
| 220 | + Extract a value from nested dictionary using a list of keys. |
| 221 | +
|
| 222 | + Args: |
| 223 | + data: Dictionary to search |
| 224 | + keys: List of keys representing the path to the value |
| 225 | +
|
| 226 | + Returns: |
| 227 | + The value if found, None otherwise |
| 228 | + """ |
| 229 | + if data is None: |
| 230 | + return None |
| 231 | + |
| 232 | + current = data |
| 233 | + |
| 234 | + for key in keys: |
| 235 | + if not isinstance(current, dict) or key not in current: |
| 236 | + self.logger.debug( |
| 237 | + f"Key path {' -> '.join(keys)} not found in UMM-C record" |
| 238 | + ) |
| 239 | + return None |
| 240 | + current = current[key] |
| 241 | + |
| 242 | + return current |
| 243 | + |
| 244 | + |
| 245 | +def get_collection_metadata( |
| 246 | + environment: str, short_name: str, version: Union[str, int] |
| 247 | +) -> CollectionMetadata: |
| 248 | + """ |
| 249 | + Retrieve collection metadata for the specified collection. |
| 250 | +
|
| 251 | + This function creates a CollectionReader instance and retrieves the metadata, |
| 252 | + providing a simple interface for the rest of the application. |
| 253 | +
|
| 254 | + Args: |
| 255 | + environment: Environment to query ("uat" or "prod") |
| 256 | + short_name: Collection short name |
| 257 | + version: Collection version |
| 258 | +
|
| 259 | + Returns: |
| 260 | + CollectionMetadata object |
| 261 | + """ |
| 262 | + reader = CollectionMetadataReader(environment) |
| 263 | + return reader.get_collection_metadata(short_name, version) |
0 commit comments