diff --git a/pygeometa/core.py b/pygeometa/core.py index 328205a..f0b1484 100644 --- a/pygeometa/core.py +++ b/pygeometa/core.py @@ -334,6 +334,9 @@ def import_metadata(schema: str, metadata: str) -> dict: :returns: MCF object """ + content = None + error_message = None + if schema == 'autodetect': schemas = get_supported_schemas() else: @@ -344,11 +347,17 @@ def import_metadata(schema: str, metadata: str) -> dict: schema_object = load_schema(s) try: - return schema_object.import_(metadata) + content = schema_object.import_(metadata) + break except NotImplementedError: - raise RuntimeError(f'Import not supported for {s}') + error_message = f'Import not supported for {s}' except Exception as err: - raise RuntimeError(f'Import failed: {err}') + error_message = f'Import failed: {err}' + + if error_message is not None: + LOGGER.warning(error_message) + + return content def transform_metadata(input_schema: str, output_schema: str, diff --git a/pygeometa/helpers.py b/pygeometa/helpers.py index 916750e..dc07485 100644 --- a/pygeometa/helpers.py +++ b/pygeometa/helpers.py @@ -93,3 +93,48 @@ def json_serial(obj) -> Any: msg = f'{obj} type {type(obj)} not serializable' LOGGER.error(msg) raise TypeError(msg) + + +def generate_datetime(date_value: str) -> str: + """ + Helper function to derive RFC3339 date from MCF date type + + :param date_value: `str` of date value + + :returns: `str` of date-time value + """ + + value = None + + if isinstance(date_value, str) and date_value != 'None': + if len(date_value) == 10: # YYYY-MM-DD + format_ = '%Y-%m-%d' + elif len(date_value) == 7: # YYYY-MM + format_ = '%Y-%m' + elif len(date_value) == 4: # YYYY + format_ = '%Y' + elif len(date_value) == 19: # YYYY-MM-DDTHH:MM:SS + msg = 'YYYY-MM-DDTHH:MM:SS with no timezone; converting to UTC' + LOGGER.debug(msg) + format_ = '%Y-%m-%dT%H:%M:%S' + + LOGGER.debug('date type found; expanding to date-time') + value = datetime.strptime(date_value, format_).strftime('%Y-%m-%dT%H:%M:%SZ') # noqa + + elif isinstance(date_value, int) and len(str(date_value)) == 4: + date_value2 = str(date_value) + LOGGER.debug('date type found; expanding to date-time') + format_ = '%Y' + value = datetime.strptime(date_value2, format_).strftime('%Y-%m-%dT%H:%M:%SZ') # noqa + + elif isinstance(date_value, (date, datetime)): + value = date_value.strftime('%Y-%m-%dT%H:%M:%SZ') + + elif date_value in [None, 'None']: + value = datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ') + + else: + msg = f'Unknown date string: {date_value}' + raise RuntimeError(msg) + + return value diff --git a/pygeometa/schemas/__init__.py b/pygeometa/schemas/__init__.py index 79f2f0f..66f5d71 100644 --- a/pygeometa/schemas/__init__.py +++ b/pygeometa/schemas/__init__.py @@ -53,12 +53,13 @@ THISDIR = os.path.dirname(os.path.realpath(__file__)) SCHEMAS = { + 'dcat': 'pygeometa.schemas.dcat.DCATOutputSchema', 'iso19139': 'pygeometa.schemas.iso19139.ISO19139OutputSchema', 'iso19139-2': 'pygeometa.schemas.iso19139_2.ISO19139_2OutputSchema', 'iso19139-hnap': 'pygeometa.schemas.iso19139_hnap.ISO19139HNAPOutputSchema', # noqa 'oarec-record': 'pygeometa.schemas.ogcapi_records.OGCAPIRecordOutputSchema', # noqa + 'schema-org': 'pygeometa.schemas.schema_org.SchemaOrgOutputSchema', 'stac-item': 'pygeometa.schemas.stac.STACItemOutputSchema', - 'dcat': 'pygeometa.schemas.dcat.DCATOutputSchema', 'wmo-cmp': 'pygeometa.schemas.wmo_cmp.WMOCMPOutputSchema', 'wmo-wcmp2': 'pygeometa.schemas.wmo_wcmp2.WMOWCMP2OutputSchema', 'wmo-wigos': 'pygeometa.schemas.wmo_wigos.WMOWIGOSOutputSchema' diff --git a/pygeometa/schemas/ogcapi_records/__init__.py b/pygeometa/schemas/ogcapi_records/__init__.py index d7f52c8..8bf82ad 100644 --- a/pygeometa/schemas/ogcapi_records/__init__.py +++ b/pygeometa/schemas/ogcapi_records/__init__.py @@ -43,13 +43,12 @@ # # ================================================================= -from datetime import date, datetime import logging import os from typing import Union from pygeometa.core import get_charstring -from pygeometa.helpers import json_dumps +from pygeometa.helpers import generate_datetime, json_dumps from pygeometa.schemas.base import BaseOutputSchema THISDIR = os.path.dirname(os.path.realpath(__file__)) @@ -158,12 +157,11 @@ def write(self, mcf: dict, stringify: str = True) -> Union[dict, str]: LOGGER.debug('Checking for dates') - if 'dates' in mcf['identification']: - if 'creation' in mcf['identification']['dates']: - record['properties']['created'] = self.generate_date(mcf['identification']['dates']['creation']) # noqa - - if 'revision' in mcf['identification']['dates']: - record['properties']['updated'] = self.generate_date(mcf['identification']['dates']['revision']) # noqa + for key, value in mcf['identification']['dates'].items(): + if key == 'creation': + record['properties']['created'] = generate_datetime(value) + elif key == 'revision': + record['properties']['updated'] = generate_datetime(value) rights = get_charstring(mcf['identification'].get('rights'), self.lang1, self.lang2) @@ -417,47 +415,3 @@ def generate_link(self, distribution: dict) -> dict: link['channel'] = distribution['channel'] return link - - def generate_date(self, date_value: str) -> str: - """ - Helper function to derive RFC3339 date from MCF date type - - :param date_value: `str` of date value - - :returns: `str` of date-time value - """ - - value = None - - if isinstance(date_value, str) and date_value != 'None': - if len(date_value) == 10: # YYYY-MM-DD - format_ = '%Y-%m-%d' - elif len(date_value) == 7: # YYYY-MM - format_ = '%Y-%m' - elif len(date_value) == 4: # YYYY - format_ = '%Y' - elif len(date_value) == 19: # YYYY-MM-DDTHH:MM:SS - msg = 'YYYY-MM-DDTHH:MM:SS with no timezone; converting to UTC' - LOGGER.debug(msg) - format_ = '%Y-%m-%dT%H:%M:%S' - - LOGGER.debug('date type found; expanding to date-time') - value = datetime.strptime(date_value, format_).strftime('%Y-%m-%dT%H:%M:%SZ') # noqa - - elif isinstance(date_value, int) and len(str(date_value)) == 4: - date_value2 = str(date_value) - LOGGER.debug('date type found; expanding to date-time') - format_ = '%Y' - value = datetime.strptime(date_value2, format_).strftime('%Y-%m-%dT%H:%M:%SZ') # noqa - - elif isinstance(date_value, (date, datetime)): - value = date_value.strftime('%Y-%m-%dT%H:%M:%SZ') - - elif date_value in [None, 'None']: - value = datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ') - - else: - msg = f'Unknown date string: {date_value}' - raise RuntimeError(msg) - - return value diff --git a/pygeometa/schemas/schema_org/__init__.py b/pygeometa/schemas/schema_org/__init__.py new file mode 100644 index 0000000..a3c27dd --- /dev/null +++ b/pygeometa/schemas/schema_org/__init__.py @@ -0,0 +1,562 @@ +# ================================================================= +# +# Terms and Conditions of Use +# +# Unless otherwise noted, computer program source code of this +# distribution # is covered under Crown Copyright, Government of +# Canada, and is distributed under the MIT License. +# +# The Canada wordmark and related graphics associated with this +# distribution are protected under trademark law and copyright law. +# No permission is granted to use them outside the parameters of +# the Government of Canada's corporate identity program. For +# more information, see +# http://www.tbs-sct.gc.ca/fip-pcim/index-eng.asp +# +# Copyright title to all 3rd party software distributed with this +# software is held by the respective copyright holders as noted in +# those files. Users are asked to read the 3rd Party Licenses +# referenced with those assets. +# +# Copyright (c) 2025 Tom Kralidis +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# ================================================================= + +import json +import logging +import os +from typing import Union + +from pygeometa.core import get_charstring +from pygeometa.helpers import generate_datetime, json_dumps +from pygeometa.schemas.base import BaseOutputSchema + +THISDIR = os.path.dirname(os.path.realpath(__file__)) + +LOGGER = logging.getLogger(__name__) + +CONTACTS = [ + 'accountablePerson', + 'author', + 'contributor', + 'copyrightHolder', + 'creator', + 'editor', + 'funder', + 'maintainer', + 'producer', + 'provider', + 'publisher', + 'sponsor' +] + +TYPES = { + 'Series': 'series', + 'SoftwareApplication': 'software', + 'ProductModel': 'model', + 'Dataset': 'dataset', + 'WebAPI': 'service', + 'Property': 'attribute', + 'ListItem': 'feature' +} + + +class SchemaOrgOutputSchema(BaseOutputSchema): + """Schema.org schema""" + + def __init__(self): + """ + Initialize object + + :returns: pygeometa.schemas.base.BaseOutputSchema + """ + + description = 'Schema.org' + + super().__init__('schema-org', description, 'json', THISDIR) + + def import_(self, metadata: str) -> dict: + """ + Import metadata into MCF + + :param metadata: string of metadata content + + :returns: `dict` of MCF content + """ + + md = json.loads(metadata) + + mcf = { + 'mcf': { + 'version': '1.0', + }, + 'metadata': {}, + 'identification': { + 'extents': { + 'spatial': [] + } + }, + 'contact': {}, + 'distribution': {} + } + + mcf['metadata']['identifier'] = md['identifier'] + mcf['metadata']['charset'] = 'utf-8' + mcf['metadata']['type'] = TYPES[md.get('type', 'Dataset')] + mcf['metadata']['language'] = md.get('inLanguage', 'en') + + if 'spatialCoverage' in md or 'spatial' in md: + crs = 4326 + mcf['spatial'] = mcf.get('spatial', {}) + md['spatial'] = md.get('spatial', md.get('spatialCoverage')) + geo = self.get_first(self.get_first(md, 'spatial', {}), 'geo') + bbox = None + if geo and '@type' in geo.keys(): + if geo['@type'] == 'GeoCoordinates': + mcf['spatial']['datatype'] = 'vector' + mcf['spatial']['geomtype'] = 'point' + bbox = [geo['longitude'], geo['latitude'], + geo['longitude'], geo['latitude']] + elif geo['@type'] == 'GeoShape': + mcf['spatial']['datatype'] = 'vector' + mcf['spatial']['geomtype'] = 'polygon' + bt = geo['box'].replace(' ', ',').split() + if len(bt) == 4: + bbox = bt[1], bt[0], bt[3], bt[2] + if bbox: + mcf['identification']['extents']['spatial'].append({ + 'bbox': bbox, + 'crs': crs + }) + + if 'temporalCoverage' in md: + begin, end = self.get_first(md, 'temporalCoverage', '/').split('/') + mcf['identification']['extents']['temporal'] = [{ + 'begin': begin, + 'end': end + }] + + mcf['identification']['language'] = mcf['metadata']['language'] + mcf['identification']['title'] = self.get_first(md, 'name') + mcf['identification']['abstract'] = self.get_first(md, 'description') + + if 'dateCreated' in md: + mcf['identification']['creation'] = self.get_first(md, 'datePublished') # noqa + if 'datePublished' in md: + mcf['identification']['publication'] = self.get_first(md, 'datePublished') # noqa + if 'dateModified' in md: + mcf['identification']['revision'] = self.get_first(md, 'dateModified') # noqa + + if 'version' in md: + mcf['identification']['edition'] = self.get_first(md, 'version') + + mcf['identification']['keywords'] = { + 'default': { + 'keywords': md['keywords'] + } + } + + for dist in md['distribution']: + mcf['distribution'][dist['name']] = { + 'name': self.get_first(dist, 'name'), + 'type': self.get_first(dist, 'encodingFormat'), + 'url': self.get_first(dist, 'contentUrl'), + 'rel': 'download', + 'function': 'download' + } + + for ct in ['author', 'publisher', 'creator', 'provider', 'funder']: + if ct in md: + contact = {} + contact['url'] = md[ct]['url'] + contact['individualname'] = self.get_first(ct, 'name') + if md[ct]['@type'] == 'Organization': + contact['organization'] = self.get_first(ct, 'name') + + if 'address' in md[ct]: + contact['address'] = self.get_first(ct, 'streetAddress') + contact['city'] = self.get_first(ct, 'addressLocality') + contact['administrativearea'] = self.get_first(ct, 'addressRegion') # noqa + contact['postalcode'] = self.get_first(ct, 'postalCode') + contact['country'] = self.get_first(ct, 'addressCountry') + + if 'contactPoint' in md[ct]: + cp = self.get_first(ct, 'contactPoint') + contact['email'] = self.get_first(cp, 'email') + contact['fax'] = self.get_first(cp, 'fax') + + mcf['contact'][ct] = contact + + return mcf + + def write(self, mcf: dict, stringify: str = True) -> Union[dict, str]: + """ + Write outputschema to JSON string buffer + + :param mcf: dict of MCF content model + :param stringify: whether to return a string representation (default) + else native (dict, etree) + + :returns: `dict` or `str` of MCF as Schema.org + """ + + self.lang1 = mcf['metadata'].get('language') + self.lang2 = mcf['metadata'].get('language_alternate') + + minx, miny, maxx, maxy = (mcf['identification']['extents'] + ['spatial'][0]['bbox']) + + title = get_charstring(mcf['identification'].get('title'), + self.lang1, self.lang2) + + description = get_charstring(mcf['identification'].get('abstract'), + self.lang1, self.lang2) + + LOGGER.debug('Generating baseline record') + record = { + 'identifier': mcf['metadata']['identifier'], + "@context": "http://schema.org/", + '@type': 'schema:' + dict(zip(TYPES.values(), TYPES.keys()))[mcf['metadata']['hierarchylevel']], # noqa + 'spatialCoverage': [{ + '@type': 'schema:Place', + 'geo': { + '@type': 'schema:GeoShape', + 'box': f'{miny},{minx} {maxy},{maxx}' + } + }], + 'name': title[0], + 'description': description[0], + 'distribution': [] + } + + if self.lang1 is not None: + record['inLanguage'] = self.lang1 + + LOGGER.debug('Checking for temporal') + try: + begin = mcf['identification']['extents']['temporal'][0].get('begin') # noqa + end = mcf['identification']['extents']['temporal'][0].get('end') # noqa + + if begin in ['now', 'None', None]: + begin = '..' + else: + begin = str(begin) + + if end in ['now', 'None', None]: + end = '..' + else: + end = str(end) + + if [begin, end] == [None, None]: + record['time'] = None + elif [begin, end] == ['..', '..']: + pass + else: + record['temporalCoverage'] = [f'{begin}/{end}'] + except (IndexError, KeyError): + pass + + LOGGER.debug('Checking for dates') + + for key, value in mcf['identification']['dates'].items(): + if key == 'creation': + record['dateCreated'] = generate_datetime(value) + elif key == 'revision': + record['dateModified'] = generate_datetime(value) + elif key == 'publication': + record['datePublished'] = generate_datetime(value) + + LOGGER.debug('Checking for contacts') + + for ct in CONTACTS: + contacts = self.generate_contacts(mcf['contact'], ct) + if contacts and len(contacts) > 0: + record[ct] = contacts + + all_keywords = [] + + LOGGER.debug('Checking for keywords') + for key, value in mcf['identification']['keywords'].items(): + theme = {'concepts': []} + scheme = None + + keywords = get_charstring(value.get('keywords'), self.lang1, + self.lang2) + + if 'vocabulary' in value: + if 'url' in value['vocabulary']: + scheme = value['vocabulary']['url'] + elif 'name' in value['vocabulary']: + scheme = value['vocabulary']['name'] + + if scheme is None: + LOGGER.debug('Keywords found without vocabulary') + LOGGER.debug('Aggregating as bare keywords') + all_keywords.extend(keywords[0]) + else: + LOGGER.debug('Adding as theme/concepts') + for kw in keywords[0]: + theme['concepts'].append({'id': kw}) + + theme['scheme'] = scheme + + if all_keywords: + record['keywords'] = all_keywords + + LOGGER.debug('Checking for licensing') + if mcf['identification'].get('license') is not None: + license = mcf['identification']['license'] + + if 'url' in license: + LOGGER.debug('Encoding license as link') + record['license'] = license['url'] + else: + LOGGER.debug('Encoding license as property') + record['license'] = license['name'] + + LOGGER.debug('Checking for distribution') + for value in mcf['distribution'].values(): + record['distribution'].append(self.generate_link(value)) + + LOGGER.debug('Checking for content_info') + if mcf.get('content_info', {}): + ci = mcf['content_info'] + if ci.get('attributes', {}): + record['variableMeasured'] = self.generate_variables(ci['attributes']) # noqa + if ci.get('dimensions', {}): + record['variableMeasured'] = self.generate_variables(ci['dimensions']) # noqa + + if stringify: + return json_dumps(record) + + return record + + def generate_party(self, contact: dict, + lang1: str, lang2: str) -> dict: + """ + generate party construct from MCF contact + + :param contact: dict of MCF contact + :param self.lang1: primary language + :param self.lang2: alternate language + + + :returns: MCF contact as a party representation + """ + + organization_name = get_charstring(contact.get('organization'), + self.lang1, self.lang2) + + individual_name = get_charstring(contact.get('individualname'), + self.lang1, self.lang2) + + position_name = get_charstring(contact.get('positionname'), + self.lang1, self.lang2) + + address = get_charstring(contact.get('address'), + self.lang1, self.lang2) + + city = get_charstring(contact.get('city'), self.lang1, self.lang2) + + administrative_area = get_charstring(contact.get('administrativearea'), + self.lang1, self.lang2) + + postalcode = get_charstring(contact.get('postalcode'), + self.lang1, self.lang2) + + country = get_charstring(contact.get('country'), + self.lang1, self.lang2) + + rp = { + 'addresses': [{}], + 'roles': [] + } + + if individual_name[0] is not None: + rp['@type'] = "schema:Person" + rp['name'] = individual_name[0] + if position_name[0] is not None: + rp['jobTitle'] = position_name[0] + rp['affiliation'] = { + '@type': "schema:Organization", + 'name': organization_name[0] + } + else: + rp['@type'] = "schema:Organization" + rp['name'] = organization_name[0] + + if address[0] is not None: + rp['address'] = {"@type": "schema:PostalAddress"} + rp['address']['streetAddress'] = address[0] + if city[0] is not None: + rp['address']['addressLocality'] = city[0] + if administrative_area[0] is not None: + rp['address']['addressRegion'] = administrative_area[0] + if postalcode[0] is not None: + rp['address']['postalCode'] = postalcode[0] + if country[0] is not None: + rp['address']['addressCountry'] = country[0] + + if contact.get('phone') is not None: + LOGGER.debug('Formatting phone number') + phone = contact['phone'] + phone = phone.replace('-', '').replace('(', '').replace(')', '') + phone = phone.replace('+0', '+').replace(' ', '') + rp['telephone'] = phone + + if contact.get('email') is not None: + rp['email'] = contact.get('email') + + if 'url' in contact: + rp['url'] = contact['url'] + + return rp + + def generate_variables(self, dict_: dict) -> list: + """ + Generates 1..n variables + + :param dict_: `dict` of attributes + + :returns: `list` of variables + """ + + dict2 = [] + for d in dict_: + d2 = { + '@type': 'schema:PropertyValue', + 'name': d.get('name', ''), + 'decription': d.get('description', ''), + } + if d.get('max') is not None: + d2['maxValue'] = d['max'] + if d.get('min') is not None: + d2['minValue'] = d['min'] + if d.get('units') is not None: + d2['unitCode'] = d['unit'] + dict2.append(d2) + + return dict2 + + def generate_contacts(self, contact: dict, role: str) -> list: + """ + Generates 1..n contacts, streamlining identical + contacts with multiple roles + + :param contact: `dict` of contacts + :param role: `str` of role + + :returns: `list` of contacts + """ + + contacts = [] + + role_mcf_schema_map = { + 'accountablePerson': [], + 'author': ['originator'], + 'contributor': ['user'], + 'copyrightHolder': ['owner'], + 'creator': [], + 'editor': [], + 'funder': [], + 'maintainer': ['processor', 'custodian'], + 'producer': ['distributor', 'principalInvestigator'], + 'provider': ['resourceProvider'], + 'publisher': ['pointOfContact'], + 'sponsor': [] + } + + for key, value in contact.items(): + if any([value.get('role', key) == role, + value.get('role', key) in role_mcf_schema_map[role]]): + contacts.append( + self.generate_party(value, self.lang1, self.lang2)) + + return contacts + + def generate_link(self, distribution: dict) -> dict: + """ + Generates Schema.org link object from MCF distribution object + + :param distribution: `dict` of MCF distribution + + :returns: Schema.org link object + """ + + name = get_charstring(distribution.get('name'), + self.lang1, self.lang2) + + desc = get_charstring(distribution.get('description'), + self.lang1, self.lang2) + + link = { + '@type': 'schema:DataDownload', + 'contentUrl': distribution['url'] + } + + if distribution.get('type') is not None: + link['encodingFormat'] = distribution['type'] + + if name != [None, None]: + link['name'] = name[0] + elif name != [None, None]: + link['name'] = name[0] + + if desc != [None, None]: + link['description'] = desc[0] + + return link + + def get_first(self, obj, key, default=None): + """ + returns first element of a list else return element + + :param obj: any + + :returns: first element (str, num or dict) + """ + if key not in obj.keys() or not obj[key]: + return default + elif isinstance(obj[key], list): + if len(obj[key]) > 0: + return obj[key][0] + else: + return default + else: + return obj[key] + + def get_all(self, obj, key, default=[]): + """ + return list of elements + + :param obj: any + + :returns: list of elements + """ + if 'key' not in obj.keys() or not obj[key]: + return default + elif isinstance(obj[key], list): + return obj[key] + else: + return [obj[key]] diff --git a/tests/run_tests.py b/tests/run_tests.py index 2f4c7c6..28b517f 100644 --- a/tests/run_tests.py +++ b/tests/run_tests.py @@ -226,17 +226,17 @@ def test_get_supported_schemas(self): schemas = sorted(get_supported_schemas()) self.assertIsInstance(schemas, list, 'Expected list') - self.assertEqual(len(schemas), 9, + self.assertEqual(len(schemas), 10, 'Expected specific number of supported schemas') self.assertEqual(sorted(schemas), sorted(['dcat', 'iso19139', 'iso19139-2', - 'iso19139-hnap', 'oarec-record', + 'iso19139-hnap', 'oarec-record', 'schema-org', 'stac-item', 'wmo-cmp', 'wmo-wcmp2', 'wmo-wigos']), 'Expected exact list of supported schemas') schemas = get_supported_schemas(include_autodetect=True) - self.assertEqual(len(schemas), 10, + self.assertEqual(len(schemas), 11, 'Expected specific number of supported schemas') self.assertIn('autodetect', schemas, 'Expected autodetect in list')