diff --git a/openeo/rest/connection.py b/openeo/rest/connection.py index 28de15dfd..94c44fbeb 100644 --- a/openeo/rest/connection.py +++ b/openeo/rest/connection.py @@ -1254,7 +1254,7 @@ def datacube_from_json(self, src: Union[str, Path], parameters: Optional[dict] = def load_collection( self, collection_id: Union[str, Parameter], - spatial_extent: Union[Dict[str, float], Parameter, None] = None, + spatial_extent: Union[Dict[str, float], Parameter, shapely.geometry.base.BaseGeometry, None] = None, temporal_extent: Union[Sequence[InputDate], Parameter, str, None] = None, bands: Union[None, List[str], Parameter] = None, properties: Union[ @@ -1267,7 +1267,13 @@ def load_collection( Load a DataCube by collection id. :param collection_id: image collection identifier - :param spatial_extent: limit data to specified bounding box or polygons + :param spatial_extent: limit data to specified bounding box or polygons. Can be provided in different ways: + - a shapely geometry + - a GeoJSON-style dictionary, + - a path (:py:class:`str` or :py:class:`~pathlib.Path`) to a local, client-side GeoJSON file, + which will be loaded automatically to get the geometries as GeoJSON construct. + - a :py:class:`~openeo.api.process.Parameter` instance. + - a bounding box dictionary :param temporal_extent: limit data to specified temporal interval. Typically, just a two-item list or tuple containing start and end date. See :ref:`filtering-on-temporal-extent-section` for more details on temporal extent handling and shorthand notation. diff --git a/openeo/rest/datacube.py b/openeo/rest/datacube.py index db087f854..9dcb69a37 100644 --- a/openeo/rest/datacube.py +++ b/openeo/rest/datacube.py @@ -143,7 +143,7 @@ def load_collection( cls, collection_id: Union[str, Parameter], connection: Optional[Connection] = None, - spatial_extent: Union[Dict[str, float], Parameter, None] = None, + spatial_extent: Union[Dict[str, float], Parameter, shapely.geometry.base.BaseGeometry, None] = None, temporal_extent: Union[Sequence[InputDate], Parameter, str, None] = None, bands: Union[None, List[str], Parameter] = None, fetch_metadata: bool = True, @@ -158,7 +158,13 @@ def load_collection( :param collection_id: image collection identifier :param connection: The backend connection to use. Can be ``None`` to work without connection and collection metadata. - :param spatial_extent: limit data to specified bounding box or polygons + :param spatial_extent: limit data to specified bounding box or polygons. Can be provided in different ways: + - a shapely geometry + - a GeoJSON-style dictionary, + - a path (:py:class:`str` or :py:class:`~pathlib.Path`) to a local, client-side GeoJSON file, + which will be loaded automatically to get the geometries as GeoJSON construct. + - a :py:class:`~openeo.api.process.Parameter` instance. + - a bounding box dictionary :param temporal_extent: limit data to specified temporal interval. Typically, just a two-item list or tuple containing start and end date. See :ref:`filtering-on-temporal-extent-section` for more details on temporal extent handling and shorthand notation. @@ -187,9 +193,17 @@ def load_collection( "Unexpected parameterized `spatial_extent` in `load_collection`:" f" expected schema compatible with type 'object' but got {spatial_extent.schema!r}." ) + elif not spatial_extent or (isinstance(spatial_extent, dict) and spatial_extent.keys() & {"west", "east", "north", "south"}): + pass + else: + valid_geojson_types = [ + "Polygon", "MultiPolygon", "Feature", "FeatureCollection" + ] + spatial_extent = _get_geometry_argument(argument=spatial_extent, valid_geojson_types=valid_geojson_types, + connection=connection) + arguments = { 'id': collection_id, - # TODO: spatial_extent could also be a "geojson" subtype object, so we might want to allow (and convert) shapely shapes as well here. 'spatial_extent': spatial_extent, 'temporal_extent': temporal_extent, } @@ -628,10 +642,9 @@ def filter_spatial( (which will be loaded client-side to get the geometries as GeoJSON construct). """ valid_geojson_types = [ - "Point", "MultiPoint", "LineString", "MultiLineString", "Polygon", "MultiPolygon", "GeometryCollection", "FeatureCollection" ] - geometries = self._get_geometry_argument(geometries, valid_geojson_types=valid_geojson_types, crs=None) + geometries = _get_geometry_argument(geometries, valid_geojson_types=valid_geojson_types, connection=self.connection, crs=None) return self.process( process_id='filter_spatial', arguments={ @@ -1058,75 +1071,6 @@ def _merge_operator_binary_cubes( } )) - def _get_geometry_argument( - self, - argument: Union[ - shapely.geometry.base.BaseGeometry, - dict, - str, - pathlib.Path, - Parameter, - _FromNodeMixin, - ], - valid_geojson_types: List[str], - crs: Optional[str] = None, - ) -> Union[dict, Parameter, PGNode]: - """ - Convert input to a geometry as "geojson" subtype object or vectorcube. - - :param crs: value that encodes a coordinate reference system. - See :py:func:`openeo.util.normalize_crs` for more details about additional normalization that is applied to this argument. - """ - if isinstance(argument, Parameter): - return argument - elif isinstance(argument, _FromNodeMixin): - return argument.from_node() - - if isinstance(argument, str) and re.match(r"^https?://", argument, flags=re.I): - # Geometry provided as URL: load with `load_url` (with best-effort format guess) - url = urllib.parse.urlparse(argument) - suffix = pathlib.Path(url.path.lower()).suffix - format = { - ".json": "GeoJSON", - ".geojson": "GeoJSON", - ".pq": "Parquet", - ".parquet": "Parquet", - ".geoparquet": "Parquet", - }.get(suffix, suffix.split(".")[-1]) - return self.connection.load_url(url=argument, format=format) - - if ( - isinstance(argument, (str, pathlib.Path)) - and pathlib.Path(argument).is_file() - and pathlib.Path(argument).suffix.lower() in [".json", ".geojson"] - ): - geometry = load_json(argument) - elif isinstance(argument, shapely.geometry.base.BaseGeometry): - geometry = mapping(argument) - elif isinstance(argument, dict): - geometry = argument - else: - raise OpenEoClientException(f"Invalid geometry argument: {argument!r}") - - if geometry.get("type") not in valid_geojson_types: - raise OpenEoClientException("Invalid geometry type {t!r}, must be one of {s}".format( - t=geometry.get("type"), s=valid_geojson_types - )) - if crs: - # TODO: don't warn when the crs is Lon-Lat like EPSG:4326? - warnings.warn(f"Geometry with non-Lon-Lat CRS {crs!r} is only supported by specific back-ends.") - # TODO #204 alternative for non-standard CRS in GeoJSON object? - epsg_code = normalize_crs(crs) - if epsg_code is not None: - # proj did recognize the CRS - crs_name = f"EPSG:{epsg_code}" - else: - # proj did not recognise this CRS - warnings.warn(f"non-Lon-Lat CRS {crs!r} is not known to the proj library and might not be supported.") - crs_name = crs - geometry["crs"] = {"type": "name", "properties": {"name": crs_name}} - return geometry - @openeo_process def aggregate_spatial( self, @@ -1198,7 +1142,7 @@ def aggregate_spatial( "Point", "MultiPoint", "LineString", "MultiLineString", "Polygon", "MultiPolygon", "GeometryCollection", "Feature", "FeatureCollection" ] - geometries = self._get_geometry_argument(geometries, valid_geojson_types=valid_geojson_types, crs=crs) + geometries = _get_geometry_argument(geometries, valid_geojson_types=valid_geojson_types, connection= self.connection, crs=crs) reducer = build_child_callback(reducer, parent_parameters=["data"]) return VectorCube( graph=self._build_pgnode( @@ -1478,8 +1422,8 @@ def chunk_polygon( "Feature", "FeatureCollection", ] - chunks = self._get_geometry_argument( - chunks, valid_geojson_types=valid_geojson_types + chunks = _get_geometry_argument( + chunks, valid_geojson_types=valid_geojson_types, connection=self.connection ) mask_value = float(mask_value) if mask_value is not None else None return self.process( @@ -1568,7 +1512,7 @@ def apply_polygon( process = build_child_callback(process, parent_parameters=["data"], connection=self.connection) valid_geojson_types = ["Polygon", "MultiPolygon", "Feature", "FeatureCollection"] - geometries = self._get_geometry_argument(geometries, valid_geojson_types=valid_geojson_types) + geometries = _get_geometry_argument(geometries, valid_geojson_types=valid_geojson_types, connection=self.connection) mask_value = float(mask_value) if mask_value is not None else None return self.process( process_id="apply_polygon", @@ -2056,7 +2000,7 @@ def mask_polygon( (which will be loaded client-side to get the geometries as GeoJSON construct). """ valid_geojson_types = ["Polygon", "MultiPolygon", "GeometryCollection", "Feature", "FeatureCollection"] - mask = self._get_geometry_argument(mask, valid_geojson_types=valid_geojson_types, crs=srs) + mask = _get_geometry_argument(mask, valid_geojson_types=valid_geojson_types, connection=self.connection, crs=srs) return self.process( process_id="mask_polygon", arguments=dict_no_none( @@ -2477,6 +2421,7 @@ def execute_batch( job_options: Optional[dict] = None, validate: Optional[bool] = None, auto_add_save_result: bool = True, + enable_print=True, # TODO: deprecate `format_options` as keyword arguments **format_options, ) -> BatchJob: @@ -2494,6 +2439,7 @@ def execute_batch( :param validate: Optional toggle to enable/prevent validation of the process graphs before execution (overruling the connection's ``auto_validate`` setting). :param auto_add_save_result: Automatically add a ``save_result`` node to the process graph if there is none yet. + :param enable_print: whether to print error logs .. versionchanged:: 0.32.0 Added ``auto_add_save_result`` option @@ -2529,7 +2475,8 @@ def execute_batch( ) return job.run_synchronous( outputfile=outputfile, - print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval + print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval, + enable_print=enable_print ) def create_job( @@ -2885,3 +2832,71 @@ def unflatten_dimension(self, dimension: str, target_dimensions: List[str], labe label_separator=label_separator, ), ) +def _get_geometry_argument( + argument: Union[ + shapely.geometry.base.BaseGeometry, + dict, + str, + pathlib.Path, + Parameter, + _FromNodeMixin, + ], + valid_geojson_types: List[str], + connection: Connection = None, + crs: Optional[str] = None, +) -> Union[dict, Parameter, PGNode]: + """ + Convert input to a geometry as "geojson" subtype object or vectorcube. + + :param crs: value that encodes a coordinate reference system. + See :py:func:`openeo.util.normalize_crs` for more details about additional normalization that is applied to this argument. + """ + if isinstance(argument, Parameter): + return argument + elif isinstance(argument, _FromNodeMixin): + return argument.from_node() + + if isinstance(argument, str) and re.match(r"^https?://", argument, flags=re.I): + # Geometry provided as URL: load with `load_url` (with best-effort format guess) + url = urllib.parse.urlparse(argument) + suffix = pathlib.Path(url.path.lower()).suffix + format = { + ".json": "GeoJSON", + ".geojson": "GeoJSON", + ".pq": "Parquet", + ".parquet": "Parquet", + ".geoparquet": "Parquet", + }.get(suffix, suffix.split(".")[-1]) + return connection.load_url(url=argument, format=format) + # + if ( + isinstance(argument, (str, pathlib.Path)) + and pathlib.Path(argument).is_file() + and pathlib.Path(argument).suffix.lower() in [".json", ".geojson"] + ): + geometry = load_json(argument) + elif isinstance(argument, shapely.geometry.base.BaseGeometry): + geometry = mapping(argument) + elif isinstance(argument, dict): + geometry = argument + else: + raise OpenEoClientException(f"Invalid geometry argument: {argument!r}") + + if geometry.get("type") not in valid_geojson_types: + raise OpenEoClientException("Invalid geometry type {t!r}, must be one of {s}".format( + t=geometry.get("type"), s=valid_geojson_types + )) + if crs: + # TODO: don't warn when the crs is Lon-Lat like EPSG:4326? + warnings.warn(f"Geometry with non-Lon-Lat CRS {crs!r} is only supported by specific back-ends.") + # TODO #204 alternative for non-standard CRS in GeoJSON object? + epsg_code = normalize_crs(crs) + if epsg_code is not None: + # proj did recognize the CRS + crs_name = f"EPSG:{epsg_code}" + else: + # proj did not recognise this CRS + warnings.warn(f"non-Lon-Lat CRS {crs!r} is not known to the proj library and might not be supported.") + crs_name = crs + geometry["crs"] = {"type": "name", "properties": {"name": crs_name}} + return geometry \ No newline at end of file diff --git a/openeo/rest/job.py b/openeo/rest/job.py index e3f307a71..fcaaebbd7 100644 --- a/openeo/rest/job.py +++ b/openeo/rest/job.py @@ -236,11 +236,21 @@ def logs( def run_synchronous( self, outputfile: Union[str, Path, None] = None, - print=print, max_poll_interval=60, connection_retry_interval=30 + print=print, max_poll_interval=60, connection_retry_interval=30, enable_print=True ) -> BatchJob: - """Start the job, wait for it to finish and download result""" + """ + Start the job, wait for it to finish and download result + + :param outputfile: The path of a file to which a result can be written + :param print: print/logging function to show progress/status + :param max_poll_interval: maximum number of seconds to sleep between status polls + :param connection_retry_interval: how long to wait when status poll failed due to connection issue + :param enable_print: whether to print error logs + :return: + """ self.start_and_wait( - print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval + print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval, + enable_print=enable_print ) # TODO #135 support multi file result sets too? if outputfile is not None: @@ -248,7 +258,8 @@ def run_synchronous( return self def start_and_wait( - self, print=print, max_poll_interval: int = 60, connection_retry_interval: int = 30, soft_error_max=10 + self, print=print, max_poll_interval: int = 60, connection_retry_interval: int = 30, soft_error_max=10, + enable_print=True ) -> BatchJob: """ Start the batch job, poll its status and wait till it finishes (or fails) @@ -257,6 +268,7 @@ def start_and_wait( :param max_poll_interval: maximum number of seconds to sleep between status polls :param connection_retry_interval: how long to wait when status poll failed due to connection issue :param soft_error_max: maximum number of soft errors (e.g. temporary connection glitches) to allow + :param enable_print: whether to print error logs :return: """ # TODO rename `connection_retry_interval` to something more generic? @@ -314,13 +326,13 @@ def soft_error(message: str): poll_interval = min(1.25 * poll_interval, max_poll_interval) if status != "finished": - # TODO: allow to disable this printing logs (e.g. in non-interactive contexts)? # TODO: render logs jupyter-aware in a notebook context? - print(f"Your batch job {self.job_id!r} failed. Error logs:") - print(self.logs(level=logging.ERROR)) - print( - f"Full logs can be inspected in an openEO (web) editor or with `connection.job({self.job_id!r}).logs()`." - ) + if enable_print: + print(f"Your batch job {self.job_id!r} failed. Error logs:") + print(self.logs(level=logging.ERROR)) + print( + f"Full logs can be inspected in an openEO (web) editor or with `connection.job({self.job_id!r}).logs()`." + ) raise JobFailedException( f"Batch job {self.job_id!r} didn't finish successfully. Status: {status} (after {elapsed()}).", job=self, diff --git a/tests/rest/datacube/test_datacube.py b/tests/rest/datacube/test_datacube.py index cd3afbcba..820f720a4 100644 --- a/tests/rest/datacube/test_datacube.py +++ b/tests/rest/datacube/test_datacube.py @@ -136,6 +136,33 @@ def test_load_collection_connectionless_temporal_extent_shortcut(self): } } + def test_load_collection_connectionless_shapely_spatial_extent(self): + polygon = shapely.Polygon(((0.0,1.0),(2.0,1.0),(3.0,2.0),(1.5,0.0),(0.0,1.0))) + cube = DataCube.load_collection("T3", spatial_extent=polygon) + assert cube.flat_graph() == { + "loadcollection1": { + "arguments": {"id": "T3", "spatial_extent": + {'coordinates': (((0.0,1.0),(2.0,1.0),(3.0,2.0),(1.5,0.0),(0.0,1.0)),),'type': 'Polygon'}, + "temporal_extent": None}, + "process_id": "load_collection", + "result": True, + } + } + + @pytest.mark.parametrize("path_factory", [str, pathlib.Path]) + def test_load_collection_connectionless_local_path_spatial_extent(self, path_factory, test_data): + path = path_factory(test_data.get_path("geojson/polygon02.json")) + cube = DataCube.load_collection("T3", spatial_extent=path) + assert cube.flat_graph() == { + "loadcollection1": { + "arguments": {"id": "T3", "spatial_extent": + {"type": "Polygon", "coordinates": [[[3, 50], [4, 50], [4, 51], [3, 50]]]}, + "temporal_extent": None}, + "process_id": "load_collection", + "result": True, + } + } + def test_load_collection_connectionless_save_result(self): cube = DataCube.load_collection("T3").save_result(format="GTiff") assert cube.flat_graph() == { diff --git a/tests/rest/test_job.py b/tests/rest/test_job.py index 775f1b38a..eba811903 100644 --- a/tests/rest/test_job.py +++ b/tests/rest/test_job.py @@ -223,6 +223,57 @@ def test_execute_batch_with_soft_errors(con100, requests_mock, tmpdir, error_res assert path.read() == "tiffdata" assert job.logs() == [] +def test_execute_batch_with_error_with_error_logs_disabled(con100, requests_mock, tmpdir): + requests_mock.get(API_URL + "/file_formats", json={"output": {"GTiff": {"gis_data_types": ["raster"]}}}) + requests_mock.get(API_URL + "/collections/SENTINEL2", json={"foo": "bar"}) + requests_mock.post(API_URL + "/jobs", status_code=201, headers={"OpenEO-Identifier": "f00ba5"}) + requests_mock.post(API_URL + "/jobs/f00ba5/results", status_code=202) + requests_mock.get( + API_URL + "/jobs/f00ba5", + [ + {"json": {"status": "submitted"}}, + {"json": {"status": "queued"}}, + {"json": {"status": "running", "progress": 15}}, + {"json": {"status": "running", "progress": 80}}, + {"json": {"status": "error", "progress": 100}}, + ], + ) + requests_mock.get( + API_URL + "/jobs/f00ba5/logs", + json={ + "logs": [ + {"id": "12", "level": "info", "message": "starting"}, + {"id": "34", "level": "error", "message": "nope"}, + ] + }, + ) + + path = tmpdir.join("tmp.tiff") + log = [] + + try: + with fake_time(): + con100.load_collection("SENTINEL2").execute_batch( + outputfile=path, out_format="GTIFF", + max_poll_interval=.1, print=log.append, enable_print=False + ) + pytest.fail("execute_batch should fail") + except JobFailedException as e: + assert e.job.status() == "error" + assert [(l.level, l.message) for l in e.job.logs()] == [ + ("info", "starting"), + ("error", "nope"), + ] + + assert log == [ + "0:00:01 Job 'f00ba5': send 'start'", + "0:00:02 Job 'f00ba5': submitted (progress N/A)", + "0:00:04 Job 'f00ba5': queued (progress N/A)", + "0:00:07 Job 'f00ba5': running (progress 15%)", + "0:00:12 Job 'f00ba5': running (progress 80%)", + "0:00:20 Job 'f00ba5': error (progress 100%)", + ] + @pytest.mark.parametrize(["error_response", "expected"], [ (