diff --git a/CHANGELOG.md b/CHANGELOG.md index 7b83adabe..3000f1485 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - `MultiBackendJobManager`: costs has been added as a column in tracking databases ([[#588](https://github.com/Open-EO/openeo-python-client/issues/588)]) +- Eliminate usage of non-standard `read_vector` process in `DataCube.aggregate_spatial`, `DataCube.mask_polygon`, ..., and replace with standard `load_url` ([#104](https://github.com/Open-EO/openeo-python-client/issues/104)) ### Removed diff --git a/openeo/rest/datacube.py b/openeo/rest/datacube.py index f0da58849..c989d718b 100644 --- a/openeo/rest/datacube.py +++ b/openeo/rest/datacube.py @@ -12,7 +12,9 @@ import datetime import logging import pathlib +import re import typing +import urllib.parse import warnings from builtins import staticmethod from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple, Union @@ -57,7 +59,13 @@ from openeo.rest.service import Service from openeo.rest.udp import RESTUserDefinedProcess from openeo.rest.vectorcube import VectorCube -from openeo.util import dict_no_none, guess_format, normalize_crs, rfc3339 +from openeo.util import ( + dict_no_none, + guess_format, + load_json_resource, + normalize_crs, + rfc3339, +) if typing.TYPE_CHECKING: # Imports for type checking only (circular import issue at runtime). @@ -1035,7 +1043,7 @@ def _merge_operator_binary_cubes( def _get_geometry_argument( self, - geometry: Union[ + argument: Union[ shapely.geometry.base.BaseGeometry, dict, str, @@ -1047,30 +1055,47 @@ def _get_geometry_argument( crs: Optional[str] = None, ) -> Union[dict, Parameter, PGNode]: """ - Convert input to a geometry as "geojson" subtype object. + Convert input to a geometry as "geojson" subtype object or vector cube. :param crs: value that encodes a coordinate reference system. See :py:func:`openeo.util.normalize_crs` for more details about additional normalization that is applied to this argument. """ - if isinstance(geometry, (str, pathlib.Path)): - # Assumption: `geometry` is path to polygon is a path to vector file at backend. - # TODO #104: `read_vector` is non-standard process. - # TODO: If path exists client side: load it client side? - return PGNode(process_id="read_vector", arguments={"filename": str(geometry)}) - elif isinstance(geometry, Parameter): - return geometry - elif isinstance(geometry, _FromNodeMixin): - return geometry.from_node() - - if isinstance(geometry, shapely.geometry.base.BaseGeometry): - geometry = mapping(geometry) - if not isinstance(geometry, dict): - raise OpenEoClientException("Invalid geometry argument: {g!r}".format(g=geometry)) + # First handle (abstract) references, e.g. parameter or back-end side vector cube + if isinstance(argument, Parameter): + return argument + elif isinstance(argument, _FromNodeMixin): + return argument.from_node() + + if isinstance(argument, str) and re.match(r"^https?://", argument, flags=re.I): + # Geometry provided as URL: load with `load_url` (with best-effort format guess) + url = urllib.parse.urlparse(argument) + suffix = pathlib.Path(url.path.lower()).suffix + format = { + ".json": "GeoJSON", + ".geojson": "GeoJSON", + ".pq": "Parquet", + ".parquet": "Parquet", + ".geoparquet": "Parquet", + }.get(suffix, suffix.split(".")[-1]) + return self.connection.load_url(url=argument, format=format) + + if isinstance(argument, str): + geometry = load_json_resource(argument) + elif isinstance(argument, pathlib.Path): + geometry = load_json_resource(argument) + elif isinstance(argument, shapely.geometry.base.BaseGeometry): + geometry = mapping(argument) + elif isinstance(argument, dict): + geometry = argument + else: + raise OpenEoClientException(f"Invalid geometry argument: {argument!r}") if geometry.get("type") not in valid_geojson_types: - raise OpenEoClientException("Invalid geometry type {t!r}, must be one of {s}".format( - t=geometry.get("type"), s=valid_geojson_types - )) + raise OpenEoClientException( + f"Invalid geometry type {geometry.get('type')!r}, must be one of {valid_geojson_types}" + ) + + # TODO #671 get rid of this ad-hoc, inconsistent `crs` handling if crs: # TODO: don't warn when the crs is Lon-Lat like EPSG:4326? warnings.warn(f"Geometry with non-Lon-Lat CRS {crs!r} is only supported by specific back-ends.") @@ -1099,6 +1124,7 @@ def aggregate_spatial( ], reducer: Union[str, typing.Callable, PGNode], target_dimension: Optional[str] = None, + # TODO #671 deprecate/remove `crs` argument here crs: Optional[Union[int, str]] = None, context: Optional[dict] = None, # TODO arguments: target dimension, context @@ -1943,6 +1969,7 @@ def mask(self, mask: DataCube = None, replacement=None) -> DataCube: def mask_polygon( self, mask: Union[shapely.geometry.base.BaseGeometry, dict, str, pathlib.Path, Parameter, VectorCube], + # TODO #671 deprecate/remove `srs` argument here srs: str = None, replacement=None, inside: bool = None, diff --git a/openeo/util.py b/openeo/util.py index 44842124a..837946f42 100644 --- a/openeo/util.py +++ b/openeo/util.py @@ -441,6 +441,7 @@ def guess_format(filename: Union[str, Path]) -> Union[str, None]: def load_json(path: Union[Path, str]) -> dict: + """Load JSON serialized data from a local file""" with Path(path).open("r", encoding="utf-8") as f: return json.load(f) @@ -459,7 +460,7 @@ def load_json_resource(src: Union[str, Path]) -> dict: elif isinstance(src, str) and re.match(r"^https?://", src, flags=re.I): # URL to remote JSON resource return requests.get(src).json() - elif isinstance(src, Path) or (isinstance(src, str) and src.endswith(".json")): + elif isinstance(src, Path) or (isinstance(src, str) and Path(src).suffix.lower() in {".json", ".geojson"}): # Assume source is a local JSON file path return load_json(src) raise ValueError(src) diff --git a/tests/data/1.0.0/aggregate_zonal_path.json b/tests/data/1.0.0/aggregate_zonal_path.json index 29aaf784c..7efa78248 100644 --- a/tests/data/1.0.0/aggregate_zonal_path.json +++ b/tests/data/1.0.0/aggregate_zonal_path.json @@ -21,10 +21,11 @@ } } }, - "readvector1": { - "process_id": "read_vector", + "loadurl1": { + "process_id": "load_url", "arguments": { - "filename": "/some/path/to/GeometryCollection.geojson" + "url": "https://example.com/geometries.geojson", + "format": "GeoJSON" } }, "aggregatespatial1": { @@ -34,7 +35,7 @@ "from_node": "filterbbox1" }, "geometries": { - "from_node": "readvector1" + "from_node": "loadurl1" }, "reducer": { "process_graph": { diff --git a/tests/data/geojson/polygon02.json b/tests/data/geojson/polygon02.json new file mode 100644 index 000000000..1c9103d13 --- /dev/null +++ b/tests/data/geojson/polygon02.json @@ -0,0 +1,23 @@ +{ + "type": "Polygon", + "coordinates": [ + [ + [ + 3, + 50 + ], + [ + 4, + 50 + ], + [ + 4, + 51 + ], + [ + 3, + 50 + ] + ] + ] +} diff --git a/tests/rest/__init__.py b/tests/rest/__init__.py index dbd006530..313635252 100644 --- a/tests/rest/__init__.py +++ b/tests/rest/__init__.py @@ -1,13 +1,16 @@ import json +from typing import Union import mock -from openeo.rest.datacube import DataCube +from openeo import DataCube, VectorCube # TODO: move (some of) these to openeo.testing? -def get_download_graph(cube: DataCube, *, drop_save_result: bool = False, drop_load_collection: bool = False) -> dict: +def get_download_graph( + cube: Union[DataCube, VectorCube], *, drop_save_result: bool = False, drop_load_collection: bool = False +) -> dict: """ Do fake download of a cube and intercept the process graph :param cube: cube to download diff --git a/tests/rest/datacube/test_datacube100.py b/tests/rest/datacube/test_datacube100.py index d565fb119..51cbe38fc 100644 --- a/tests/rest/datacube/test_datacube100.py +++ b/tests/rest/datacube/test_datacube100.py @@ -595,6 +595,33 @@ def test_aggregate_spatial_geometry_from_node(con100: Connection, get_geometries } +def test_aggregate_spatial_geometry_url(con100: Connection): + cube = con100.load_collection("S2") + result = cube.aggregate_spatial(geometries="https://example.com/geometry.json", reducer="mean") + assert get_download_graph(result, drop_save_result=True, drop_load_collection=True) == { + "loadurl1": { + "process_id": "load_url", + "arguments": {"url": "https://example.com/geometry.json", "format": "GeoJSON"}, + }, + "aggregatespatial1": { + "process_id": "aggregate_spatial", + "arguments": { + "data": {"from_node": "loadcollection1"}, + "geometries": {"from_node": "loadurl1"}, + "reducer": { + "process_graph": { + "mean1": { + "process_id": "mean", + "arguments": {"data": {"from_parameter": "data"}}, + "result": True, + } + } + }, + }, + }, + } + + def test_aggregate_spatial_window(con100: Connection): img = con100.load_collection("S2") size = [5, 3] @@ -763,21 +790,19 @@ def test_mask_polygon_parameter(con100: Connection): } -def test_mask_polygon_path(con100: Connection): - img = con100.load_collection("S2") - masked = img.mask_polygon(mask="path/to/polygon.json") - assert sorted(masked.flat_graph().keys()) == ["loadcollection1", "maskpolygon1", "readvector1"] - assert masked.flat_graph()["maskpolygon1"] == { - "process_id": "mask_polygon", - "arguments": { - "data": {"from_node": "loadcollection1"}, - "mask": {"from_node": "readvector1"}, +@pytest.mark.parametrize("path_factory", [str, pathlib.Path]) +def test_mask_polygon_path(con100: Connection, path_factory, test_data): + path = path_factory(test_data.get_path("geojson/polygon02.json")) + cube = con100.load_collection("S2") + masked = cube.mask_polygon(mask=path) + assert get_download_graph(masked, drop_save_result=True, drop_load_collection=True) == { + "maskpolygon1": { + "process_id": "mask_polygon", + "arguments": { + "data": {"from_node": "loadcollection1"}, + "mask": {"type": "Polygon", "coordinates": [[[3, 50], [4, 50], [4, 51], [3, 50]]]}, + }, }, - "result": True, - } - assert masked.flat_graph()["readvector1"] == { - "process_id": "read_vector", - "arguments": {"filename": "path/to/polygon.json"}, } @@ -1490,18 +1515,19 @@ def test_chunk_polygon_parameter(con100: Connection): } -def test_chunk_polygon_path(con100: Connection): +@pytest.mark.parametrize("path_factory", [str, pathlib.Path]) +def test_chunk_polygon_path(con100: Connection, test_data, path_factory): + path = path_factory(test_data.get_path("geojson/polygon02.json")) cube = con100.load_collection("S2") process = lambda data: data.run_udf(udf="myfancycode", runtime="Python") with pytest.warns(UserDeprecationWarning, match="Use `apply_polygon`"): - result = cube.chunk_polygon(chunks="path/to/polygon.json", process=process) + result = cube.chunk_polygon(chunks=path, process=process) assert get_download_graph(result, drop_save_result=True, drop_load_collection=True) == { - "readvector1": {"process_id": "read_vector", "arguments": {"filename": "path/to/polygon.json"}}, "chunkpolygon1": { "process_id": "chunk_polygon", "arguments": { "data": {"from_node": "loadcollection1"}, - "chunks": {"from_node": "readvector1"}, + "chunks": {"type": "Polygon", "coordinates": [[[3, 50], [4, 50], [4, 51], [3, 50]]]}, "process": { "process_graph": { "runudf1": { @@ -1704,21 +1730,17 @@ def test_apply_polygon_parameter(con100: Connection, geometries_argument, geomet ("geometries", "geometries"), ], ) -def test_apply_polygon_path(con100: Connection, geometries_argument, geometries_parameter): +def test_apply_polygon_path(con100: Connection, geometries_argument, geometries_parameter, test_data): + path = test_data.get_path("geojson/polygon02.json") cube = con100.load_collection("S2") process = UDF(code="myfancycode", runtime="Python") - result = cube.apply_polygon(**{geometries_argument: "path/to/polygon.json"}, process=process) + result = cube.apply_polygon(**{geometries_argument: path}, process=process) assert get_download_graph(result, drop_save_result=True, drop_load_collection=True) == { - "readvector1": { - # TODO #104 #457 get rid of non-standard read_vector - "process_id": "read_vector", - "arguments": {"filename": "path/to/polygon.json"}, - }, "applypolygon1": { "process_id": "apply_polygon", "arguments": { "data": {"from_node": "loadcollection1"}, - geometries_parameter: {"from_node": "readvector1"}, + geometries_parameter: {"type": "Polygon", "coordinates": [[[3, 50], [4, 50], [4, 51], [3, 50]]]}, "process": { "process_graph": { "runudf1": { diff --git a/tests/rest/datacube/test_zonal_stats.py b/tests/rest/datacube/test_zonal_stats.py index bac0ccac2..bbb38735e 100644 --- a/tests/rest/datacube/test_zonal_stats.py +++ b/tests/rest/datacube/test_zonal_stats.py @@ -26,19 +26,19 @@ def test_aggregate_spatial(connection, api_version, reducer, test_data): def test_polygon_timeseries_path(connection, api_version, test_data): res = ( - connection.load_collection('S2') - .filter_bbox(west=3, east=6, north=52, south=50) - .polygonal_mean_timeseries(polygon="/some/path/to/GeometryCollection.geojson") + connection.load_collection("S2") + .filter_bbox(west=3, east=6, north=52, south=50) + .polygonal_mean_timeseries(polygon="https://example.com/geometries.geojson") ) assert get_execute_graph(res) == test_data.load_json("%s/aggregate_zonal_path.json" % api_version) @pytest.mark.parametrize("reducer", ["mean", openeo.processes.mean, lambda x: x.mean()]) -def test_aggregate_spatial_read_vector(connection, api_version, reducer, test_data): +def test_aggregate_spatial_with_geometry_url(connection, api_version, reducer, test_data): res = ( connection.load_collection("S2") - .filter_bbox(3, 6, 52, 50) - .aggregate_spatial(geometries="/some/path/to/GeometryCollection.geojson", reducer=reducer) + .filter_bbox(3, 6, 52, 50) + .aggregate_spatial(geometries="https://example.com/geometries.geojson", reducer=reducer) ) assert get_execute_graph(res) == test_data.load_json("%s/aggregate_zonal_path.json" % api_version)