diff --git a/CHANGES.rst b/CHANGES.rst index c36498ff4..8e42c2ad7 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -12,11 +12,17 @@ Changes Changes: -------- -- No change. +- Add support for various GeoTIFF formats, allowing flexible handling and representation of GeoTIFFs in outputs + (fixes `#100 `_). +- Add support for ``GET /jobs/{jobId}/results/{id}`` and ``GET /jobs/{jobId}/outputs/{id}`` routes to enable direct access to + individual `Job` result items by ID. This enhancement includes support alternate representations based on the ``Accept`` header. + If an alternate format (e.g., YAML for a JSON source) is requested it will be automatically generated and returned. +- Return ``Link`` headers containing all possible output formats, allowing retrieval via query parameters + (e.g., ``/jobs/{jobId}/outputs/{id}?f=application/x-yaml``) (fixes `#18 `_). Fixes: ------ -- No change. +- Add a fix entry for the multiple duplicate links not re-downloaded in the CLI. .. _changes_6.4.1: diff --git a/Makefile b/Makefile index ff7b19cd3..fa35d8a73 100644 --- a/Makefile +++ b/Makefile @@ -17,6 +17,7 @@ DOCKER_REPO ?= pavics/weaver # guess OS (Linux, Darwin,...) OS_NAME := $(shell uname -s 2>/dev/null || echo "unknown") CPU_ARCH := $(shell uname -m 2>/dev/null || uname -p 2>/dev/null || echo "unknown") +SUDO ?= # conda CONDA_CMD ?= __EMPTY__ @@ -229,10 +230,10 @@ conda-env-export: ## export the conda environment install: install-all ## alias for 'install-all' target .PHONY: install-run -install-run: conda-install install-sys install-pkg install-raw ## install requirements and application to run locally +install-run: conda-install install-sys install-pkg install-raw install-dev install-transform ## install requirements and application to run locally .PHONY: install-all -install-all: conda-install install-sys install-pkg install-pip install-dev ## install application with all dependencies +install-all: conda-install install-sys install-pkg install-pip install-dev install-transform ## install application with all dependencies .PHONY: install-doc install-doc: install-pip ## install documentation dependencies @@ -275,7 +276,7 @@ install-raw: ## install without any requirements or dependencies (suppose everyt install-npm: ## install npm package manager and dependencies if they cannot be found @[ -f "$(shell which npm)" ] || ( \ echo "Binary package manager npm not found. Attempting to install it."; \ - apt-get install npm \ + $(SUDO) apt-get install npm \ ) .PHONY: install-npm-stylelint @@ -292,6 +293,16 @@ install-npm-remarklint: install-npm ## install remark-lint dependency for 'chec npm install --save-dev \ ) +.PHONY: install-transform +install-transform: install-cairo-dependencies # install-transform dependencies + +.PHONY: install-cairo-dependencies +install-cairo-dependencies: ## install required dependencies for Transformer + @[ -f "$(shell which cairo)" ] || ( \ + echo "Binary package manager cairo not found. Attempting to install it."; \ + $(SUDO) apt-get install libpangocairo-1.0-0 \ + ) + .PHONY: install-dev-npm install-dev-npm: install-npm install-npm-remarklint install-npm-remarklint ## install all npm development dependencies diff --git a/docker/Dockerfile-base b/docker/Dockerfile-base index 0c5b0b69d..c9e4bd260 100644 --- a/docker/Dockerfile-base +++ b/docker/Dockerfile-base @@ -23,6 +23,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ g++ \ git \ nodejs \ + libpangocairo-1.0-0 \ && pip install --no-cache-dir --upgrade -r requirements-sys.txt \ && pip install --no-cache-dir -r requirements.txt \ && pip install --no-cache-dir -e ${APP_DIR} \ diff --git a/requirements.txt b/requirements.txt index 67cd2c17a..e1831ecd3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,6 +11,7 @@ boto3-stubs[s3] # https://github.com/celery/billiard/issues/313 billiard>2; sys_platform != "win32" # avoid issue with use_2to3 billiard>3.2,<3.4; sys_platform == "win32" +cairosvg # pymongo>=4 breaks for some kombu combinations corresponding to pinned Celery # - https://github.com/crim-ca/weaver/issues/386 # - https://github.com/celery/kombu/pull/1536 @@ -52,6 +53,7 @@ duration esgf-compute-api @ git+https://github.com/ESGF/esgf-compute-api.git@v2.3.7 # invalid 'zarr' requirement in 'geotiff' dependencies required by 'pywps' fail to install # (https://github.com/KipCrossing/geotiff/pull/59) +fpdf geotiff>=0.2.8 # gunicorn >20 breaks some config.ini loading parameters (paste) # use pserve to continue supporting config.ini with paste settings @@ -60,6 +62,7 @@ gunicorn>=22 # even more reduced dependency constraints (https://github.com/vinitkumar/json2xml/pull/195) json2xml==4.1.0 jsonschema>=3.0.1 + # FIXME: kombu for pymongo>=4 not yet released as 5.3.0 (only pre-releases available) # - https://github.com/crim-ca/weaver/issues/386 # - https://github.com/celery/kombu/pull/1536 @@ -70,12 +73,15 @@ mako # force use of later mistune (https://github.com/common-workflow-language/schema_salad/pull/619#issuecomment-1346025607) # employed by cwltool -> schema-salad -> mistune #mistune>=2.0.3,<2.1 +multipagetiff mypy_boto3_s3 numpy>=1.22.2 # esgf-compute-api (cwt) needs oauthlib but doesn't add it in their requirements oauthlib owslib==0.32.1 PasteDeploy>=3.1.0; python_version >= "3.12" +pandas +Pillow pint psutil # notes: https://github.com/geopython/pygeofilter @@ -103,9 +109,11 @@ pystac pystac_client python-box python-dateutil +python-magic pytz pywps==4.6.0 pyyaml>=5.2 +rasterio rdflib>=5 # pyup: ignore requests>=2.32.2 requests_file diff --git a/tests/functional/test_cli.py b/tests/functional/test_cli.py index 17ac8d13d..8e334e421 100644 --- a/tests/functional/test_cli.py +++ b/tests/functional/test_cli.py @@ -475,7 +475,11 @@ def test_describe(self): for out_fmt in output_formats: out_fmt.pop("$schema", None) out_fmt.pop("$id", None) - assert output_formats == [{"default": True, "mediaType": ContentType.TEXT_PLAIN}] + assert output_formats == [ + {"default": True, "mediaType": ContentType.TEXT_PLAIN}, + {"mediaType": ContentType.TEXT_HTML}, + {"mediaType": ContentType.APP_PDF} + ] assert "undefined" not in result.message, "CLI should not have confused process description as response detail." assert result.body["description"] == ( "Dummy process that simply echo's back the input message for testing purposes." @@ -1493,6 +1497,13 @@ def test_deploy_payload_process_info_merged(self): out_cwl_fmt = {"default": False, "mediaType": io_fmt} out_oas_fmt = {"default": True, "mediaType": ContentType.APP_JSON} out_any_fmt = [out_cwl_fmt, out_oas_fmt] + # Alternative format added in process description + out_alt_fmt = [ + {"mediaType": ContentType.TEXT_CSV}, + {"mediaType": ContentType.APP_XML}, + {"mediaType": ContentType.APP_YAML}, + ] + out_any_fmt.extend(out_alt_fmt) # ignore schema specifications for comparison only of contents for field in ["$id", "$schema"]: in_schema.pop(field, None) diff --git a/tests/functional/test_wps_package.py b/tests/functional/test_wps_package.py index 342492792..d3b4b196f 100644 --- a/tests/functional/test_wps_package.py +++ b/tests/functional/test_wps_package.py @@ -569,6 +569,14 @@ def test_deploy_process_io_no_format_default(self): expect_outputs["file"]["formats"][0]["default"] = False expect_outputs["file"]["formats"][1]["default"] = True expect_outputs["file"]["formats"][2]["default"] = False + # Alternate type added automatically in offering. + alternative_formats = [ + {"mediaType": ContentType.IMAGE_GIF}, + {"mediaType": ContentType.IMAGE_TIFF}, + {"mediaType": ContentType.IMAGE_SVG_XML}, + {"mediaType": ContentType.APP_PDF} + ] + expect_outputs["file"]["formats"].extend(alternative_formats) expect_outputs["file"]["schema"] = { "oneOf": [ {"type": "string", "format": "binary", @@ -1510,14 +1518,22 @@ def test_deploy_merge_complex_io_with_multiple_formats_and_defaults(self): # assert "default" not in format_spec assert proc["outputs"][0]["id"] == "single_value_single_format" - assert len(proc["outputs"][0]["formats"]) == 1 + assert len(proc["outputs"][0]["formats"]) == 4 # Alternative format added in process assert proc["outputs"][0]["formats"][0]["mediaType"] == ContentType.APP_JSON assert proc["outputs"][0]["formats"][0]["default"] is True + assert proc["outputs"][0]["formats"][1]["mediaType"] == ContentType.TEXT_CSV + assert proc["outputs"][0]["formats"][2]["mediaType"] == ContentType.APP_XML + assert proc["outputs"][0]["formats"][3]["mediaType"] == ContentType.APP_YAML assert proc["outputs"][1]["id"] == "single_value_multi_format" - assert len(proc["outputs"][1]["formats"]) == 3 + assert len(proc["outputs"][1]["formats"]) == 8 # Alternative format added in process assert proc["outputs"][1]["formats"][0]["mediaType"] == ContentType.APP_JSON assert proc["outputs"][1]["formats"][1]["mediaType"] == ContentType.TEXT_PLAIN assert proc["outputs"][1]["formats"][2]["mediaType"] == ContentType.APP_NETCDF + assert proc["outputs"][1]["formats"][3]["mediaType"] == ContentType.TEXT_CSV + assert proc["outputs"][1]["formats"][4]["mediaType"] == ContentType.APP_XML + assert proc["outputs"][1]["formats"][5]["mediaType"] == ContentType.APP_YAML + assert proc["outputs"][1]["formats"][6]["mediaType"] == ContentType.TEXT_HTML + assert proc["outputs"][1]["formats"][7]["mediaType"] == ContentType.APP_PDF assert proc["outputs"][1]["formats"][0]["default"] is True # mandatory assert proc["outputs"][1]["formats"][1].get("default", False) is False # omission is allowed assert proc["outputs"][1]["formats"][2].get("default", False) is False # omission is allowed @@ -1526,10 +1542,15 @@ def test_deploy_merge_complex_io_with_multiple_formats_and_defaults(self): assert proc["outputs"][2]["formats"][0]["mediaType"] == ContentType.APP_NETCDF assert proc["outputs"][2]["formats"][0]["default"] is True assert proc["outputs"][3]["id"] == "multi_value_multi_format" - assert len(proc["outputs"][3]["formats"]) == 3 + assert len(proc["outputs"][3]["formats"]) == 8 assert proc["outputs"][3]["formats"][0]["mediaType"] == ContentType.APP_NETCDF assert proc["outputs"][3]["formats"][1]["mediaType"] == ContentType.TEXT_PLAIN assert proc["outputs"][3]["formats"][2]["mediaType"] == ContentType.APP_JSON + assert proc["outputs"][3]["formats"][3]["mediaType"] == ContentType.TEXT_HTML + assert proc["outputs"][3]["formats"][4]["mediaType"] == ContentType.APP_PDF + assert proc["outputs"][3]["formats"][5]["mediaType"] == ContentType.TEXT_CSV + assert proc["outputs"][3]["formats"][6]["mediaType"] == ContentType.APP_XML + assert proc["outputs"][3]["formats"][7]["mediaType"] == ContentType.APP_YAML assert proc["outputs"][3]["formats"][0]["default"] is True # mandatory assert proc["outputs"][3]["formats"][1].get("default", False) is False # omission is allowed assert proc["outputs"][3]["formats"][2].get("default", False) is False # omission is allowed @@ -3301,10 +3322,12 @@ def test_deploy_merge_complex_io_from_package(self): assert "minOccurs" not in proc["outputs"][0] assert "maxOccurs" not in proc["outputs"][0] assert isinstance(proc["outputs"][0]["formats"], list) - assert len(proc["outputs"][0]["formats"]) == 1 + assert len(proc["outputs"][0]["formats"]) == 3 assert isinstance(proc["outputs"][0]["formats"][0], dict) assert proc["outputs"][0]["formats"][0]["mediaType"] == ContentType.TEXT_PLAIN assert proc["outputs"][0]["formats"][0]["default"] is True + assert proc["outputs"][0]["formats"][1]["mediaType"] == ContentType.TEXT_HTML + assert proc["outputs"][0]["formats"][2]["mediaType"] == ContentType.APP_PDF expect = KNOWN_PROCESS_DESCRIPTION_FIELDS fields = set(proc.keys()) - expect assert len(fields) == 0, f"Unexpected fields found:\n Unknown: {fields}\n Expected: {expect}" @@ -3404,15 +3427,23 @@ def test_deploy_merge_complex_io_from_package_and_offering(self): assert isinstance(proc["outputs"], list) assert len(proc["outputs"]) == 2 assert proc["outputs"][0]["id"] == "complex_output_only_cwl_minimal" - assert len(proc["outputs"][0]["formats"]) == 1, \ - "Default format should be added to process definition when omitted from both CWL and WPS" + assert len(proc["outputs"][0]["formats"]) == 3, ( + "Default format and alternate formats should be added " + "to process definition when omitted from both CWL and WPS" + ) assert proc["outputs"][0]["formats"][0]["mediaType"] == ContentType.TEXT_PLAIN assert proc["outputs"][0]["formats"][0]["default"] is True + assert proc["outputs"][0]["formats"][1]["mediaType"] == ContentType.TEXT_HTML + assert proc["outputs"][0]["formats"][2]["mediaType"] == ContentType.APP_PDF assert proc["outputs"][1]["id"] == "complex_output_both_cwl_and_wps" - assert len(proc["outputs"][1]["formats"]) == 1, \ - "Default format should be added to process definition when omitted from both CWL and WPS" + assert len(proc["outputs"][1]["formats"]) == 3, ( + "Default format and alternate formats should be added " + "to process definition when omitted from both CWL and WPS" + ) assert proc["outputs"][1]["formats"][0]["mediaType"] == ContentType.TEXT_PLAIN assert proc["outputs"][1]["formats"][0]["default"] is True + assert proc["outputs"][1]["formats"][1]["mediaType"] == ContentType.TEXT_HTML + assert proc["outputs"][1]["formats"][2]["mediaType"] == ContentType.APP_PDF assert proc["outputs"][1]["title"] == "Additional detail only within WPS output", \ "Additional details defined only in WPS matching CWL I/O by ID should be preserved" @@ -3530,9 +3561,11 @@ def test_deploy_literal_and_complex_io_from_wps_xml_reference(self): assert proc["outputs"][1]["description"] == "Collected logs during process run." assert "minOccurs" not in proc["outputs"][1] assert "maxOccurs" not in proc["outputs"][1] - assert len(proc["outputs"][1]["formats"]) == 1 + assert len(proc["outputs"][1]["formats"]) == 3 assert proc["outputs"][1]["formats"][0]["default"] is True assert proc["outputs"][1]["formats"][0]["mediaType"] == ContentType.TEXT_PLAIN + assert proc["outputs"][1]["formats"][1]["mediaType"] == ContentType.TEXT_HTML + assert proc["outputs"][1]["formats"][2]["mediaType"] == ContentType.APP_PDF def test_deploy_enum_array_and_multi_format_inputs_from_wps_xml_reference(self): body = { @@ -3992,7 +4025,7 @@ def test_execute_single_output_prefer_header_return_minimal_literal_accept_defau path = f"/processes/{p_id}/execution" resp = mocked_sub_requests(self.app, "post_json", path, timeout=5, data=exec_content, headers=exec_headers, only_local=True) - assert resp.status_code == 200, f"Failed with: [{resp.status_code}]\nReason:\n{resp.json}" + assert resp.status_code == 200, f"Failed with: [{resp.status_code}]\nReason:\n{resp.text}" assert "Preference-Applied" in resp.headers assert resp.headers["Preference-Applied"] == prefer_header.replace(",", ";") @@ -4377,8 +4410,9 @@ def test_execute_single_output_response_raw_reference_literal(self): assert results.content_type is None assert results.headers["Content-Location"] == results_href assert ("Link", output_data_link) in results.headerlist + rel_pattern = re.compile(r"rel=\"?([^\"]+)\"?") assert not any( - any(out_id in link[-1] for out_id in ["output_json", "output_text"]) + any(out_id in rel_pattern.search(link[1]).group(1) for out_id in ["output_json", "output_text"]) for link in results.headerlist if link[0] == "Link" ), "Filtered outputs should not be found in results response links." outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT}) @@ -4604,9 +4638,7 @@ def test_execute_single_output_multipart_accept_link(self): }, } - # FIXME: implement (https://github.com/crim-ca/weaver/pull/548) @pytest.mark.oap_part1 - @pytest.mark.xfail(reason="not implemented") def test_execute_single_output_multipart_accept_alt_format(self): """ Validate the returned contents combining an ``Accept`` header as ``multipart`` and a ``format`` in ``outputs``. @@ -4661,23 +4693,25 @@ def test_execute_single_output_multipart_accept_alt_format(self): output_json_as_yaml = yaml.safe_dump({"data": "test"}) results_body = self.fix_result_multipart_indent(f""" --{boundary} + Content-Disposition: attachment; name="output_json"; filename="result.yml" Content-Type: {ContentType.APP_YAML} + Content-Location: {out_url}/{job_id}/output_json/result.yml Content-ID: - Content-Length: 12 + Content-Length: 11 {output_json_as_yaml} --{boundary}-- """) results_text = self.remove_result_multipart_variable(results.text) assert results.content_type.startswith(ContentType.MULTIPART_MIXED) - assert results_text == results_body + for line1, line2 in zip(results_text.splitlines(), results_body.splitlines()): + assert line1 == line2 outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT}) assert outputs.content_type.startswith(ContentType.APP_JSON) assert outputs.json["outputs"] == { - "output_data": "test", "output_json": { - "href": f"{out_url}/{job_id}/output_json/output.yml", - "type": ContentType.APP_YAML, + "href": f"{out_url}/{job_id}/output_json/result.json", + "type": ContentType.APP_JSON, }, } @@ -4687,9 +4721,7 @@ def test_execute_single_output_multipart_accept_alt_format(self): assert result_json.content_type == ContentType.APP_JSON assert result_json.text == "{\"data\":\"test\"}" - # FIXME: implement (https://github.com/crim-ca/weaver/pull/548) @pytest.mark.oap_part1 - @pytest.mark.xfail(reason="not implemented") def test_execute_single_output_response_document_alt_format_yaml(self): proc = "EchoResultsTester" p_id = self.fully_qualified_test_name(proc) @@ -4738,27 +4770,29 @@ def test_execute_single_output_response_document_alt_format_yaml(self): output_json_as_yaml = yaml.safe_dump({"data": "test"}) results_body = self.fix_result_multipart_indent(f""" --{boundary} + Content-Disposition: attachment; name="output_json"; filename="result.yml" Content-Type: {ContentType.APP_YAML} + Content-Location: {out_url}/{job_id}/output_json/result.yml Content-ID: - Content-Length: 12 + Content-Length: 11 {output_json_as_yaml} --{boundary}-- """) results_text = self.remove_result_multipart_variable(results.text) assert results.content_type.startswith(ContentType.MULTIPART_MIXED) - assert results_text == results_body + for line1, line2 in zip(results_text.splitlines(), results_body.splitlines()): + assert line1 == line2 + outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT}) assert outputs.content_type.startswith(ContentType.APP_JSON) assert outputs.json["outputs"] == { - "output_data": "test", "output_json": { - "href": f"{out_url}/{job_id}/output_json/output.yml", - "type": ContentType.APP_YAML, + "href": f"{out_url}/{job_id}/output_json/result.json", + "type": ContentType.APP_JSON, }, } - # FIXME: implement (https://github.com/crim-ca/weaver/pull/548) # validate the results can be obtained with the "real" representation result_json = self.app.get(f"/jobs/{job_id}/results/output_json", headers=self.json_headers) assert result_json.status_code == 200, f"Failed with: [{resp.status_code}]\nReason:\n{resp.text}" @@ -4830,12 +4864,11 @@ def test_execute_single_output_response_document_alt_format_json_raw_literal(sel }, } - # FIXME: add check of direct request of output (https://github.com/crim-ca/weaver/pull/548) # validate the results can be obtained with the "real" representation - # result_json = self.app.get(f"/jobs/{job_id}/results/output_json", headers=self.json_headers) - # assert result_json.status_code == 200, f"Failed with: [{resp.status_code}]\nReason:\n{resp.json}" - # assert result_json.content_type == ContentType.APP_JSON - # assert result_json.json == {"data": "test"} + result_json = self.app.get(f"/jobs/{job_id}/results/output_json", headers=self.json_headers) + assert result_json.status_code == 200, f"Failed with: [{resp.status_code}]\nReason:\n{resp.json}" + assert result_json.content_type == ContentType.APP_JSON + assert result_json.json == {"data": "test"} @pytest.mark.oap_part1 def test_execute_single_output_response_document_default_format_json_special(self): diff --git a/tests/functional/test_wps_provider.py b/tests/functional/test_wps_provider.py index e7a2566d4..5869f6008 100644 --- a/tests/functional/test_wps_provider.py +++ b/tests/functional/test_wps_provider.py @@ -150,9 +150,11 @@ def test_register_describe_execute_ncdump(self, mock_responses): assert "outputs" in body and len(body["outputs"]) == 1 assert "output" in body["outputs"] assert "formats" in body["outputs"]["output"] - assert len(body["outputs"]["output"]["formats"]) == 1 + assert len(body["outputs"]["output"]["formats"]) == 3 assert body["outputs"]["output"]["formats"][0]["default"] is True assert body["outputs"]["output"]["formats"][0]["mediaType"] == ContentType.TEXT_PLAIN + assert body["outputs"]["output"]["formats"][1]["mediaType"] == ContentType.TEXT_HTML + assert body["outputs"]["output"]["formats"][2]["mediaType"] == ContentType.APP_PDF assert "literalDataDomains" not in body["outputs"]["output"] assert body["processDescriptionURL"] == proc_desc_url diff --git a/tests/resources/__init__.py b/tests/resources/__init__.py index db9b94e7f..c2f41f123 100644 --- a/tests/resources/__init__.py +++ b/tests/resources/__init__.py @@ -10,6 +10,7 @@ from weaver.typedefs import JSON RESOURCES_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "")) +TRANSFORM_PATH = os.path.join(RESOURCES_PATH, "transform") EXAMPLES_PATH = os.path.join(WEAVER_MODULE_DIR, "wps_restapi/examples") FUNCTIONAL_APP_PKG = os.path.abspath(os.path.join(RESOURCES_PATH, "../functional/application-packages")) diff --git a/tests/resources/transform/basic.yaml b/tests/resources/transform/basic.yaml new file mode 100644 index 000000000..e50c17cf0 --- /dev/null +++ b/tests/resources/transform/basic.yaml @@ -0,0 +1,5 @@ +name: John Doe +age: 30 +address: + street: 123 Main St + city: Anytown diff --git a/tests/resources/transform/dubai.tif b/tests/resources/transform/dubai.tif new file mode 100644 index 000000000..148fc3016 Binary files /dev/null and b/tests/resources/transform/dubai.tif differ diff --git a/tests/resources/transform/logo_crim.png b/tests/resources/transform/logo_crim.png new file mode 100644 index 000000000..d42f02881 Binary files /dev/null and b/tests/resources/transform/logo_crim.png differ diff --git a/tests/resources/transform/multi.tif b/tests/resources/transform/multi.tif new file mode 100644 index 000000000..953e65a2c Binary files /dev/null and b/tests/resources/transform/multi.tif differ diff --git a/tests/resources/transform/test-books.xml b/tests/resources/transform/test-books.xml new file mode 100644 index 000000000..3f5bc0ba2 --- /dev/null +++ b/tests/resources/transform/test-books.xml @@ -0,0 +1,18 @@ + + + + Introduction to XML + John Doe + 29.99 + + + Learning Python + Jane Smith + 39.99 + + + Data Science Fundamentals + Emily Johnson + 49.99 + + diff --git a/tests/resources/transform/test.csv b/tests/resources/transform/test.csv new file mode 100644 index 000000000..58d74db12 --- /dev/null +++ b/tests/resources/transform/test.csv @@ -0,0 +1,3 @@ +title,value +abc,123 +def,456 \ No newline at end of file diff --git a/tests/resources/transform/test.json b/tests/resources/transform/test.json new file mode 100644 index 000000000..210cb0e75 --- /dev/null +++ b/tests/resources/transform/test.json @@ -0,0 +1,3 @@ +{ + "test": ["test"] +} diff --git a/tests/resources/transform/test_circle.svg b/tests/resources/transform/test_circle.svg new file mode 100644 index 000000000..79a5cef14 --- /dev/null +++ b/tests/resources/transform/test_circle.svg @@ -0,0 +1,4 @@ + + + + diff --git a/tests/resources/transform/text.txt b/tests/resources/transform/text.txt new file mode 100644 index 000000000..08e00ed29 --- /dev/null +++ b/tests/resources/transform/text.txt @@ -0,0 +1 @@ +Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. \ No newline at end of file diff --git a/tests/test_datatype.py b/tests/test_datatype.py index 30b86b78f..9028b8de2 100644 --- a/tests/test_datatype.py +++ b/tests/test_datatype.py @@ -1,11 +1,14 @@ +import contextlib import uuid from copy import deepcopy from datetime import datetime, timedelta +import mock import pytest from visibility import Visibility from tests import resources +from tests.utils import setup_mongodb_processstore from weaver.datatype import Authentication, AuthenticationTypes, DockerAuthentication, Job, Process, Service from weaver.execute import ExecuteControlOption, ExecuteMode, ExecuteResponse, ExecuteReturnPreference from weaver.formats import ContentType @@ -215,6 +218,49 @@ def test_process_split_version(process_id, result): assert Process.split_version(process_id) == result +def test_process_outputs_alt(): + + from weaver.processes.utils import get_settings as real_get_settings + setup_mongodb_processstore() + + def _get_mocked(req=None): + return req.registry.settings if req else real_get_settings(None) + + # mock db functions called by offering + with contextlib.ExitStack() as stack: + + stack.enter_context(mock.patch("weaver.processes.utils.get_settings", side_effect=_get_mocked)) + + process = Process(id=f"test-{uuid.uuid4()!s}", package={}, + outputs=[{"identifier": "output1", "formats": [{"mediaType": ContentType.IMAGE_TIFF}]}], + inputs=[{"identifier": "input_1", "formats": [{"mediaType": ContentType.APP_ZIP}]}]) + offer = process.offering() + + # Assert that process outputs in offering contains alternate representation + assert offer["outputs"]["output1"]["formats"] == [ + { + "mediaType": ContentType.IMAGE_TIFF + }, + { + "mediaType": ContentType.IMAGE_PNG + }, + { + "mediaType": ContentType.IMAGE_GIF + }, + { + "mediaType": ContentType.IMAGE_JPEG + }, + { + "mediaType": ContentType.IMAGE_SVG_XML + }, + { + "mediaType": ContentType.APP_PDF + }] + + # Assert that process outputs are unchanged + assert process.outputs[0]["formats"] == [{"mediaType": ContentType.IMAGE_TIFF}] + + @pytest.mark.parametrize( ["attribute", "value", "result"], [ diff --git a/tests/transform/__init__.py b/tests/transform/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/transform/test_tif.py b/tests/transform/test_tif.py new file mode 100644 index 000000000..84c929e9d --- /dev/null +++ b/tests/transform/test_tif.py @@ -0,0 +1,103 @@ +from unittest.mock import MagicMock, patch + +import numpy as np +from PIL import Image + +from weaver.transform.tiff import Tiff, normalize_band + + +@patch("rasterio.open", autospec=True) +@patch("multipagetiff.read_stack", autospec=True) +def test_tiff_init(mock_read_stack, mock_rasterio_open): + mock_dataset = MagicMock() + mock_dataset.crs = None # Ensure it is NOT a GeoTIFF + mock_rasterio_open.return_value = mock_dataset + + dummy_stack = np.random.randint(0, 255, (2, 2, 3), dtype=np.uint8) + mock_read_stack.return_value = dummy_stack + + tiff = Tiff("dummy.tiff") + + assert tiff.file_path == "dummy.tiff" + assert tiff.is_geotiff is False, f"Expected False but got {tiff.is_geotiff}" + assert isinstance(tiff.images, np.ndarray) + assert tiff.images.shape == (2, 2, 3) + + +def test_normalize_band(): + image_band = np.array([[10, 20], [30, 40]], dtype=np.float32) + normalized = normalize_band(image_band) + assert normalized.min() == 0.0 + assert normalized.max() == 1.0 + assert np.allclose(normalized, np.array([[0.0, 0.3333], [0.6667, 1.0]], dtype=np.float32), atol=1e-4) + + +@patch("rasterio.open", autospec=True) +@patch("multipagetiff.read_stack", autospec=True) +def test_get_images_non_geotiff(mock_read_stack, mock_rasterio_open): + mock_dataset = MagicMock() + mock_dataset.crs = None # Ensure it is NOT a GeoTIFF + mock_rasterio_open.return_value = mock_dataset + + dummy_stack = np.random.randint(0, 255, (2, 2, 3, 2), dtype=np.uint8) + + mock_images = MagicMock() + mock_images.pages = dummy_stack + + mock_read_stack.return_value = mock_images + + tiff = Tiff("dummy_multipage.tiff") + images = tiff.get_images() + + assert len(images) == 2, f"Expected 2 images but got {len(images)}" + assert isinstance(images[0], Image.Image), "First item is not an Image object" + assert images[0].size == (3, 2), f"First image size is not (3, 2): {images[0].size}" + assert images[1].size == (3, 2), f"Second image size is not (3, 2): {images[1].size}" + + +@patch("rasterio.open") +def test_tiff_geotiff(mock_rasterio_open): + mock_dataset = MagicMock() + mock_dataset.crs = "EPSG:4326" + mock_dataset.count = 3 + mock_dataset.width = 100 + mock_dataset.height = 100 + mock_dataset.indexes = [1, 2, 3] + mock_dataset.dtypes = ["uint8", "uint8", "uint8"] + mock_dataset.transform.__mul__.side_effect = lambda x: (x, x) + mock_rasterio_open.return_value = mock_dataset + + tiff = Tiff("dummy_geotiff.tif") + assert tiff.is_geotiff is True + assert tiff.nb_bands == 3 + assert tiff.width == 100 + assert tiff.height == 100 + assert tiff.bands == {1: "uint8", 2: "uint8", 3: "uint8"} + + +@patch("rasterio.open") +def test_get_band(mock_rasterio_open): + mock_dataset = MagicMock() + mock_dataset.count = 3 + mock_dataset.read.return_value = np.array([[1, 2], [3, 4]]) + mock_rasterio_open.return_value = mock_dataset + + tiff = Tiff("dummy.tiff") + band = tiff.get_band(1) + assert band is not None + assert band.shape == (2, 2) + assert (band == np.array([[1, 2], [3, 4]])).all() + + +@patch("rasterio.open") +def test_get_images(mock_rasterio_open): + mock_dataset = MagicMock() + mock_dataset.count = 3 + mock_dataset.read.side_effect = [np.ones((2, 2)), np.ones((2, 2)) * 2, np.ones((2, 2)) * 3] + mock_rasterio_open.return_value = mock_dataset + + tiff = Tiff("dummy.tiff") + images = tiff.get_images() + assert isinstance(images, list) + assert isinstance(images[0], Image.Image) + assert images[0].size == (2, 2) diff --git a/tests/transform/test_transform.py b/tests/transform/test_transform.py new file mode 100644 index 000000000..fc1b28d11 --- /dev/null +++ b/tests/transform/test_transform.py @@ -0,0 +1,41 @@ +import mimetypes +import os +import shutil +import tempfile + +import pytest +from pyramid.response import FileResponse + +from tests.resources import TRANSFORM_PATH +from weaver.transform.transform import CONVERSION_DICT, Transform + +# Register the MIME type for .yaml files +mimetypes.add_type("application/x-yaml", ".yaml") + + +def using_mimes(func): + def wrapper(*args, **kwargs): + cmt = mimetypes.guess_type(args[0])[0] + if cmt in CONVERSION_DICT: + for wmt in CONVERSION_DICT[cmt]: + func(args[0], cmt, wmt) + + return wrapper + + +@using_mimes +def transform(f, cmt="", wmt=""): + with tempfile.TemporaryDirectory() as tmp_path: + shutil.copy(f, os.path.join(tmp_path, os.path.basename(f))) + f = os.path.join(tmp_path, os.path.basename(f)) + trans = Transform(file_path=f, current_media_type=cmt, wanted_media_type=wmt) + assert isinstance(trans.get(), FileResponse), f"{cmt} -> {wmt}" + print(f"{cmt} -> {wmt} passed") + return trans.output_path + + +@pytest.mark.parametrize("file_name", [f for f in os.listdir(TRANSFORM_PATH) + if os.path.isfile(os.path.join(TRANSFORM_PATH, f))]) +def test_transformations(file_name): + file_path = os.path.join(TRANSFORM_PATH, file_name) + transform(file_path) diff --git a/tests/transform/test_utils.py b/tests/transform/test_utils.py new file mode 100644 index 000000000..9520d5ba1 --- /dev/null +++ b/tests/transform/test_utils.py @@ -0,0 +1,65 @@ +import os +import tarfile +import tempfile + +import pytest +from PIL import Image + +from weaver.transform.utils import write_images + +# pylint: disable=redefined-outer-name + + +@pytest.fixture(scope="module") +def test_sample_images(): + """ + Generate a reusable list of sample images for tests. + """ + images = [] + for color in [(255, 0, 0), (0, 255, 0), (0, 0, 255)]: + img = Image.new("RGB", (10, 10), color) + images.append(img) + return images + + +def test_write_images_multiple_images(test_sample_images): + with tempfile.NamedTemporaryFile(suffix=".tar.gz") as temp_file_multi: + write_images(test_sample_images, temp_file_multi.name) + assert tarfile.is_tarfile(temp_file_multi.name) + with tarfile.open(temp_file_multi.name, "r:gz") as tar: + members = tar.getmembers() + assert len(members) == len(test_sample_images) + for i, member in enumerate(members): + assert member.name == f"{str(i).zfill(4)}.png" + + +def test_write_images_single_image(test_sample_images): + single_image = test_sample_images[:1] + with tempfile.NamedTemporaryFile(suffix=".png") as temp_file_single: + write_images(single_image, temp_file_single.name) + assert os.path.isfile(temp_file_single.name) + assert not tarfile.is_tarfile(temp_file_single.name) + + +def test_write_images_custom_extension(test_sample_images): + with tempfile.NamedTemporaryFile(suffix=".jpg") as temp_file_custom: + output_tar_path = f"{temp_file_custom.name}.tar.gz" + try: + write_images(test_sample_images, temp_file_custom.name, ext="jpg") + assert tarfile.is_tarfile(output_tar_path) + with tarfile.open(output_tar_path, "r:gz") as tar: + members = tar.getmembers() + assert len(members) == len(test_sample_images) + for i, member in enumerate(members): + assert member.name == f"{str(i).zfill(4)}.jpg" + finally: + if os.path.exists(output_tar_path): + os.remove(output_tar_path) + + +def test_write_images_output_file_naming(test_sample_images): + with tempfile.NamedTemporaryFile(suffix=".output") as temp_file_naming: + write_images(test_sample_images, temp_file_naming.name) + expected_output = f"{temp_file_naming.name}.tar.gz" + assert os.path.isfile(expected_output) + assert tarfile.is_tarfile(expected_output) diff --git a/tests/utils.py b/tests/utils.py index 1c9144073..644af404d 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -950,6 +950,7 @@ class FileServer(SimpleHTTPTestServer): This server takes more time to start than usual mocks. Use it sparingly, and consider maintaining a single instance over multiple tests of a complete test suite rather than recreating a server for each test. """ + def __init__(self): # pylint: disable=W0231 self._port = self.get_port() self._uri = f"http://0.0.0.0:{self._port}" diff --git a/tests/wps_restapi/test_jobs.py b/tests/wps_restapi/test_jobs.py index 46b11cec8..5782df9b9 100644 --- a/tests/wps_restapi/test_jobs.py +++ b/tests/wps_restapi/test_jobs.py @@ -461,7 +461,7 @@ def test_get_jobs_links_navigation(self): base_url = self.settings["weaver.url"] jobs_url = base_url + sd.jobs_service.path limit = 2 # expect 11 jobs to be visible, making 6 pages of 2 each (except last that is 1) - last = 5 # zero-based index of last page + last = 5 # zero-based index of last page last_page = f"page={last}" prev_last_page = f"page={last - 1}" limit_kvp = f"limit={limit}" @@ -907,7 +907,7 @@ def test_get_jobs_public_service_no_processes(self): service=self.service_public.name, process=self.process_private.identifier) with contextlib.ExitStack() as stack: - for patch in mocked_remote_wps([]): # process invisible (not returned by remote) + for patch in mocked_remote_wps([]): # process invisible (not returned by remote) stack.enter_context(patch) resp = self.app.get(path, headers=self.json_headers, expect_errors=True) assert resp.status_code == 404 @@ -1378,7 +1378,7 @@ def test_get_job_invalid_uuid(self): """ # to make sure UUID is applied, use the "same format" (8-4-4-4-12), but with invalid definitions base_path = sd.job_service.path.format(job_id="thisisnt-some-real-uuid-allerrordata") - for sub_path in ["", "/inputs", "/outputs", "/results", "/logs", "exceptions"]: + for sub_path in ["", "/inputs", "/outputs", "/results", "/logs", "/exceptions"]: path = f"{base_path}{sub_path}" resp = self.app.get(path, headers=self.json_headers, expect_errors=True) assert resp.status_code == 400 @@ -1565,7 +1565,7 @@ def test_job_results_errors(self): assert resp.status_code == code, case assert resp.json["title"] == title, case assert resp.json["cause"] == cause, case - assert resp.json["type"].endswith(error_type), case # ignore http full reference, not always there + assert resp.json["type"].endswith(error_type), case # ignore http full reference, not always there assert "links" in resp.json def test_jobs_inputs_outputs_validations(self): diff --git a/tests/wps_restapi/test_processes.py b/tests/wps_restapi/test_processes.py index fbcfd5a48..cf6bcdb47 100644 --- a/tests/wps_restapi/test_processes.py +++ b/tests/wps_restapi/test_processes.py @@ -835,8 +835,11 @@ def assert_deployed_wps3(response_json, expected_process_id, assert_io=True): # assert proc["outputs"][0]["minOccurs"] == "1" # assert proc["outputs"][0]["maxOccurs"] == "1" assert isinstance(proc["outputs"][0]["formats"], list) - assert len(proc["outputs"][0]["formats"]) == 1 + assert len(proc["outputs"][0]["formats"]) == 4 # Alternate format representation added assert proc["outputs"][0]["formats"][0]["mediaType"] == ContentType.APP_JSON + assert proc["outputs"][0]["formats"][1]["mediaType"] == ContentType.TEXT_CSV + assert proc["outputs"][0]["formats"][2]["mediaType"] == ContentType.APP_XML + assert proc["outputs"][0]["formats"][3]["mediaType"] == ContentType.APP_YAML def deploy_process_make_visible_and_fetch_deployed(self, deploy_payload, # type: JSON @@ -1002,8 +1005,26 @@ def deploy_process_CWL_direct(self, assert proc["outputs"] == [{ "id": "output", "title": "output", - "schema": {"type": "string", "contentMediaType": "text/plain"}, - "formats": [{"default": True, "mediaType": "text/plain"}] + "schema": { + "oneOf": [ + { + "type": "string", + "contentMediaType": ContentType.TEXT_PLAIN + }, + { + "type": "string", + "contentMediaType": ContentType.TEXT_HTML + }, + { + "type": "string", + "format": "binary", + "contentMediaType": ContentType.APP_PDF, + "contentEncoding": "base64" + } + ] + }, + "formats": [{"default": True, "mediaType": ContentType.TEXT_PLAIN}, {"mediaType": ContentType.TEXT_HTML}, + {"mediaType": ContentType.APP_PDF}] }] return cwl, desc # type: ignore @@ -1101,8 +1122,27 @@ def test_deploy_process_CWL_DockerRequirement_href(self, exec_unit_style): assert proc["outputs"] == [{ "id": "output", "title": "output", - "schema": {"type": "string", "contentMediaType": "text/plain"}, - "formats": [{"default": True, "mediaType": "text/plain"}] + "schema": { + "oneOf": [ + { + "type": "string", + "contentMediaType": ContentType.TEXT_PLAIN + }, + { + "type": "string", + "contentMediaType": ContentType.TEXT_HTML + }, + { + "type": "string", + "format": "binary", + "contentMediaType": ContentType.APP_PDF, + "contentEncoding": "base64" + } + ] + }, + "formats": [{"default": True, "mediaType": ContentType.TEXT_PLAIN}, + {"mediaType": ContentType.TEXT_HTML}, + {"mediaType": ContentType.APP_PDF}] }] def test_deploy_process_CWL_DockerRequirement_owsContext(self): @@ -1141,8 +1181,27 @@ def test_deploy_process_CWL_DockerRequirement_owsContext(self): assert proc["outputs"] == [{ "id": "output", "title": "output", - "schema": {"type": "string", "contentMediaType": "text/plain"}, - "formats": [{"default": True, "mediaType": "text/plain"}] + "schema": { + "oneOf": [ + { + "type": "string", + "contentMediaType": ContentType.TEXT_PLAIN + }, + { + "type": "string", + "contentMediaType": ContentType.TEXT_HTML + }, + { + "type": "string", + "format": "binary", + "contentMediaType": ContentType.APP_PDF, + "contentEncoding": "base64" + } + ] + }, + "formats": [{"default": True, "mediaType": ContentType.TEXT_PLAIN}, + {"mediaType": ContentType.TEXT_HTML}, + {"mediaType": ContentType.APP_PDF}] }] def test_deploy_process_CWL_DockerRequirement_executionUnit(self): @@ -1179,8 +1238,27 @@ def test_deploy_process_CWL_DockerRequirement_executionUnit(self): assert proc["outputs"] == [{ "id": "output", "title": "output", - "schema": {"type": "string", "contentMediaType": "text/plain"}, - "formats": [{"default": True, "mediaType": "text/plain"}] + "schema": { + "oneOf": [ + { + "type": "string", + "contentMediaType": ContentType.TEXT_PLAIN + }, + { + "type": "string", + "contentMediaType": ContentType.TEXT_HTML + }, + { + "type": "string", + "format": "binary", + "contentMediaType": ContentType.APP_PDF, + "contentEncoding": "base64" + } + ] + }, + "formats": [{"default": True, "mediaType": ContentType.TEXT_PLAIN}, + {"mediaType": ContentType.TEXT_HTML}, + {"mediaType": ContentType.APP_PDF}] }] def test_deploy_process_CWL_DockerRequirement_executionUnit_DirectUnit(self): @@ -1217,8 +1295,27 @@ def test_deploy_process_CWL_DockerRequirement_executionUnit_DirectUnit(self): assert proc["outputs"] == [{ "id": "output", "title": "output", - "schema": {"type": "string", "contentMediaType": "text/plain"}, - "formats": [{"default": True, "mediaType": "text/plain"}] + "schema": { + "oneOf": [ + { + "type": "string", + "contentMediaType": ContentType.TEXT_PLAIN + }, + { + "type": "string", + "contentMediaType": ContentType.TEXT_HTML + }, + { + "type": "string", + "format": "binary", + "contentMediaType": ContentType.APP_PDF, + "contentEncoding": "base64" + } + ] + }, + "formats": [{"default": True, "mediaType": ContentType.TEXT_PLAIN}, + {"mediaType": ContentType.TEXT_HTML}, + {"mediaType": ContentType.APP_PDF}] }] def test_deploy_process_CWL_DockerRequirement_executionUnit_UnitWithMediaType(self): @@ -1255,8 +1352,27 @@ def test_deploy_process_CWL_DockerRequirement_executionUnit_UnitWithMediaType(se assert proc["outputs"] == [{ "id": "output", "title": "output", - "schema": {"type": "string", "contentMediaType": "text/plain"}, - "formats": [{"default": True, "mediaType": "text/plain"}] + "schema": { + "oneOf": [ + { + "type": "string", + "contentMediaType": ContentType.TEXT_PLAIN + }, + { + "type": "string", + "contentMediaType": ContentType.TEXT_HTML + }, + { + "type": "string", + "format": "binary", + "contentMediaType": ContentType.APP_PDF, + "contentEncoding": "base64" + } + ] + }, + "formats": [{"default": True, "mediaType": ContentType.TEXT_PLAIN}, + {"mediaType": ContentType.TEXT_HTML}, + {"mediaType": ContentType.APP_PDF}] }] @pytest.mark.usefixtures("assert_cwl_no_warn_unknown_hint") diff --git a/tests/wps_restapi/test_providers.py b/tests/wps_restapi/test_providers.py index f3f1d248d..9d0267f48 100644 --- a/tests/wps_restapi/test_providers.py +++ b/tests/wps_restapi/test_providers.py @@ -593,8 +593,10 @@ def test_get_provider_process_literal_values(self): assert outputs[0]["formats"][1]["default"] is False assert "maximumMegabytes" not in outputs[0]["formats"][1] # never applies, even with OWSLib update assert outputs[1]["id"] == "output_log" - assert len(outputs[1]["formats"]) == 1 + assert len(outputs[1]["formats"]) == 3 assert outputs[1]["formats"][0]["mediaType"] == ContentType.TEXT_PLAIN + assert outputs[1]["formats"][1]["mediaType"] == ContentType.TEXT_HTML + assert outputs[1]["formats"][2]["mediaType"] == ContentType.APP_PDF assert "encoding" not in outputs[1]["formats"][0] assert outputs[1]["formats"][0]["default"] is True assert "maximumMegabytes" not in outputs[1]["formats"][0] # never applies, even with OWSLib update diff --git a/weaver/cli.py b/weaver/cli.py index 547f18a69..e845893c7 100644 --- a/weaver/cli.py +++ b/weaver/cli.py @@ -2165,8 +2165,12 @@ def _download_references(self, outputs, out_links, out_dir, job_id, auth=None): # download links from headers LOGGER.debug("%s outputs in results link headers.", "Processing" if len(out_links) else "No") + downloaded_links = set() for _, link_header in ResponseHeaders(out_links).items(): link = parse_link_header(link_header) + if link["href"] in downloaded_links: + continue + downloaded_links.add(link["href"]) rel = link["rel"].rsplit(".", 1) output = rel[0] is_array = len(rel) > 1 and str.isnumeric(rel[1]) @@ -2847,6 +2851,7 @@ class ValidateAuthHandlerAction(argparse.Action): """ Action that will validate that the input argument references an authentication handler that can be resolved. """ + def __call__(self, parser, namespace, auth_handler_ref, option_string=None): # type: (argparse.ArgumentParser, argparse.Namespace, Optional[str], Optional[str]) -> None """ @@ -2896,6 +2901,7 @@ class ValidateHeaderAction(argparse._AppendAction): # noqa: W0212 Header-Name: Header-Value """ + def __call__(self, parser, namespace, values, option_string=None): # type: (argparse.ArgumentParser, argparse.Namespace, Union[str, Sequence[Any], None], Optional[str]) -> None """ @@ -2930,6 +2936,7 @@ class ValidateNonZeroPositiveNumberAction(argparse.Action): """ Action that will validate that the input argument is a positive number greater than zero. """ + def __call__(self, parser, namespace, values, option_string=None): # type: (argparse.ArgumentParser, argparse.Namespace, Union[str, Sequence[Any], None], Optional[str]) -> None """ diff --git a/weaver/datatype.py b/weaver/datatype.py index 5839559b3..7b5de275d 100644 --- a/weaver/datatype.py +++ b/weaver/datatype.py @@ -61,6 +61,8 @@ from weaver.quotation.status import QuoteStatus from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory, map_status from weaver.store.base import StoreProcesses +from weaver.transform import transform +from weaver.transform.utils import extend_alternate_formats from weaver.utils import localize_datetime # for backward compatibility of previously saved jobs not time-locale-aware from weaver.utils import ( LoggerHandler, @@ -1473,6 +1475,38 @@ def result_path(self, job_id=None, output_id=None, file_name=None): result_job_path = os.path.join(result_job_path, file_name) return result_job_path + def get_all_possible_formats_links(self, url, results): + """ + Get direct links to all outputs in any possible format. + + Args: + url (str): The base URL for constructing links. + results (List[Dict[str, Any]]): A list of result dictionaries containing + "mimeType" and "identifier". + + Returns: + List[Dict[str, str]]: A list of dictionaries representing the links to + all possible output formats. + """ + links = [] + for result in results: + media_type = get_field(result, "mimeType", search_variations=True) + if media_type and media_type not in transform.EXCLUDED_TYPES: + id = get_field(result, "identifier", search_variations=True) + formats = [{"mediaType": media_type}] + extended_formats = extend_alternate_formats(formats, transform.CONVERSION_DICT) + links.extend([ + { + "href": f"{url}/{id}?f={fmt['mediaType']}", + "rel": "output", + "id": id, + "type": fmt["mediaType"], + "title": f"Link to job {id} result in {fmt['mediaType']}" + } + for fmt in extended_formats + ]) + return links + def prov_url(self, container=None, extra_path=None): # type: (Optional[AnySettingsContainer], Optional[ProvenancePathType]) -> str extra_path = str(extra_path or "") @@ -1598,6 +1632,9 @@ def links(self, container=None, self_link=None): {"href": f"{job_url}/prov", "rel": "https://www.w3.org/ns/prov", # unofficial "title": "Job provenance collected following process execution."}, ]) + f_links = self.get_all_possible_formats_links(url=job_url, results=self.results) + if len(f_links) > 0: + job_links.extend(f_links) else: job_links.append({ "href": f"{job_url}/exceptions", "rel": "http://www.opengis.net/def/rel/ogc/1.0/exceptions", @@ -2825,7 +2862,7 @@ def offering(self, schema=ProcessSchema.OGC, request=None): if schema == ProcessSchema.WPS: return self.xml(request) - process = self.dict() + process = copy.deepcopy(self.dict()) links = self.links() process.update({ "deploymentProfile": self.deployment_profile, @@ -2846,6 +2883,10 @@ def offering(self, schema=ProcessSchema.OGC, request=None): # In this situation, the lack of WPS I/O altogether requires to generate OAS from I/O merge/conversion. # Deployment with OAS should have generated this field already to save time or for more precise definitions. for io_def in process[io_type].values(): + if io_type == "outputs": + formats = io_def.get("formats", []) + if formats: + io_def["formats"] = extend_alternate_formats(formats, transform.CONVERSION_DICT) io_schema = get_field(io_def, "schema", search_variations=False) if not isinstance(io_schema, dict): io_def["schema"] = json2oas_io(io_def) diff --git a/weaver/formats.py b/weaver/formats.py index 000e572ef..8e6b09614 100644 --- a/weaver/formats.py +++ b/weaver/formats.py @@ -118,11 +118,13 @@ class ContentType(Constants): IMAGE_GIF = "image/gif" IMAGE_PNG = "image/png" IMAGE_TIFF = "image/tiff" + IMAGE_SVG_XML = "image/svg+xml" MULTIPART_ANY = "multipart/*" MULTIPART_FORM = "multipart/form-data" # data/file upload MULTIPART_MIXED = "multipart/mixed" # content of various types MULTIPART_RELATED = "multipart/related" # content that contain cross-references with Content-ID (CID) TEXT_ENRICHED = "text/enriched" + TEXT_CSV = "text/csv" TEXT_HTML = "text/html" TEXT_PLAIN = "text/plain" TEXT_RICHTEXT = "text/richtext" @@ -456,6 +458,7 @@ class SchemaRole(Constants): ContentType.APP_OCTET_STREAM: ".bin", ContentType.APP_FORM: "", ContentType.MULTIPART_FORM: "", + ContentType.IMAGE_SVG_XML: ".svg", } _CONTENT_TYPE_EXCLUDE = [ ContentType.APP_OCTET_STREAM, @@ -494,9 +497,8 @@ class SchemaRole(Constants): _CONTENT_TYPE_EXT_PATTERN = re.compile(r"^[a-z]+/(x-)?(?P([a-z]+)).*$") _CONTENT_TYPE_LOCALS_MISSING = [ (ctype, _CONTENT_TYPE_EXT_PATTERN.match(ctype)) - for name, ctype in locals().items() - if name.startswith("ContentType.") - and isinstance(ctype, str) + for ctype in ContentType.values() + if isinstance(ctype, str) and ctype not in _CONTENT_TYPE_EXCLUDE and ctype not in _CONTENT_TYPE_FORMAT_MAPPING and ctype not in _CONTENT_TYPE_EXTENSION_MAPPING diff --git a/weaver/transform/__init__.py b/weaver/transform/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/weaver/transform/png2svg.py b/weaver/transform/png2svg.py new file mode 100644 index 000000000..8fcd1feb3 --- /dev/null +++ b/weaver/transform/png2svg.py @@ -0,0 +1,198 @@ +""" +This module provides utilities for converting PNG images to a SVG format. + +Cross-reference: +This work draws inspiration from png2svg.py, available at: +https://github.com/ianmackinnon/png2svg/blob/master/png2svg.py +""" +import operator +from collections import deque +from io import StringIO +from typing import TYPE_CHECKING + +from PIL import Image + +if TYPE_CHECKING: + from typing import List, Optional, Tuple + + +def add_tuple(first_tuple, second_tuple): + # type: (Tuple[int, int], Tuple[int, int]) -> Tuple[int, int] + return tuple(map(operator.add, first_tuple, second_tuple)) + + +def sub_tuple(first_tuple, second_tuple): + # type: (Tuple[int, int], Tuple[int, int]) -> Tuple[int, int] + return tuple(map(operator.sub, first_tuple, second_tuple)) + + +def neg_tuple(first_tuple): + # type: (Tuple[int, int]) -> Tuple[int, int] + return tuple(map(operator.neg, first_tuple)) + + +def direction(edge): + # type: (Tuple[Tuple[int, int], Tuple[int, int]]) -> Tuple[int, int] + return sub_tuple(edge[1], edge[0]) + + +def magnitude(tpl): + # type: (Tuple[int, int]) -> int + return int(pow(pow(tpl[0], 2) + pow(tpl[1], 2), .5)) + + +def normalize(tpl): + # type: (Tuple[int, int]) -> Tuple[int, int] + mag = magnitude(tpl) + assert mag > 0, "Cannot normalize a zero-length vector" + return tuple(map(operator.truediv, tpl, [mag] * len(tpl))) + + +def svg_header(width, height): + # type: (int, int) -> str + return f""" + + +""" + + +def joined_edges(assorted_edges, keep_every_point=False): + # type: (List[Tuple[Tuple[int, int], Tuple[int, int]]], bool) -> List[List[Tuple[Tuple[int, int], Tuple[int, int]]]] + pieces = [] + piece = [] + directions = deque([ + (0, 1), + (1, 0), + (0, -1), + (-1, 0), + ]) + while assorted_edges: + if not piece: + piece.append(assorted_edges.pop()) + current_direction = normalize(direction(piece[-1])) + while current_direction != directions[2]: + directions.rotate() + for i in range(1, 4): + next_end = add_tuple(piece[-1][1], directions[i]) + next_edge = (piece[-1][1], next_end) + if next_edge in assorted_edges: + assorted_edges.remove(next_edge) + if i == 2 and not keep_every_point: + # same direction + piece[-1] = (piece[-1][0], next_edge[1]) + else: + piece.append(next_edge) + if piece[0][0] == piece[-1][1]: + if not keep_every_point and normalize(direction(piece[0])) == normalize(direction(piece[-1])): + piece[-1] = (piece[-1][0], piece.pop(0)[1]) + # same direction + pieces.append(piece) + piece = [] + break + else: + raise Exception("Failed to find connecting edge") + return pieces + + +def rgba_image_to_svg_contiguous(img, opaque=None, keep_every_point=False): + # type: (Image.Image, Optional[bool], bool) -> str + # collect contiguous pixel groups + + adjacent = ((1, 0), (0, 1), (-1, 0), (0, -1)) + visited = Image.new("1", img.size, 0) + + color_pixel_lists = {} + + width, height = img.size + for x in range(width): + for y in range(height): + here = (x, y) + if visited.getpixel(here): + continue + rgba = img.getpixel((x, y)) + if opaque and not rgba[3]: + continue + piece = [] + queue = [here] + visited.putpixel(here, 1) + while queue: + here = queue.pop() + for offset in adjacent: + neighbour = add_tuple(here, offset) + if not 0 <= neighbour[0] < width or not 0 <= neighbour[1] < height: + continue + if visited.getpixel(neighbour): + continue + neighbour_rgba = img.getpixel(neighbour) + if neighbour_rgba != rgba: + continue + queue.append(neighbour) + visited.putpixel(neighbour, 1) + piece.append(here) + + if rgba not in color_pixel_lists: + color_pixel_lists[rgba] = [] + color_pixel_lists[rgba].append(piece) + + del adjacent + del visited + + # calculate clockwise edges of pixel groups + + edges = { + (-1, 0): ((0, 0), (0, 1)), + (0, 1): ((0, 1), (1, 1)), + (1, 0): ((1, 1), (1, 0)), + (0, -1): ((1, 0), (0, 0)), + } + + color_edge_lists = {} + + for rgba, pieces in color_pixel_lists.items(): + for piece_pixel_list in pieces: + edge_set = set([]) + for coord in piece_pixel_list: + for offset, (start_offset, end_offset) in edges.items(): + neighbour = add_tuple(coord, offset) + start = add_tuple(coord, start_offset) + end = add_tuple(coord, end_offset) + edge = (start, end) + if neighbour in piece_pixel_list: + continue + edge_set.add(edge) + if rgba not in color_edge_lists: + color_edge_lists[rgba] = [] + color_edge_lists[rgba].append(edge_set) + + del color_pixel_lists + del edges + + # join edges of pixel groups + + color_joined_pieces = {} + + for color, pieces in color_edge_lists.items(): + color_joined_pieces[color] = [] + for assorted_edges in pieces: + color_joined_pieces[color].append(joined_edges(assorted_edges, keep_every_point)) + + str = StringIO() + str.write(svg_header(*img.size)) + + for color, shapes in color_joined_pieces.items(): + for shape in shapes: + str.write(""" \n""") + + str.write("""\n""") + return str.getvalue() diff --git a/weaver/transform/tiff.py b/weaver/transform/tiff.py new file mode 100644 index 000000000..21ae7c6d6 --- /dev/null +++ b/weaver/transform/tiff.py @@ -0,0 +1,133 @@ +from typing import List, Optional + +import multipagetiff as mtif +import numpy as np +import rasterio +from PIL import Image, UnidentifiedImageError + + +def normalize_band(image_band: np.ndarray) -> np.ndarray: + """ + Normalize a single band of an image to the range [0, 1]. + + :param image_band: The image band to normalize. + :type image_band: np.ndarray + :return: The normalized image band. + :rtype: np.ndarray + """ + band_min, band_max = image_band.min(), image_band.max() # type: ignore # IDE type stub error + return (image_band - band_min) / (band_max - band_min) + + +class Tiff: + """ + A class for working with TIFF files, including GeoTIFFs and multi-page TIFFs. + + :ivar file_path: The file path to the TIFF image. + :vartype file_path: str + :ivar dataset: The rasterio dataset for GeoTIFFs. + :vartype dataset: rasterio.Dataset + :ivar is_geotiff: A flag indicating whether the image is a GeoTIFF. + :vartype is_geotiff: bool + :ivar images: The list of image arrays for multi-page TIFFs. + :vartype images: List[np.ndarray] + :ivar _images: A copy of the list of image arrays. + :vartype _images: List[np.ndarray] + :ivar nb_bands: The number of bands in the GeoTIFF. + :vartype nb_bands: int + :ivar width: The width of the image. + :vartype width: int + :ivar height: The height of the image. + :vartype height: int + :ivar bands: A dictionary of band indexes and their data types. + :vartype bands: dict + :ivar coordinates: The coordinates for the GeoTIFF. + :vartype coordinates: Tuple[Tuple[float, float], Tuple[float, float]] + :ivar crs: The coordinate reference system for the GeoTIFF. + :vartype crs: rasterio.crs.CRS + """ + + def __init__(self, file_path: str): + """ + Initialize the Tiff object with the given file path. + + :param file_path: The file path to the TIFF image. + :type file_path: str + """ + self.file_path = file_path + self.dataset = rasterio.open(self.file_path) + + self.is_geotiff = self.dataset.crs is not None + + if not self.is_geotiff: + try: + self.images = mtif.read_stack(self.file_path) + self._images = self.images.copy() + except Exception as ex: + if isinstance(ex, UnidentifiedImageError): + self.is_geotiff = True + else: + raise + + if self.is_geotiff: + self.nb_bands = self.dataset.count + self.width = self.dataset.width + self.height = self.dataset.height + + self.bands = {i: dtype for i, dtype in zip(self.dataset.indexes, self.dataset.dtypes)} + self.coordinates = (self.dataset.transform * (0, 0), self.dataset.transform * (self.width, self.height)) + + self.crs = self.dataset.crs + + @property + def range(self) -> range: + """ + Get the range of valid band indexes for the TIFF file. + + :return: A range object representing valid band indexes. + :rtype: range + """ + return range(1, self.nb_bands + 1) + + def get_band(self, index: int) -> Optional[np.ndarray]: + """ + Retrieve a specific band of the image by index. + + :param index: The band index to retrieve. + :type index: int + :return: The band as a NumPy array, or None if not found. + :rtype: Optional[np.ndarray] + :raises RuntimeError: If the band index is invalid or data cannot be read. + """ + if index in self.range: + return self.dataset.read(index) + return None + + def get_images(self, red_band: int = 1, green_band: int = 2, blue_band: int = 3) -> List[Image.Image]: + """ + Retrieve RGB images by combining bands from a GeoTIFF or multi-page TIFF. + + :param red_band: The band index for the red channel. + :type red_band: int + :param green_band: The band index for the green channel. + :type green_band: int + :param blue_band: The band index for the blue channel. + :type blue_band: int + :return: A list of PIL Image objects representing the RGB image(s). + :rtype: List[Image.Image] + """ + if self.is_geotiff: + indexes = [i for i in [red_band, green_band, blue_band] if i in self.range] + array = ( + np.dstack([normalize_band(self.get_band(idx)) for idx in indexes]) + * 255 + ).astype(np.uint8) + if len(indexes) < 3: + array = np.squeeze(array, axis=2) + + return [Image.fromarray(array)] + else: + imlist = [] + for page in self.images.pages: + imlist.append(Image.fromarray(page)) + return imlist diff --git a/weaver/transform/transform.py b/weaver/transform/transform.py new file mode 100644 index 000000000..7747e957c --- /dev/null +++ b/weaver/transform/transform.py @@ -0,0 +1,605 @@ +import base64 +import csv +import json +import os.path +import shutil +import tarfile +import tempfile +from typing import List + +import pandas as pd +import xmltodict +import yaml +from bs4 import BeautifulSoup +from cairosvg import svg2png +from celery.utils.log import get_task_logger +from fpdf import FPDF +from json2xml import json2xml +from json2xml.utils import readfromjson +from markupsafe import escape +from PIL import Image +from pyramid.httpexceptions import HTTPUnprocessableEntity +from pyramid.response import FileResponse + +from weaver.formats import ContentType, get_extension +from weaver.transform.png2svg import rgba_image_to_svg_contiguous +from weaver.transform.tiff import Tiff +from weaver.transform.utils import get_content, is_gif, is_image, is_png, is_svg, is_tiff, write_content + +LOGGER = get_task_logger(__name__) + +HTML_CONTENT = """ + +

%CONTENT%

+ """ + +CONVERSION_DICT = { + ContentType.TEXT_PLAIN: [ContentType.TEXT_PLAIN, ContentType.TEXT_HTML, ContentType.APP_PDF], + ContentType.TEXT_HTML: [ContentType.TEXT_PLAIN, ContentType.APP_PDF], + ContentType.IMAGE_PNG: [ContentType.IMAGE_GIF, ContentType.IMAGE_JPEG, ContentType.IMAGE_TIFF, + ContentType.IMAGE_SVG_XML, ContentType.APP_PDF], + ContentType.IMAGE_GIF: [ContentType.IMAGE_PNG, ContentType.IMAGE_JPEG, ContentType.IMAGE_TIFF, + ContentType.IMAGE_SVG_XML, ContentType.APP_PDF], + ContentType.IMAGE_JPEG: [ContentType.IMAGE_PNG, ContentType.IMAGE_GIF, ContentType.IMAGE_TIFF, + ContentType.IMAGE_SVG_XML, ContentType.APP_PDF], + ContentType.IMAGE_TIFF: [ContentType.IMAGE_PNG, ContentType.IMAGE_GIF, ContentType.IMAGE_JPEG, + ContentType.IMAGE_SVG_XML, ContentType.APP_PDF], + ContentType.IMAGE_SVG_XML: [ContentType.IMAGE_PNG, ContentType.IMAGE_GIF, ContentType.IMAGE_JPEG, + ContentType.IMAGE_TIFF, ContentType.APP_PDF], + ContentType.TEXT_CSV: [ContentType.APP_XML, ContentType.APP_YAML, ContentType.APP_JSON], + ContentType.APP_XML: [ContentType.APP_YAML, ContentType.APP_JSON], + ContentType.APP_YAML: [ContentType.TEXT_CSV, ContentType.APP_XML, ContentType.APP_JSON], + ContentType.APP_JSON: [ContentType.TEXT_CSV, ContentType.APP_XML, ContentType.APP_YAML] +} +EXCLUDED_TYPES = {ContentType.APP_RAW_JSON, ContentType.APP_OCTET_STREAM, ContentType.TEXT_PLAIN} + + +def exception_handler(func): + """ + Decorator to handle exceptions in functions and log them. + + :param func: Function to wrap with exception handling. + :return: The wrapped function. + """ + def inner_function(*args, **kwargs): + try: + if "_to_" in func.__name__: + LOGGER.debug(f"{func.__name__} operation: [%s] -> [%s]", os.path.basename(args[0]), + os.path.basename(args[1])) + func(*args, **kwargs) + except Exception: + raise + + return inner_function + + +@exception_handler +def image_to_any(i: str, out: str) -> None: + """ + Converts image files to a specified output format. If no conversion is needed, it copies the file. + + :param i: Input image file path. + :param out: Output file path. + """ + # exit if no transformation needed + if os.path.splitext(i)[1] == os.path.splitext(out)[1]: + if not os.path.exists(out): + shutil.copy(i, out) + return + + if is_tiff(i): + tif = Tiff(i) + return images_to_any(tif.get_images(), out) + + if is_gif(i): + return images_to_any([Image.open(i).convert("RGB")], out) + + if is_svg(i): + png = f"{i}.png" + with open(i, "rb") as svg_file: + svg_data = svg_file.read() + with open(png, "wb") as png_file: + svg2png(svg_data, write_to=png_file) + i = png + + return images_to_any([Image.open(i)], out) + + +def images_to_any(ims: List[Image.Image], out: str) -> None: + """ + Processes a list of images and converts them to the desired format, saving them in the specified output path. + + :param ims: List of Image objects to process. + :param out: Output file path. + """ + ret = [] + with tempfile.TemporaryDirectory() as tmp_path: + _o = os.path.join(tmp_path, str(len(ret)).zfill(4) + get_extension(out)) + for img in ims: + clrs = img.getpixel((0, 0)) + if not isinstance(clrs, tuple): + img = img.convert("RGB") + clrs = img.getpixel((0, 0)) + if is_svg(_o): + width, height = img.size + basewidth = 300 + if max(width, height) > basewidth: + wpercent = basewidth / float(img.size[0]) + hsize = int((float(img.size[1]) * float(wpercent))) + img = img.resize((basewidth, hsize), Image.Resampling.LANCZOS) + if len(clrs) == 3: + img.putalpha(0) + + write_content(_o, rgba_image_to_svg_contiguous(img)) + elif is_image(_o): + if is_png(_o) and len(clrs) == 3: + img.putalpha(0) + img.save(_o) + + if not is_png(_o) and len(clrs) == 4: + img.load() + rbg = Image.new("RGB", img.size, (255, 255, 255)) + rbg.paste(img, mask=img.split()[3]) + rbg.save(_o) + else: + img.save(_o) + else: + raise RuntimeError(f"Unsupported format: {_o}") + ret.append(_o) + + if len(ret) == 1: + shutil.copy(ret[0], out) + else: + if not out.endswith(".tar.gz"): + out += ".tar.gz" + + with tarfile.open(out, "w:gz") as tar: + for file_name in ret: + path = os.path.join(tmp_path, file_name) + tar.add(path, arcname=file_name) + + +@exception_handler +def any_to_html(i: str, out: str) -> None: + """ + Converts any content type (text or image) to HTML format. + + :param i: Input file path. + :param out: Output file path. + """ + try: + if not is_image(i): + content = get_content(i) + # Escape and replace content in HTML + html_content = HTML_CONTENT.replace("%CONTENT%", escape(content)) # Use escape from markupsafe + write_content(out, html_content) + else: + jpg = f"{i}.jpg" + image_to_any(i, jpg) + with open(jpg, "rb") as img_file: + img_data = base64.b64encode(img_file.read()).decode("utf-8") # Base64 encode the image content + write_content(out, HTML_CONTENT.replace( + "%CONTENT%", f"\"Result\"")) + except Exception as err: + print(f"An error occurred: {str(err)}") # Print the error message + raise RuntimeError(f"Error processing file {i}: {str(err)}") + + +@exception_handler +def any_to_pdf(i: str, out: str) -> None: + """ + Converts a file to PDF format. If the file is an image, it is embedded in the PDF, otherwise, it is treated as text. + + :param i: Input file path. + :param out: Output PDF file path. + """ + image = Image.open(i) if is_image(i) and not is_svg(i) else None + new_pdf = FPDF(orientation="P", unit="pt", format="A4") + if image is None: + # If input is not an image, treat it as text + new_pdf.add_page() + new_pdf.set_font("Arial", size=12) + new_pdf.multi_cell(0, 10, txt=get_content(i), align="L") + else: + if is_tiff(i): + tiff = Tiff(i) + ims = tiff.get_images() # For TIFF files with multiple pages + else: + ims = [image.convert("RGB")] + + new_pdf.set_margins(10, 10) + + pdf_width = new_pdf.w - 20 + pdf_height = new_pdf.h - 20 + + for img in ims: + image_w, image_h = img.size + + if image_w > image_h: + new_pdf.add_page(orientation="L") + _w, _h = pdf_height, pdf_width + else: + new_pdf.add_page(orientation="P") + _w, _h = pdf_width, pdf_height + + # Scale image down to fit within the PDF page while keeping aspect ratio + aspect_ratio = image_w / image_h + if image_w > _w: + image_w = _w + image_h = image_w / aspect_ratio + if image_h > _h: + image_h = _h + image_w = image_h * aspect_ratio + + # Center the image on the page + x_offset = (_w - image_w) / 2 + y_offset = (_h - image_h) / 2 + + # Add the image to the PDF + im_path = os.path.join(tempfile.gettempdir(), "temp_image.jpg") + img.save(im_path) # Save image to temp path for FPDF + new_pdf.image(im_path, x=x_offset, y=y_offset, w=image_w, h=image_h) + + new_pdf.output(out, "F") + + +@exception_handler +def csv_to_json(i: str, out: str) -> None: + """ + Converts a CSV file to a JSON file with a 'datas' key containing the rows. + + :param i: Path to the input CSV file. + :param out: Path to the output JSON file. + """ + with open(i, encoding="utf-8") as csvf: + csv_reader = csv.DictReader(csvf) + + for idx, fieldname in enumerate(csv_reader.fieldnames): + if fieldname == "": + csv_reader.fieldnames[idx] = f"unknown_{idx}" + ret = [] + for rows in csv_reader: + ret.append({"data": rows}) + write_content(out, {"datas": ret}) + + +@exception_handler +def csv_to_xml(i: str, out: str) -> None: + """ + Converts a CSV file to an XML file by first converting it to JSON. + + :param i: Path to the input CSV file. + :param out: Path to the output XML file. + """ + file = f"{i}.json" + csv_to_json(i, file) + data = readfromjson(file) + write_content(out, json2xml.Json2xml(data, item_wrap=False).to_xml()) + + +@exception_handler +def json_to_xml(i: str, out: str) -> None: + """ + Converts a JSON file to an XML file. + + :param i: Path to the input JSON file. + :param out: Path to the output XML file. + """ + data = readfromjson(i) + write_content(out, json2xml.Json2xml(data, item_wrap=False).to_xml()) + + +@exception_handler +def json_to_txt(i: str, out: str) -> None: + """ + Converts a JSON file to a text file. + + :param i: Path to the input JSON file. + :param out: Path to the output text file. + """ + with open(i, "r", encoding="utf-8") as file: + data = json.load(file) + with open(out, "w", encoding="utf-8") as txt_file: + json.dump(data, txt_file, indent=4) + + +@exception_handler +def json_to_yaml(i: str, out: str) -> None: + """ + Converts a JSON file to a YAML file. + + :param i: Path to the input JSON file. + :param out: Path to the output YAML file. + """ + with open(i, "r", encoding="utf-8") as file: + configuration = json.load(file) + with open(out, "w", encoding="utf-8") as yaml_file: + yaml.dump(configuration, yaml_file) + + +@exception_handler +def yaml_to_json(i: str, out: str) -> None: + """ + Converts a YAML file to a JSON file. + + :param i: Path to the input YAML file. + :param out: Path to the output JSON file. + """ + with open(i, "r", encoding="utf-8") as file: + configuration = yaml.safe_load(file) + with open(out, "w", encoding="utf-8") as json_file: + json.dump(configuration, json_file) + + +@exception_handler +def json_to_csv(i: str, out: str) -> None: + """ + Converts a JSON file to a CSV file. + + :param i: Path to the input JSON file. + :param out: Path to the output CSV file. + """ + with open(i, encoding="utf-8") as file: + data_file = pd.read_json(file, encoding="utf-8") + data_file.to_csv(out, encoding="utf-8", index=False) + + +@exception_handler +def xml_to_json(i: str, out: str) -> None: + """ + Converts an XML file to a JSON file. + + :param i: Path to the input XML file. + :param out: Path to the output JSON file. + """ + write_content(out, xmltodict.parse(get_content(i))) + + +@exception_handler +def html_to_txt(i: str, out: str) -> None: + """ + Converts an HTML file to a text file. + + :param i: Path to the input HTML file. + :param out: Path to the output text file. + """ + write_content(out, " ".join(BeautifulSoup(get_content(i), "html.parser").stripped_strings)) + + +@exception_handler +def yaml_to_csv(i: str, out: str) -> None: + """ + Converts a YAML file to a CSV file by first converting it to JSON. + + :param i: Path to the input YAML file. + :param out: Path to the output CSV file. + """ + yaml_to_json(i, f"{i}.json") + json_to_csv(f"{i}.json", out) + + +@exception_handler +def yaml_to_xml(i: str, out: str) -> None: + """ + Converts a YAML file to an XML file by first converting it to JSON. + + :param i: Path to the input YAML file. + :param out: Path to the output XML file. + """ + yaml_to_json(i, f"{i}.json") + json_to_xml(f"{i}.json", out) + + +@exception_handler +def xml_to_yaml(i: str, out: str) -> None: + """ + Converts an XML file to a YAML file by first converting it to JSON. + + :param i: Path to the input XML file. + :param out: Path to the output YAML file. + """ + xml_to_json(i, f"{i}.json") + json_to_yaml(f"{i}.json", out) + + +@exception_handler +def csv_to_yaml(i: str, out: str) -> None: + """ + Converts a CSV file to a YAML file by first converting it to JSON. + + :param i: Path to the input CSV file. + :param out: Path to the output YAML file. + """ + csv_to_json(i, f"{i}.json") + json_to_yaml(f"{i}.json", out) + + +class Transform: + """ + Class for handling the transformation of files between different media types (e.g., text, image, application). + + :param file_path: The path to the input file to be transformed. + :param current_media_type: The media type of the input file. + :param wanted_media_type: The desired media type after transformation. + + Attributes: + file_path (str): The path to the input file to be transformed. + current_media_type (str): The media type of the input file. + wanted_media_type (str): The desired media type after transformation. + output_path (str): The path where the transformed file will be saved. + ext (str): The extension of the output file based on the wanted media type. + + Methods: + process(): + Initiates the file transformation process based on the input and output media types. + get(): + Returns a `FileResponse` with the transformed file for download. + """ + + def __init__(self, file_path: str, current_media_type: str, wanted_media_type: str): + """ + Initializes the Transform object with file paths and media types. + + :param file_path: Path to the file to be transformed. + :param current_media_type: The media type of the input file. + :param wanted_media_type: The desired media type for the output file. + """ + self.file_path = file_path + self.cmt = current_media_type.lower() + self.wmt = wanted_media_type.lower() + self.output_path = self.file_path + + self.ext = get_extension(self.wmt) + + if self.cmt != self.wmt: + base_path, _ = os.path.splitext(self.file_path) + self.output_path = base_path + self.ext + if os.path.exists(self.output_path): + try: + os.remove(self.output_path) + except OSError as exc: + LOGGER.warning("Failed to delete [%s] err: %s", os.path.basename(self.output_path), exc) + + def process(self) -> None: + """ + Processes the file based on the current and wanted media types and performs the transformation. + + :raises RuntimeError: If an error occurs during the file transformation process. + """ + try: + if self.output_path != self.file_path: + if "text/" in self.cmt: + self.process_text() + elif "application/" in self.cmt: + self.process_application() + elif "image/" in self.cmt: + self.process_image() + except Exception as e: + raise RuntimeError(f"Error processing file {self.file_path}: {str(e)}") + + def process_text(self) -> None: + """ + Handles the transformation of text-based files (e.g., plain text, HTML, CSV). + + :raises RuntimeError: If a conversion type is unsupported. + """ + if "plain" in self.cmt: + if "html" in self.wmt: + any_to_html(self.file_path, self.output_path) + if "pdf" in self.wmt: + any_to_pdf(self.file_path, self.output_path) + if "html" in self.cmt: + if "plain" in self.wmt: + html_to_txt(self.file_path, self.output_path) + if "csv" in self.cmt: + self.process_csv() + + def process_csv(self) -> None: + """ + Handles the conversion of CSV files to other formats like JSON, XML, and YAML. + + :raises RuntimeError: If a conversion type is unsupported. + """ + if "json" in self.wmt: + csv_to_json(self.file_path, self.output_path) + elif "xml" in self.wmt: + csv_to_xml(self.file_path, self.output_path) + elif "yaml" in self.wmt: + csv_to_yaml(self.file_path, self.output_path) + else: + raise RuntimeError(f"Conversion from CSV to {self.wmt} is not supported.") + + def process_application(self) -> None: + """ + Handles the conversion of application files (e.g., JSON, XML, YAML). + + :raises RuntimeError: If a conversion type is unsupported. + """ + if "json" in self.cmt: + self.process_json() + if "yaml" in self.cmt: + self.process_yaml() + if "xml" in self.cmt: + self.process_xml() + + def process_json(self) -> None: + """ + Handles the transformation of JSON files to other formats like CSV, XML, YAML, and plain text. + + :raises RuntimeError: If a conversion type is unsupported. + """ + if "csv" in self.wmt: + json_to_csv(self.file_path, self.output_path) + elif "xml" in self.wmt: + json_to_xml(self.file_path, self.output_path) + elif "yaml" in self.wmt: + json_to_yaml(self.file_path, self.output_path) + elif "plain" in self.wmt: + json_to_txt(self.file_path, self.output_path) + else: + raise RuntimeError(f"Conversion from JSON to {self.wmt} is not supported.") + + def process_yaml(self) -> None: + """ + Handles the conversion of YAML files to other formats like CSV, JSON, and XML. + + :raises RuntimeError: If a conversion type is unsupported. + """ + if "csv" in self.wmt: + yaml_to_csv(self.file_path, self.output_path) + elif "json" in self.wmt: + yaml_to_json(self.file_path, self.output_path) + elif "xml" in self.wmt: + yaml_to_xml(self.file_path, self.output_path) + else: + raise RuntimeError(f"Conversion from YAML to {self.wmt} is not supported.") + + def process_xml(self) -> None: + """ + Handles the conversion of XML files to JSON or YAML. + + :raises RuntimeError: If a conversion type is unsupported. + """ + if "json" in self.wmt: + xml_to_json(self.file_path, self.output_path) + elif "yaml" in self.wmt: + xml_to_yaml(self.file_path, self.output_path) + else: + raise RuntimeError(f"Conversion from XML to {self.wmt} is not supported.") + + def process_image(self) -> None: + """ + Handles the conversion of image files to other formats (e.g., image to image or image to PDF). + + :raises RuntimeError: If a conversion type is unsupported. + """ + if "image/" in self.wmt: + image_to_any(self.file_path, self.output_path) + if not os.path.exists(self.output_path) and os.path.exists(f"{self.output_path}.tar.gz"): + self.output_path += ".tar.gz" + elif "pdf" in self.wmt: + any_to_pdf(self.file_path, self.output_path) + else: + raise RuntimeError(f"Conversion from img to {self.wmt} is not supported.") + + def get(self) -> FileResponse: + """ + Returns the transformed file as a response for download. + + :returns: The response containing the transformed file. + :raises HTTPUnprocessableEntity: If an error occurs during file transformation. + """ + try: + if not os.path.exists(self.output_path): + self.process() + response = FileResponse(self.output_path) + response.headers["Content-Disposition"] = f"attachment; filename={os.path.basename(self.output_path)}" + return response + except Exception as err: + raise HTTPUnprocessableEntity(json={ + "code": "JobOutputProcessingError", + "description": "An error occurred while treating the output data", + "cause": str(err), + "error": type(err).__name__, + "value": "" + }) diff --git a/weaver/transform/utils.py b/weaver/transform/utils.py new file mode 100644 index 000000000..85d7cd9cd --- /dev/null +++ b/weaver/transform/utils.py @@ -0,0 +1,155 @@ +import json +import os +import shutil +import tarfile +import tempfile +from typing import List, Union + +from celery.utils.log import get_task_logger +from PIL import Image +from processes.convert import get_field +from weaver.formats import ContentType, get_content_type + +LOGGER = get_task_logger(__name__) + + +def is_image(image: str) -> bool: + """ + Check if the file is an image based on its MIME content type. + + :param image: The file name or path. + :return: True if the file is an image, False otherwise. + """ + ext = os.path.splitext(image)[1] + content_type = get_content_type(ext) + return content_type.startswith("image/") + + +def is_svg(image: str) -> bool: + """ + Check if the file is an SVG image based on its MIME content type. + + :param image: The file name or path. + :return: True if the file is SVG, False otherwise. + """ + ext = os.path.splitext(image)[1] + return get_content_type(ext) == ContentType.IMAGE_SVG_XML + + +def is_png(image: str) -> bool: + """ + Check if the file is a PNG image based on its MIME content type. + + :param image: The file name or path. + :return: True if the file is PNG, False otherwise. + """ + ext = os.path.splitext(image)[1] + return get_content_type(ext) == ContentType.IMAGE_PNG + + +def is_tiff(image: str) -> bool: + """ + Check if the file is a TIFF image based on its MIME content type. + + :param image: The file name or path. + :return: True if the file is TIFF, False otherwise. + """ + ext = os.path.splitext(image)[1] + return get_content_type(ext) in { + ContentType.IMAGE_TIFF, + ContentType.IMAGE_GEOTIFF, + ContentType.IMAGE_OGC_GEOTIFF, + ContentType.IMAGE_COG, + } + + +def is_gif(image: str) -> bool: + """ + Check if the file is a GIF image based on its MIME content type. + + :param image: The file name or path. + :return: True if the file is GIF, False otherwise. + """ + ext = os.path.splitext(image)[1] + return get_content_type(ext) == ContentType.IMAGE_GIF + + +def get_content(file_path: str, mode: str = "r") -> str: + """ + Retrieve the content of a file. + + :param file_path: The path to the file. + :param mode: The mode in which to open the file. Defaults to "r". + :return: The content of the file as a string. + """ + with open(file_path, mode, encoding="utf-8") as f: + return f.read() + + +def write_content(file_path: str, content: Union[str, dict]) -> None: + """ + Write content to a file. + + :param file_path: The path to the file. + :param content: The content to write, can be a string or dictionary. + """ + if isinstance(content, dict): + content = json.dumps(content) + + with open(file_path, "w", encoding="utf-8") as f: + f.write(content) + + +def write_images(images: List[Image.Image], output_file: str, ext: str = "png") -> None: + """ + Save a list of images to an archive or single file. + + :param images: A list of images to save. + :param output_file: The output file name or path. + :param ext: The image format (extension). Defaults to "png". + """ + with tempfile.TemporaryDirectory() as tmp_path: + img_paths = [] + for i, img in enumerate(images): + img_path = os.path.join(tmp_path, f"{str(i).zfill(4)}.{ext}") + img.save(img_path) + img_paths.append(img_path) + if len(img_paths) > 1: + if not output_file.endswith(".tar.gz"): + output_file += ".tar.gz" + with tarfile.open(output_file, "w:gz") as tar: + for img_path in img_paths: + tar.add(img_path, arcname=os.path.basename(img_path)) + else: + shutil.copy(img_paths[0], output_file) + + +def extend_alternate_formats(formats, conversion_dict): + """ + Extend a list of formats with missing alternate formats while preserving the original order. + + :param formats: A list of format dictionaries containing the "mediaType" key. + :param conversion_dict: A dictionary mapping media types to their alternate formats. + :return: The extended list of formats with alternate formats added in a consistent order. + """ + if not formats or not all(isinstance(fmt, dict) for fmt in formats): + return formats # No formats or invalid structure, return as-is + + # Extract existing media types while preserving order + existing_media_types = [] + seen = set() + for format_entry in formats: + media_type = get_field(format_entry, "mediaType", search_variations=True) + if media_type and media_type not in seen: + existing_media_types.append(media_type) + seen.add(media_type) + + # Collect missing alternate formats while preserving original order + missing_formats = [] + for media_type in existing_media_types: + for alt_format in conversion_dict.get(media_type, []): + if alt_format not in seen: + missing_formats.append({"mediaType": alt_format}) + seen.add(alt_format) + + return formats + missing_formats diff --git a/weaver/utils.py b/weaver/utils.py index 4c2661244..4cdd84d52 100644 --- a/weaver/utils.py +++ b/weaver/utils.py @@ -475,6 +475,7 @@ class SchemaRefResolver(JsonSchemaRefResolver): """ # only need to override the remote resolution to add YAML support # init overload used to patch invalid typing definition + def __init__(self, base_uri, referrer, *_, **__): # type: (str, OpenAPISchema, *Any, **Any) -> None super(SchemaRefResolver, self).__init__(base_uri, referrer, *_, **__) # type: ignore @@ -1191,6 +1192,16 @@ def get_file_header_datetime(dt): return dt_str +def create_content_id(first_id, second_id): + # type: (AnyUUID, AnyUUID) -> str + """ + Generate a unique content id from passed ids. + + Both ids can be strings or UUIDs. + """ + return f"<{first_id}@{second_id}>" + + def get_href_headers( path, # type: str download_headers=False, # type: bool @@ -1318,9 +1329,8 @@ def get_href_headers( headers = {} if content_headers: - content_id = content_id.strip("<>") if isinstance(content_id, str) else "" if content_id: - headers["Content-ID"] = f"<{content_id}>" + headers["Content-ID"] = content_id if location_headers: headers["Content-Location"] = content_location or href c_type, c_enc = guess_file_contents(href) @@ -1361,6 +1371,7 @@ def make_link_header( type=None, # type: Optional[str] # noqa title=None, # type: Optional[str] charset=None, # type: Optional[str] + **kwargs, # type: Optional[str] ): # type: (...) -> str """ Creates the HTTP Link (:rfc:`8288`) header value from input parameters or a dictionary representation. @@ -1373,13 +1384,16 @@ def make_link_header( Parameter :paramref:`rel` is optional to allow unpacking with a single parameter, but its value is required to form a valid ``Link`` header. """ + params = {} if isinstance(href, dict): - rel = rel or href.get("rel") - type = type or href.get("type") # noqa - title = title or href.get("title") - charset = charset or href.get("charset") # noqa - hreflang = hreflang or href.get("hreflang") - href = href["href"] + rel = rel or href.pop("rel", None) + type = type or href.pop("type", None) # noqa + title = title or href.pop("title", None) + charset = charset or href.pop("charset", None) # noqa + hreflang = hreflang or href.pop("hreflang", None) + params = {key: val for key, val in href.items() if val and isinstance(val, str)} + href = params.pop("href", None) + params.update(kwargs) link = f"<{href}>; rel=\"{rel}\"" if type: link += f"; type=\"{type}\"" @@ -1389,6 +1403,9 @@ def make_link_header( link += f"; title=\"{title}\"" if hreflang: link += f"; hreflang={hreflang}" + if params: + for key, val in params.items(): + link += f"; {key}={val}" return link diff --git a/weaver/wps_restapi/colander_extras.py b/weaver/wps_restapi/colander_extras.py index 524a1032a..59cdc1a5b 100644 --- a/weaver/wps_restapi/colander_extras.py +++ b/weaver/wps_restapi/colander_extras.py @@ -648,6 +648,7 @@ class NoneType(colander.SchemaType): """ Type representing an explicit :term:`JSON` ``null`` value. """ + def serialize(self, node, appstruct): # noqa # type: (colander.SchemaNode, Any) -> Union[None, colander.null, colander.drop] if appstruct in (colander.null, colander.drop): @@ -671,6 +672,7 @@ class AnyType(colander.SchemaType): """ Type representing any :term:`JSON` structure. """ + def serialize(self, node, appstruct): # noqa # type: (colander.SchemaNode, Any) -> Any return appstruct @@ -2634,6 +2636,7 @@ class SchemaRefConverter(TypeConverter): """ Converter that will add :term:`OpenAPI` ``$schema`` and ``$id`` references if they are provided in the schema node. """ + def convert_type(self, schema_node): # type: (colander.SchemaNode) -> OpenAPISchema result = super(SchemaRefConverter, self).convert_type(schema_node) @@ -2650,6 +2653,7 @@ class ExtendedTypeConverter(SchemaRefConverter): """ Base converter with support of `Extended` schema type definitions. """ + def convert_type(self, schema_node): # type: (colander.SchemaNode) -> OpenAPISchema # base type converters expect raw pattern string diff --git a/weaver/wps_restapi/examples/job_output.json b/weaver/wps_restapi/examples/job_output.json new file mode 100644 index 000000000..98d82d177 --- /dev/null +++ b/weaver/wps_restapi/examples/job_output.json @@ -0,0 +1,52 @@ +{ + "outputs": [ + { + "href": "http://schema-example.com/wpsoutputs/14c68477-c3ed-4784-9c0f-a4c9e1344db5/output.txt", + "id": "output" + } + ], + "links": [ + { + "type": "application/json", + "title": "Job status.", + "hreflang": "en-US", + "href": "http://schema-example.com/providers/hummingbird/processes/ncdump/jobs/14c68477-c3ed-4784-9c0f-a4c9e1344db5", + "rel": "status" + }, + { + "type": "application/json", + "title": "Job logs.", + "hreflang": "en-US", + "href": "http://schema-example.com/providers/hummingbird/processes/ncdump/jobs/14c68477-c3ed-4784-9c0f-a4c9e1344db5/logs", + "rel": "logs" + }, + { + "type": "application/json", + "title": "Job inputs.", + "hreflang": "en-US", + "href": "http://schema-example.com/providers/hummingbird/processes/ncdump/jobs/14c68477-c3ed-4784-9c0f-a4c9e1344db5/inputs", + "rel": "inputs" + }, + { + "type": "application/json", + "title": "Job outputs.", + "hreflang": "en-US", + "href": "http://schema-example.com/providers/hummingbird/processes/ncdump/jobs/14c68477-c3ed-4784-9c0f-a4c9e1344db5/outputs", + "rel": "outputs" + }, + { + "type": "application/json", + "title": "Job results.", + "hreflang": "en-US", + "href": "http://schema-example.com/providers/hummingbird/processes/ncdump/jobs/14c68477-c3ed-4784-9c0f-a4c9e1344db5/results", + "rel": "results" + }, + { + "type": "application/json", + "title": "Job outputs.", + "hreflang": "en-US", + "href": "http://schema-example.com/providers/hummingbird/processes/ncdump/jobs/14c68477-c3ed-4784-9c0f-a4c9e1344db5/outputs", + "rel": "self" + } + ] +} diff --git a/weaver/wps_restapi/jobs/jobs.py b/weaver/wps_restapi/jobs/jobs.py index bb5da7c66..72523f1cd 100644 --- a/weaver/wps_restapi/jobs/jobs.py +++ b/weaver/wps_restapi/jobs/jobs.py @@ -6,6 +6,7 @@ from pyramid.httpexceptions import ( HTTPBadRequest, HTTPNoContent, + HTTPNotFound, HTTPOk, HTTPPermanentRedirect, HTTPUnprocessableEntity, @@ -26,7 +27,7 @@ repr_json ) from weaver.processes.constants import JobInputsOutputsSchema, JobStatusSchema -from weaver.processes.convert import convert_input_values_schema, convert_output_params_schema +from weaver.processes.convert import convert_input_values_schema, convert_output_params_schema, get_field from weaver.processes.execution import ( submit_job, submit_job_dispatch_task, @@ -37,14 +38,17 @@ from weaver.processes.wps_package import mask_process_inputs from weaver.status import JOB_STATUS_CATEGORIES, StatusCategory, StatusCompliant, map_status from weaver.store.base import StoreJobs -from weaver.utils import get_header, get_settings, make_link_header +from weaver.transform import transform +from weaver.utils import get_any_value, get_header, get_settings, make_link_header from weaver.wps_restapi import swagger_definitions as sd from weaver.wps_restapi.jobs.utils import ( dismiss_job_task, get_job, get_job_io_schema_query, get_job_list_links, + get_job_output_transmission, get_job_results_response, + get_job_results_single, get_job_status_schema, get_results, raise_job_bad_status_locked, @@ -582,11 +586,97 @@ def get_job_outputs(request): schema = get_job_io_schema_query(request.params.get("schema"), default=JobInputsOutputsSchema.OGC) results, _ = get_results(job, request, schema=schema, link_references=False) outputs = {"outputs": results} - outputs.update({"links": job.links(request, self_link="outputs")}) + links = job.links(request, self_link="outputs") + outputs.update({"links": links}) outputs = sd.JobOutputsBody().deserialize(outputs) return HTTPOk(json=outputs) +@sd.provider_result_value_service.get( + tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROVIDERS], + schema=sd.ProviderResultValueEndpoint(), + response_schemas=sd.get_prov_result_responses +) +@sd.process_result_value_service.get( + tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROCESSES], + schema=sd.ProcessResultValueEndpoint(), + response_schemas=sd.get_proc_result_responses +) +@sd.job_result_value_service.get( + tags=[sd.TAG_JOBS, sd.TAG_RESULTS], + schema=sd.JobResultValueEndpoint(), + response_schemas=sd.get_job_result_responses +) +@sd.provider_output_service.get( + tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROCESSES], + schema=sd.ProviderAnyOutputEndpoint(), + response_schemas=sd.get_prov_result_responses +) +@sd.process_output_service.get( + tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROCESSES], + schema=sd.ProcessAnyOutputEndpoint(), + response_schemas=sd.get_proc_result_responses +) +@sd.job_output_service.get( + tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROCESSES], + schema=sd.JobAnyOutputEndpoint(), + response_schemas=sd.get_job_result_responses +) +@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) +def get_job_output(request): + # type: (PyramidRequest) -> AnyResponseType + """ + Retrieve a specific output from a job execution. + """ + job = get_job(request) + raise_job_dismissed(job, request) + raise_job_bad_status_success(job, request) + settings = get_settings(request) + output_id = request.matchdict.get("output_id") + # Get requested media-type. "*/*" if omit + accept = str(request.accept) if request.accept else "*/*" + headers = request.headers + results = [o for o in job.results if str(o["identifier"]) == output_id] + if results: + result = results[0] + mime_type = get_field(result, "mime_type", search_variations=True, default="") + possible_media_types = transform.CONVERSION_DICT.get(mime_type, []) + possible_media_types.append(mime_type) + else: + raise HTTPNotFound( + json={ + "code": "", + "description": "The requested output Id is not available in the job results.", + "cause": "The output ID is not available", + "error": "", + "value": "" + } + ) + result_media_type = get_field(result, "mimeType", search_variations=True) + result_media_type = guess_target_format(request, default=result_media_type) + + if result_media_type not in possible_media_types: + raise HTTPUnprocessableEntity(json={ + "code": "InvalidMimeTypeRequested", + "description": "The requested output format is not in the possible output formats.", + "cause": "Incompatible mime Types", + "error": "InvalidMimeTypeRequested", + "value": "" + }) + + is_reference = bool(get_any_value(result, key=True, file=True)) + _, output_format = get_job_output_transmission(job, output_id, is_reference) + output_format = accept or output_format or result_media_type + + # To ensure consistency and avoid type mismatches, all output formats are converted to a dictionary. + # This standardization prevents errors when handling cases in `get_job_results_single` + # where some formats are dictionaries and others are different types (e.g., strings or ContentType). + if not isinstance(output_format, dict): + output_format = {"mime_type": output_format} + + return get_job_results_single(job, result, output_id, output_format, headers=headers, settings=settings) + + @sd.provider_results_service.get( tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROVIDERS], schema=sd.ProviderResultsEndpoint(), @@ -799,27 +889,33 @@ def includeme(config): config.add_cornice_service(sd.jobs_service) config.add_cornice_service(sd.job_service) config.add_cornice_service(sd.job_results_service) + config.add_cornice_service(sd.job_result_value_service) config.add_cornice_service(sd.job_outputs_service) + config.add_cornice_service(sd.job_output_service) config.add_cornice_service(sd.job_inputs_service) config.add_cornice_service(sd.job_exceptions_service) config.add_cornice_service(sd.job_logs_service) config.add_cornice_service(sd.job_stats_service) - config.add_cornice_service(sd.process_jobs_service) - config.add_cornice_service(sd.process_job_service) - config.add_cornice_service(sd.process_results_service) - config.add_cornice_service(sd.process_outputs_service) - config.add_cornice_service(sd.process_inputs_service) - config.add_cornice_service(sd.process_exceptions_service) - config.add_cornice_service(sd.process_logs_service) - config.add_cornice_service(sd.process_stats_service) config.add_cornice_service(sd.provider_job_service) config.add_cornice_service(sd.provider_jobs_service) config.add_cornice_service(sd.provider_results_service) + config.add_cornice_service(sd.provider_result_value_service) config.add_cornice_service(sd.provider_outputs_service) + config.add_cornice_service(sd.provider_output_service) config.add_cornice_service(sd.provider_inputs_service) config.add_cornice_service(sd.provider_exceptions_service) config.add_cornice_service(sd.provider_logs_service) config.add_cornice_service(sd.provider_stats_service) + config.add_cornice_service(sd.process_jobs_service) + config.add_cornice_service(sd.process_job_service) + config.add_cornice_service(sd.process_results_service) + config.add_cornice_service(sd.process_result_value_service) + config.add_cornice_service(sd.process_outputs_service) + config.add_cornice_service(sd.process_output_service) + config.add_cornice_service(sd.process_inputs_service) + config.add_cornice_service(sd.process_exceptions_service) + config.add_cornice_service(sd.process_logs_service) + config.add_cornice_service(sd.process_stats_service) # backward compatibility routes (deprecated) config.add_cornice_service(sd.job_result_service) diff --git a/weaver/wps_restapi/jobs/utils.py b/weaver/wps_restapi/jobs/utils.py index 53990f990..5e6fece3e 100644 --- a/weaver/wps_restapi/jobs/utils.py +++ b/weaver/wps_restapi/jobs/utils.py @@ -54,7 +54,9 @@ from weaver.processes.convert import any2wps_literal_datatype, convert_output_params_schema, get_field from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory, map_status from weaver.store.base import StoreJobs, StoreProcesses, StoreServices +from weaver.transform import transform from weaver.utils import ( + create_content_id, data2str, fetch_file, get_any_id, @@ -288,6 +290,7 @@ def get_job_list_links(job_total, filters, request): "href": parent_url, "rel": "up", "type": ContentType.APP_JSON, "title": "Parent collection for which listed jobs apply." }) + return links @@ -396,8 +399,9 @@ def make_result_link( url = headers["Content-Location"] typ = headers["Content-Type"] enc = headers.get("Content-Encoding", None) - link_header = make_link_header(url, rel=result_id, type=typ, charset=enc) - links.append(link_header) + link_header_result = make_link_header(url, rel=result_id, type=typ, charset=enc) + link_header_output = make_link_header(url, rel="output", type=typ, charset=enc, id=result_id) + links.extend([link_header_result, link_header_output]) return links @@ -510,7 +514,7 @@ def get_results( # pylint: disable=R1260 else: output["dataType"] = dtype - if schema == JobInputsOutputsSchema.OGC_STRICT: + if strict: out_fmt = output.pop("format", {}) for fmt_key, fmt_val in out_fmt.items(): output.setdefault(fmt_key, fmt_val) @@ -537,6 +541,32 @@ def get_results( # pylint: disable=R1260 return outputs, headers +def get_job_output_transmission(job, output_id, is_reference): + # type: (Job, str, bool) -> Tuple[AnyExecuteTransmissionMode, Optional[JobValueFormat]] + """ + Obtain the requested :term:`Job` output ``transmissionMode`` and ``format``. + """ + outputs = job.outputs or {} + outputs = convert_output_params_schema(outputs, JobInputsOutputsSchema.OGC) + out = outputs.get(output_id) or {} + out_mode = cast("AnyExecuteTransmissionMode", out.get("transmissionMode")) + out_fmt = cast("JobValueFormat", out.get("format")) + + # raw/representation can change the output transmission mode if they are not overriding it + # document/minimal return is not checked, since it is our default, and will resolve as such anyway + if ( + not out_mode and + job.execution_return == ExecuteReturnPreference.REPRESENTATION and + job.execution_response == ExecuteResponse.RAW + ): + return ExecuteTransmissionMode.VALUE, out_fmt + + # because mode can be omitted, resolve their default explicitly + if not out_mode: + out_mode = ExecuteTransmissionMode.REFERENCE if is_reference else ExecuteTransmissionMode.VALUE + return out_mode, out_fmt + + def get_job_return( job=None, # type: Optional[Job] body=None, # type: Optional[JSON] @@ -565,32 +595,6 @@ def get_job_return( return job.execution_response, job.execution_return -def get_job_output_transmission(job, output_id, is_reference): - # type: (Job, str, bool) -> Tuple[AnyExecuteTransmissionMode, Optional[JobValueFormat]] - """ - Obtain the requested :term:`Job` output ``transmissionMode`` and ``format``. - """ - outputs = job.outputs or {} - outputs = convert_output_params_schema(outputs, JobInputsOutputsSchema.OGC) - out = outputs.get(output_id) or {} - out_mode = cast("AnyExecuteTransmissionMode", out.get("transmissionMode")) - out_fmt = cast("JobValueFormat", out.get("format")) - - # raw/representation can change the output transmission mode if they are not overriding it - # document/minimal return is not checked, since it is our default, and will resolve as such anyway - if ( - not out_mode and - job.execution_return == ExecuteReturnPreference.REPRESENTATION and - job.execution_response == ExecuteResponse.RAW - ): - return ExecuteTransmissionMode.VALUE, out_fmt - - # because mode can be omitted, resolve their default explicitly - if not out_mode: - out_mode = ExecuteTransmissionMode.REFERENCE if is_reference else ExecuteTransmissionMode.VALUE - return out_mode, out_fmt - - def get_job_results_response( job, # type: Job *, # force named keyword arguments after @@ -752,12 +756,12 @@ def get_job_results_response( # https://docs.ogc.org/is/18-062r2/18-062r2.html#req_core_process-execute-sync-raw-value-one res_id = out_vals[0][0] - # FIXME: add transform for requested output format (https://github.com/crim-ca/weaver/pull/548) - # req_fmt = guess_target_format(container) where container=request - # out_fmt (see above) - # out_type = result.get("type") - # out_select = req_fmt or out_fmt or out_type (resolution order/precedence) - out_fmt = None + # check accept header + req_fmt = (request_headers or {}).get("accept") + out_fmt = out_transmissions[res_id][1] + out_type = get_field(results[res_id], "mime_type", search_variations=True, default=None) # a voir en debuggant + out_select = req_fmt or out_fmt or out_type # (resolution order/precedence) + out_fmt = out_select return get_job_results_single(job, out_info, res_id, out_fmt, headers=headers, settings=settings) @@ -767,7 +771,7 @@ def generate_or_resolve_result( result_id, # type: str output_id, # type: str output_mode, # type: AnyExecuteTransmissionMode - output_format, # type: Optional[JobValueFormat] # FIXME: implement (https://github.com/crim-ca/weaver/pull/548) + output_format, # type: Optional[JobValueFormat] settings, # type: SettingsType ): # type: (...) -> Tuple[HeadersType, Optional[AnyDataStream]] """ @@ -788,7 +792,7 @@ def generate_or_resolve_result( is_val = bool(get_any_value(result, key=True, file=False, data=True)) is_ref = bool(get_any_value(result, key=True, file=True, data=False)) val = get_any_value(result) - cid = f"{result_id}@{job.id}" + cid = create_content_id(result_id, job.id) url = None loc = None res_data = None @@ -814,6 +818,16 @@ def generate_or_resolve_result( else: typ = get_field(result, "mime_type", search_variations=True, default=ContentType.TEXT_PLAIN) + out = clean_media_type_format(get_field(output_format, "mime_type", search_variations=True, default=None)) + + # Apply transform if type is different from desired output and desired output is different from plain + if out and out not in transform.EXCLUDED_TYPES and out != typ: + file_transform = transform.Transform(file_path=loc, current_media_type=typ, wanted_media_type=out) + typ = out + file_transform.get() + loc = file_transform.output_path + url = map_wps_output_location(loc, settings, exists=True, url=True) + if not url: out_dir = get_wps_output_dir(settings) out_name = f"{result_id}.txt" diff --git a/weaver/wps_restapi/swagger_definitions.py b/weaver/wps_restapi/swagger_definitions.py index d2bcb1b05..0b40f2dd7 100644 --- a/weaver/wps_restapi/swagger_definitions.py +++ b/weaver/wps_restapi/swagger_definitions.py @@ -162,7 +162,6 @@ ViewInfo = TypedDict("ViewInfo", {"name": str, "pattern": str}) - WEAVER_CONFIG_REMOTE_LIST = f"[{', '.join(WeaverFeature.REMOTE)}]" API_TITLE = "Weaver REST API" @@ -311,7 +310,6 @@ else: EXAMPLES[_name] = f.read() - ######################################################### # API tags ######################################################### @@ -358,7 +356,11 @@ jobs_service = Service(name="jobs", path="/jobs") job_service = Service(name="job", path=f"{jobs_service.path}/{{job_id}}") job_results_service = Service(name="job_results", path=f"{job_service.path}/results") +job_result_value_service = Service(name="job_result_value", path=f"{job_results_service.path}/{{output_id}}") +job_exceptions_service = Service(name="job_exceptions", path=f"{job_service.path}/exceptions") job_outputs_service = Service(name="job_outputs", path=f"{job_service.path}/outputs") +job_output_service = Service(name="job_output", path=f"{job_outputs_service.path}/{{output_id}}") + job_inputs_service = Service(name="job_inputs", path=f"{job_service.path}/inputs") job_exceptions_service = Service(name="job_exceptions", path=f"{job_service.path}/exceptions") job_logs_service = Service(name="job_logs", path=f"{job_service.path}/logs") @@ -386,8 +388,11 @@ process_jobs_service = Service(name="process_jobs", path=process_service.path + jobs_service.path) process_job_service = Service(name="process_job", path=process_service.path + job_service.path) process_results_service = Service(name="process_results", path=process_service.path + job_results_service.path) +process_result_value_service = Service(name="process_result_value", path=process_service.path + + job_result_value_service.path) process_inputs_service = Service(name="process_inputs", path=process_service.path + job_inputs_service.path) process_outputs_service = Service(name="process_outputs", path=process_service.path + job_outputs_service.path) +process_output_service = Service(name="process_output", path=process_service.path + job_output_service.path) process_exceptions_service = Service(name="process_exceptions", path=process_service.path + job_exceptions_service.path) process_logs_service = Service(name="process_logs", path=process_service.path + job_logs_service.path) process_stats_service = Service(name="process_stats", path=process_service.path + job_stats_service.path) @@ -432,8 +437,11 @@ provider_jobs_service = Service(name="provider_jobs", path=provider_service.path + process_jobs_service.path) provider_job_service = Service(name="provider_job", path=provider_service.path + process_job_service.path) provider_results_service = Service(name="provider_results", path=provider_service.path + process_results_service.path) +provider_result_value_service = Service(name="provider_result_value", path=provider_service.path + + process_result_value_service.path) provider_inputs_service = Service(name="provider_inputs", path=provider_service.path + process_inputs_service.path) provider_outputs_service = Service(name="provider_outputs", path=provider_service.path + process_outputs_service.path) +provider_output_service = Service(name="provider_output", path=provider_service.path + process_output_service.path) provider_exceptions_service = Service( name="provider_exceptions", path=provider_service.path + process_exceptions_service.path, @@ -486,6 +494,7 @@ vault_service = Service(name="vault", path="/vault") vault_file_service = Service(name="vault_file", path=f"{vault_service.path}/{{file_id}}") + ######################################################### # Generic schemas ######################################################### @@ -521,6 +530,14 @@ class URL(ExtendedSchemaNode): format = "url" +class URN(ExtendedSchemaNode): + schema_type = String + description = "Universal ressource name." + example = "urn:ogc:def:objectType:authority:version:code" + pattern = re.compile(r"^urn\:[A-Za-z0-9]+(:[A-Za-z0-9]+)+$") + title = "NamespacedRelationshipType" + + class URI(ExtendedSchemaNode): """ String format that will be automatically mapped to a URI-pattern validator. @@ -738,6 +755,14 @@ class AcceptHeader(ExtendedSchemaNode): default = ContentType.APP_JSON # defaults to JSON for easy use within browsers +class AcceptAnyHeader(ExtendedSchemaNode): + # ok to use 'name' in this case because target 'key' in the mapping must + # be that specific value but cannot have a field named with this format + name = "Accept" + schema_type = String + missing = drop + + class AcceptLanguageHeader(ExtendedSchemaNode): # ok to use 'name' in this case because target 'key' in the mapping must # be that specific value but cannot have a field named with this format @@ -821,6 +846,13 @@ class RequestHeaders(ExtendedMappingSchema): content_type = RequestContentTypeHeader() +class RequestHeadersAcceptAny(RequestHeaders): + """ + Headers that can indicate how to adjust the behavior and/or result to be provided in the response. + """ + accept = AcceptAnyHeader() + + class ResponseHeaders(ExtendedMappingSchema): """ Headers describing resulting response. @@ -913,8 +945,9 @@ class FileResponseHeaders(NoContent): content_type = ContentTypeHeader(example=ContentType.APP_JSON) content_length = ContentLengthHeader() content_disposition = ContentDispositionHeader() - date = DateHeader() - last_modified = LastModifiedHeader() + content_location = ReferenceURL() + date = DateHeader(missing=drop) + last_modified = LastModifiedHeader(missing=drop) class AccessToken(ExtendedSchemaNode): @@ -966,10 +999,16 @@ class LinkRelationshipType(OneOfKeywordSchema): "Relationship of the link to the current content. " "This should be one item amongst registered relations https://www.iana.org/assignments/link-relations/." )), - URL(description="Fully qualified extension link relation to the current content.") + URL(description="Fully qualified extension link relation to the current content."), ] +class LinkId(ExtendedMappingSchema): + # https://datatracker.ietf.org/doc/html/rfc8288#section-3.4 (Target Attributes) + # https://datatracker.ietf.org/doc/html/rfc8288#section-3.4.2 (Extension Attributes) + id = SLUG(name="id", missing=drop) + + class LinkRelationship(ExtendedMappingSchema): rel = LinkRelationshipType() @@ -979,7 +1018,8 @@ class LinkBase(LinkLanguage, MetadataBase): type = MediaType(description="IANA identifier of content-type located at the link.", missing=drop) -class Link(LinkRelationship, LinkBase): +class Link(LinkRelationship, LinkBase, LinkId): + # https://datatracker.ietf.org/doc/html/rfc2068#section-19.6.2.4 (Link Header) _schema = f"{OGC_API_COMMON_PART1_SCHEMAS}/link.json" _schema_include_deserialize = False # only in OpenAPI otherwise too verbose @@ -1223,8 +1263,8 @@ class AdditionalParametersList(ExtendedSequenceSchema): class Content(ExtendedMappingSchema): href = ReferenceURL(description="URL to CWL file.", title="OWSContentURL", - default=drop, # if invalid, drop it completely, - missing=required, # but still mark as 'required' for parent objects + default=drop, # if invalid, drop it completely, + missing=required, # but still mark as 'required' for parent objects example="http://some.host/applications/cwl/multisensor_ndvi.cwl") @@ -2503,8 +2543,8 @@ class QuotePath(ExtendedMappingSchema): quote_id = UUID(description="Quote ID") -class ResultPath(ExtendedMappingSchema): - result_id = UUID(description="Result ID") +class OutputPath(ExtendedMappingSchema): + output_id = UUID(description="Output ID") ######################################################### @@ -3403,6 +3443,38 @@ class GetJobEndpoint(JobPath): querystring = GetJobQuery() +class OutputEndpoint(OutputPath): + header = RequestHeadersAcceptAny() + + +class ResultValueEndpoint(OutputEndpoint): + pass + + +class JobAnyOutputEndpoint(JobPath, OutputPath): + header = RequestHeadersAcceptAny() + + +class JobResultValueEndpoint(JobAnyOutputEndpoint): + pass + + +class ProcessAnyOutputEndpoint(LocalProcessPath, JobPath, OutputPath): + header = RequestHeadersAcceptAny() + + +class ProcessResultValueEndpoint(ProcessAnyOutputEndpoint): + pass + + +class ProviderAnyOutputEndpoint(ProviderProcessPath, LocalProcessPath, JobPath, OutputPath): + header = RequestHeadersAcceptAny() + + +class ProviderResultValueEndpoint(ProviderAnyOutputEndpoint): + pass + + class ProcessInputsEndpoint(LocalProcessPath, JobPath): header = RequestHeaders() @@ -3464,16 +3536,31 @@ class JobOutputsEndpoint(JobPath): querystring = LocalProcessJobResultsQuery() +class JobOutputEndpoint(JobPath): + header = RequestHeaders() + querystring = LocalProcessJobResultsQuery() + + class ProcessOutputsEndpoint(LocalProcessPath, JobPath): header = RequestHeaders() querystring = LocalProcessJobResultsQuery() +class ProcessOutputEndpoint(LocalProcessPath, JobPath): + header = RequestHeaders() + querystring = LocalProcessJobResultsQuery() + + class ProviderOutputsEndpoint(ProviderProcessPath, JobPath): header = RequestHeaders() querystring = JobResultsQuery() +class ProviderOutputEndpoint(ProviderProcessPath, JobPath): + header = RequestHeaders() + querystring = JobResultsQuery() + + class ProcessResultEndpoint(ProcessOutputsEndpoint): deprecated = True header = RequestHeaders() @@ -7896,6 +7983,11 @@ class NoContentJobResultsResponse(ExtendedMappingSchema): body = NoContent(default="") +class JobResultContentResponse(ExtendedMappingSchema): + header = FileResponseHeaders() + body = ResultData(default="") + + class CreatedQuoteExecuteResponse(ExtendedMappingSchema): header = ResponseHeaders() body = CreatedQuotedJobStatusSchema() @@ -8468,6 +8560,20 @@ class GoneVaultFileDownloadResponse(ExtendedMappingSchema): get_provider_outputs_responses.update({ "403": ForbiddenProviderLocalResponseSchema(), }) + +get_job_output_responses = { + "200": OkGetJobOutputsResponse(description="success"), + "400": InvalidJobResponseSchema(), + "404": NotFoundJobResponseSchema(), + "405": MethodNotAllowedErrorResponseSchema(), + "410": GoneJobResponseSchema(), + "500": InternalServerErrorResponseSchema(), +} +get_prov_output_responses = copy(get_job_output_responses) +get_prov_output_responses.update({ + "403": ForbiddenProviderLocalResponseSchema(), +}) + get_result_redirect_responses = { "308": RedirectResultResponse(description="Redirects '/result' (without 's') to corresponding '/results' path."), } @@ -8490,6 +8596,24 @@ class GoneVaultFileDownloadResponse(ExtendedMappingSchema): get_provider_results_responses.update({ "403": ForbiddenProviderLocalResponseSchema(), }) + +get_job_result_responses = { + "200": JobResultContentResponse(description="success by value"), + "204": NoContentJobResultsResponse(description="success by reference"), + "404": NotFoundJobResponseSchema(), + "405": MethodNotAllowedErrorResponseSchema(), + "406": NotAcceptableErrorResponseSchema(), + "410": GoneJobResponseSchema(), + "500": InternalServerErrorResponseSchema(), +} +get_prov_result_responses = copy(get_job_result_responses) +get_prov_result_responses.update({ + "403": ForbiddenProviderLocalResponseSchema(), +}) + +get_proc_result_responses = get_job_result_responses + + get_job_exceptions_responses = { "200": OkGetJobExceptionsResponse(description="success", examples={ "JobExceptions": {