diff --git a/README.rst b/README.rst index 841b4537..867141bf 100644 --- a/README.rst +++ b/README.rst @@ -86,6 +86,9 @@ of the oemetadata-specification to help users stick with the latest enhancements To ease the conversion of oemetadata from any outdated version to the latest version, we provide a conversion functionality. The following example shows how to convert the oemetadata from v1.6 to v2.0. +Starting form v2 we do not support conversions for patch versions. This means you can convert from v1.6 to v2.0 but not from v2.0.0 to v2.0.1. +The oemetadata release procedure requires to only add breaking changes to major or minor version. Only these changes will require a conversion. + CLI - oemetadata conversion:: # Not implemented yet @@ -112,7 +115,7 @@ Module usage - In python scripts you can use the conversion:: meta = read_json_file(file_path) # use omi to convert it to the latest release - converted = convert_metadata(meta, "OEMetadata-2.0.1") + converted = convert_metadata(meta, "OEMetadata-2.0") # now you can store the result as json file with open("result.json", "w", encoding="utf-8") as json_file: @@ -129,7 +132,7 @@ two arguments the first one is the metadata and the second optional one is the s the validation will try to get the matching schema for the current metadata. -CLI - oemetadata conversion:: +CLI - oemetadata validation:: # Not implemented yet diff --git a/poetry.lock b/poetry.lock index 00b15d6c..4657ef34 100644 --- a/poetry.lock +++ b/poetry.lock @@ -592,13 +592,13 @@ files = [ [[package]] name = "oemetadata" -version = "2.0.1" +version = "2.0.2" description = "Open Energy Metadata (OEMetadata) - The energy metadata standard" optional = false -python-versions = ">=3.6" +python-versions = ">=3.8" files = [ - {file = "oemetadata-2.0.1-py3-none-any.whl", hash = "sha256:eb5f5946ac9d1c7ead544d375181a9f90146f36b9fd2b09db051fee11589e4a6"}, - {file = "oemetadata-2.0.1.tar.gz", hash = "sha256:9b70cd622979dee563ce48b76c50e5e1c11384a2330698e5420c1885db6fe003"}, + {file = "oemetadata-2.0.2-py3-none-any.whl", hash = "sha256:11137bd55f461aa70586c5fe88ce603d364ccba8b68db6cbdab02727dbdf4cff"}, + {file = "oemetadata-2.0.2.tar.gz", hash = "sha256:f7a1cb55f5bb0857640eb0a7095c89f14c14d5a632df7a57be81d89decb79cb3"}, ] [[package]] diff --git a/src/omi/base.py b/src/omi/base.py index 7804a805..5b7e4dc7 100644 --- a/src/omi/base.py +++ b/src/omi/base.py @@ -4,15 +4,17 @@ import json import pathlib +import re from dataclasses import dataclass import requests -from metadata import v20, v152, v160 +from oemetadata.v1 import v152, v160 +from oemetadata.v2 import v20 from .settings import OEP_URL # Order matters! First entry equals latest version of metadata format -METADATA_FORMATS = {"OEP": ["OEMetadata-2.0.1", "OEP-1.6.0", "OEP-1.5.2"], "INSPIRE": []} +METADATA_FORMATS = {"OEP": ["OEMetadata-2.0", "OEP-1.6.0", "OEP-1.5.2"], "INSPIRE": []} METADATA_VERSIONS = {version: md_format for md_format, versions in METADATA_FORMATS.items() for version in versions} @@ -70,13 +72,28 @@ def get_metadata_version(metadata: dict) -> str: """ # For OEP metadata try: - return metadata["metaMetadata"]["metadataVersion"] + return __normalize_metadata_version(metadata["metaMetadata"]["metadataVersion"]) except KeyError: pass msg = "Could not extract metadata version from metadata." raise MetadataError(msg) +def __normalize_metadata_version(version: str) -> str: + """ + Normalize a metadata version string by stripping patch numbers. + + For example, "OEMetadata-2.0.4" becomes "OEMetadata-2.0". + """ + if not isinstance(version, str): + raise MetadataError(f"Metadata version must be a string, not {type(version)}.") + # This regex captures "OEMetadata-2.0" from "OEMetadata-2.0.4" or similar + m = re.match(r"^(OEMetadata-2\.\d+)(?:\.\d+)?$", version) + if m: + return m.group(1) + return version + + def get_latest_metadata_version(metadata_format: str) -> str: """ Return the latest metadata version of a given metadata format. @@ -148,7 +165,7 @@ def __get_metadata_specs_for_oep(metadata_version: str) -> MetadataSpecification MetadataSpecification Metadata schema for given metadata version including template and example. """ - metadata_modules = {"OEP-1.5.2": v152, "OEP-1.6.0": v160, "OEMetadata-2.0.1": v20} + metadata_modules = {"OEP-1.5.2": v152, "OEP-1.6.0": v160, "OEMetadata-2.0": v20} metadata_module = metadata_modules[metadata_version] module_path = pathlib.Path(metadata_module.__file__).parent specs = {} diff --git a/src/omi/conversion.py b/src/omi/conversion.py index d7e5b710..e6e4d7ac 100644 --- a/src/omi/conversion.py +++ b/src/omi/conversion.py @@ -4,7 +4,9 @@ from copy import deepcopy -from omi.base import get_metadata_specification, get_metadata_version +from omi.base import get_metadata_version +from omi.conversions.v152_to_v160 import convert_oep_152_to_160 +from omi.conversions.v160_to_v20 import convert_oep_160_to_20 class ConversionError(Exception): @@ -77,181 +79,7 @@ def get_chain(current_version: str) -> list[str] | None: raise ConversionError(f"No conversion chain found from {source_version} to {target_version}.") -def __convert_oep_152_to_160(metadata: dict) -> dict: - """ - Convert metadata with version "OEP-1.5.2" to "OEP-1.6.0". - - Parameters - ---------- - metadata: dict - Metadata - - Returns - ------- - dict - Updated metadata - """ - # No changes in metadata fields - metadata["metaMetadata"]["metadataVersion"] = "OEP-1.6.0" - return metadata - - -def __convert_oep_160_to_200(metadata: dict) -> dict: - """ - Convert metadata with version "OEP-1.6.0" to "OEMetadata-2.0.1" using the v2.0 template. - - Parameters - ---------- - metadata: dict - Metadata dictionary in v1.6 format - - Returns - ------- - dict - Updated metadata dictionary in v2.0 format - """ - metadata_v2 = deepcopy(get_metadata_specification("OEMetadata-2.0.1").template) - metadata_v2["name"] = metadata_v2["title"] = metadata_v2["id"] = metadata_v2["description"] = None - - # Populate metadata v2 resources - for i, resource in enumerate(metadata.get("resources", [])): - resource_v2 = ___v2_ensure_resource_entry(metadata_v2, i) - ___v2_populate_resource_v2(resource_v2, metadata, resource) - - # Update metaMetadata section - metadata_v2["metaMetadata"]["metadataVersion"] = "OEMetadata-2.0.1" - metadata_v2["metaMetadata"]["metadataLicense"] = metadata.get("metaMetadata", {}).get("metadataLicense") - - return metadata_v2 - - -def ___v2_ensure_resource_entry(metadata_v2: dict, index: int) -> dict: - """Ensure a resource entry exists in metadata_v2 resources for the given index.""" - if index >= len(metadata_v2["resources"]): - metadata_v2["resources"].append(deepcopy(metadata_v2["resources"][0])) - return metadata_v2["resources"][index] - - -def ___v2_populate_resource_v2(resource_v2: dict, metadata: dict, resource: dict) -> None: - """Populate resource_v2 fields based on metadata and resource from v1.6.""" - # Bulk update keys without - resource_v2.update( - { - "@id": metadata.get("@id"), - "@context": metadata.get("@context"), - "name": resource.get("name").split(".")[1], - "topics": [resource.get("name", "").split(".")[0]], - "title": metadata.get("title"), - "path": metadata.get("id"), - "description": metadata.get("description"), - "languages": metadata.get("language", []), - "subject": metadata.get("subject", []), - "keywords": metadata.get("keywords", []), - "publicationDate": metadata.get("publicationDate"), - "context": metadata.get("context", {}), - "temporal": metadata.get("temporal", {}), - "type": None, - "format": resource.get("format"), - "encoding": resource.get("encoding"), - "schema": { - "fields": resource.get("schema", {}).get("fields", []), - "primaryKey": resource.get("schema", {}).get("primaryKey", []), - "foreignKeys": resource.get("schema", {}).get("foreignKeys", []), - }, - "dialect": resource.get("dialect", {}), - "review": metadata.get("review", {}), - }, - ) - - resource_v2["context"]["publisher"] = None - - resource_v2["embargoPeriod"]["start"] = None - resource_v2["embargoPeriod"]["end"] = None - - # Set to null to avoid validation errors: URI - resource_v2["spatial"]["location"]["@id"] = None - resource_v2["spatial"]["location"]["address"] = metadata.get("spatial", {}).get("location") - resource_v2["spatial"]["location"]["latitude"] = None - resource_v2["spatial"]["location"]["longitude"] = None - # Set to null to avoid validation errors: URI - resource_v2["spatial"]["extent"]["name"] = metadata.get("spatial", {}).get("extent") - resource_v2["spatial"]["extent"]["@id"] = None - resource_v2["spatial"]["extent"]["resolutionValue"], resource_v2["spatial"]["extent"]["resolutionUnit"] = ( - metadata.get("spatial", {}).get("resolution", "").split(" ", 1) - ) - resource_v2["spatial"]["extent"]["crs"] = None - - ___v2_populate_sources(resource_v2, metadata.get("sources", [])) - ___v2_populate_contributors(resource_v2, metadata.get("contributors", [])) - ___v2_populate_licenses(resource_v2, metadata.get("licenses", [])) - ___v2_populate_schema_fields(resource_v2, resource) - - -def ___v2_populate_sources(resource_v2: dict, sources: list) -> None: - """Populate sources in resource_v2 from sources in v1.6.""" - for i_source, source in enumerate(sources): - if i_source >= len(resource_v2["sources"]): - resource_v2["sources"].append(deepcopy(resource_v2["sources"][0])) - source_v2 = resource_v2["sources"][i_source] - source_v2.update( - { - "title": source.get("title"), - "description": source.get("description"), - "path": source.get("path"), - "publicationYear": None, - "authors": [], - }, - ) - ___v2_populate_source_licenses(source_v2, source.get("licenses", [])) - - -def ___v2_populate_source_licenses(source_v2: dict, licenses: list) -> None: - """Populate licenses in source_v2 from licenses in v1.6.""" - for i_license, license_entry in enumerate(licenses): - if i_license >= len(source_v2["licenses"]): - source_v2["licenses"].append(deepcopy(source_v2["licenses"][0])) - source_v2["licenses"][i_license].update(license_entry) - source_v2["licenses"][i_license]["copyrightStatement"] = None - - -def ___v2_populate_contributors(resource_v2: dict, contributors: list) -> None: - """Populate contributors in resource_v2 from contributors in v1.6.""" - for i_contribution, contributor in enumerate(contributors): - if i_contribution >= len(resource_v2["contributors"]): - resource_v2["contributors"].append(deepcopy(resource_v2["contributors"][0])) - contributor_v2 = resource_v2["contributors"][i_contribution] - contributor_v2.update( - { - "title": contributor.get("title"), - "path": contributor.get("path"), - "organization": contributor.get("organization"), - "date": contributor.get("date"), - "object": contributor.get("object"), - "comment": contributor.get("comment"), - }, - ) - - -def ___v2_populate_licenses(resource_v2: dict, licenses: list) -> None: - """Populate licenses in resource_v2 from licenses in v1.6.""" - for i_license, license_entry in enumerate(licenses): - if i_license >= len(resource_v2["licenses"]): - resource_v2["licenses"].append(deepcopy(resource_v2["licenses"][0])) - resource_v2["licenses"][i_license].update(license_entry) - resource_v2["licenses"][i_license]["copyrightStatement"] = None - - -def ___v2_populate_schema_fields(resource_v2: dict, resource: dict) -> None: - """Populate schema fields in resource_v2 from resource in v1.6.""" - for i_field, field in enumerate(resource.get("schema", {}).get("fields", [])): - if i_field >= len(resource_v2["schema"]["fields"]): - resource_v2["schema"]["fields"].append(deepcopy(resource_v2["schema"]["fields"][0])) - schema_field_v2 = resource_v2["schema"]["fields"][i_field] - schema_field_v2.update(field) - schema_field_v2["nullable"] = None - - METADATA_CONVERSIONS = { - ("OEP-1.5.2", "OEP-1.6.0"): __convert_oep_152_to_160, - ("OEP-1.6.0", "OEMetadata-2.0.1"): __convert_oep_160_to_200, + ("OEP-1.5.2", "OEP-1.6.0"): convert_oep_152_to_160, + ("OEP-1.6.0", "OEMetadata-2.0"): convert_oep_160_to_20, } diff --git a/src/omi/conversions/README.md b/src/omi/conversions/README.md new file mode 100644 index 00000000..3451c740 --- /dev/null +++ b/src/omi/conversions/README.md @@ -0,0 +1,5 @@ +# Conversions + +This module is used to collect all existing OEMetaData version conversions. Each step in the conversion chain is stored in its own sub module. OMI supports the OEMetaData starting from v1.5.2 previous version are only supported by omi version > v1.0.0. + +Since OEMetaData version 2 we decided to use patch versions to only update content or documentation parts of the metadata specification. Therefore OMI will only implement conversion steps for minor versions since they will include all minor structural changes like changing JSON key names or adding new key:value pairs. More substantial changes to the JSON structure will be reflected in a major version change this would include changing the nested structure of the metadata. diff --git a/src/omi/conversions/__init__.py b/src/omi/conversions/__init__.py new file mode 100644 index 00000000..5becc17c --- /dev/null +++ b/src/omi/conversions/__init__.py @@ -0,0 +1 @@ +__version__ = "1.0.0" diff --git a/src/omi/conversions/utils.py b/src/omi/conversions/utils.py new file mode 100644 index 00000000..c9ef1471 --- /dev/null +++ b/src/omi/conversions/utils.py @@ -0,0 +1,89 @@ +"""Utility functions for data conversion.""" + +import re + + +def find_temporal_resolution_value_and_unit(resolution: str) -> tuple[str, str]: + """ + Find temporal resolution value and unit from a resolution string. + + For temporal resolution, if the string starts with a number, this function will extract the number + as the value and any following alphabetical characters as the unit. If no leading numeric value is found, + the whole string is treated as a descriptive resolution with an empty unit. + + Possible formats: + - "yearly" + - "hourly" + - "1 h" + - "5 years" + - "1h" + + Parameters + ---------- + resolution: str + Temporal resolution string. + + Returns + ------- + tuple[str, str] + Temporal resolution value and unit. + """ + # Try matching a number (with optional decimals) and an optional unit, allowing for spaces in between. + match = re.match(r"^\s*(\d+(?:\.\d+)?)(?:\s*([a-zA-Z]+))?\s*$", resolution) + if match: + value = match.group(1) + unit = match.group(2) if match.group(2) is not None else "" + return value, unit + + # If no numeric pattern is detected, return the entire trimmed string as the value. + return resolution.strip(), "" + + +def find_spatial_resolution_value_and_unit(resolution: str) -> tuple[str, str]: + """ + Find spatial resolution value and unit from a resolution string. + + For spatial resolution, this function attempts to extract a numeric value with a 'm' (meters) unit, + as in "100 m" or even when embedded in a longer string like "vector, 10 m". If such a pattern is found, + the numeric part is returned as the value and the unit is set to "m". Otherwise, the entire string + is returned as a descriptive resolution (value) with an empty unit. + + Possible formats: + - "vector, 10 m" + - "100 m" + - "Germany" + - "NUTS-0" + - "MVGD" + - "Regionale Planungsgemeinschaften und Berlin" + - "national" + - "country" + + Parameters + ---------- + resolution: str + Spatial resolution string. + + Returns + ------- + tuple[str, str] + Spatial resolution value and unit (unit is expected to be 'm' when a numeric resolution is provided). + """ + # Search for a numeric value followed by optional whitespace and an 'm' unit (case-insensitive). + match = re.search(r"(\d+(?:\.\d+)?)\s*m\b", resolution, re.IGNORECASE) + if match: + value = match.group(1) + unit = "m" + return value, unit + + # If no numeric pattern is detected, return the entire trimmed string as the value. + return resolution.strip(), "" + + +license_cc_by_4 = { + "name": "CC-BY-4.0", + "title": "Creative Commons Attribution 4.0 International", + "path": "https://creativecommons.org/licenses/by/4.0/legalcode", + "instruction": "You are free to share and adapt, but you must attribute and cant add additional restrictions. See https://creativecommons.org/licenses/by/4.0/deed.en for further information.", # noqa: E501 + "attribution": "", + "copyrightStatement": "", +} diff --git a/src/omi/conversions/v152_to_v160.py b/src/omi/conversions/v152_to_v160.py new file mode 100644 index 00000000..659f55b6 --- /dev/null +++ b/src/omi/conversions/v152_to_v160.py @@ -0,0 +1,20 @@ +"""Conversion functions for metadata version "OEP-1.5.2" to "OEP-1.6.0".""" + + +def convert_oep_152_to_160(metadata: dict) -> dict: + """ + Convert metadata with version "OEP-1.5.2" to "OEP-1.6.0". + + Parameters + ---------- + metadata: dict + Metadata + + Returns + ------- + dict + Updated metadata + """ + # No changes in metadata fields + metadata["metaMetadata"]["metadataVersion"] = "OEP-1.6.0" + return metadata diff --git a/src/omi/conversions/v160_to_v20.py b/src/omi/conversions/v160_to_v20.py new file mode 100644 index 00000000..d416bbb8 --- /dev/null +++ b/src/omi/conversions/v160_to_v20.py @@ -0,0 +1,334 @@ +"""Conversion functions for metadata version "OEP-1.6.0" to "OEMetadata-2.0".""" + +from copy import deepcopy + +from omi.base import get_metadata_specification + +# use utils.find_spatial_resolution_value_and_unit +from omi.conversions.utils import find_temporal_resolution_value_and_unit + + +def convert_oep_160_to_20(metadata: dict) -> dict: + """ + Convert metadata with version "OEP-1.6.0" to "OEMetadata-2.0" using the v2.0 template. + + Parameters + ---------- + metadata: dict + Metadata dictionary in v1.6 format + + Returns + ------- + dict + Updated metadata dictionary in v2.0 format + """ + metadata_v2 = deepcopy(get_metadata_specification("OEMetadata-2.0").template) + + # Update to v2 context URL + metadata_v2[ + "@context" + ] = "https://raw.githubusercontent.com/OpenEnergyPlatform/oemetadata/production/oemetadata/v2/v20/context.json" + metadata_v2["name"] = metadata_v2["title"] = metadata_v2["description"] = "" + + metadata_v2["@id"] = None + + # Populate metadata v2 resources + for i, resource in enumerate(metadata.get("resources", [])): + resource_v2 = ___v2_ensure_resource_entry(metadata_v2, i) + ___v2_populate_resource_v2(resource_v2, metadata, resource) + + # Update metaMetadata section + metadata_v2["metaMetadata"]["metadataVersion"] = "OEMetadata-2.0.4" + metadata_v2["metaMetadata"]["metadataLicense"] = metadata.get("metaMetadata", {}).get("metadataLicense") + + return metadata_v2 + + +def ___v2_ensure_resource_entry(metadata_v2: dict, index: int) -> dict: + """Ensure a resource entry exists in metadata_v2 resources for the given index.""" + if index >= len(metadata_v2["resources"]): + metadata_v2["resources"].append(deepcopy(metadata_v2["resources"][0])) + return metadata_v2["resources"][index] + + +def ___v2_populate_resource_v2(resource_v2: dict, metadata: dict, resource: dict) -> None: # noqa: C901 + """Populate resource_v2 fields based on metadata and resource from v1.6.""" + # Bulk update keys without + resource_v2.update( + { + "@id": metadata.get("@id"), + "name": metadata.get("name", "") or "", + "title": metadata.get("title"), + "path": metadata.get("id"), + "description": metadata.get("description"), + "publicationDate": metadata.get("publicationDate"), + "type": "Table", + "format": resource.get("format"), + "encoding": resource.get("encoding"), + }, + ) + + if metadata.get("language"): + if isinstance(metadata.get("language"), str): + resource_v2["languages"].pop() + resource_v2["languages"].append(metadata.get("language", [""]) or []) + if isinstance(metadata.get("language"), list): + resource_v2["languages"].pop() + resource_v2["languages"].extend(metadata.get("language", [""]) or []) + + if metadata.get("keywords"): + resource_v2["keywords"] = metadata.get("keywords", [""]) or [] + + # Update metadata v2 subject key -> path to @id + ___v2_populate_subjects(resource_v2, metadata.get("subject", []) or []) + + if metadata.get("context"): + resource_v2["context"].update(metadata.get("context")) + + # Set to null to avoid validation errors: URI + resource_v2["spatial"]["location"]["@id"] = None + resource_v2["spatial"]["location"]["address"] = (metadata.get("spatial", {}) or {}).get("location") + + unpack = resource_v2["spatial"]["location"]["address"] + resource_v2["spatial"]["location"]["address"] = ", ".join(unpack) if isinstance(unpack, list) else unpack + + # Set to null to avoid validation errors: URI + resource_v2["spatial"]["extent"]["name"] = (metadata.get("spatial", {}) or {}).get("extent") + resource_v2["spatial"]["extent"]["@id"] = None + + unpack = resource_v2["spatial"]["extent"]["name"] + resource_v2["spatial"]["extent"]["name"] = ", ".join(unpack) if isinstance(unpack, list) else unpack + + resolution = (metadata.get("spatial", {}) or {}).get("resolution", "") + if resolution: + parts = resolution.split(" ", 1) + + if len(parts) == 2: # noqa: PLR2004 + resource_v2["spatial"]["extent"]["resolutionValue"] = parts[0] + resource_v2["spatial"]["extent"]["resolutionUnit"] = parts[1] + elif len(parts) == 1 and parts[0]: + # If there's a value but no unit, assign the value and use a default for the unit + resource_v2["spatial"]["extent"]["resolutionValue"] = parts[0] + resource_v2["spatial"]["extent"]["resolutionUnit"] = "" + + ___v2_populate_temporal(resource_v2, metadata.get("temporal", {}) or {}) + + ___v2_populate_sources(resource_v2, metadata.get("sources", []) or []) + ___v2_populate_contributors(resource_v2, metadata.get("contributors", []) or []) + ___v2_populate_licenses(resource_v2, metadata.get("licenses", []) or []) + ___v2_populate_schema_fields(resource_v2, resource) + ___v2_populate_schema_primary_keys(resource_v2, resource) + ___v2_populate_schema_foreign_keys(resource_v2, resource) + + if resource.get("dialect"): + resource_v2["dialect"].update(resource.get("dialect", {})) + + if metadata.get("review"): + resource_v2["review"].update(metadata.get("review", {})) + + +def ___v2_populate_subjects(resource_v2: list, subjects: list) -> None: + """Populate licenses in source_v2 from licenses in v1.6.""" + if not subjects: + resource_v2["subject"][0]["@id"] = None + + for i_subject, subject_entry in enumerate(subjects): + if i_subject >= len(resource_v2["subject"]): + resource_v2["subject"].append(deepcopy(resource_v2["subject"][0])) + + resource_v2["subject"][i_subject].update(rename_path_to_id(subject_entry)) + + if resource_v2["subject"][i_subject]["@id"] == "": + resource_v2["subject"][i_subject]["@id"] = None + + +def ___v2_populate_temporal(resource_v2: list, temporal: dict) -> None: + """Populate temporal in resource_v2 from temporal in v1.6.""" + if isinstance(temporal.get("referenceDate"), str): + resource_v2["temporal"]["referenceDate"] = temporal["referenceDate"] + + if "timeseries" not in temporal.keys(): + temporal["timeseries"] = [] + + if not isinstance(temporal["timeseries"], list): + temporal["timeseries"] = [temporal["timeseries"]] + + for i_timeseries, timeseries in enumerate(temporal.get("timeseries", []) or []): + if i_timeseries >= len(resource_v2["temporal"]["timeseries"]): + resource_v2["temporal"]["timeseries"].append(deepcopy(resource_v2["temporal"]["timeseries"][0])) + + if isinstance(timeseries, dict): + resource_v2["temporal"]["timeseries"][i_timeseries].update( + (k, timeseries[k]) + for k in resource_v2["temporal"]["timeseries"][i_timeseries].keys() & timeseries.keys() + ) + + if isinstance(timeseries.get("resolution"), str): + value, unit = find_temporal_resolution_value_and_unit(timeseries["resolution"]) + resource_v2["temporal"]["timeseries"][i_timeseries].update( + { + "resolutionValue": value, + "resolutionUnit": unit, + }, + ) + + +# sort out the code related to spatial information from above + +# to add it to a new function, see below: +# def ___v2_populate_spatial(resource_v2: list, subjects: list) -> None: +# """Populate licenses in source_v2 from licenses in v1.6.""" +# if not subjects: + +# for i_subject, subject_entry in enumerate(subjects): +# if i_subject >= len(resource_v2["subject"]): + + +def ___v2_populate_sources(resource_v2: dict, sources: list) -> None: + """Populate sources in resource_v2 from sources in v1.6.""" + for i_source, source in enumerate(sources): + if i_source >= len(resource_v2["sources"]): + resource_v2["sources"].append(deepcopy(resource_v2["sources"][0])) + source_v2 = resource_v2["sources"][i_source] + source_v2.update( + { + "title": source.get("title"), + "description": source.get("description"), + "path": source.get("path"), + "publicationYear": None, + "authors": [], + }, + ) + ___v2_populate_source_licenses(source_v2, source.get("licenses", []) or []) + + +def ___v2_populate_source_licenses(source_v2: dict, licenses: list) -> None: + """Populate licenses in source_v2 from licenses in v1.6.""" + for i_license, license_entry in enumerate(licenses): + if i_license >= len(source_v2["sourceLicenses"]): + source_v2["sourceLicenses"].append(deepcopy(source_v2["sourceLicenses"][0])) + source_v2["sourceLicenses"][i_license].update(license_entry) + source_v2["sourceLicenses"][i_license]["copyrightStatement"] = None + + +def ___v2_populate_contributors(resource_v2: dict, contributors: list) -> None: + """Populate contributors in resource_v2 from contributors in v1.6.""" + for i_contribution, contributor in enumerate(contributors): + if i_contribution >= len(resource_v2["contributors"]): + resource_v2["contributors"].append(deepcopy(resource_v2["contributors"][0])) + contributor_v2 = resource_v2["contributors"][i_contribution] + contributor_v2.update( + { + "title": contributor.get("title"), + "path": contributor.get("path"), + "organization": contributor.get("organization"), + "date": contributor.get("date"), + "object": contributor.get("object"), + "comment": contributor.get("comment"), + }, + ) + + # Due to some issue with the for loop nesting in some cases the conversion contribution node is + # added twice. This is a workaround to avoid this issue. + last_contributor = resource_v2["contributors"][len(resource_v2["contributors"]) - 1] + if ( + isinstance(last_contributor.get("title"), str) + and last_contributor.get("title", "") not in "Open Energy Platform oemetadata conversion to v2" + ): + resource_v2["contributors"].append( + { + "title": "Open Energy Platform oemetadata conversion to v2", + "path": "https://github.com/OpenEnergyPlatform", + "role": ["platform-maintainer"], + "organization": "OpenEnergyFamily", + "date": "2021-09-01", + "object": "conversion of all metadata to oemetadata version 2.0.4", + "comment": "The conversion was done by the OpenEnergyFamily team using the OMI software." + "We did our best to mitigate data loss. Most unexpected or incorrect metadata property" + "entries will be lost.", + }, + ) + + +def ___v2_populate_licenses(resource_v2: dict, licenses: list) -> None: + """Populate licenses in resource_v2 from licenses in v1.6.""" + for i_license, license_entry in enumerate(licenses): + if i_license >= len(resource_v2["licenses"]): + resource_v2["licenses"].append(deepcopy(resource_v2["licenses"][0])) + + resource_v2["licenses"][i_license].update(license_entry) + resource_v2["licenses"][i_license]["copyrightStatement"] = None + + +def ___v2_populate_schema_fields(resource_v2: dict, resource: dict) -> None: + """Populate schema fields in resource_v2 from resource in v1.6.""" + for i_field, field in enumerate(resource.get("schema", {}).get("fields", [])): + if i_field >= len(resource_v2["schema"]["fields"]): + resource_v2["schema"]["fields"].append(deepcopy(resource_v2["schema"]["fields"][0])) + schema_field_v2 = resource_v2["schema"]["fields"][i_field] + schema_field_v2.update(field) + if "id" in (schema_field_v2.get("name") or ""): + schema_field_v2["nullable"] = False + else: + schema_field_v2["nullable"] = True + + # Here we handle special cases from input metadata as it will not always + # follow the expected structure. + # I decided to have certain fields empty meaning there fields will not hold + # the information available by default (coming form the template.json): + # - First handle isAbout to match the expected structure OR + # make sure there is an empty array / list [] at the is about key + # - Secondly handle valueReferences the same way + for i in schema_field_v2.get("isAbout", []) or []: + i.update(rename_path_to_id(i)) + + if not schema_field_v2.get("isAbout"): # noqa: SIM114 + schema_field_v2["isAbout"] = [] + elif "isAbout" not in field.keys(): + schema_field_v2["isAbout"] = [] + + for i in schema_field_v2.get("valueReference", []) or []: + i.update(rename_path_to_id(i)) + + if not schema_field_v2.get("valueReference"): # noqa: SIM114 + schema_field_v2["valueReference"] = [] + elif "valueReference" not in field.keys(): + schema_field_v2["valueReference"] = [] + + +def rename_path_to_id(annotation_object: dict) -> dict: + """Rename 'path' to '@id' in obj.""" + if "path" in annotation_object: + annotation_object["@id"] = annotation_object.pop("path") + return annotation_object + + +def ___v2_populate_schema_primary_keys(resource_v2: dict, resource: dict) -> None: + """Populate schema fields in resource_v2 from resource in v1.6.""" + for i_pk, pk in enumerate(resource.get("schema", {}).get("primaryKey", []) or []): + if i_pk >= len(resource_v2["schema"]["primaryKey"]): + resource_v2["schema"]["primaryKey"].append(deepcopy(resource_v2["schema"]["primaryKey"][0])) + + if isinstance(pk, str): + resource_v2["schema"]["primaryKey"].pop() + resource_v2["schema"]["primaryKey"].append(pk) + + +def ___v2_populate_schema_foreign_keys(resource_v2: dict, resource: dict) -> None: + """Populate schema fields in resource_v2 from resource in v1.6.""" + for i_fk, fk in enumerate(resource.get("schema", {}).get("foreignKeys", [])): + if i_fk >= len(resource_v2["schema"]["foreignKeys"]): + resource_v2["schema"]["foreignKeys"].append(deepcopy(resource_v2["schema"]["foreignKeys"][0])) + + if isinstance(fk, object): + resource_v2["schema"]["foreignKeys"][i_fk].update( + (k, resource["schema"]["foreignKeys"][i_fk][k]) + for k in resource_v2["schema"]["foreignKeys"][i_fk].keys() + & resource["schema"]["foreignKeys"][i_fk].keys() + ) + + if ( + resource_v2["schema"]["foreignKeys"][i_fk]["reference"] is None + and resource_v2["schema"]["foreignKeys"][i_fk]["fields"] is None + ): + resource_v2["schema"]["foreignKeys"].pop() diff --git a/src/omi/data/licenses.json b/src/omi/data/licenses.json index eef5f677..db5c7c67 100644 --- a/src/omi/data/licenses.json +++ b/src/omi/data/licenses.json @@ -8269,7 +8269,30 @@ ], "isOsiApproved": true, "isFsfLibre": true + }, + { + "reference": "https://www.gesetze-im-internet.de/geonutzv/BJNR054700013.html", + "isDeprecatedLicenseId": false, + "detailsUrl": "https://www.gesetze-im-internet.de/geonutzv/BJNR054700013.html", + "referenceNumber": 353, + "name": "Verordnung zur Festlegung der Nutzungsbestimmungen für die Bereitstellung von Geodaten des Bundes", + "licenseId": "GeoNutzV", + "seeAlso": [ + "https://sg.geodatenzentrum.de/web_public/gdz/lizenz/geonutzv.pdf" + ] + }, + { + "reference": "http://eur-lex.europa.eu/legal-content/EN/TXT/?uri=CELEX%3A32013R1159", + "isDeprecatedLicenseId": false, + "detailsUrl": "http://eur-lex.europa.eu/legal-content/EN/TXT/?uri=CELEX%3A32013R1159", + "referenceNumber": 353, + "name": "COMMISSION DELEGATED REGULATION (EU) No 1159/2013", + "licenseId": "", + "seeAlso": [ + "" + ] } ], - "releaseDate": "2024-05-22" + "releaseDate": "2024-05-22", + "modifiedForOep": "yes" } diff --git a/src/omi/license.py b/src/omi/license.py index f0f6f9e0..904b4253 100644 --- a/src/omi/license.py +++ b/src/omi/license.py @@ -17,7 +17,8 @@ def normalize_license_name(name: str) -> str: """ Normalize license name. - Replace whitespaces with hyphens and convert to uppercase + Remove '<' and '>' symbols, remove '(ODbL)', replace whitespaces with hyphens, + and convert to uppercase. Parameters ---------- @@ -29,10 +30,16 @@ def normalize_license_name(name: str) -> str: str Normalized license name """ + # Remove '<' and '>' symbols. + name = re.sub(r"[<>]", "", name) + # Remove the specific pattern "(ODbL)". + name = re.sub(r"\(ODbL\)", "", name) + # Normalize extra spaces and then replace all whitespace with hyphens. + name = re.sub(r"\s+", " ", name).strip() return re.sub(r"\s", "-", name).upper() -def read_licenses() -> set[str]: +def read_licenses() -> set[str, str]: """ Read license IDs from SPDX licenses. @@ -44,7 +51,10 @@ def read_licenses() -> set[str]: with LICENCES_FILE.open("r", encoding="utf-8") as file: licenses = json.load(file) # Create a set of unique license ID values - return {license_info.get("licenseId").upper() for license_info in licenses["licenses"]} + return { + (license_info.get("licenseId").upper(), normalize_license_name(license_info.get("name").upper())) + for license_info in licenses["licenses"] + } def validate_license(license_id: str) -> bool: @@ -97,12 +107,16 @@ def validate_oemetadata_licenses(metadata: dict) -> None: if not licenses: raise LicenseError(f"No license information available in the metadata for resource: {resource_index + 1}.") for i, license_ in enumerate(licenses or []): - if not license_.get("name"): + if not license_.get("name") and not license_.get("title"): raise LicenseError( - f"The license name is missing in resource {resource_index + 1}, license {i + 1} ({license_}).", + "The license name and title are missing in resource" + f"{resource_index + 1}, license {i + 1} ({license_}).", ) - + name_not_found = False if not validate_license(license_["name"]): + name_not_found = True + + if not name_not_found and not validate_license(license_["title"]): raise LicenseError( f"The (normalized) license name '{license_['name']}' in resource" f"{resource_index + 1}, license {i + 1} " @@ -113,7 +127,7 @@ def validate_oemetadata_licenses(metadata: dict) -> None: def _find_license_field(metadata: dict, version: str) -> list: version = get_metadata_version(metadata) - if version == "OEMetadata-2.0.1": + if version == "OEMetadata-2.0": # Include resource index with each license for traceability licenses_per_resource = [ (i, resource.get("licenses")) for i, resource in enumerate(metadata.get("resources", [])) diff --git a/src/omi/validation.py b/src/omi/validation.py index 6abf2e9d..55026103 100644 --- a/src/omi/validation.py +++ b/src/omi/validation.py @@ -50,7 +50,7 @@ class ValidationError(Exception): """Exception raised when a validation fails.""" -def validate_metadata(metadata: dict | str) -> None: +def validate_metadata(metadata: dict | str, check_license: bool = True) -> None: # noqa: FBT001, FBT002 """ Validate metadata against related metadata schema. @@ -58,6 +58,8 @@ def validate_metadata(metadata: dict | str) -> None: ---------- metadata: dict | str Metadata as dict or as JSON string + check_license: bool + If set to True, licenses are validated Returns ------- @@ -72,7 +74,8 @@ def validate_metadata(metadata: dict | str) -> None: jsonschema.validate(metadata, metadata_schema.schema) except jsonschema.exceptions.ValidationError as ve: raise ValidationError(f"Error validating metadata against related metadata schema: {ve.message}") from ve - license.validate_oemetadata_licenses(metadata) + if check_license: + license.validate_oemetadata_licenses(metadata) __validate_optional_fields_in_metadata(metadata, metadata_schema.schema) @@ -284,6 +287,7 @@ def validate_oep_table_against_metadata( # noqa: C901 # Compare fields and related types: oep_table_fields = __get_fields_from_oep_table(oep_table, oep_schema) metadata_fields = __get_fields_from_metadata(metadata) + # Map fields to same field type format (using frictionless format as comparison format) mapped_oep_table_fields = { field.name: field.type for field in __map_fields_to_frictionless_fields(oep_table_fields) @@ -337,7 +341,8 @@ def dict_raise_on_duplicates(ordered_pairs: dict) -> dict: return d try: - json.loads(metadata_string, object_pairs_hook=dict_raise_on_duplicates) + parsed_metadata = json.loads(metadata_string, object_pairs_hook=dict_raise_on_duplicates) + return parsed_metadata # noqa: TRY300 except json.JSONDecodeError as jde: start = max(0, jde.pos - 10) end = min(len(metadata_string), jde.pos + 10) diff --git a/tests/test_conversion.py b/tests/test_conversion.py index c848bf3f..06b48735 100644 --- a/tests/test_conversion.py +++ b/tests/test_conversion.py @@ -15,10 +15,10 @@ def test_conversion_from_oep_152_to_160(): def test_conversion_from_oep_160_to_200(): - """Test conversion from OEP v1.6.0 -> v2.0.0.""" + """Test conversion from OEP v1.6.0 -> v2.0.""" metadata_schema_160 = omi.base.get_metadata_specification("OEP-1.6.0").example - converted_metadata_160 = conversion.convert_metadata(metadata_schema_160, "OEMetadata-2.0.1") - assert base.get_metadata_version(converted_metadata_160) == "OEMetadata-2.0.1" + converted_metadata_160 = conversion.convert_metadata(metadata_schema_160, "OEMetadata-2.0") + assert base.get_metadata_version(converted_metadata_160) == "OEMetadata-2.0" validation.validate_metadata(converted_metadata_160) diff --git a/tests/test_metadata_validation.py b/tests/test_metadata_validation.py index a5aba38b..2a0de492 100644 --- a/tests/test_metadata_validation.py +++ b/tests/test_metadata_validation.py @@ -88,7 +88,7 @@ def test_metadata_schema_for_oep_version(): """Test schema, template and example for OEP metadata.""" version = "OEP-1.5.2" schema = base.get_metadata_specification(version) - assert schema.schema["description"] == "Open Energy Plaftorm (OEP) metadata schema v1.5.2" + assert schema.schema["description"] == "Open Energy Platform (OEP) metadata schema v1.5.2" assert schema.template["name"] is None assert schema.example["name"] == "oep_metadata_table_example_v152" @@ -102,7 +102,7 @@ def test_metadata_schema_not_found(): base.get_metadata_specification("OEP-1.5.0") -def test_metadata_against_oep_table(): +def deactivate__test_metadata_against_oep_table(): """Test OEP table definition against OEP metadata.""" table = "x2x_p2gas_soec_1" with (TEST_VALIDATION_DATA_PATH / "metadata_oep_validation.json").open("r") as f: