Skip to content
10 changes: 8 additions & 2 deletions openeo/rest/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1254,7 +1254,7 @@ def datacube_from_json(self, src: Union[str, Path], parameters: Optional[dict] =
def load_collection(
self,
collection_id: Union[str, Parameter],
spatial_extent: Union[Dict[str, float], Parameter, None] = None,
spatial_extent: Union[Dict[str, float], Parameter, shapely.geometry.base.BaseGeometry, None] = None,
temporal_extent: Union[Sequence[InputDate], Parameter, str, None] = None,
bands: Union[None, List[str], Parameter] = None,
properties: Union[
Expand All @@ -1267,7 +1267,13 @@ def load_collection(
Load a DataCube by collection id.

:param collection_id: image collection identifier
:param spatial_extent: limit data to specified bounding box or polygons
:param spatial_extent: limit data to specified bounding box or polygons. Can be provided in different ways:
- a shapely geometry
- a GeoJSON-style dictionary,
- a path (:py:class:`str` or :py:class:`~pathlib.Path`) to a local, client-side GeoJSON file,
which will be loaded automatically to get the geometries as GeoJSON construct.
- a :py:class:`~openeo.api.process.Parameter` instance.
- a bounding box dictionary
:param temporal_extent: limit data to specified temporal interval.
Typically, just a two-item list or tuple containing start and end date.
See :ref:`filtering-on-temporal-extent-section` for more details on temporal extent handling and shorthand notation.
Expand Down
175 changes: 95 additions & 80 deletions openeo/rest/datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def load_collection(
cls,
collection_id: Union[str, Parameter],
connection: Optional[Connection] = None,
spatial_extent: Union[Dict[str, float], Parameter, None] = None,
spatial_extent: Union[Dict[str, float], Parameter, shapely.geometry.base.BaseGeometry, None] = None,
temporal_extent: Union[Sequence[InputDate], Parameter, str, None] = None,
bands: Union[None, List[str], Parameter] = None,
fetch_metadata: bool = True,
Expand All @@ -158,7 +158,13 @@ def load_collection(
:param collection_id: image collection identifier
:param connection: The backend connection to use.
Can be ``None`` to work without connection and collection metadata.
:param spatial_extent: limit data to specified bounding box or polygons
:param spatial_extent: limit data to specified bounding box or polygons. Can be provided in different ways:
- a shapely geometry
- a GeoJSON-style dictionary,
- a path (:py:class:`str` or :py:class:`~pathlib.Path`) to a local, client-side GeoJSON file,
which will be loaded automatically to get the geometries as GeoJSON construct.
- a :py:class:`~openeo.api.process.Parameter` instance.
- a bounding box dictionary
:param temporal_extent: limit data to specified temporal interval.
Typically, just a two-item list or tuple containing start and end date.
See :ref:`filtering-on-temporal-extent-section` for more details on temporal extent handling and shorthand notation.
Expand Down Expand Up @@ -187,9 +193,17 @@ def load_collection(
"Unexpected parameterized `spatial_extent` in `load_collection`:"
f" expected schema compatible with type 'object' but got {spatial_extent.schema!r}."
)
elif not spatial_extent or (isinstance(spatial_extent, dict) and spatial_extent.keys() & {"west", "east", "north", "south"}):
pass
else:
valid_geojson_types = [
"Polygon", "MultiPolygon", "Feature", "FeatureCollection"
]
spatial_extent = _get_geometry_argument(argument=spatial_extent, valid_geojson_types=valid_geojson_types,
connection=connection)

arguments = {
'id': collection_id,
# TODO: spatial_extent could also be a "geojson" subtype object, so we might want to allow (and convert) shapely shapes as well here.
'spatial_extent': spatial_extent,
'temporal_extent': temporal_extent,
}
Expand Down Expand Up @@ -628,10 +642,9 @@ def filter_spatial(
(which will be loaded client-side to get the geometries as GeoJSON construct).
"""
valid_geojson_types = [
"Point", "MultiPoint", "LineString", "MultiLineString",
"Polygon", "MultiPolygon", "GeometryCollection", "FeatureCollection"
]
geometries = self._get_geometry_argument(geometries, valid_geojson_types=valid_geojson_types, crs=None)
geometries = _get_geometry_argument(geometries, valid_geojson_types=valid_geojson_types, connection=self.connection, crs=None)
return self.process(
process_id='filter_spatial',
arguments={
Expand Down Expand Up @@ -1058,75 +1071,6 @@ def _merge_operator_binary_cubes(
}
))

def _get_geometry_argument(
self,
argument: Union[
shapely.geometry.base.BaseGeometry,
dict,
str,
pathlib.Path,
Parameter,
_FromNodeMixin,
],
valid_geojson_types: List[str],
crs: Optional[str] = None,
) -> Union[dict, Parameter, PGNode]:
"""
Convert input to a geometry as "geojson" subtype object or vectorcube.

:param crs: value that encodes a coordinate reference system.
See :py:func:`openeo.util.normalize_crs` for more details about additional normalization that is applied to this argument.
"""
if isinstance(argument, Parameter):
return argument
elif isinstance(argument, _FromNodeMixin):
return argument.from_node()

if isinstance(argument, str) and re.match(r"^https?://", argument, flags=re.I):
# Geometry provided as URL: load with `load_url` (with best-effort format guess)
url = urllib.parse.urlparse(argument)
suffix = pathlib.Path(url.path.lower()).suffix
format = {
".json": "GeoJSON",
".geojson": "GeoJSON",
".pq": "Parquet",
".parquet": "Parquet",
".geoparquet": "Parquet",
}.get(suffix, suffix.split(".")[-1])
return self.connection.load_url(url=argument, format=format)

if (
isinstance(argument, (str, pathlib.Path))
and pathlib.Path(argument).is_file()
and pathlib.Path(argument).suffix.lower() in [".json", ".geojson"]
):
geometry = load_json(argument)
elif isinstance(argument, shapely.geometry.base.BaseGeometry):
geometry = mapping(argument)
elif isinstance(argument, dict):
geometry = argument
else:
raise OpenEoClientException(f"Invalid geometry argument: {argument!r}")

if geometry.get("type") not in valid_geojson_types:
raise OpenEoClientException("Invalid geometry type {t!r}, must be one of {s}".format(
t=geometry.get("type"), s=valid_geojson_types
))
if crs:
# TODO: don't warn when the crs is Lon-Lat like EPSG:4326?
warnings.warn(f"Geometry with non-Lon-Lat CRS {crs!r} is only supported by specific back-ends.")
# TODO #204 alternative for non-standard CRS in GeoJSON object?
epsg_code = normalize_crs(crs)
if epsg_code is not None:
# proj did recognize the CRS
crs_name = f"EPSG:{epsg_code}"
else:
# proj did not recognise this CRS
warnings.warn(f"non-Lon-Lat CRS {crs!r} is not known to the proj library and might not be supported.")
crs_name = crs
geometry["crs"] = {"type": "name", "properties": {"name": crs_name}}
return geometry

@openeo_process
def aggregate_spatial(
self,
Expand Down Expand Up @@ -1198,7 +1142,7 @@ def aggregate_spatial(
"Point", "MultiPoint", "LineString", "MultiLineString",
"Polygon", "MultiPolygon", "GeometryCollection", "Feature", "FeatureCollection"
]
geometries = self._get_geometry_argument(geometries, valid_geojson_types=valid_geojson_types, crs=crs)
geometries = _get_geometry_argument(geometries, valid_geojson_types=valid_geojson_types, connection= self.connection, crs=crs)
reducer = build_child_callback(reducer, parent_parameters=["data"])
return VectorCube(
graph=self._build_pgnode(
Expand Down Expand Up @@ -1478,8 +1422,8 @@ def chunk_polygon(
"Feature",
"FeatureCollection",
]
chunks = self._get_geometry_argument(
chunks, valid_geojson_types=valid_geojson_types
chunks = _get_geometry_argument(
chunks, valid_geojson_types=valid_geojson_types, connection=self.connection
)
mask_value = float(mask_value) if mask_value is not None else None
return self.process(
Expand Down Expand Up @@ -1568,7 +1512,7 @@ def apply_polygon(

process = build_child_callback(process, parent_parameters=["data"], connection=self.connection)
valid_geojson_types = ["Polygon", "MultiPolygon", "Feature", "FeatureCollection"]
geometries = self._get_geometry_argument(geometries, valid_geojson_types=valid_geojson_types)
geometries = _get_geometry_argument(geometries, valid_geojson_types=valid_geojson_types, connection=self.connection)
mask_value = float(mask_value) if mask_value is not None else None
return self.process(
process_id="apply_polygon",
Expand Down Expand Up @@ -2056,7 +2000,7 @@ def mask_polygon(
(which will be loaded client-side to get the geometries as GeoJSON construct).
"""
valid_geojson_types = ["Polygon", "MultiPolygon", "GeometryCollection", "Feature", "FeatureCollection"]
mask = self._get_geometry_argument(mask, valid_geojson_types=valid_geojson_types, crs=srs)
mask = _get_geometry_argument(mask, valid_geojson_types=valid_geojson_types, connection=self.connection, crs=srs)
return self.process(
process_id="mask_polygon",
arguments=dict_no_none(
Expand Down Expand Up @@ -2477,6 +2421,7 @@ def execute_batch(
job_options: Optional[dict] = None,
validate: Optional[bool] = None,
auto_add_save_result: bool = True,
enable_print=True,
# TODO: deprecate `format_options` as keyword arguments
**format_options,
) -> BatchJob:
Expand All @@ -2494,6 +2439,7 @@ def execute_batch(
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
(overruling the connection's ``auto_validate`` setting).
:param auto_add_save_result: Automatically add a ``save_result`` node to the process graph if there is none yet.
:param enable_print: whether to print error logs

.. versionchanged:: 0.32.0
Added ``auto_add_save_result`` option
Expand Down Expand Up @@ -2529,7 +2475,8 @@ def execute_batch(
)
return job.run_synchronous(
outputfile=outputfile,
print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval
print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval,
enable_print=enable_print
)

def create_job(
Expand Down Expand Up @@ -2885,3 +2832,71 @@ def unflatten_dimension(self, dimension: str, target_dimensions: List[str], labe
label_separator=label_separator,
),
)
def _get_geometry_argument(
argument: Union[
shapely.geometry.base.BaseGeometry,
dict,
str,
pathlib.Path,
Parameter,
_FromNodeMixin,
],
valid_geojson_types: List[str],
connection: Connection = None,
crs: Optional[str] = None,
) -> Union[dict, Parameter, PGNode]:
"""
Convert input to a geometry as "geojson" subtype object or vectorcube.

:param crs: value that encodes a coordinate reference system.
See :py:func:`openeo.util.normalize_crs` for more details about additional normalization that is applied to this argument.
"""
if isinstance(argument, Parameter):
return argument
elif isinstance(argument, _FromNodeMixin):
return argument.from_node()

if isinstance(argument, str) and re.match(r"^https?://", argument, flags=re.I):
# Geometry provided as URL: load with `load_url` (with best-effort format guess)
url = urllib.parse.urlparse(argument)
suffix = pathlib.Path(url.path.lower()).suffix
format = {
".json": "GeoJSON",
".geojson": "GeoJSON",
".pq": "Parquet",
".parquet": "Parquet",
".geoparquet": "Parquet",
}.get(suffix, suffix.split(".")[-1])
return connection.load_url(url=argument, format=format)
#
if (
isinstance(argument, (str, pathlib.Path))
and pathlib.Path(argument).is_file()
and pathlib.Path(argument).suffix.lower() in [".json", ".geojson"]
):
geometry = load_json(argument)
elif isinstance(argument, shapely.geometry.base.BaseGeometry):
geometry = mapping(argument)
elif isinstance(argument, dict):
geometry = argument
else:
raise OpenEoClientException(f"Invalid geometry argument: {argument!r}")

if geometry.get("type") not in valid_geojson_types:
raise OpenEoClientException("Invalid geometry type {t!r}, must be one of {s}".format(
t=geometry.get("type"), s=valid_geojson_types
))
if crs:
# TODO: don't warn when the crs is Lon-Lat like EPSG:4326?
warnings.warn(f"Geometry with non-Lon-Lat CRS {crs!r} is only supported by specific back-ends.")
# TODO #204 alternative for non-standard CRS in GeoJSON object?
epsg_code = normalize_crs(crs)
if epsg_code is not None:
# proj did recognize the CRS
crs_name = f"EPSG:{epsg_code}"
else:
# proj did not recognise this CRS
warnings.warn(f"non-Lon-Lat CRS {crs!r} is not known to the proj library and might not be supported.")
crs_name = crs
geometry["crs"] = {"type": "name", "properties": {"name": crs_name}}
return geometry
32 changes: 22 additions & 10 deletions openeo/rest/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,19 +236,30 @@ def logs(

def run_synchronous(
self, outputfile: Union[str, Path, None] = None,
print=print, max_poll_interval=60, connection_retry_interval=30
print=print, max_poll_interval=60, connection_retry_interval=30, enable_print=True
) -> BatchJob:
"""Start the job, wait for it to finish and download result"""
"""
Start the job, wait for it to finish and download result

:param outputfile: The path of a file to which a result can be written
:param print: print/logging function to show progress/status
:param max_poll_interval: maximum number of seconds to sleep between status polls
:param connection_retry_interval: how long to wait when status poll failed due to connection issue
:param enable_print: whether to print error logs
:return:
"""
self.start_and_wait(
print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval
print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval,
enable_print=enable_print
)
# TODO #135 support multi file result sets too?
if outputfile is not None:
self.download_result(outputfile)
return self

def start_and_wait(
self, print=print, max_poll_interval: int = 60, connection_retry_interval: int = 30, soft_error_max=10
self, print=print, max_poll_interval: int = 60, connection_retry_interval: int = 30, soft_error_max=10,
enable_print=True
) -> BatchJob:
"""
Start the batch job, poll its status and wait till it finishes (or fails)
Expand All @@ -257,6 +268,7 @@ def start_and_wait(
:param max_poll_interval: maximum number of seconds to sleep between status polls
:param connection_retry_interval: how long to wait when status poll failed due to connection issue
:param soft_error_max: maximum number of soft errors (e.g. temporary connection glitches) to allow
:param enable_print: whether to print error logs
:return:
"""
# TODO rename `connection_retry_interval` to something more generic?
Expand Down Expand Up @@ -314,13 +326,13 @@ def soft_error(message: str):
poll_interval = min(1.25 * poll_interval, max_poll_interval)

if status != "finished":
# TODO: allow to disable this printing logs (e.g. in non-interactive contexts)?
# TODO: render logs jupyter-aware in a notebook context?
print(f"Your batch job {self.job_id!r} failed. Error logs:")
print(self.logs(level=logging.ERROR))
print(
f"Full logs can be inspected in an openEO (web) editor or with `connection.job({self.job_id!r}).logs()`."
)
if enable_print:
print(f"Your batch job {self.job_id!r} failed. Error logs:")
print(self.logs(level=logging.ERROR))
print(
f"Full logs can be inspected in an openEO (web) editor or with `connection.job({self.job_id!r}).logs()`."
)
raise JobFailedException(
f"Batch job {self.job_id!r} didn't finish successfully. Status: {status} (after {elapsed()}).",
job=self,
Expand Down
27 changes: 27 additions & 0 deletions tests/rest/datacube/test_datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,33 @@ def test_load_collection_connectionless_temporal_extent_shortcut(self):
}
}

def test_load_collection_connectionless_shapely_spatial_extent(self):
polygon = shapely.Polygon(((0.0,1.0),(2.0,1.0),(3.0,2.0),(1.5,0.0),(0.0,1.0)))
cube = DataCube.load_collection("T3", spatial_extent=polygon)
assert cube.flat_graph() == {
"loadcollection1": {
"arguments": {"id": "T3", "spatial_extent":
{'coordinates': (((0.0,1.0),(2.0,1.0),(3.0,2.0),(1.5,0.0),(0.0,1.0)),),'type': 'Polygon'},
"temporal_extent": None},
"process_id": "load_collection",
"result": True,
}
}

@pytest.mark.parametrize("path_factory", [str, pathlib.Path])
def test_load_collection_connectionless_local_path_spatial_extent(self, path_factory, test_data):
path = path_factory(test_data.get_path("geojson/polygon02.json"))
cube = DataCube.load_collection("T3", spatial_extent=path)
assert cube.flat_graph() == {
"loadcollection1": {
"arguments": {"id": "T3", "spatial_extent":
{"type": "Polygon", "coordinates": [[[3, 50], [4, 50], [4, 51], [3, 50]]]},
"temporal_extent": None},
"process_id": "load_collection",
"result": True,
}
}

def test_load_collection_connectionless_save_result(self):
cube = DataCube.load_collection("T3").save_result(format="GTiff")
assert cube.flat_graph() == {
Expand Down
Loading
Loading