Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed

- `MultiBackendJobManager`: costs has been added as a column in tracking databases ([[#588](https://github.com/Open-EO/openeo-python-client/issues/588)])
- Eliminate usage of non-standard `read_vector` process in `DataCube.aggregate_spatial`, `DataCube.mask_polygon`, ..., and replace with standard `load_url` ([#104](https://github.com/Open-EO/openeo-python-client/issues/104))

### Removed

Expand Down
67 changes: 47 additions & 20 deletions openeo/rest/datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
import datetime
import logging
import pathlib
import re
import typing
import urllib.parse
import warnings
from builtins import staticmethod
from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple, Union
Expand Down Expand Up @@ -57,7 +59,13 @@
from openeo.rest.service import Service
from openeo.rest.udp import RESTUserDefinedProcess
from openeo.rest.vectorcube import VectorCube
from openeo.util import dict_no_none, guess_format, normalize_crs, rfc3339
from openeo.util import (
dict_no_none,
guess_format,
load_json_resource,
normalize_crs,
rfc3339,
)

if typing.TYPE_CHECKING:
# Imports for type checking only (circular import issue at runtime).
Expand Down Expand Up @@ -1035,7 +1043,7 @@ def _merge_operator_binary_cubes(

def _get_geometry_argument(
self,
geometry: Union[
argument: Union[
shapely.geometry.base.BaseGeometry,
dict,
str,
Expand All @@ -1047,30 +1055,47 @@ def _get_geometry_argument(
crs: Optional[str] = None,
) -> Union[dict, Parameter, PGNode]:
"""
Convert input to a geometry as "geojson" subtype object.
Convert input to a geometry as "geojson" subtype object or vector cube.

:param crs: value that encodes a coordinate reference system.
See :py:func:`openeo.util.normalize_crs` for more details about additional normalization that is applied to this argument.
"""
if isinstance(geometry, (str, pathlib.Path)):
# Assumption: `geometry` is path to polygon is a path to vector file at backend.
# TODO #104: `read_vector` is non-standard process.
# TODO: If path exists client side: load it client side?
return PGNode(process_id="read_vector", arguments={"filename": str(geometry)})
elif isinstance(geometry, Parameter):
return geometry
elif isinstance(geometry, _FromNodeMixin):
return geometry.from_node()

if isinstance(geometry, shapely.geometry.base.BaseGeometry):
geometry = mapping(geometry)
if not isinstance(geometry, dict):
raise OpenEoClientException("Invalid geometry argument: {g!r}".format(g=geometry))
# First handle (abstract) references, e.g. parameter or back-end side vector cube
if isinstance(argument, Parameter):
return argument
elif isinstance(argument, _FromNodeMixin):
return argument.from_node()

if isinstance(argument, str) and re.match(r"^https?://", argument, flags=re.I):
# Geometry provided as URL: load with `load_url` (with best-effort format guess)
url = urllib.parse.urlparse(argument)
suffix = pathlib.Path(url.path.lower()).suffix
format = {
".json": "GeoJSON",
".geojson": "GeoJSON",
".pq": "Parquet",
".parquet": "Parquet",
".geoparquet": "Parquet",
}.get(suffix, suffix.split(".")[-1])
return self.connection.load_url(url=argument, format=format)

if isinstance(argument, str):
geometry = load_json_resource(argument)
elif isinstance(argument, pathlib.Path):
geometry = load_json_resource(argument)
elif isinstance(argument, shapely.geometry.base.BaseGeometry):
geometry = mapping(argument)
elif isinstance(argument, dict):
geometry = argument
else:
raise OpenEoClientException(f"Invalid geometry argument: {argument!r}")

if geometry.get("type") not in valid_geojson_types:
raise OpenEoClientException("Invalid geometry type {t!r}, must be one of {s}".format(
t=geometry.get("type"), s=valid_geojson_types
))
raise OpenEoClientException(
f"Invalid geometry type {geometry.get('type')!r}, must be one of {valid_geojson_types}"
)

# TODO #671 get rid of this ad-hoc, inconsistent `crs` handling
if crs:
# TODO: don't warn when the crs is Lon-Lat like EPSG:4326?
warnings.warn(f"Geometry with non-Lon-Lat CRS {crs!r} is only supported by specific back-ends.")
Expand Down Expand Up @@ -1099,6 +1124,7 @@ def aggregate_spatial(
],
reducer: Union[str, typing.Callable, PGNode],
target_dimension: Optional[str] = None,
# TODO #671 deprecate/remove `crs` argument here
crs: Optional[Union[int, str]] = None,
context: Optional[dict] = None,
# TODO arguments: target dimension, context
Expand Down Expand Up @@ -1943,6 +1969,7 @@ def mask(self, mask: DataCube = None, replacement=None) -> DataCube:
def mask_polygon(
self,
mask: Union[shapely.geometry.base.BaseGeometry, dict, str, pathlib.Path, Parameter, VectorCube],
# TODO #671 deprecate/remove `srs` argument here
srs: str = None,
replacement=None,
inside: bool = None,
Expand Down
3 changes: 2 additions & 1 deletion openeo/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ def guess_format(filename: Union[str, Path]) -> Union[str, None]:


def load_json(path: Union[Path, str]) -> dict:
"""Load JSON serialized data from a local file"""
with Path(path).open("r", encoding="utf-8") as f:
return json.load(f)

Expand All @@ -459,7 +460,7 @@ def load_json_resource(src: Union[str, Path]) -> dict:
elif isinstance(src, str) and re.match(r"^https?://", src, flags=re.I):
# URL to remote JSON resource
return requests.get(src).json()
elif isinstance(src, Path) or (isinstance(src, str) and src.endswith(".json")):
elif isinstance(src, Path) or (isinstance(src, str) and Path(src).suffix.lower() in {".json", ".geojson"}):
# Assume source is a local JSON file path
return load_json(src)
raise ValueError(src)
Expand Down
9 changes: 5 additions & 4 deletions tests/data/1.0.0/aggregate_zonal_path.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
}
}
},
"readvector1": {
"process_id": "read_vector",
"loadurl1": {
"process_id": "load_url",
"arguments": {
"filename": "/some/path/to/GeometryCollection.geojson"
"url": "https://example.com/geometries.geojson",
"format": "GeoJSON"
}
},
"aggregatespatial1": {
Expand All @@ -34,7 +35,7 @@
"from_node": "filterbbox1"
},
"geometries": {
"from_node": "readvector1"
"from_node": "loadurl1"
},
"reducer": {
"process_graph": {
Expand Down
23 changes: 23 additions & 0 deletions tests/data/geojson/polygon02.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"type": "Polygon",
"coordinates": [
[
[
3,
50
],
[
4,
50
],
[
4,
51
],
[
3,
50
]
]
]
}
7 changes: 5 additions & 2 deletions tests/rest/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import json
from typing import Union

import mock

from openeo.rest.datacube import DataCube
from openeo import DataCube, VectorCube

# TODO: move (some of) these to openeo.testing?


def get_download_graph(cube: DataCube, *, drop_save_result: bool = False, drop_load_collection: bool = False) -> dict:
def get_download_graph(
cube: Union[DataCube, VectorCube], *, drop_save_result: bool = False, drop_load_collection: bool = False
) -> dict:
"""
Do fake download of a cube and intercept the process graph
:param cube: cube to download
Expand Down
74 changes: 48 additions & 26 deletions tests/rest/datacube/test_datacube100.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,33 @@ def test_aggregate_spatial_geometry_from_node(con100: Connection, get_geometries
}


def test_aggregate_spatial_geometry_url(con100: Connection):
cube = con100.load_collection("S2")
result = cube.aggregate_spatial(geometries="https://example.com/geometry.json", reducer="mean")
assert get_download_graph(result, drop_save_result=True, drop_load_collection=True) == {
"loadurl1": {
"process_id": "load_url",
"arguments": {"url": "https://example.com/geometry.json", "format": "GeoJSON"},
},
"aggregatespatial1": {
"process_id": "aggregate_spatial",
"arguments": {
"data": {"from_node": "loadcollection1"},
"geometries": {"from_node": "loadurl1"},
"reducer": {
"process_graph": {
"mean1": {
"process_id": "mean",
"arguments": {"data": {"from_parameter": "data"}},
"result": True,
}
}
},
},
},
}


def test_aggregate_spatial_window(con100: Connection):
img = con100.load_collection("S2")
size = [5, 3]
Expand Down Expand Up @@ -763,21 +790,19 @@ def test_mask_polygon_parameter(con100: Connection):
}


def test_mask_polygon_path(con100: Connection):
img = con100.load_collection("S2")
masked = img.mask_polygon(mask="path/to/polygon.json")
assert sorted(masked.flat_graph().keys()) == ["loadcollection1", "maskpolygon1", "readvector1"]
assert masked.flat_graph()["maskpolygon1"] == {
"process_id": "mask_polygon",
"arguments": {
"data": {"from_node": "loadcollection1"},
"mask": {"from_node": "readvector1"},
@pytest.mark.parametrize("path_factory", [str, pathlib.Path])
def test_mask_polygon_path(con100: Connection, path_factory, test_data):
path = path_factory(test_data.get_path("geojson/polygon02.json"))
cube = con100.load_collection("S2")
masked = cube.mask_polygon(mask=path)
assert get_download_graph(masked, drop_save_result=True, drop_load_collection=True) == {
"maskpolygon1": {
"process_id": "mask_polygon",
"arguments": {
"data": {"from_node": "loadcollection1"},
"mask": {"type": "Polygon", "coordinates": [[[3, 50], [4, 50], [4, 51], [3, 50]]]},
},
},
"result": True,
}
assert masked.flat_graph()["readvector1"] == {
"process_id": "read_vector",
"arguments": {"filename": "path/to/polygon.json"},
}


Expand Down Expand Up @@ -1490,18 +1515,19 @@ def test_chunk_polygon_parameter(con100: Connection):
}


def test_chunk_polygon_path(con100: Connection):
@pytest.mark.parametrize("path_factory", [str, pathlib.Path])
def test_chunk_polygon_path(con100: Connection, test_data, path_factory):
path = path_factory(test_data.get_path("geojson/polygon02.json"))
cube = con100.load_collection("S2")
process = lambda data: data.run_udf(udf="myfancycode", runtime="Python")
with pytest.warns(UserDeprecationWarning, match="Use `apply_polygon`"):
result = cube.chunk_polygon(chunks="path/to/polygon.json", process=process)
result = cube.chunk_polygon(chunks=path, process=process)
assert get_download_graph(result, drop_save_result=True, drop_load_collection=True) == {
"readvector1": {"process_id": "read_vector", "arguments": {"filename": "path/to/polygon.json"}},
"chunkpolygon1": {
"process_id": "chunk_polygon",
"arguments": {
"data": {"from_node": "loadcollection1"},
"chunks": {"from_node": "readvector1"},
"chunks": {"type": "Polygon", "coordinates": [[[3, 50], [4, 50], [4, 51], [3, 50]]]},
"process": {
"process_graph": {
"runudf1": {
Expand Down Expand Up @@ -1704,21 +1730,17 @@ def test_apply_polygon_parameter(con100: Connection, geometries_argument, geomet
("geometries", "geometries"),
],
)
def test_apply_polygon_path(con100: Connection, geometries_argument, geometries_parameter):
def test_apply_polygon_path(con100: Connection, geometries_argument, geometries_parameter, test_data):
path = test_data.get_path("geojson/polygon02.json")
cube = con100.load_collection("S2")
process = UDF(code="myfancycode", runtime="Python")
result = cube.apply_polygon(**{geometries_argument: "path/to/polygon.json"}, process=process)
result = cube.apply_polygon(**{geometries_argument: path}, process=process)
assert get_download_graph(result, drop_save_result=True, drop_load_collection=True) == {
"readvector1": {
# TODO #104 #457 get rid of non-standard read_vector
"process_id": "read_vector",
"arguments": {"filename": "path/to/polygon.json"},
},
"applypolygon1": {
"process_id": "apply_polygon",
"arguments": {
"data": {"from_node": "loadcollection1"},
geometries_parameter: {"from_node": "readvector1"},
geometries_parameter: {"type": "Polygon", "coordinates": [[[3, 50], [4, 50], [4, 51], [3, 50]]]},
"process": {
"process_graph": {
"runudf1": {
Expand Down
12 changes: 6 additions & 6 deletions tests/rest/datacube/test_zonal_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,19 @@ def test_aggregate_spatial(connection, api_version, reducer, test_data):

def test_polygon_timeseries_path(connection, api_version, test_data):
res = (
connection.load_collection('S2')
.filter_bbox(west=3, east=6, north=52, south=50)
.polygonal_mean_timeseries(polygon="/some/path/to/GeometryCollection.geojson")
connection.load_collection("S2")
.filter_bbox(west=3, east=6, north=52, south=50)
.polygonal_mean_timeseries(polygon="https://example.com/geometries.geojson")
)
assert get_execute_graph(res) == test_data.load_json("%s/aggregate_zonal_path.json" % api_version)


@pytest.mark.parametrize("reducer", ["mean", openeo.processes.mean, lambda x: x.mean()])
def test_aggregate_spatial_read_vector(connection, api_version, reducer, test_data):
def test_aggregate_spatial_with_geometry_url(connection, api_version, reducer, test_data):
res = (
connection.load_collection("S2")
.filter_bbox(3, 6, 52, 50)
.aggregate_spatial(geometries="/some/path/to/GeometryCollection.geojson", reducer=reducer)
.filter_bbox(3, 6, 52, 50)
.aggregate_spatial(geometries="https://example.com/geometries.geojson", reducer=reducer)
)
assert get_execute_graph(res) == test_data.load_json("%s/aggregate_zonal_path.json" % api_version)

Expand Down