From 08d738e3cbb2c98cc56edc94b2b5de08dcac3a15 Mon Sep 17 00:00:00 2001 From: Carmen Date: Wed, 7 Jan 2026 17:37:50 +0100 Subject: [PATCH 1/9] add job status info endpoint --- .vscode/launch.json | 24 +++ .vscode/tasks.json | 61 ++++++ README.md | 6 +- docker/Dockerfile | 5 +- docker/docker-compose.yml | 4 +- .../api/job_status_info.py | 110 +++++++++++ .../apidocs/job_status_info.py | 44 +++++ .../core/job_status_info.py | 174 ++++++++++++++++++ .../endpoints.py | 4 +- .../model/response_models.py | 72 +++++++- 10 files changed, 491 insertions(+), 13 deletions(-) create mode 100644 .vscode/launch.json create mode 100644 .vscode/tasks.json create mode 100644 src/actinia_ogc_api_processes_plugin/api/job_status_info.py create mode 100644 src/actinia_ogc_api_processes_plugin/apidocs/job_status_info.py create mode 100644 src/actinia_ogc_api_processes_plugin/core/job_status_info.py diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..f257107 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,24 @@ +{ + "configurations": [ + { + "name": "Docker: Python - Flask", + "type": "docker", + "request": "launch", + "preLaunchTask": "docker-run: debug", + "python": { + "pathMappings": [ + { + "localRoot": "${workspaceFolder}", + "remoteRoot": "/src/actinia-ogc-api-processes-plugin" + } + ], + "projectType": "flask" + }, + "dockerServerReadyAction": { + "action": "openExternally", + "pattern": "Running on (https?://\\S+|[0-9]+)", + "uriFormat": "%s://localhost:%s/processes" + } + } + ] +} diff --git a/.vscode/tasks.json b/.vscode/tasks.json new file mode 100644 index 0000000..3bdfac1 --- /dev/null +++ b/.vscode/tasks.json @@ -0,0 +1,61 @@ +{ + "version": "2.0.0", + "tasks": [ + { + "type": "docker-build", + "label": "docker-build", + "platform": "python", + "dockerBuild": { + "tag": "actinia-ogc-api-processes-plugin:latest", + "dockerfile": "${workspaceFolder}/docker/Dockerfile", + "context": "${workspaceFolder}" + } + }, + { + "type": "docker-run", + "label": "docker-run: debug", + "dependsOn": [ + "docker-build", + ], + "python": { + "module": "flask", + "args": [ + "-e", + "/src/.env", + "run", + "--no-debugger", + "--host", + "0.0.0.0", + "--port", + "4044" + ] + }, + "dockerRun": { + "containerName": "actinia-ogc-api-processes", + "remove": true, + "network": "actinia-docker_actinia-dev", + "ports": [ + { + "containerPort": 4044, + "hostPort": 4044 + } + ], + "env": { + "PYTHONUNBUFFERED": "1", + "PYTHONDONWRITEBYTECODE": "1", + "FLASK_APP": "actinia_ogc_api_processes_plugin.main", + "FLASK_DEBUG": "1", + "FLASK_ENV": "development" + }, + "customOptions": "--ip 172.18.0.12", + "volumes": [ + { + "localPath": "${workspaceFolder}", + "containerPath": "/src/actinia-ogc-api-processes-plugin", + "permissions": "rw" + } + ] + } + } + ] +} diff --git a/README.md b/README.md index 97b471a..17a8876 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ docker compose -f docker/docker-compose.yml up # -- only current plugin (Note: need to start actinia + valkey separately) docker compose -f docker/docker-compose.yml run --rm --service-ports --entrypoint sh actinia-ogc-api-processes # within docker -gunicorn -b 0.0.0.0:3003 -w 8 --access-logfile=- -k gthread actinia_ogc_api_processes_plugin.main:flask_app +gunicorn -b 0.0.0.0:4044 -w 8 --access-logfile=- -k gthread actinia_ogc_api_processes_plugin.main:flask_app ``` ### DEV setup @@ -20,7 +20,7 @@ gunicorn -b 0.0.0.0:3003 -w 8 --access-logfile=- -k gthread actinia_ogc_api_proc # Uncomment the volume mount of the ogc-api-processes-plugin and additional marked sections of actinia-ogc-api-processes service within docker/docker-compose.yml, # Note: might also need to set: # - within config/mount/sample.ini: processing_base_url = http://127.0.0.1:8088/api/v3 -# - within src/actinia_ogc_api_processes_plugin/main.py set port: flask_app.run(..., port=3003) +# - within src/actinia_ogc_api_processes_plugin/main.py set port: flask_app.run(..., port=4044) # then: docker compose -f docker/docker-compose.yml down docker compose -f docker/docker-compose.yml up --build @@ -29,7 +29,7 @@ docker compose -f docker/docker-compose.yml up --build docker attach $(docker ps | grep docker-actinia-ogc-api-processes | cut -d " " -f1) # In another terminal: example call of processes-endpoint: -curl -u actinia-gdi:actinia-gdi -X GET http://localhost:3003/processes +curl -u actinia-gdi:actinia-gdi -X GET http://localhost:4044/processes ``` ### Installation hints diff --git a/docker/Dockerfile b/docker/Dockerfile index 5bbd9ea..504b3cf 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -19,11 +19,8 @@ RUN pip3 install --no-cache-dir gunicorn && \ COPY . /src/actinia-ogc-api-processes-plugin/ RUN pip3 install --no-cache-dir -e /src/actinia-ogc-api-processes-plugin/ -# TODO -# For tests, when created: -# RUN chmod a+x /src/actinia-ogc-api-processes-plugin/tests.sh WORKDIR /src/actinia-ogc-api-processes-plugin # RUN make test -CMD ["gunicorn", "-b", "0.0.0.0:3003", "-w", "8", "--access-logfile=-", "-k", "gthread", "actinia_ogc_api_processes_plugin.main:flask_app"] +CMD ["gunicorn", "-b", "0.0.0.0:4044", "-w", "8", "--access-logfile=-", "-k", "gthread", "actinia_ogc_api_processes_plugin.main:flask_app"] diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 345beaa..aecb520 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -15,13 +15,13 @@ services: cap_add: - SYS_PTRACE ports: - - "3003:3003" + - "4044:4044" # -- For dev-setup/debugging uncomment following: # network_mode: "host" # stdin_open: true # tty: true # command: > - # sh -c "python -m actinia_ogc_api_processes_plugin.main --workers 1 --timeout 3600 --bind 0.0.0.0:3003" + # sh -c "python -m actinia_ogc_api_processes_plugin.main --workers 1 --timeout 3600 --bind 0.0.0.0:4044" actinia-core: diff --git a/src/actinia_ogc_api_processes_plugin/api/job_status_info.py b/src/actinia_ogc_api_processes_plugin/api/job_status_info.py new file mode 100644 index 0000000..1146b75 --- /dev/null +++ b/src/actinia_ogc_api_processes_plugin/api/job_status_info.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python +"""SPDX-FileCopyrightText: (c) 2026 by mundialis GmbH & Co. KG. + +SPDX-License-Identifier: GPL-3.0-or-later + +JobStatusInfo endpoint implementation. +""" + +__license__ = "GPL-3.0-or-later" +__author__ = "Carmen Tawalika" +__copyright__ = "Copyright 2026 mundialis GmbH & Co. KG" +__maintainer__ = "mundialis GmbH & Co. KG" + +from flask import jsonify, make_response +from flask_restful_swagger_2 import Resource, swagger +from requests.exceptions import ConnectionError as req_ConnectionError + +from actinia_ogc_api_processes_plugin.apidocs import job_status_info +from actinia_ogc_api_processes_plugin.authentication import require_basic_auth +from actinia_ogc_api_processes_plugin.core.job_status_info import ( + get_job_status_info, +) +from actinia_ogc_api_processes_plugin.model.response_models import ( + SimpleStatusCodeResponseModel, + StatusInfoResponseModel, +) +from actinia_ogc_api_processes_plugin.resources.logging import log + + +class JobStatusInfo(Resource): + """JobStatusInfo handling.""" + + def __init__(self) -> None: + """Initialise.""" + self.msg = "Return job status information" + + @require_basic_auth() + @swagger.doc(job_status_info.describe_job_status_info_get_docs) + def get(self, job_id): + """Return status information for a given job id.""" + try: + status, status_info, resp = get_job_status_info(job_id) + if status == 200: + # build StatusInfoResponseModel from status_info dict + model_kwargs = {} + for k in ( + "processID", + "type", + "jobID", + "status", + "message", + "created", + "started", + "finished", + "updated", + "progress", + "links", + ): + if k in status_info: + model_kwargs[k] = status_info[k] + + res = jsonify(StatusInfoResponseModel(**model_kwargs)) + return make_response(res, 200) + elif status == 401: + log.error("ERROR: Unauthorized Access") + log.debug(f"actinia response: {resp.text}") + res = jsonify( + SimpleStatusCodeResponseModel( + status=401, + message="ERROR: Unauthorized Access", + ), + ) + return make_response(res, 401) + elif status == 404: + log.error("ERROR: No such job") + log.debug(f"actinia response: {resp.text}") + res = jsonify( + { + "type": ( + "http://www.opengis.net/def/exceptions/" + "ogcapi-processes-1/1.0/no-such-job" + ), + "title": "No Such Job", + "status": 404, + "detail": f"Job '{job_id}' not found", + }, + ) + return make_response(res, 404) + else: + log.error("ERROR: Internal Server Error") + code = getattr(resp, "status_code", status) + text = getattr(resp, "text", "") + log.debug(f"actinia status code: {code}") + log.debug(f"actinia response: {text}") + res = jsonify( + SimpleStatusCodeResponseModel( + status=500, + message="ERROR: Internal Server Error", + ), + ) + return make_response(res, 500) + except req_ConnectionError as e: + log.error(f"Connection ERROR: {e}") + res = jsonify( + SimpleStatusCodeResponseModel( + status=503, + message=f"Connection ERROR: {e}", + ), + ) + return make_response(res, 503) diff --git a/src/actinia_ogc_api_processes_plugin/apidocs/job_status_info.py b/src/actinia_ogc_api_processes_plugin/apidocs/job_status_info.py new file mode 100644 index 0000000..c36e30d --- /dev/null +++ b/src/actinia_ogc_api_processes_plugin/apidocs/job_status_info.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python +"""SPDX-FileCopyrightText: (c) 2026 by mundialis GmbH & Co. KG. + +SPDX-License-Identifier: GPL-3.0-or-later + +API docs for JobStatusInfo endpoint. +""" + +__license__ = "GPL-3.0-or-later" +__author__ = "Carmen Tawalika" +__copyright__ = "Copyright 2026 mundialis GmbH & Co. KG" +__maintainer__ = "mundialis GmbH & Co. KG" + +from actinia_ogc_api_processes_plugin.model.response_models import ( + SimpleStatusCodeResponseModel, + StatusInfoResponseModel, +) + +describe_job_status_info_get_docs = { + "tags": ["job_status_info"], + "description": "Retrieves the status information for a job.", + "responses": { + "200": { + "description": "This response returns the job status information.", + "schema": StatusInfoResponseModel, + }, + "401": { + "description": "Unauthorized Access", + "schema": SimpleStatusCodeResponseModel, + }, + "404": { + "description": "Job not found", + "schema": SimpleStatusCodeResponseModel, + }, + "500": { + "description": "Internal Server Error", + "schema": SimpleStatusCodeResponseModel, + }, + "503": { + "description": "Connection Error", + "schema": SimpleStatusCodeResponseModel, + }, + }, +} diff --git a/src/actinia_ogc_api_processes_plugin/core/job_status_info.py b/src/actinia_ogc_api_processes_plugin/core/job_status_info.py new file mode 100644 index 0000000..bc4801e --- /dev/null +++ b/src/actinia_ogc_api_processes_plugin/core/job_status_info.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python +"""SPDX-FileCopyrightText: (c) 2026 by mundialis GmbH & Co. KG. + +SPDX-License-Identifier: GPL-3.0-or-later + +Core helper to fetch job status from actinia processing API. +""" + +__license__ = "GPL-3.0-or-later" +__author__ = "Carmen Tawalika" +__copyright__ = "Copyright 2026 mundialis GmbH & Co. KG" +__maintainer__ = "mundialis GmbH & Co. KG" + +from datetime import datetime, timedelta, timezone + +import requests +from flask import request +from requests.auth import HTTPBasicAuth + +from actinia_ogc_api_processes_plugin.resources.config import ACTINIA + + +def get_job_status(job_id): + """Retrieve job status from actinia processing service.""" + auth = request.authorization + kwargs = dict() + if auth: + kwargs["auth"] = HTTPBasicAuth(auth.username, auth.password) + + url_job = ( + f"{ACTINIA.processing_base_url}/resources/{auth.username}/" + f"resource_id-{job_id}" + ) + return requests.get(url_job, **kwargs) + + +def map_status(raw: object) -> str: + """Map actinia status values to OGC statusInfo values. + + Mapping: + - accepted -> accepted + - running -> running + - finished -> successful + - error -> failed + - terminated -> dismissed + + Default is 'accepted' when input is falsy or unknown. + """ + if not raw: + return "accepted" + s = str(raw).strip().lower() + mapping = { + "accepted": "accepted", + "running": "running", + "finished": "successful", + "error": "failed", + "terminated": "dismissed", + } + return mapping.get(s, "accepted") + + +def calculate_progress(data: dict): + """Return integer progress 0..100 from data or None. + + Supports nested object `progress: { num_of_steps, step }`. + Returns None on invalid input. + """ + progress = data.get("progress") + if not isinstance(progress, dict): + return None + + try: + raw_num = progress.get("num_of_steps") + raw_cur = progress.get("step") + num = int(raw_num) if isinstance(raw_num, int) else int(float(raw_num)) + cur = int(raw_cur) if isinstance(raw_cur, int) else int(float(raw_cur)) + except (TypeError, ValueError): + return None + + if num <= 0: + return None + + prog = round((cur / num) * 100) + return max(0, min(100, prog)) + + +def calculate_finished(data: dict): + """Return finished time as ISO string or None. + + Calculate `finished` from accept_timestamp + time_delta (seconds) + """ + start = data.get("accept_timestamp") + if start is None: + return None + + try: + start_dt = datetime.fromtimestamp(float(start), tz=timezone.utc) + td = float(data.get("time_delta")) + except (TypeError, ValueError): + return None + + finished_dt = start_dt + timedelta(seconds=td) + # format without microseconds + return finished_dt.replace(microsecond=0).isoformat() + + +def get_job_status_info(job_id): + """Return a tuple (status_code, status_info_dict_or_None, response). + + Maps the actinia job response into the OGC `statusInfo` structure when + possible. `response` is the original requests.Response for logging. + """ + resp = get_job_status(job_id) + status = resp.status_code + if status != 200: + return status, None, resp + + try: + data = resp.json() + except (ValueError, TypeError): + data = {} + + status_info = {} + status_info["jobID"] = job_id + status_info["status"] = map_status(data.get("status")) + status_info["type"] = data.get("type", "process") + status_info["message"] = data.get("message") + status_info["processID"] = data.get("resource_id") + + # Servers SHOULD set the value of the created field when a job has been + # accepted and queued for execution. + try: + created = datetime.fromtimestamp( + float(data.get("accept_timestamp")), + tz=timezone.utc, + ) + status_info["created"] = created.replace(microsecond=0).isoformat() + except (TypeError, ValueError): + pass + + # Whenever the status field of the job changes, servers SHOULD revise the + # value of the updated field. + # -> actinia-core updates this field every time anything was updated. + try: + updated = datetime.fromtimestamp( + float(data.get("timestamp")), + tz=timezone.utc, + ) + status_info["updated"] = updated.replace(microsecond=0).isoformat() + except (TypeError, ValueError): + pass + + # Servers SHOULD set the value of the started field when a job begins + # execution and is consuming compute resources. + # status_info["started"] = # TODO do we have this information? + + # Servers SHOULD set the value of the finished field when the execution of + # a job has completed and the process is no longer consuming compute + # resources. + # -> Returns none of job not finished. + finished_val = calculate_finished(data) + if finished_val is not None: + status_info["finished"] = finished_val + + prog_val = calculate_progress(data) + if prog_val is not None: + status_info["progress"] = prog_val + + links = data.get("links") + if not links: + links = [{"href": request.url, "rel": "self"}] + status_info["links"] = links + + return 200, status_info, resp diff --git a/src/actinia_ogc_api_processes_plugin/endpoints.py b/src/actinia_ogc_api_processes_plugin/endpoints.py index 6952097..2c4689f 100644 --- a/src/actinia_ogc_api_processes_plugin/endpoints.py +++ b/src/actinia_ogc_api_processes_plugin/endpoints.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -"""SPDX-FileCopyrightText: (c) 2018-2025 by mundialis GmbH & Co. KG. +"""SPDX-FileCopyrightText: (c) 2018-2026 by mundialis GmbH & Co. KG. SPDX-License-Identifier: GPL-3.0-or-later @@ -13,6 +13,7 @@ from flask_restful_swagger_2 import Api +from actinia_ogc_api_processes_plugin.api.job_status_info import JobStatusInfo from actinia_ogc_api_processes_plugin.api.process_description import ( ProcessDescription, ) @@ -25,5 +26,6 @@ def create_endpoints(flask_api: Api) -> None: # Endpoints following: https://docs.ogc.org/is/18-062r2/18-062r2.html#toc0 + apidoc.add_resource(JobStatusInfo, "/jobs/") apidoc.add_resource(ProcessList, "/processes") apidoc.add_resource(ProcessDescription, "/processes/") diff --git a/src/actinia_ogc_api_processes_plugin/model/response_models.py b/src/actinia_ogc_api_processes_plugin/model/response_models.py index ed405d7..c17b1b8 100644 --- a/src/actinia_ogc_api_processes_plugin/model/response_models.py +++ b/src/actinia_ogc_api_processes_plugin/model/response_models.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -"""SPDX-FileCopyrightText: (c) 2018-2025 by mundialis GmbH & Co. KG. +"""SPDX-FileCopyrightText: (c) 2018-2026 by mundialis GmbH & Co. KG. SPDX-License-Identifier: GPL-3.0-or-later @@ -7,8 +7,8 @@ """ __license__ = "GPL-3.0-or-later" -__author__ = "Anika Weinmann" -__copyright__ = "Copyright 2018-2025 mundialis GmbH & Co. KG" +__author__ = "Anika Weinmann, Carmen Tawalika" +__copyright__ = "Copyright 2018-2026 mundialis GmbH & Co. KG" __maintainer__ = "mundialis GmbH & Co. KG" @@ -39,3 +39,69 @@ class SimpleStatusCodeResponseModel(Schema): message="success", ) SimpleStatusCodeResponseModel.example = simple_response_example + + +class StatusInfoResponseModel(Schema): + """statusInfo schema from OGC API - Processes (part1).""" + + type: str = "object" + properties: ClassVar[dict] = { + "processID": {"type": "string"}, + "type": {"type": "string", "enum": ["process"]}, + "jobID": {"type": "string"}, + "status": { + "type": "string", + "enum": [ + "accepted", + "running", + "successful", + "failed", + "dismissed", + ], + }, + "message": {"type": "string"}, + "created": {"type": "string", "format": "date-time"}, + "started": {"type": "string", "format": "date-time"}, + "finished": {"type": "string", "format": "date-time"}, + "updated": {"type": "string", "format": "date-time"}, + "progress": {"type": "integer", "minimum": 0, "maximum": 100}, + "links": { + "type": "array", + "items": { + "type": "object", + "properties": { + "href": {"type": "string"}, + "rel": {"type": "string"}, + "type": {"type": "string"}, + "hreflang": {"type": "string"}, + "title": {"type": "string"}, + }, + "required": ["href"], + }, + }, + } + required: ClassVar[list[str]] = ["jobID", "status", "type"] + + +# attach examples +status_info_example = StatusInfoResponseModel( + jobID="96ed4cb9-1290-4409-b034-c162759c10a1", + status="successful", + type="process", + message="Processing successfully finished", + processID="resource_id-96ed4cb9-1290-4409-b034-c162759c10a1", + created="2026-01-06T11:02:14", + updated="2026-01-06T11:02:49", + finished="2026-01-06T11:02:49", + progress=100, + links=[ + { + "href": ( + "http://example.com/jobs/" + "96ed4cb9-1290-4409-b034-c162759c10a1" + ), + "rel": "self", + }, + ], +) +StatusInfoResponseModel.example = status_info_example From d5b97677ca2b29da3e0c7f1fa2047add1ec82f66 Mon Sep 17 00:00:00 2001 From: Carmen Date: Wed, 7 Jan 2026 17:53:19 +0100 Subject: [PATCH 2/9] lint --- .vscode/tasks.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.vscode/tasks.json b/.vscode/tasks.json index 3bdfac1..87e7afa 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -15,7 +15,7 @@ "type": "docker-run", "label": "docker-run: debug", "dependsOn": [ - "docker-build", + "docker-build" ], "python": { "module": "flask", From 4364daf40d67350e57f1fdb245254dc6b15cd941 Mon Sep 17 00:00:00 2001 From: Carmen Date: Thu, 8 Jan 2026 15:02:48 +0100 Subject: [PATCH 3/9] add tests --- .github/workflows/test.yml | 26 +-- .../core/job_status_info.py | 57 +++--- .../integrationtests/test_job_status_info.py | 78 ++++++++ tests/unittests/test_core_job_status_info.py | 189 ++++++++++++++++++ 4 files changed, 303 insertions(+), 47 deletions(-) create mode 100644 tests/integrationtests/test_job_status_info.py create mode 100644 tests/unittests/test_core_job_status_info.py diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index f3793d4..d491e1c 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -10,29 +10,7 @@ on: branches: [ main ] jobs: - - # unittests: - # runs-on: ubuntu-latest - # steps: - # - name: Checkout - # uses: actions/checkout@v4 - # - name: Set up Docker Buildx - # uses: docker/setup-buildx-action@v3 - # - name: Replace run only unittest command - # run: | - # sed -i "s+# RUN make test+RUN make unittest+g" docker/Dockerfile - # - name: Unittests of actinia-ogc-api-processes-plugin - # id: docker_build - # uses: docker/build-push-action@v6 - # with: - # push: false - # tags: actinia-ogc-api-processes-plugin-tests:alpine - # context: . - # file: docker/Dockerfile - # no-cache: true - # # pull: true - - integration-tests: + tests: runs-on: ubuntu-latest steps: - uses: actions/checkout@v6 @@ -45,6 +23,6 @@ jobs: - name: Docker logs actinia-core run: docker logs docker-actinia-core-1 - name: Run integration test - run: docker exec -t docker-actinia-ogc-api-processes-1 make integrationtest + run: docker exec -t docker-actinia-ogc-api-processes-1 make test - name: Stop containers run: docker compose -f "docker/docker-compose.yml" down diff --git a/src/actinia_ogc_api_processes_plugin/core/job_status_info.py b/src/actinia_ogc_api_processes_plugin/core/job_status_info.py index bc4801e..59cae29 100644 --- a/src/actinia_ogc_api_processes_plugin/core/job_status_info.py +++ b/src/actinia_ogc_api_processes_plugin/core/job_status_info.py @@ -20,18 +20,18 @@ from actinia_ogc_api_processes_plugin.resources.config import ACTINIA -def get_job_status(job_id): - """Retrieve job status from actinia processing service.""" +def get_actinia_job(job_id): + """Retrieve job status from actinia.""" auth = request.authorization kwargs = dict() if auth: kwargs["auth"] = HTTPBasicAuth(auth.username, auth.password) - url_job = ( + url = ( f"{ACTINIA.processing_base_url}/resources/{auth.username}/" f"resource_id-{job_id}" ) - return requests.get(url_job, **kwargs) + return requests.get(url, **kwargs) def map_status(raw: object) -> str: @@ -65,9 +65,12 @@ def calculate_progress(data: dict): Supports nested object `progress: { num_of_steps, step }`. Returns None on invalid input. """ + status = data.get("status") progress = data.get("progress") if not isinstance(progress, dict): return None + if status == "finished": + return 100 try: raw_num = progress.get("num_of_steps") @@ -79,8 +82,8 @@ def calculate_progress(data: dict): if num <= 0: return None - - prog = round((cur / num) * 100) + # calculate percentage with total steps + 1 to avoid 100% before finished + prog = round((cur / (num + 1)) * 100) return max(0, min(100, prog)) @@ -89,9 +92,10 @@ def calculate_finished(data: dict): Calculate `finished` from accept_timestamp + time_delta (seconds) """ - start = data.get("accept_timestamp") - if start is None: + status = data.get("status") + if status != "finished": return None + start = data.get("accept_timestamp") try: start_dt = datetime.fromtimestamp(float(start), tz=timezone.utc) @@ -104,17 +108,8 @@ def calculate_finished(data: dict): return finished_dt.replace(microsecond=0).isoformat() -def get_job_status_info(job_id): - """Return a tuple (status_code, status_info_dict_or_None, response). - - Maps the actinia job response into the OGC `statusInfo` structure when - possible. `response` is the original requests.Response for logging. - """ - resp = get_job_status(job_id) - status = resp.status_code - if status != 200: - return status, None, resp - +def parse_actinia_job(job_id, resp): + """Parse actinia job response into status_info dict.""" try: data = resp.json() except (ValueError, TypeError): @@ -152,7 +147,7 @@ def get_job_status_info(job_id): # Servers SHOULD set the value of the started field when a job begins # execution and is consuming compute resources. - # status_info["started"] = # TODO do we have this information? + # status_info["started"] = # TODO implement in actinia-core # Servers SHOULD set the value of the finished field when the execution of # a job has completed and the process is no longer consuming compute @@ -162,13 +157,29 @@ def get_job_status_info(job_id): if finished_val is not None: status_info["finished"] = finished_val - prog_val = calculate_progress(data) - if prog_val is not None: - status_info["progress"] = prog_val + progress_val = calculate_progress(data) + if progress_val is not None: + status_info["progress"] = progress_val links = data.get("links") if not links: links = [{"href": request.url, "rel": "self"}] status_info["links"] = links + return status_info + + +def get_job_status_info(job_id): + """Return a tuple (status_code, status_info_dict_or_None, response). + + Maps the actinia job response into the OGC `statusInfo` structure when + possible. `response` is the original requests.Response for logging. + """ + resp = get_actinia_job(job_id) + + status_code = resp.status_code + if status_code not in {200, 400}: + return status_code, None, resp + + status_info = parse_actinia_job(job_id, resp) return 200, status_info, resp diff --git a/tests/integrationtests/test_job_status_info.py b/tests/integrationtests/test_job_status_info.py new file mode 100644 index 0000000..1da5b0f --- /dev/null +++ b/tests/integrationtests/test_job_status_info.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python +"""SPDX-FileCopyrightText: (c) 2026 by mundialis GmbH & Co. KG. + +SPDX-License-Identifier: GPL-3.0-or-later +""" + +__license__ = "GPL-3.0-or-later" +__author__ = "Carmen Tawalika" +__copyright__ = "Copyright 2026 mundialis GmbH & Co. KG" +__maintainer__ = "mundialis GmbH & Co. KG" + + +import pytest +from flask import Response + +from tests.testsuite import TestCase + + +class JobStatusInfoTest(TestCase): + """Integration tests for /jobs/ endpoint.""" + + # @pytest.mark.integrationtest + # def test_get_job_status_success(self) -> None: + # """Successful query returns statusInfo-like structure.""" + # # example job id used in response model examples + # job_id = "96ed4cb9-1290-4409-b034-c162759c10a1" + # resp = self.app.get(f"/jobs/{job_id}", headers=self.HEADER_AUTH) + # assert isinstance(resp, Response) + # assert resp.status_code == 200 + # assert hasattr(resp, "json") + + # assert "jobID" in resp.json, "There is no 'jobID' in response" + # assert "status" in resp.json, "There is no 'status' in response" + # assert "links" in resp.json, "There is no 'links' in response" + + @pytest.mark.integrationtest + def test_get_job_status_missing_auth(self) -> None: + """Request without auth returns 401.""" + job_id = "96ed4cb9-1290-4409-b034-c162759c10a1" + resp = self.app.get(f"/jobs/{job_id}") + assert isinstance(resp, Response) + assert resp.status_code == 401 + assert hasattr(resp, "json") + assert "message" in resp.json + assert resp.json["message"] == "Authentication required" + + @pytest.mark.integrationtest + def test_get_job_status_false_auth(self) -> None: + """Wrong credentials return 401 and error message.""" + job_id = "96ed4cb9-1290-4409-b034-c162759c10a1" + resp = self.app.get(f"/jobs/{job_id}", headers=self.HEADER_AUTH_WRONG) + assert isinstance(resp, Response) + assert resp.status_code == 401 + assert hasattr(resp, "json") + assert "message" in resp.json + assert resp.json["message"] == "ERROR: Unauthorized Access" + + @pytest.mark.integrationtest + def test_get_job_status_not_found(self) -> None: + """Non-existent job id returns 404 with OGC exception type.""" + resp = self.app.get("/jobs/invalid_job_id", headers=self.HEADER_AUTH) + assert isinstance(resp, Response) + assert resp.status_code == 404 + assert hasattr(resp, "json") + assert "type" in resp.json + expected = ( + "http://www.opengis.net/def/exceptions/" + "ogcapi-processes-1/1.0/no-such-job" + ) + assert resp.json["type"] == expected + + @pytest.mark.integrationtest + def test_get_job_method_not_allowed(self) -> None: + """Other methods than GET return 405 Method Not Allowed.""" + resp = self.app.post("/jobs/invalid_job_id", headers=self.HEADER_AUTH) + assert isinstance(resp, Response) + assert resp.status_code == 405 + assert hasattr(resp, "json") diff --git a/tests/unittests/test_core_job_status_info.py b/tests/unittests/test_core_job_status_info.py new file mode 100644 index 0000000..43aac38 --- /dev/null +++ b/tests/unittests/test_core_job_status_info.py @@ -0,0 +1,189 @@ +#!/usr/bin/env python +"""SPDX-FileCopyrightText: (c) 2026 by mundialis GmbH & Co. KG. + +SPDX-License-Identifier: GPL-3.0-or-later + +Unit tests for core.job_status_info helper functions. +""" + +__license__ = "GPL-3.0-or-later" +__author__ = "Carmen Tawalika" +__copyright__ = "Copyright 2026 mundialis GmbH & Co. KG" +__maintainer__ = "mundialis GmbH & Co. KG" + + +import pytest + +from actinia_ogc_api_processes_plugin.core import job_status_info as core + + +class MockResp: + """Lightweight Response-like object for unit tests. + + Provides `status_code`, `text` and a `json()` method returning the + configured payload. + """ + + def __init__(self, status_code=200, json_data=None, text="") -> None: + """Initialise.""" + self.status_code = status_code + self._json = json_data or {} + self.text = text + + def json(self): + """Return the configured response json.""" + return self._json + + +@pytest.mark.unittest +def test_map_status_values(): + """Test mapping of actinia job status to OGC API Processes values.""" + assert core.map_status("accepted") == "accepted" + assert core.map_status("running") == "running" + assert core.map_status("finished") == "successful" + assert core.map_status("error") == "failed" + assert core.map_status("terminated") == "dismissed" + assert core.map_status(None) == "accepted" + assert core.map_status("unknown") == "accepted" + + +@pytest.mark.unittest +def test_calculate_progress_valid_and_invalid(): + """Test calculation of progress percentage from actinia progress info.""" + # valid integer steps + data = {"progress": {"num_of_steps": 4, "step": 2}} + # calculation uses num+1 denominator -> 2/(4+1) = 0.4 -> 40 + assert core.calculate_progress(data) == 40 + + # step > num -> clamp to 100 + data = {"progress": {"num_of_steps": 3, "step": 5}} + assert core.calculate_progress(data) == 100 + + # string floats + data = {"progress": {"num_of_steps": "4", "step": "1"}} + # 1/(4+1) = 0.2 -> 20 + assert core.calculate_progress(data) == 20 + + # not a dict -> None + assert core.calculate_progress({"progress": 50}) is None + # invalid values -> None + data = {"progress": {"num_of_steps": 0, "step": 0}} + assert core.calculate_progress(data) is None + + +@pytest.mark.unittest +def test_calculate_finished(): + """Test calculation of finished timestamp.""" + # 2021-01-01T00:00:00Z + 3600s -> 2021-01-01T01:00:00+00:00 + data = { + "accept_timestamp": 1609459200, + "time_delta": 3600, + "status": "finished", + } + assert core.calculate_finished(data) == "2021-01-01T01:00:00+00:00" + + +@pytest.mark.unittest +def test_calculate_finished_but_still_running(): + """Test that finished is None when job not finished.""" + # 2021-01-01T00:00:00Z + 3600s -> 2021-01-01T01:00:00+00:00 + data = { + "accept_timestamp": 1609459200, + "time_delta": 3600, + "status": "running", + } + assert core.calculate_finished(data) is None + + +@pytest.mark.unittest +def test_get_job_status_info_success_and_error(monkeypatch): + """Test get_job_status_info function for success and error cases.""" + job_id = "job-123" + sample = { + "status": "finished", + "resource_id": "proc-1", + "accept_timestamp": 1609459200, + "timestamp": 1609462800, + "time_delta": 3600, + "progress": {"num_of_steps": 4, "step": 2}, + "links": [{"href": "http://example.com/out", "rel": "alternate"}], + } + + resp = MockResp(200, json_data=sample, text="ok") + + # patch the network call inside the module to return our mock + monkeypatch.setattr(core, "get_actinia_job", lambda _jid: resp) + + status, status_info, r = core.get_job_status_info(job_id) + assert status == 200 + assert r is resp + assert status_info["jobID"] == job_id + assert status_info["status"] == "successful" + assert status_info["processID"] == "proc-1" + assert status_info["created"] == "2021-01-01T00:00:00+00:00" + assert status_info["updated"] == "2021-01-01T01:00:00+00:00" + # finished == accept + time_delta + assert status_info["finished"] == "2021-01-01T01:00:00+00:00" + # status is 'finished' -> progress should be 100 + assert status_info["progress"] == 100 + assert isinstance(status_info["links"], list) + + # non-200 is passed through + notfound = MockResp(404, json_data={}, text="not found") + monkeypatch.setattr(core, "get_actinia_job", lambda _jid: notfound) + status2, info2, r2 = core.get_job_status_info(job_id) + assert status2 == 404 + assert info2 is None + assert r2 is notfound + + +@pytest.mark.unittest +def test_parse_actinia_job_with_valid_data(): + """Test parsing of actinia job response into status_info dict.""" + sample = { + "status": "running", + "resource_id": "resource_id-96ed4cb9-1290-4409-b034-c162759c10a1", + "accept_timestamp": 1767697334.010796, + "timestamp": 1767697369.8468018, + "time_delta": 35.83603835105896, + "progress": {"num_of_steps": 4, "step": 2}, + "type": "process", + "message": "ok", + "links": [{"href": "http://example.com/out", "rel": "alternate"}], + } + + resp = MockResp(200, json_data=sample, text="ok") + info = core.parse_actinia_job("96ed4cb9-1290-4409-b034-c162759c10a1", resp) + assert info["jobID"] == "96ed4cb9-1290-4409-b034-c162759c10a1" + assert info["status"] == "running" + expected_proc = "resource_id-96ed4cb9-1290-4409-b034-c162759c10a1" + assert info["processID"] == expected_proc + assert info["created"] == "2026-01-06T11:02:14+00:00" + assert info["updated"] == "2026-01-06T11:02:49+00:00" + assert "finished" not in info + # progress -> 2/(4+1) = 0.4 -> 40 + assert info["progress"] == 40 + assert isinstance(info["links"], list) + assert info["links"][0]["href"] == "http://example.com/out" + + +@pytest.mark.unittest +def test_parse_actinia_job_finished_status(): + """Test parsing of actinia job response with finished status.""" + sample = { + "status": "finished", + "resource_id": "resource_id-96ed4cb9-1290-4409-b034-c162759c10a1", + "accept_timestamp": 1767697334.010796, + "timestamp": 1767697369.8468018, + "time_delta": 35.83603835105896, + "progress": {"num_of_steps": 4, "step": 4}, + "links": [{"href": "http://example.com/out", "rel": "alternate"}], + } + + resp = MockResp(200, json_data=sample, text="ok") + info = core.parse_actinia_job("96ed4cb9-1290-4409-b034-c162759c10a1", resp) + # finished maps to 'successful' + assert info["status"] == "successful" + assert info["finished"] == "2026-01-06T11:02:49+00:00" + # finished status should set progress to 100 + assert info["progress"] == 100 From 5158dba349a18531fe9eaba84109809920a8c117 Mon Sep 17 00:00:00 2001 From: Carmen Date: Thu, 8 Jan 2026 15:12:05 +0100 Subject: [PATCH 4/9] add comments to tests --- .../integrationtests/test_job_status_info.py | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/tests/integrationtests/test_job_status_info.py b/tests/integrationtests/test_job_status_info.py index 1da5b0f..4c25137 100644 --- a/tests/integrationtests/test_job_status_info.py +++ b/tests/integrationtests/test_job_status_info.py @@ -19,6 +19,7 @@ class JobStatusInfoTest(TestCase): """Integration tests for /jobs/ endpoint.""" + # Can only be activated when example job is available in actinia instance # @pytest.mark.integrationtest # def test_get_job_status_success(self) -> None: # """Successful query returns statusInfo-like structure.""" @@ -55,19 +56,20 @@ def test_get_job_status_false_auth(self) -> None: assert "message" in resp.json assert resp.json["message"] == "ERROR: Unauthorized Access" - @pytest.mark.integrationtest - def test_get_job_status_not_found(self) -> None: - """Non-existent job id returns 404 with OGC exception type.""" - resp = self.app.get("/jobs/invalid_job_id", headers=self.HEADER_AUTH) - assert isinstance(resp, Response) - assert resp.status_code == 404 - assert hasattr(resp, "json") - assert "type" in resp.json - expected = ( - "http://www.opengis.net/def/exceptions/" - "ogcapi-processes-1/1.0/no-such-job" - ) - assert resp.json["type"] == expected + # requires https://github.com/actinia-org/actinia-core/pull/685 + # @pytest.mark.integrationtest + # def test_get_job_status_not_found(self) -> None: + # """Non-existent job id returns 404 with OGC exception type.""" + # resp = self.app.get("/jobs/invalid_job_id", headers=self.HEADER_AUTH) + # assert isinstance(resp, Response) + # assert resp.status_code == 404 + # assert hasattr(resp, "json") + # assert "type" in resp.json + # expected = ( + # "http://www.opengis.net/def/exceptions/" + # "ogcapi-processes-1/1.0/no-such-job" + # ) + # assert resp.json["type"] == expected @pytest.mark.integrationtest def test_get_job_method_not_allowed(self) -> None: From ea767a751c6d04e5253cfa9bc729463aa1856a72 Mon Sep 17 00:00:00 2001 From: Carmen Date: Thu, 8 Jan 2026 15:33:39 +0100 Subject: [PATCH 5/9] fix test description --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d491e1c..8870341 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -22,7 +22,7 @@ jobs: run: docker logs docker-actinia-ogc-api-processes-1 - name: Docker logs actinia-core run: docker logs docker-actinia-core-1 - - name: Run integration test + - name: Run unit and integration tests run: docker exec -t docker-actinia-ogc-api-processes-1 make test - name: Stop containers run: docker compose -f "docker/docker-compose.yml" down From f5c98755ce68353d84f6d5e85cc4d16edb047169 Mon Sep 17 00:00:00 2001 From: Carmen Date: Fri, 9 Jan 2026 10:59:19 +0100 Subject: [PATCH 6/9] fix status code decisions --- src/actinia_ogc_api_processes_plugin/api/job_status_info.py | 2 +- src/actinia_ogc_api_processes_plugin/core/job_status_info.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/actinia_ogc_api_processes_plugin/api/job_status_info.py b/src/actinia_ogc_api_processes_plugin/api/job_status_info.py index 1146b75..ea1602b 100644 --- a/src/actinia_ogc_api_processes_plugin/api/job_status_info.py +++ b/src/actinia_ogc_api_processes_plugin/api/job_status_info.py @@ -71,7 +71,7 @@ def get(self, job_id): ), ) return make_response(res, 401) - elif status == 404: + elif status in {400, 404}: log.error("ERROR: No such job") log.debug(f"actinia response: {resp.text}") res = jsonify( diff --git a/src/actinia_ogc_api_processes_plugin/core/job_status_info.py b/src/actinia_ogc_api_processes_plugin/core/job_status_info.py index 59cae29..31c88ae 100644 --- a/src/actinia_ogc_api_processes_plugin/core/job_status_info.py +++ b/src/actinia_ogc_api_processes_plugin/core/job_status_info.py @@ -178,7 +178,7 @@ def get_job_status_info(job_id): resp = get_actinia_job(job_id) status_code = resp.status_code - if status_code not in {200, 400}: + if status_code != 200: return status_code, None, resp status_info = parse_actinia_job(job_id, resp) From cda48eec49f9c95e8b12a5127c812e4a23a46332 Mon Sep 17 00:00:00 2001 From: Carmen Date: Fri, 9 Jan 2026 11:52:49 +0100 Subject: [PATCH 7/9] readd test --- .../integrationtests/test_job_status_info.py | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/tests/integrationtests/test_job_status_info.py b/tests/integrationtests/test_job_status_info.py index 4c25137..4447488 100644 --- a/tests/integrationtests/test_job_status_info.py +++ b/tests/integrationtests/test_job_status_info.py @@ -56,20 +56,19 @@ def test_get_job_status_false_auth(self) -> None: assert "message" in resp.json assert resp.json["message"] == "ERROR: Unauthorized Access" - # requires https://github.com/actinia-org/actinia-core/pull/685 - # @pytest.mark.integrationtest - # def test_get_job_status_not_found(self) -> None: - # """Non-existent job id returns 404 with OGC exception type.""" - # resp = self.app.get("/jobs/invalid_job_id", headers=self.HEADER_AUTH) - # assert isinstance(resp, Response) - # assert resp.status_code == 404 - # assert hasattr(resp, "json") - # assert "type" in resp.json - # expected = ( - # "http://www.opengis.net/def/exceptions/" - # "ogcapi-processes-1/1.0/no-such-job" - # ) - # assert resp.json["type"] == expected + @pytest.mark.integrationtest + def test_get_job_status_not_found(self) -> None: + """Non-existent job id returns 404 with OGC exception type.""" + resp = self.app.get("/jobs/invalid_job_id", headers=self.HEADER_AUTH) + assert isinstance(resp, Response) + assert resp.status_code == 404 + assert hasattr(resp, "json") + assert "type" in resp.json + expected = ( + "http://www.opengis.net/def/exceptions/" + "ogcapi-processes-1/1.0/no-such-job" + ) + assert resp.json["type"] == expected @pytest.mark.integrationtest def test_get_job_method_not_allowed(self) -> None: From d48e94c84182c6530acdf39182a30f7de6b990dc Mon Sep 17 00:00:00 2001 From: Carmen Date: Fri, 9 Jan 2026 14:57:19 +0100 Subject: [PATCH 8/9] fix status code 400 for error resource --- .../core/job_status_info.py | 35 ++++++++++++++++--- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/src/actinia_ogc_api_processes_plugin/core/job_status_info.py b/src/actinia_ogc_api_processes_plugin/core/job_status_info.py index 31c88ae..72aa6d9 100644 --- a/src/actinia_ogc_api_processes_plugin/core/job_status_info.py +++ b/src/actinia_ogc_api_processes_plugin/core/job_status_info.py @@ -178,8 +178,35 @@ def get_job_status_info(job_id): resp = get_actinia_job(job_id) status_code = resp.status_code - if status_code != 200: - return status_code, None, resp - status_info = parse_actinia_job(job_id, resp) - return 200, status_info, resp + if status_code == 200: + status_info = parse_actinia_job(job_id, resp) + return 200, status_info, resp + + # Actinia returns HTTP 400 both for 'no such job' and for + # resources that include an error state. Distinguish by inspecting the + # JSON payload: if it looks like a job/resource object (contains + # identifiers or job fields) treat it as a valid resource and map it to + # a 200 + statusInfo. Otherwise return 404. + if status_code == 400: + try: + data = resp.json() + except (ValueError, TypeError): + return 404, None, resp + + indicative_keys = { + "accept_timestamp", + "message", + "status", + "resource_id", + "timestamp", + } + + if isinstance(data, dict) and indicative_keys.issubset(data.keys()): + status_info = parse_actinia_job(job_id, resp) + return 200, status_info, resp + + return 404, None, resp + + # Any other status codes return as-is + return status_code, None, resp From b42a91dae2fae2f4dfff3ea2334b5b9d96fdbfc9 Mon Sep 17 00:00:00 2001 From: Carmen Date: Tue, 13 Jan 2026 15:49:33 +0100 Subject: [PATCH 9/9] update actinia version in tests --- docker/docker-compose.yml | 2 +- tests/integrationtests/test_job_status_info.py | 16 +++++++++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index aecb520..d6c3827 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -25,7 +25,7 @@ services: actinia-core: - image: mundialis/actinia:2.12.2 + image: mundialis/actinia:2.13.1 # ports: # - "8088:8088" depends_on: diff --git a/tests/integrationtests/test_job_status_info.py b/tests/integrationtests/test_job_status_info.py index 4447488..0293a4e 100644 --- a/tests/integrationtests/test_job_status_info.py +++ b/tests/integrationtests/test_job_status_info.py @@ -19,7 +19,7 @@ class JobStatusInfoTest(TestCase): """Integration tests for /jobs/ endpoint.""" - # Can only be activated when example job is available in actinia instance + # # Can only be activated when example job is available in actinia instance # @pytest.mark.integrationtest # def test_get_job_status_success(self) -> None: # """Successful query returns statusInfo-like structure.""" @@ -29,10 +29,24 @@ class JobStatusInfoTest(TestCase): # assert isinstance(resp, Response) # assert resp.status_code == 200 # assert hasattr(resp, "json") + # assert "jobID" in resp.json, "There is no 'jobID' in response" + # assert "status" in resp.json, "There is no 'status' in response" + # assert "links" in resp.json, "There is no 'links' in response" + # # Can only be activated when example job is available in actinia instance + # @pytest.mark.integrationtest + # def test_get_job_status_failed(self) -> None: + # """Successful query returns statusInfo-like structure.""" + # # example job id used in response model examples + # job_id = "565f6bc9-7535-44c6-9826-864fbb2421f3" + # resp = self.app.get(f"/jobs/{job_id}", headers=self.HEADER_AUTH) + # assert isinstance(resp, Response) + # assert resp.status_code == 200 + # assert hasattr(resp, "json") # assert "jobID" in resp.json, "There is no 'jobID' in response" # assert "status" in resp.json, "There is no 'status' in response" # assert "links" in resp.json, "There is no 'links' in response" + # assert resp.json["status"] == "failed" @pytest.mark.integrationtest def test_get_job_status_missing_auth(self) -> None: