diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index f3793d4..8870341 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -10,29 +10,7 @@ on: branches: [ main ] jobs: - - # unittests: - # runs-on: ubuntu-latest - # steps: - # - name: Checkout - # uses: actions/checkout@v4 - # - name: Set up Docker Buildx - # uses: docker/setup-buildx-action@v3 - # - name: Replace run only unittest command - # run: | - # sed -i "s+# RUN make test+RUN make unittest+g" docker/Dockerfile - # - name: Unittests of actinia-ogc-api-processes-plugin - # id: docker_build - # uses: docker/build-push-action@v6 - # with: - # push: false - # tags: actinia-ogc-api-processes-plugin-tests:alpine - # context: . - # file: docker/Dockerfile - # no-cache: true - # # pull: true - - integration-tests: + tests: runs-on: ubuntu-latest steps: - uses: actions/checkout@v6 @@ -44,7 +22,7 @@ jobs: run: docker logs docker-actinia-ogc-api-processes-1 - name: Docker logs actinia-core run: docker logs docker-actinia-core-1 - - name: Run integration test - run: docker exec -t docker-actinia-ogc-api-processes-1 make integrationtest + - name: Run unit and integration tests + run: docker exec -t docker-actinia-ogc-api-processes-1 make test - name: Stop containers run: docker compose -f "docker/docker-compose.yml" down diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..f257107 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,24 @@ +{ + "configurations": [ + { + "name": "Docker: Python - Flask", + "type": "docker", + "request": "launch", + "preLaunchTask": "docker-run: debug", + "python": { + "pathMappings": [ + { + "localRoot": "${workspaceFolder}", + "remoteRoot": "/src/actinia-ogc-api-processes-plugin" + } + ], + "projectType": "flask" + }, + "dockerServerReadyAction": { + "action": "openExternally", + "pattern": "Running on (https?://\\S+|[0-9]+)", + "uriFormat": "%s://localhost:%s/processes" + } + } + ] +} diff --git a/.vscode/tasks.json b/.vscode/tasks.json new file mode 100644 index 0000000..87e7afa --- /dev/null +++ b/.vscode/tasks.json @@ -0,0 +1,61 @@ +{ + "version": "2.0.0", + "tasks": [ + { + "type": "docker-build", + "label": "docker-build", + "platform": "python", + "dockerBuild": { + "tag": "actinia-ogc-api-processes-plugin:latest", + "dockerfile": "${workspaceFolder}/docker/Dockerfile", + "context": "${workspaceFolder}" + } + }, + { + "type": "docker-run", + "label": "docker-run: debug", + "dependsOn": [ + "docker-build" + ], + "python": { + "module": "flask", + "args": [ + "-e", + "/src/.env", + "run", + "--no-debugger", + "--host", + "0.0.0.0", + "--port", + "4044" + ] + }, + "dockerRun": { + "containerName": "actinia-ogc-api-processes", + "remove": true, + "network": "actinia-docker_actinia-dev", + "ports": [ + { + "containerPort": 4044, + "hostPort": 4044 + } + ], + "env": { + "PYTHONUNBUFFERED": "1", + "PYTHONDONWRITEBYTECODE": "1", + "FLASK_APP": "actinia_ogc_api_processes_plugin.main", + "FLASK_DEBUG": "1", + "FLASK_ENV": "development" + }, + "customOptions": "--ip 172.18.0.12", + "volumes": [ + { + "localPath": "${workspaceFolder}", + "containerPath": "/src/actinia-ogc-api-processes-plugin", + "permissions": "rw" + } + ] + } + } + ] +} diff --git a/README.md b/README.md index 97b471a..17a8876 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ docker compose -f docker/docker-compose.yml up # -- only current plugin (Note: need to start actinia + valkey separately) docker compose -f docker/docker-compose.yml run --rm --service-ports --entrypoint sh actinia-ogc-api-processes # within docker -gunicorn -b 0.0.0.0:3003 -w 8 --access-logfile=- -k gthread actinia_ogc_api_processes_plugin.main:flask_app +gunicorn -b 0.0.0.0:4044 -w 8 --access-logfile=- -k gthread actinia_ogc_api_processes_plugin.main:flask_app ``` ### DEV setup @@ -20,7 +20,7 @@ gunicorn -b 0.0.0.0:3003 -w 8 --access-logfile=- -k gthread actinia_ogc_api_proc # Uncomment the volume mount of the ogc-api-processes-plugin and additional marked sections of actinia-ogc-api-processes service within docker/docker-compose.yml, # Note: might also need to set: # - within config/mount/sample.ini: processing_base_url = http://127.0.0.1:8088/api/v3 -# - within src/actinia_ogc_api_processes_plugin/main.py set port: flask_app.run(..., port=3003) +# - within src/actinia_ogc_api_processes_plugin/main.py set port: flask_app.run(..., port=4044) # then: docker compose -f docker/docker-compose.yml down docker compose -f docker/docker-compose.yml up --build @@ -29,7 +29,7 @@ docker compose -f docker/docker-compose.yml up --build docker attach $(docker ps | grep docker-actinia-ogc-api-processes | cut -d " " -f1) # In another terminal: example call of processes-endpoint: -curl -u actinia-gdi:actinia-gdi -X GET http://localhost:3003/processes +curl -u actinia-gdi:actinia-gdi -X GET http://localhost:4044/processes ``` ### Installation hints diff --git a/docker/Dockerfile b/docker/Dockerfile index 5bbd9ea..504b3cf 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -19,11 +19,8 @@ RUN pip3 install --no-cache-dir gunicorn && \ COPY . /src/actinia-ogc-api-processes-plugin/ RUN pip3 install --no-cache-dir -e /src/actinia-ogc-api-processes-plugin/ -# TODO -# For tests, when created: -# RUN chmod a+x /src/actinia-ogc-api-processes-plugin/tests.sh WORKDIR /src/actinia-ogc-api-processes-plugin # RUN make test -CMD ["gunicorn", "-b", "0.0.0.0:3003", "-w", "8", "--access-logfile=-", "-k", "gthread", "actinia_ogc_api_processes_plugin.main:flask_app"] +CMD ["gunicorn", "-b", "0.0.0.0:4044", "-w", "8", "--access-logfile=-", "-k", "gthread", "actinia_ogc_api_processes_plugin.main:flask_app"] diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 345beaa..d6c3827 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -15,17 +15,17 @@ services: cap_add: - SYS_PTRACE ports: - - "3003:3003" + - "4044:4044" # -- For dev-setup/debugging uncomment following: # network_mode: "host" # stdin_open: true # tty: true # command: > - # sh -c "python -m actinia_ogc_api_processes_plugin.main --workers 1 --timeout 3600 --bind 0.0.0.0:3003" + # sh -c "python -m actinia_ogc_api_processes_plugin.main --workers 1 --timeout 3600 --bind 0.0.0.0:4044" actinia-core: - image: mundialis/actinia:2.12.2 + image: mundialis/actinia:2.13.1 # ports: # - "8088:8088" depends_on: diff --git a/src/actinia_ogc_api_processes_plugin/api/job_status_info.py b/src/actinia_ogc_api_processes_plugin/api/job_status_info.py new file mode 100644 index 0000000..ea1602b --- /dev/null +++ b/src/actinia_ogc_api_processes_plugin/api/job_status_info.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python +"""SPDX-FileCopyrightText: (c) 2026 by mundialis GmbH & Co. KG. + +SPDX-License-Identifier: GPL-3.0-or-later + +JobStatusInfo endpoint implementation. +""" + +__license__ = "GPL-3.0-or-later" +__author__ = "Carmen Tawalika" +__copyright__ = "Copyright 2026 mundialis GmbH & Co. KG" +__maintainer__ = "mundialis GmbH & Co. KG" + +from flask import jsonify, make_response +from flask_restful_swagger_2 import Resource, swagger +from requests.exceptions import ConnectionError as req_ConnectionError + +from actinia_ogc_api_processes_plugin.apidocs import job_status_info +from actinia_ogc_api_processes_plugin.authentication import require_basic_auth +from actinia_ogc_api_processes_plugin.core.job_status_info import ( + get_job_status_info, +) +from actinia_ogc_api_processes_plugin.model.response_models import ( + SimpleStatusCodeResponseModel, + StatusInfoResponseModel, +) +from actinia_ogc_api_processes_plugin.resources.logging import log + + +class JobStatusInfo(Resource): + """JobStatusInfo handling.""" + + def __init__(self) -> None: + """Initialise.""" + self.msg = "Return job status information" + + @require_basic_auth() + @swagger.doc(job_status_info.describe_job_status_info_get_docs) + def get(self, job_id): + """Return status information for a given job id.""" + try: + status, status_info, resp = get_job_status_info(job_id) + if status == 200: + # build StatusInfoResponseModel from status_info dict + model_kwargs = {} + for k in ( + "processID", + "type", + "jobID", + "status", + "message", + "created", + "started", + "finished", + "updated", + "progress", + "links", + ): + if k in status_info: + model_kwargs[k] = status_info[k] + + res = jsonify(StatusInfoResponseModel(**model_kwargs)) + return make_response(res, 200) + elif status == 401: + log.error("ERROR: Unauthorized Access") + log.debug(f"actinia response: {resp.text}") + res = jsonify( + SimpleStatusCodeResponseModel( + status=401, + message="ERROR: Unauthorized Access", + ), + ) + return make_response(res, 401) + elif status in {400, 404}: + log.error("ERROR: No such job") + log.debug(f"actinia response: {resp.text}") + res = jsonify( + { + "type": ( + "http://www.opengis.net/def/exceptions/" + "ogcapi-processes-1/1.0/no-such-job" + ), + "title": "No Such Job", + "status": 404, + "detail": f"Job '{job_id}' not found", + }, + ) + return make_response(res, 404) + else: + log.error("ERROR: Internal Server Error") + code = getattr(resp, "status_code", status) + text = getattr(resp, "text", "") + log.debug(f"actinia status code: {code}") + log.debug(f"actinia response: {text}") + res = jsonify( + SimpleStatusCodeResponseModel( + status=500, + message="ERROR: Internal Server Error", + ), + ) + return make_response(res, 500) + except req_ConnectionError as e: + log.error(f"Connection ERROR: {e}") + res = jsonify( + SimpleStatusCodeResponseModel( + status=503, + message=f"Connection ERROR: {e}", + ), + ) + return make_response(res, 503) diff --git a/src/actinia_ogc_api_processes_plugin/apidocs/job_status_info.py b/src/actinia_ogc_api_processes_plugin/apidocs/job_status_info.py new file mode 100644 index 0000000..c36e30d --- /dev/null +++ b/src/actinia_ogc_api_processes_plugin/apidocs/job_status_info.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python +"""SPDX-FileCopyrightText: (c) 2026 by mundialis GmbH & Co. KG. + +SPDX-License-Identifier: GPL-3.0-or-later + +API docs for JobStatusInfo endpoint. +""" + +__license__ = "GPL-3.0-or-later" +__author__ = "Carmen Tawalika" +__copyright__ = "Copyright 2026 mundialis GmbH & Co. KG" +__maintainer__ = "mundialis GmbH & Co. KG" + +from actinia_ogc_api_processes_plugin.model.response_models import ( + SimpleStatusCodeResponseModel, + StatusInfoResponseModel, +) + +describe_job_status_info_get_docs = { + "tags": ["job_status_info"], + "description": "Retrieves the status information for a job.", + "responses": { + "200": { + "description": "This response returns the job status information.", + "schema": StatusInfoResponseModel, + }, + "401": { + "description": "Unauthorized Access", + "schema": SimpleStatusCodeResponseModel, + }, + "404": { + "description": "Job not found", + "schema": SimpleStatusCodeResponseModel, + }, + "500": { + "description": "Internal Server Error", + "schema": SimpleStatusCodeResponseModel, + }, + "503": { + "description": "Connection Error", + "schema": SimpleStatusCodeResponseModel, + }, + }, +} diff --git a/src/actinia_ogc_api_processes_plugin/core/job_status_info.py b/src/actinia_ogc_api_processes_plugin/core/job_status_info.py new file mode 100644 index 0000000..72aa6d9 --- /dev/null +++ b/src/actinia_ogc_api_processes_plugin/core/job_status_info.py @@ -0,0 +1,212 @@ +#!/usr/bin/env python +"""SPDX-FileCopyrightText: (c) 2026 by mundialis GmbH & Co. KG. + +SPDX-License-Identifier: GPL-3.0-or-later + +Core helper to fetch job status from actinia processing API. +""" + +__license__ = "GPL-3.0-or-later" +__author__ = "Carmen Tawalika" +__copyright__ = "Copyright 2026 mundialis GmbH & Co. KG" +__maintainer__ = "mundialis GmbH & Co. KG" + +from datetime import datetime, timedelta, timezone + +import requests +from flask import request +from requests.auth import HTTPBasicAuth + +from actinia_ogc_api_processes_plugin.resources.config import ACTINIA + + +def get_actinia_job(job_id): + """Retrieve job status from actinia.""" + auth = request.authorization + kwargs = dict() + if auth: + kwargs["auth"] = HTTPBasicAuth(auth.username, auth.password) + + url = ( + f"{ACTINIA.processing_base_url}/resources/{auth.username}/" + f"resource_id-{job_id}" + ) + return requests.get(url, **kwargs) + + +def map_status(raw: object) -> str: + """Map actinia status values to OGC statusInfo values. + + Mapping: + - accepted -> accepted + - running -> running + - finished -> successful + - error -> failed + - terminated -> dismissed + + Default is 'accepted' when input is falsy or unknown. + """ + if not raw: + return "accepted" + s = str(raw).strip().lower() + mapping = { + "accepted": "accepted", + "running": "running", + "finished": "successful", + "error": "failed", + "terminated": "dismissed", + } + return mapping.get(s, "accepted") + + +def calculate_progress(data: dict): + """Return integer progress 0..100 from data or None. + + Supports nested object `progress: { num_of_steps, step }`. + Returns None on invalid input. + """ + status = data.get("status") + progress = data.get("progress") + if not isinstance(progress, dict): + return None + if status == "finished": + return 100 + + try: + raw_num = progress.get("num_of_steps") + raw_cur = progress.get("step") + num = int(raw_num) if isinstance(raw_num, int) else int(float(raw_num)) + cur = int(raw_cur) if isinstance(raw_cur, int) else int(float(raw_cur)) + except (TypeError, ValueError): + return None + + if num <= 0: + return None + # calculate percentage with total steps + 1 to avoid 100% before finished + prog = round((cur / (num + 1)) * 100) + return max(0, min(100, prog)) + + +def calculate_finished(data: dict): + """Return finished time as ISO string or None. + + Calculate `finished` from accept_timestamp + time_delta (seconds) + """ + status = data.get("status") + if status != "finished": + return None + start = data.get("accept_timestamp") + + try: + start_dt = datetime.fromtimestamp(float(start), tz=timezone.utc) + td = float(data.get("time_delta")) + except (TypeError, ValueError): + return None + + finished_dt = start_dt + timedelta(seconds=td) + # format without microseconds + return finished_dt.replace(microsecond=0).isoformat() + + +def parse_actinia_job(job_id, resp): + """Parse actinia job response into status_info dict.""" + try: + data = resp.json() + except (ValueError, TypeError): + data = {} + + status_info = {} + status_info["jobID"] = job_id + status_info["status"] = map_status(data.get("status")) + status_info["type"] = data.get("type", "process") + status_info["message"] = data.get("message") + status_info["processID"] = data.get("resource_id") + + # Servers SHOULD set the value of the created field when a job has been + # accepted and queued for execution. + try: + created = datetime.fromtimestamp( + float(data.get("accept_timestamp")), + tz=timezone.utc, + ) + status_info["created"] = created.replace(microsecond=0).isoformat() + except (TypeError, ValueError): + pass + + # Whenever the status field of the job changes, servers SHOULD revise the + # value of the updated field. + # -> actinia-core updates this field every time anything was updated. + try: + updated = datetime.fromtimestamp( + float(data.get("timestamp")), + tz=timezone.utc, + ) + status_info["updated"] = updated.replace(microsecond=0).isoformat() + except (TypeError, ValueError): + pass + + # Servers SHOULD set the value of the started field when a job begins + # execution and is consuming compute resources. + # status_info["started"] = # TODO implement in actinia-core + + # Servers SHOULD set the value of the finished field when the execution of + # a job has completed and the process is no longer consuming compute + # resources. + # -> Returns none of job not finished. + finished_val = calculate_finished(data) + if finished_val is not None: + status_info["finished"] = finished_val + + progress_val = calculate_progress(data) + if progress_val is not None: + status_info["progress"] = progress_val + + links = data.get("links") + if not links: + links = [{"href": request.url, "rel": "self"}] + status_info["links"] = links + + return status_info + + +def get_job_status_info(job_id): + """Return a tuple (status_code, status_info_dict_or_None, response). + + Maps the actinia job response into the OGC `statusInfo` structure when + possible. `response` is the original requests.Response for logging. + """ + resp = get_actinia_job(job_id) + + status_code = resp.status_code + + if status_code == 200: + status_info = parse_actinia_job(job_id, resp) + return 200, status_info, resp + + # Actinia returns HTTP 400 both for 'no such job' and for + # resources that include an error state. Distinguish by inspecting the + # JSON payload: if it looks like a job/resource object (contains + # identifiers or job fields) treat it as a valid resource and map it to + # a 200 + statusInfo. Otherwise return 404. + if status_code == 400: + try: + data = resp.json() + except (ValueError, TypeError): + return 404, None, resp + + indicative_keys = { + "accept_timestamp", + "message", + "status", + "resource_id", + "timestamp", + } + + if isinstance(data, dict) and indicative_keys.issubset(data.keys()): + status_info = parse_actinia_job(job_id, resp) + return 200, status_info, resp + + return 404, None, resp + + # Any other status codes return as-is + return status_code, None, resp diff --git a/src/actinia_ogc_api_processes_plugin/endpoints.py b/src/actinia_ogc_api_processes_plugin/endpoints.py index 6952097..2c4689f 100644 --- a/src/actinia_ogc_api_processes_plugin/endpoints.py +++ b/src/actinia_ogc_api_processes_plugin/endpoints.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -"""SPDX-FileCopyrightText: (c) 2018-2025 by mundialis GmbH & Co. KG. +"""SPDX-FileCopyrightText: (c) 2018-2026 by mundialis GmbH & Co. KG. SPDX-License-Identifier: GPL-3.0-or-later @@ -13,6 +13,7 @@ from flask_restful_swagger_2 import Api +from actinia_ogc_api_processes_plugin.api.job_status_info import JobStatusInfo from actinia_ogc_api_processes_plugin.api.process_description import ( ProcessDescription, ) @@ -25,5 +26,6 @@ def create_endpoints(flask_api: Api) -> None: # Endpoints following: https://docs.ogc.org/is/18-062r2/18-062r2.html#toc0 + apidoc.add_resource(JobStatusInfo, "/jobs/") apidoc.add_resource(ProcessList, "/processes") apidoc.add_resource(ProcessDescription, "/processes/") diff --git a/src/actinia_ogc_api_processes_plugin/model/response_models.py b/src/actinia_ogc_api_processes_plugin/model/response_models.py index ed405d7..c17b1b8 100644 --- a/src/actinia_ogc_api_processes_plugin/model/response_models.py +++ b/src/actinia_ogc_api_processes_plugin/model/response_models.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -"""SPDX-FileCopyrightText: (c) 2018-2025 by mundialis GmbH & Co. KG. +"""SPDX-FileCopyrightText: (c) 2018-2026 by mundialis GmbH & Co. KG. SPDX-License-Identifier: GPL-3.0-or-later @@ -7,8 +7,8 @@ """ __license__ = "GPL-3.0-or-later" -__author__ = "Anika Weinmann" -__copyright__ = "Copyright 2018-2025 mundialis GmbH & Co. KG" +__author__ = "Anika Weinmann, Carmen Tawalika" +__copyright__ = "Copyright 2018-2026 mundialis GmbH & Co. KG" __maintainer__ = "mundialis GmbH & Co. KG" @@ -39,3 +39,69 @@ class SimpleStatusCodeResponseModel(Schema): message="success", ) SimpleStatusCodeResponseModel.example = simple_response_example + + +class StatusInfoResponseModel(Schema): + """statusInfo schema from OGC API - Processes (part1).""" + + type: str = "object" + properties: ClassVar[dict] = { + "processID": {"type": "string"}, + "type": {"type": "string", "enum": ["process"]}, + "jobID": {"type": "string"}, + "status": { + "type": "string", + "enum": [ + "accepted", + "running", + "successful", + "failed", + "dismissed", + ], + }, + "message": {"type": "string"}, + "created": {"type": "string", "format": "date-time"}, + "started": {"type": "string", "format": "date-time"}, + "finished": {"type": "string", "format": "date-time"}, + "updated": {"type": "string", "format": "date-time"}, + "progress": {"type": "integer", "minimum": 0, "maximum": 100}, + "links": { + "type": "array", + "items": { + "type": "object", + "properties": { + "href": {"type": "string"}, + "rel": {"type": "string"}, + "type": {"type": "string"}, + "hreflang": {"type": "string"}, + "title": {"type": "string"}, + }, + "required": ["href"], + }, + }, + } + required: ClassVar[list[str]] = ["jobID", "status", "type"] + + +# attach examples +status_info_example = StatusInfoResponseModel( + jobID="96ed4cb9-1290-4409-b034-c162759c10a1", + status="successful", + type="process", + message="Processing successfully finished", + processID="resource_id-96ed4cb9-1290-4409-b034-c162759c10a1", + created="2026-01-06T11:02:14", + updated="2026-01-06T11:02:49", + finished="2026-01-06T11:02:49", + progress=100, + links=[ + { + "href": ( + "http://example.com/jobs/" + "96ed4cb9-1290-4409-b034-c162759c10a1" + ), + "rel": "self", + }, + ], +) +StatusInfoResponseModel.example = status_info_example diff --git a/tests/integrationtests/test_job_status_info.py b/tests/integrationtests/test_job_status_info.py new file mode 100644 index 0000000..0293a4e --- /dev/null +++ b/tests/integrationtests/test_job_status_info.py @@ -0,0 +1,93 @@ +#!/usr/bin/env python +"""SPDX-FileCopyrightText: (c) 2026 by mundialis GmbH & Co. KG. + +SPDX-License-Identifier: GPL-3.0-or-later +""" + +__license__ = "GPL-3.0-or-later" +__author__ = "Carmen Tawalika" +__copyright__ = "Copyright 2026 mundialis GmbH & Co. KG" +__maintainer__ = "mundialis GmbH & Co. KG" + + +import pytest +from flask import Response + +from tests.testsuite import TestCase + + +class JobStatusInfoTest(TestCase): + """Integration tests for /jobs/ endpoint.""" + + # # Can only be activated when example job is available in actinia instance + # @pytest.mark.integrationtest + # def test_get_job_status_success(self) -> None: + # """Successful query returns statusInfo-like structure.""" + # # example job id used in response model examples + # job_id = "96ed4cb9-1290-4409-b034-c162759c10a1" + # resp = self.app.get(f"/jobs/{job_id}", headers=self.HEADER_AUTH) + # assert isinstance(resp, Response) + # assert resp.status_code == 200 + # assert hasattr(resp, "json") + # assert "jobID" in resp.json, "There is no 'jobID' in response" + # assert "status" in resp.json, "There is no 'status' in response" + # assert "links" in resp.json, "There is no 'links' in response" + + # # Can only be activated when example job is available in actinia instance + # @pytest.mark.integrationtest + # def test_get_job_status_failed(self) -> None: + # """Successful query returns statusInfo-like structure.""" + # # example job id used in response model examples + # job_id = "565f6bc9-7535-44c6-9826-864fbb2421f3" + # resp = self.app.get(f"/jobs/{job_id}", headers=self.HEADER_AUTH) + # assert isinstance(resp, Response) + # assert resp.status_code == 200 + # assert hasattr(resp, "json") + # assert "jobID" in resp.json, "There is no 'jobID' in response" + # assert "status" in resp.json, "There is no 'status' in response" + # assert "links" in resp.json, "There is no 'links' in response" + # assert resp.json["status"] == "failed" + + @pytest.mark.integrationtest + def test_get_job_status_missing_auth(self) -> None: + """Request without auth returns 401.""" + job_id = "96ed4cb9-1290-4409-b034-c162759c10a1" + resp = self.app.get(f"/jobs/{job_id}") + assert isinstance(resp, Response) + assert resp.status_code == 401 + assert hasattr(resp, "json") + assert "message" in resp.json + assert resp.json["message"] == "Authentication required" + + @pytest.mark.integrationtest + def test_get_job_status_false_auth(self) -> None: + """Wrong credentials return 401 and error message.""" + job_id = "96ed4cb9-1290-4409-b034-c162759c10a1" + resp = self.app.get(f"/jobs/{job_id}", headers=self.HEADER_AUTH_WRONG) + assert isinstance(resp, Response) + assert resp.status_code == 401 + assert hasattr(resp, "json") + assert "message" in resp.json + assert resp.json["message"] == "ERROR: Unauthorized Access" + + @pytest.mark.integrationtest + def test_get_job_status_not_found(self) -> None: + """Non-existent job id returns 404 with OGC exception type.""" + resp = self.app.get("/jobs/invalid_job_id", headers=self.HEADER_AUTH) + assert isinstance(resp, Response) + assert resp.status_code == 404 + assert hasattr(resp, "json") + assert "type" in resp.json + expected = ( + "http://www.opengis.net/def/exceptions/" + "ogcapi-processes-1/1.0/no-such-job" + ) + assert resp.json["type"] == expected + + @pytest.mark.integrationtest + def test_get_job_method_not_allowed(self) -> None: + """Other methods than GET return 405 Method Not Allowed.""" + resp = self.app.post("/jobs/invalid_job_id", headers=self.HEADER_AUTH) + assert isinstance(resp, Response) + assert resp.status_code == 405 + assert hasattr(resp, "json") diff --git a/tests/unittests/test_core_job_status_info.py b/tests/unittests/test_core_job_status_info.py new file mode 100644 index 0000000..43aac38 --- /dev/null +++ b/tests/unittests/test_core_job_status_info.py @@ -0,0 +1,189 @@ +#!/usr/bin/env python +"""SPDX-FileCopyrightText: (c) 2026 by mundialis GmbH & Co. KG. + +SPDX-License-Identifier: GPL-3.0-or-later + +Unit tests for core.job_status_info helper functions. +""" + +__license__ = "GPL-3.0-or-later" +__author__ = "Carmen Tawalika" +__copyright__ = "Copyright 2026 mundialis GmbH & Co. KG" +__maintainer__ = "mundialis GmbH & Co. KG" + + +import pytest + +from actinia_ogc_api_processes_plugin.core import job_status_info as core + + +class MockResp: + """Lightweight Response-like object for unit tests. + + Provides `status_code`, `text` and a `json()` method returning the + configured payload. + """ + + def __init__(self, status_code=200, json_data=None, text="") -> None: + """Initialise.""" + self.status_code = status_code + self._json = json_data or {} + self.text = text + + def json(self): + """Return the configured response json.""" + return self._json + + +@pytest.mark.unittest +def test_map_status_values(): + """Test mapping of actinia job status to OGC API Processes values.""" + assert core.map_status("accepted") == "accepted" + assert core.map_status("running") == "running" + assert core.map_status("finished") == "successful" + assert core.map_status("error") == "failed" + assert core.map_status("terminated") == "dismissed" + assert core.map_status(None) == "accepted" + assert core.map_status("unknown") == "accepted" + + +@pytest.mark.unittest +def test_calculate_progress_valid_and_invalid(): + """Test calculation of progress percentage from actinia progress info.""" + # valid integer steps + data = {"progress": {"num_of_steps": 4, "step": 2}} + # calculation uses num+1 denominator -> 2/(4+1) = 0.4 -> 40 + assert core.calculate_progress(data) == 40 + + # step > num -> clamp to 100 + data = {"progress": {"num_of_steps": 3, "step": 5}} + assert core.calculate_progress(data) == 100 + + # string floats + data = {"progress": {"num_of_steps": "4", "step": "1"}} + # 1/(4+1) = 0.2 -> 20 + assert core.calculate_progress(data) == 20 + + # not a dict -> None + assert core.calculate_progress({"progress": 50}) is None + # invalid values -> None + data = {"progress": {"num_of_steps": 0, "step": 0}} + assert core.calculate_progress(data) is None + + +@pytest.mark.unittest +def test_calculate_finished(): + """Test calculation of finished timestamp.""" + # 2021-01-01T00:00:00Z + 3600s -> 2021-01-01T01:00:00+00:00 + data = { + "accept_timestamp": 1609459200, + "time_delta": 3600, + "status": "finished", + } + assert core.calculate_finished(data) == "2021-01-01T01:00:00+00:00" + + +@pytest.mark.unittest +def test_calculate_finished_but_still_running(): + """Test that finished is None when job not finished.""" + # 2021-01-01T00:00:00Z + 3600s -> 2021-01-01T01:00:00+00:00 + data = { + "accept_timestamp": 1609459200, + "time_delta": 3600, + "status": "running", + } + assert core.calculate_finished(data) is None + + +@pytest.mark.unittest +def test_get_job_status_info_success_and_error(monkeypatch): + """Test get_job_status_info function for success and error cases.""" + job_id = "job-123" + sample = { + "status": "finished", + "resource_id": "proc-1", + "accept_timestamp": 1609459200, + "timestamp": 1609462800, + "time_delta": 3600, + "progress": {"num_of_steps": 4, "step": 2}, + "links": [{"href": "http://example.com/out", "rel": "alternate"}], + } + + resp = MockResp(200, json_data=sample, text="ok") + + # patch the network call inside the module to return our mock + monkeypatch.setattr(core, "get_actinia_job", lambda _jid: resp) + + status, status_info, r = core.get_job_status_info(job_id) + assert status == 200 + assert r is resp + assert status_info["jobID"] == job_id + assert status_info["status"] == "successful" + assert status_info["processID"] == "proc-1" + assert status_info["created"] == "2021-01-01T00:00:00+00:00" + assert status_info["updated"] == "2021-01-01T01:00:00+00:00" + # finished == accept + time_delta + assert status_info["finished"] == "2021-01-01T01:00:00+00:00" + # status is 'finished' -> progress should be 100 + assert status_info["progress"] == 100 + assert isinstance(status_info["links"], list) + + # non-200 is passed through + notfound = MockResp(404, json_data={}, text="not found") + monkeypatch.setattr(core, "get_actinia_job", lambda _jid: notfound) + status2, info2, r2 = core.get_job_status_info(job_id) + assert status2 == 404 + assert info2 is None + assert r2 is notfound + + +@pytest.mark.unittest +def test_parse_actinia_job_with_valid_data(): + """Test parsing of actinia job response into status_info dict.""" + sample = { + "status": "running", + "resource_id": "resource_id-96ed4cb9-1290-4409-b034-c162759c10a1", + "accept_timestamp": 1767697334.010796, + "timestamp": 1767697369.8468018, + "time_delta": 35.83603835105896, + "progress": {"num_of_steps": 4, "step": 2}, + "type": "process", + "message": "ok", + "links": [{"href": "http://example.com/out", "rel": "alternate"}], + } + + resp = MockResp(200, json_data=sample, text="ok") + info = core.parse_actinia_job("96ed4cb9-1290-4409-b034-c162759c10a1", resp) + assert info["jobID"] == "96ed4cb9-1290-4409-b034-c162759c10a1" + assert info["status"] == "running" + expected_proc = "resource_id-96ed4cb9-1290-4409-b034-c162759c10a1" + assert info["processID"] == expected_proc + assert info["created"] == "2026-01-06T11:02:14+00:00" + assert info["updated"] == "2026-01-06T11:02:49+00:00" + assert "finished" not in info + # progress -> 2/(4+1) = 0.4 -> 40 + assert info["progress"] == 40 + assert isinstance(info["links"], list) + assert info["links"][0]["href"] == "http://example.com/out" + + +@pytest.mark.unittest +def test_parse_actinia_job_finished_status(): + """Test parsing of actinia job response with finished status.""" + sample = { + "status": "finished", + "resource_id": "resource_id-96ed4cb9-1290-4409-b034-c162759c10a1", + "accept_timestamp": 1767697334.010796, + "timestamp": 1767697369.8468018, + "time_delta": 35.83603835105896, + "progress": {"num_of_steps": 4, "step": 4}, + "links": [{"href": "http://example.com/out", "rel": "alternate"}], + } + + resp = MockResp(200, json_data=sample, text="ok") + info = core.parse_actinia_job("96ed4cb9-1290-4409-b034-c162759c10a1", resp) + # finished maps to 'successful' + assert info["status"] == "successful" + assert info["finished"] == "2026-01-06T11:02:49+00:00" + # finished status should set progress to 100 + assert info["progress"] == 100