diff --git a/.gitignore b/.gitignore index b7faf40..a2f76ca 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,14 @@ +# configurations +.env.override +.prod.env +.test.env + +# Certificates +certs/ + +# IDE +.idea/ + # Byte-compiled / optimized / DLL files __pycache__/ *.py[codz] @@ -204,4 +215,4 @@ cython_debug/ # Marimo marimo/_static/ marimo/_lsp/ -__marimo__/ +__marimo__/ \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..a7b8d96 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,27 @@ +FROM python:3.13.7-trixie + +# Install CA infrastructure +RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/* + +# Add AIoD cert and build custom bundle +RUN mkdir -p /certs +COPY certs/aiod-insight-centre.crt /certs/aiod-insight-centre.crt +RUN cat /etc/ssl/certs/ca-certificates.crt /certs/aiod-insight-centre.crt > /certs/custom-ca-bundle.crt + +# Make Python requests use the custom CA bundle +ENV REQUESTS_CA_BUNDLE=/certs/custom-ca-bundle.crt + +RUN useradd -m appuser +USER appuser +WORKDIR /home/appuser +RUN python -m venv .venv +RUN .venv/bin/python -m pip install uv + +COPY pyproject.toml /app/pyproject.toml +RUN .venv/bin/uv pip install -r /app/pyproject.toml + +COPY . /app +RUN .venv/bin/uv pip install file:///app + +ENTRYPOINT [".venv/bin/python", "/app/src/connector.py"] +CMD ["--mode", "all"] diff --git a/README.md b/README.md index 6d0c016..c5f8dfa 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,52 @@ -# zenodo-connector -The connector indexing metadata from Zenodo and registering it with AI-on-Demand +# AI-on-Demand Zenodo connector +Collects dataset metadata from [Zenodo](https://zenodo.org) and uploads it to AI-on-Demand. + +This package is not intended to be used directly by others, but may serve as an example of how to build a connector for the AI-on-Demand platform. +For more information on how to test this connector locally as a springboard for developing your own connector, reference the [Development](#Development) section below. + +### TODO +This package is work in progress. + +- [ ] Automatically publish to DockerHub on release +- [ ] Add tests + +## Installation +You can use the image directly from Docker Hub (TODO) or build it locally. + +From Docker Hub: `docker pull aiondemand/zenodo-connector`. + +To build a local image: + + - Clone the repository: `git clone https://github.com/aiondemand/zenodo-connector && cd zenodo-connector` + - Build the image: `docker build -t aiondemand/zenodo-connector -f Dockerfile .` + +### Configuring Client Credentials +You will need to configure what server the connector should connect to, as well as the credentials for the client that allow you to upload data. +The connector requires a `config.toml` file with a valid [aiondemand configuration](https://aiondemand.github.io/aiondemand/api/configuration/), +the default configuration can be found in the [`/script/config.prod.toml`](/script/config.prod.toml) file. +You will also need to have the 'Client Secret' for the client, which can be obtained from the keycloak administrator. +The client secret must be provided to the Docker container as an environment variable or in a dotenv file *similar to* [`script/.local.env`](script/.local.env) but named `script/.prod.env`. + +Please contact the Keycloak service maintainer to obtain said credentials you need if you are in charge of deploying this Zenodo connector. + +## Running the Connector +You will need to mount the aiondemand configuration to `/home/appuser/.aiod/config.toml` and provide environment variables directly with `-e` or through mounting the dotfile in `/home/appuser/.aiod/zenodo/.env`. The [`script/run.sh`](script/run.sh) script provides a convenience that automatically does this. +It takes one positional argument that has to be `local`, `test`, or `prod` to use the respective files in the `script` folder for configuration. +Any following arguments are interpreted as arguments to the main script. +For the latest commandline arguments, use `docker run aiondemand/zenodo-connector --help`. +Some example invocations that use the `script/run.sh` script: + + - `script/run.sh local --mode id --value 12345 --app-log-level debug` syncs one specific Zenodo record, and produces debug logs for the connector only. + - `script/run.sh test --mode since --value 2025-09-30T00:00 --root-log-level debug` syncs all records created or modified after 2025-09-30T00:00 (in reverse chronological order). + - `script/run.sh prod --mode all --root-log-level info` indexes all records on Zenodo, producing info logs for the connector and all its dependencies (this is the default). + +## Development +You can test the connector when running the [metadata catalogue](https://github.com/aiondemand/aiod-rest-api) locally. +The default configurations for this setup can be found in the [`.local.env`](script/.local.env) and [`config.local.toml`](script/config.local.toml) files. + +When connecting to the AI-on-Demand test or production server, you will need to a dedicated client registered in the keycloak instance which is connected to the REST API you want to upload data to. +See [this form]() to apply for a client. The client will need to have a `platform_X` role attached, where `X` is the name of the platform from which you register assets. +When a client is created, you will need its 'Client ID' and 'Client Secret' and update the relevant configuration and environment files accordingly. + +## Disclaimer +This project is not affiliated with Zenodo in any way. diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..c9755db --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,20 @@ +[project] +name="aiod-zenodo-connector" +dependencies = [ + "aiondemand", + "python-dotenv", +] +dynamic=["version"] + +[build-system] +requires = ["hatchling >= 1.26"] +build-backend = "hatchling.build" + +[tool.hatch.version] +path = "src/__version__.py" + +[tool.hatch.metadata] +allow-direct-references = true + +[tool.hatch.build.targets.wheel] +packages = ["src/"] diff --git a/script/.local.env b/script/.local.env new file mode 100644 index 0000000..f796d68 --- /dev/null +++ b/script/.local.env @@ -0,0 +1,4 @@ +CLIENT_SECRET=S2zo0zW6QMy8ffcqCozYbHkj0JajcWtQ +PLATFORM_NAME=example +STOP_ON_UNEXPECTED_ERROR=true +PER_DATASET_DELAY= diff --git a/script/config.local.toml b/script/config.local.toml new file mode 100644 index 0000000..78e9328 --- /dev/null +++ b/script/config.local.toml @@ -0,0 +1,3 @@ +api_server = 'http://localhost/' +auth_server = 'http://localhost/aiod-auth/' +client_id = 'sdk-service' diff --git a/script/config.prod.toml b/script/config.prod.toml new file mode 100644 index 0000000..6b9978d --- /dev/null +++ b/script/config.prod.toml @@ -0,0 +1,3 @@ +api_server = 'https://api.aiod.eu/' +auth_server = 'https://auth.aiod.eu/aiod-auth/' +client_id = 'platform_openml' diff --git a/script/config.test.toml b/script/config.test.toml new file mode 100644 index 0000000..877cb0e --- /dev/null +++ b/script/config.test.toml @@ -0,0 +1,3 @@ +api_server = 'https://aiod.insight-centre.org/' +auth_server = 'https://ai4europe.test.fedcloud.eu/aiod-auth/' +client_id = 'platform_zenodo' \ No newline at end of file diff --git a/script/run.sh b/script/run.sh new file mode 100755 index 0000000..10f3005 --- /dev/null +++ b/script/run.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +WHICH="${1:-prod}" +shift + +ABSOLUTE_SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +ENV_FILE="${ABSOLUTE_SCRIPT_DIR}/.${WHICH}.env" +CONFIG_FILE="${ABSOLUTE_SCRIPT_DIR}/config.${WHICH}.toml" + +if [[ ! -f "$ENV_FILE" ]]; then + echo "Error: expected env file not found: $ENV_FILE" >&2 + exit 1 +fi + +if [[ ! -f "$CONFIG_FILE" ]]; then + echo "Error: expected config file not found: $CONFIG_FILE" >&2 + exit 1 +fi + +docker run \ + -v "${ENV_FILE}":/home/appuser/.aiod/zenodo/.env \ + -v "${CONFIG_FILE}":/home/appuser/.aiod/config.toml \ + --network=host \ + -it aiondemand/zenodo-connector "$@" diff --git a/src/__version__.py b/src/__version__.py new file mode 100644 index 0000000..bab3b25 --- /dev/null +++ b/src/__version__.py @@ -0,0 +1 @@ +version="0.0.0" \ No newline at end of file diff --git a/src/connector.py b/src/connector.py new file mode 100644 index 0000000..daf27ff --- /dev/null +++ b/src/connector.py @@ -0,0 +1,519 @@ +from __future__ import annotations + +import logging +import os +import time +from argparse import ArgumentParser +from http import HTTPStatus +from pathlib import Path +from typing import Iterator, Optional + +import requests +from requests.exceptions import JSONDecodeError as RequestsJSONDecodeError, RequestException + +import aiod +from aiod.authentication import set_token, Token +from dotenv import load_dotenv + +logger = logging.getLogger(__name__) + +REQUEST_TIMEOUT = 10 +MAX_DESCRIPTION_LENGTH = 1800 + +PLATFORM_NAME = "zenodo" + +STOP_ON_UNEXPECTED_ERROR: bool = False +PER_DATASET_DELAY: Optional[float] = None +ZENODO_PAGE_SIZE: int = 25 + + +class ParsingError(Exception): + pass + + +class ServerError(Exception): + pass + + +def _paginate_zenodo_records(page_size: Optional[int] = None) -> Iterator[dict]: + base_url = "https://zenodo.org/api/records" + page = 1 + + if page_size is None: + page_size = ZENODO_PAGE_SIZE + + while True: + params = { + "page": page, + "size": page_size, + "all_versions": 1, + "sort": "mostrecent", + } + logger.debug("Requesting Zenodo records: %s params=%s", base_url, params) + + try: + response = requests.get(base_url, params=params, timeout=REQUEST_TIMEOUT) + except RequestException as e: + logger.warning( + "Request error while fetching %s (page %s): %s; skipping this page and continuing.", + base_url, + page, + e, + ) + page += 1 + continue + + if not response.ok: + try: + content = response.json() + except Exception: + content = response.text + logger.warning( + "Non-OK response while fetching %s (page %s): (%s) %s; skipping this page and continuing.", + response.url, + page, + response.status_code, + content, + ) + page += 1 + continue + + try: + data = response.json() + hits = data.get("hits", {}).get("hits", []) + except Exception as e: + logger.exception("Error parsing Zenodo response") + raise ParsingError( + f"Could not parse Zenodo response ({response.status_code}): " + f"{response.content}" + ) from e + + if not hits: + logger.debug("No more Zenodo records returned, stopping pagination.") + break + + logger.debug("Fetched %d Zenodo records (page %d)", len(hits), page) + for record in hits: + yield record + + page += 1 + + +def list_records(from_id: Optional[int] = None) -> Iterator[dict]: + from_id = from_id or 0 + for record in _paginate_zenodo_records(): + try: + identifier = int(record["id"]) + except Exception: + logger.error("Zenodo record without integer 'id': %s", record, exc_info=True) + continue + + if identifier < from_id: + continue + + try: + full_record = fetch_zenodo_record(identifier) + except (ServerError, ParsingError, RequestException) as e: + logger.warning( + "Skipping Zenodo record %s due to fetch error: %s", identifier, e + ) + continue + + yield full_record + + +def fetch_zenodo_record(identifier: int) -> dict: + url = f"https://zenodo.org/api/records/{identifier}" + logger.debug("Fetching Zenodo record %s", url) + + try: + response = requests.get(url, timeout=REQUEST_TIMEOUT) + except RequestException as e: + logger.warning("Request error while fetching %s: %s", url, e) + raise ServerError( + f"Error while fetching {url} from Zenodo: request error {e}" + ) from e + + if not response.ok: + try: + msg = response.json() + except Exception: + msg = response.text + raise ServerError( + f"Error while fetching {url} from Zenodo: " + f"({response.status_code}) {msg}" + ) + + try: + return response.json() + except Exception as e: + logger.exception("Error parsing JSON for Zenodo record %s", identifier) + raise ParsingError( + f"Error parsing JSON of Zenodo record {identifier}: {response.content}" + ) from e + + +def _convert_record_to_aiod(record: dict) -> dict: + numeric_id = int(record["id"]) + identifier = str(numeric_id) + metadata = record.get("metadata", {}) or {} + links = record.get("links", {}) or {} + + title = metadata.get("title") or f"Zenodo record {identifier}" + if not isinstance(title, str): + title = str(title) + if len(title) > 256: + text_break = " [...]" + title = title[: 256 - len(text_break)] + text_break + + description = metadata.get("description") or "" + if not isinstance(description, str): + logger.warning( + "Unexpected description type for record %s: %r", identifier, description + ) + description = str(description) + + if len(description) > MAX_DESCRIPTION_LENGTH: + text_break = " [...]" + description = description[: MAX_DESCRIPTION_LENGTH - len(text_break)] + text_break + + date_published = record.get("created") + + license_value = None + license_meta = metadata.get("license") + if isinstance(license_meta, dict): + license_value = ( + license_meta.get("id") + or license_meta.get("title") + or license_meta.get("url") + ) + elif isinstance(license_meta, str): + license_value = license_meta + + keywords = metadata.get("keywords") or [] + if isinstance(keywords, str): + keywords = [keywords] + + distributions = [] + + html_link = links.get("html") or links.get("latest_html") + if html_link: + distributions.append( + { + "content_url": html_link, + "encoding_format": "text/html", + } + ) + + files = record.get("files") or [] + for f in files: + f_links = f.get("links") or {} + file_url = f_links.get("download") or f_links.get("self") + if not file_url: + continue + encoding_format = f.get("type") or "application/octet-stream" + distributions.append( + { + "content_url": file_url, + "encoding_format": encoding_format, + } + ) + + platform_identifier = f"zenodo.org:{identifier}" + + return { + "platform": PLATFORM_NAME, + "platform_resource_identifier": platform_identifier, + "name": title, + "version": metadata.get("version"), + "same_as": html_link or links.get("self"), + "description": {"plain": description}, + "date_published": date_published, + "license": license_value, + "distribution": distributions, + "is_accessible_for_free": True, + "keyword": keywords, + "size": None, + } + + +def upsert_dataset(record: dict) -> int: + identifier = str(record["id"]) + + try: + local_dataset = _convert_record_to_aiod(record) + platform_identifier = local_dataset["platform_resource_identifier"] + + try: + aiod_dataset = aiod.datasets.get_asset_from_platform( + platform=PLATFORM_NAME, + platform_identifier=platform_identifier, + data_format="json", + ) + except KeyError: + aiod_dataset = None + except RequestsJSONDecodeError as e: + logger.warning( + "Non-JSON response when checking existing Zenodo asset %s in AIoD: %s. " + "Treating as not found and attempting registration.", + platform_identifier, + e, + ) + aiod_dataset = None + + if aiod_dataset is None: + response = aiod.datasets.register(metadata=local_dataset) + if isinstance(response, str): + logger.debug("Indexed Zenodo record %s: %s", identifier, response) + return HTTPStatus.OK + elif isinstance(response, requests.Response): + logger.warning( + "Error uploading Zenodo record %s " + "(%s): %s", + identifier, + response.status_code, + response.content, + ) + return response.status_code + raise RuntimeError( + f"Unexpected response type from aiod.datasets.register for {identifier}: " + f"{type(response)}" + ) + + if "identifier" not in aiod_dataset: + raise RuntimeError( + "Unexpected server response retrieving Zenodo dataset " + f"{identifier} from AI-on-Demand: {aiod_dataset}" + ) + + response = aiod.datasets.replace( + identifier=aiod_dataset["identifier"], metadata=local_dataset + ) + if response.status_code == HTTPStatus.OK: + logger.debug( + "Updated Zenodo record %s as AIoD dataset %s", + identifier, + aiod_dataset["identifier"], + ) + else: + logger.warning( + "Could not update AIoD dataset %s for Zenodo record %s " + "(%s): %s", + aiod_dataset["identifier"], + identifier, + response.status_code, + response.content, + ) + return response.status_code + + except Exception as e: + logger.exception( + "Exception encountered when upserting Zenodo record %s.", identifier, exc_info=e + ) + if STOP_ON_UNEXPECTED_ERROR: + raise + return -1 + + +def parse_args(): + parser = ArgumentParser() + + parser.add_argument( + "--mode", + choices=["all", "since", "id"], + required=True, + help=( + "'id': index a single Zenodo record by numeric id. " + "'since': index all records with id >= VALUE (or 'auto'). " + "'all': index all records." + ), + ) + parser.add_argument( + "--value", + default=None, + required=False, + type=str, + help=( + "For mode 'id' this must be a Zenodo numeric identifier. " + "For mode 'since' this must be a numeric id, or 'auto' to start " + "from the newest record already indexed in AIoD. " + "Must not be set for mode 'all'." + ), + ) + + log_levels = [level.lower() for level in logging.getLevelNamesMapping()] + parser.add_argument( + "--app-log-level", + choices=log_levels, + default="info", + help="Emit log messages of at least this level for the connector itself.", + ) + parser.add_argument( + "--root-log-level", + choices=log_levels, + default="error", + help="Emit log messages of at least this level for dependencies.", + ) + + args = parser.parse_args() + + if args.mode == "all" and args.value: + logger.error("Cannot run mode 'all' when a value is supplied.") + raise SystemExit(1) + + return args + + +def configure_connector(): + global PLATFORM_NAME, STOP_ON_UNEXPECTED_ERROR, PER_DATASET_DELAY, ZENODO_PAGE_SIZE + + dot_file = Path("~/.aiod/zenodo/.env").expanduser() + if dot_file.exists() and load_dotenv(dot_file): + logger.info("Loaded variables from %s", dot_file) + else: + logger.info("No environment variables loaded from %s.", dot_file) + + PLATFORM_NAME = os.getenv("PLATFORM_NAME", PLATFORM_NAME) + + delay = os.getenv("PER_DATASET_DELAY") + PER_DATASET_DELAY = float(delay) if delay else None + + STOP_ON_UNEXPECTED_ERROR = ( + str(os.getenv("STOP_ON_UNEXPECTED_ERROR", str(STOP_ON_UNEXPECTED_ERROR))).lower() + == "true" + ) + + page_size_env = os.getenv("ZENODO_PAGE_SIZE") + if page_size_env: + try: + ZENODO_PAGE_SIZE = int(page_size_env) + except ValueError: + logger.warning( + "Invalid ZENODO_PAGE_SIZE=%r; falling back to %d", + page_size_env, + ZENODO_PAGE_SIZE, + ) + + token = os.getenv("CLIENT_SECRET") + assert token, "CLIENT_SECRET environment variable not set" + + masked_token = "*" * max(4, (len(token) - 4)) + token[-4:] + + logger.info("%-25s %s", "aiondemand version:", aiod.version) + logger.info("%-25s %s", "STOP_ON_UNEXPECTED_ERROR:", STOP_ON_UNEXPECTED_ERROR) + logger.info("%-25s %s", "PER_DATASET_DELAY:", PER_DATASET_DELAY) + logger.info("%-25s %s", "AI-on-Demand API server:", aiod.config.api_server) + logger.info("%-25s %s", "Platform Name:", PLATFORM_NAME) + logger.info("%-25s %s", "Authentication server:", aiod.config.auth_server) + logger.info("%-25s %s", "Client ID:", aiod.config.client_id) + logger.info("%-25s %s", "Using secret:", masked_token) + + set_token(Token(client_secret=token)) + + logger.info( + "Configured AI-on-Demand client token for Zenodo connector " + "(skipping authorization_test)." + ) + + +def get_newest_indexed_record() -> str: + logger.info("Finding last uploaded Zenodo record on AI-on-Demand") + last_id = 0 + batch_size = 100 + + for offset in range(0, 1_000_000, batch_size): + datasets = aiod.datasets.get_list( + platform=PLATFORM_NAME, + data_format="json", + offset=offset, + limit=batch_size, + ) + if not datasets: + break + + for d in datasets: + pri = d.get("platform_resource_identifier") + if not isinstance(pri, str): + continue + numeric_part = "".join(ch for ch in pri.split(":")[-1] if ch.isdigit()) + if not numeric_part: + continue + try: + last_id = max(last_id, int(numeric_part)) + except ValueError: + continue + + logger.info("Current highest Zenodo id seen: %s", last_id) + + logger.info("Newest indexed Zenodo id in AIoD: %s", last_id) + return str(last_id) + + +def main(): + args = parse_args() + + logging.basicConfig(level=args.root_log_level.upper()) + logger.setLevel(args.app_log_level.upper()) + + configure_connector() + + errors: list[Optional[Exception]] = [] + + mode = args.mode + value = args.value + + if mode == "id": + if not value or not value.isdigit(): + logger.error("For mode 'id', --value must be an integer Zenodo id, got %r", value) + raise SystemExit(1) + + record = fetch_zenodo_record(int(value)) + upsert_dataset(record) + + elif mode == "since": + if value == "auto": + value = get_newest_indexed_record() + + if not value or not value.isdigit(): + logger.error( + "For mode 'since', --value must be an integer Zenodo id or 'auto', got %r", + value, + ) + raise SystemExit(1) + + start_id = int(value) + logger.info("Indexing Zenodo records with id >= %s", start_id) + + for record in list_records(from_id=start_id): + try: + upsert_dataset(record) + errors.append(None) + except Exception as e: + logger.error("Unrecoverable error upserting Zenodo record %s", record.get("id")) + logger.exception(e) + errors.append(e) + + if len(errors) > 10: + errors.pop(0) + if sum(e is not None for e in errors) > 5: + logger.error( + "Stopping because too many errors were encountered when " + "indexing Zenodo records." + ) + raise SystemExit(1) + + if PER_DATASET_DELAY: + time.sleep(PER_DATASET_DELAY) + + elif mode == "all": + logger.info("Indexing ALL Zenodo records (this may take a very long time)") + for record in list_records(): + upsert_dataset(record) + if PER_DATASET_DELAY: + time.sleep(PER_DATASET_DELAY) + else: + raise NotImplementedError(f"Unexpected mode: {mode!r}") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/verify.py b/verify.py new file mode 100644 index 0000000..d5b8ded --- /dev/null +++ b/verify.py @@ -0,0 +1,61 @@ +""" Verifies data uploaded by this connector is similar to the data already on production. + +It's not super effective but should provide confidence. +Expecting exact equality is kind of hard since datasets on Hugging Face may have been updated +whereas the indexed data on AI-on-Demand is not. This means that a recently synchronized local +database likely has different, more up-to-date, metadata. + +The difference for the specific default repositories here confirm this. Awesome-gpt-prompts has +had its keywords and distributions updated. +""" +from http import HTTPStatus +from typing import Iterable + +import requests + +repositories = list(range(1000, 1009)) +LOCAL_PLATFORM = 'openml' +LOCAL_API = "http://localhost/" +LOCAL_API = "https://test.openml.org/aiod/" +PRODUCTION_API = "https://api.aiod.eu/" + + +def diff(local: dict, prod: dict, ignore: Iterable | None = None): + if only_local := set(local) - set(prod): + print("Local-only attributes:") + for key in only_local: + print(f"{key:10} {local[key]}") + + if only_prod := set(prod) - set(local): + print("Prod-only attributes:") + for key in only_prod: + print(f"{key:10} {prod[key]}") + + if ignore is None: + ignore = ["identifier", "platform", "ai_resource_identifier", "ai_asset_identifier", "aiod_entry"] + for key in set(local) & set(prod): + if key in ignore: + continue + local_value, prod_value = local[key], prod[key] + if local_value != prod_value: + try: + if isinstance(local_value, list) and isinstance(prod_value, list): + assert set(local_value) == set(prod_value) + continue + except: + pass + print(f"{key} in both instances, but differs:") + print(f"local {local_value}") + print(f"prod {prod_value}") + + + +for repo in repositories: + local = requests.get(f"{LOCAL_API}platforms/{LOCAL_PLATFORM}/datasets/{repo}") + assert local.status_code == HTTPStatus.OK, (local.reason, local.content, repo) + prod = requests.get(f"{PRODUCTION_API}platforms/openml/datasets/{repo}") + assert prod.status_code == HTTPStatus.OK, (prod.reason, prod.content, repo) + print(f"Comparing {repo}") + diff(local.json(), prod.json()) + +