Skip to content

Commit 07ecedc

Browse files
Merge branch 'master' of https://github.com/Open-EO/openeo-python-client into issue505-Option_to_disable_printing_error_logs_on_failed_job
# Conflicts: # CHANGELOG.md
2 parents d277011 + e106045 commit 07ecedc

File tree

17 files changed

+618
-106
lines changed

17 files changed

+618
-106
lines changed

CHANGELOG.md

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
### Added
1111

12+
### Changed
13+
14+
### Removed
15+
16+
### Fixed
17+
18+
19+
## [0.36.0] - 2024-12-10
20+
21+
### Added
22+
1223
- Automatically use `load_url` when providing a URL as geometries to `DataCube.aggregate_spatial()`, `DataCube.mask_polygon()`, etc. ([#104](https://github.com/Open-EO/openeo-python-client/issues/104), [#457](https://github.com/Open-EO/openeo-python-client/issues/457))
13-
- Argument `spatial_extent` in `load_collection` supports Shapely objects and loading GeoJSON from a local path.
24+
- Allow specifying `limit` when listing batch jobs with `Connection.list_jobs()` ([#677](https://github.com/Open-EO/openeo-python-client/issues/677))
25+
- Add `additional` and `job_options` arguments to `Connection.download()`, `Datacube.download()` and related ([#681](https://github.com/Open-EO/openeo-python-client/issues/681))
1426

1527
### Changed
1628

@@ -19,12 +31,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1931
this is not translated automatically anymore to deprecated, non-standard `read_vector` usage.
2032
Instead, if it is a local GeoJSON file, the GeoJSON data will be loaded directly client-side.
2133
([#104](https://github.com/Open-EO/openeo-python-client/issues/104), [#457](https://github.com/Open-EO/openeo-python-client/issues/457))
22-
23-
### Removed
34+
- Move `read()` method from general `JobDatabaseInterface` to more specific `FullDataFrameJobDatabase` ([#680](https://github.com/Open-EO/openeo-python-client/issues/680))
35+
- Align `additional` and `job_options` arguments in `Connection.create_job()`, `DataCube.create_job()` and related.
36+
Also, follow official spec more closely. ([#683](https://github.com/Open-EO/openeo-python-client/issues/683), [Open-EO/openeo-api#276](https://github.com/Open-EO/openeo-api/issues/276))
2437

2538
### Fixed
2639

2740
- `load_stac`: use fallback temporal dimension when no "cube:dimensions" in STAC Collection ([#666](https://github.com/Open-EO/openeo-python-client/issues/666))
41+
- Fix usage of `Parameter.spatial_extent()` with `load_collection` and `filter_bbox` ([#676](https://github.com/Open-EO/openeo-python-client/issues/676))
42+
2843

2944
## [0.35.0] - 2024-11-19
3045

docs/udp.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ Some useful parameter helpers (class methods of the :py:class:`~openeo.api.proce
134134
- :py:meth:`Parameter.geojson() <openeo.api.process.Parameter.geojson>` to create
135135
a parameter for specifying a GeoJSON geometry.
136136
- :py:meth:`Parameter.spatial_extent() <openeo.api.process.Parameter.spatial_extent>` to create
137-
a spatial_extent parameter that is exactly the same as the corresponding parameter in `load_collection` and `load_stac`.
137+
a spatial_extent parameter that is exactly the same as the corresponding parameter in ``load_collection`` and ``load_stac``.
138138

139139

140140

openeo/_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.36.0a1"
1+
__version__ = "0.37.0a1"

openeo/api/process.py

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import textwrap
34
import warnings
45
from typing import List, Optional, Union
56

@@ -279,23 +280,15 @@ def bounding_box(
279280
}
280281
return cls(name=name, description=description, schema=schema, **kwargs)
281282

282-
_spatial_extent_description = """Limits the data to process to the specified bounding box or polygons.
283-
284-
For raster data, the process loads the pixel into the data cube if the point at the pixel center intersects with the bounding box or any of the polygons (as defined in the Simple Features standard by the OGC).
285-
For vector data, the process loads the geometry into the data cube if the geometry is fully within the bounding box or any of the polygons (as defined in the Simple Features standard by the OGC). Empty geometries may only be in the data cube if no spatial extent has been provided.
286-
287-
Empty geometries are ignored.
288-
Set this parameter to null to set no limit for the spatial extent. """
289-
290283
@classmethod
291284
def spatial_extent(
292285
cls,
293286
name: str = "spatial_extent",
294-
description: str = _spatial_extent_description,
287+
description: Optional[str] = None,
295288
**kwargs,
296289
) -> Parameter:
297290
"""
298-
Helper to easily create a 'spatial_extent' parameter, which is compatible with the 'load_collection' argument of
291+
Helper to easily create a 'spatial_extent' parameter, which is compatible with the ``load_collection`` argument of
299292
the same name. This allows to conveniently create user-defined processes that can be applied to a bounding box and vector data
300293
for spatial filtering. It is also possible for users to set to null, and define spatial filtering using other processes.
301294
@@ -307,6 +300,26 @@ def spatial_extent(
307300
308301
.. versionadded:: 0.32.0
309302
"""
303+
if description is None:
304+
description = textwrap.dedent(
305+
"""
306+
Limits the data to process to the specified bounding box or polygons.
307+
308+
For raster data, the process loads the pixel into the data cube if the point
309+
at the pixel center intersects with the bounding box or any of the polygons
310+
(as defined in the Simple Features standard by the OGC).
311+
312+
For vector data, the process loads the geometry into the data cube if the geometry
313+
is fully within the bounding box or any of the polygons (as defined in the
314+
Simple Features standard by the OGC). Empty geometries may only be in the
315+
data cube if no spatial extent has been provided.
316+
317+
Empty geometries are ignored.
318+
319+
Set this parameter to null to set no limit for the spatial extent.
320+
"""
321+
).strip()
322+
310323
schema = [
311324
{
312325
"title": "Bounding Box",
@@ -410,7 +423,7 @@ def geojson(cls, name: str, description: str = "Geometries specified as GeoJSON
410423
@classmethod
411424
def temporal_interval(
412425
cls,
413-
name: str,
426+
name: str = "temporal_extent",
414427
description: str = "Temporal extent specified as two-element array with start and end date/date-time.",
415428
**kwargs,
416429
) -> Parameter:
@@ -441,3 +454,26 @@ def temporal_interval(
441454
},
442455
}
443456
return cls(name=name, description=description, schema=schema, **kwargs)
457+
458+
459+
def schema_supports(schema: Union[dict, List[dict]], type: str, subtype: Optional[str] = None) -> bool:
460+
"""Helper to check if parameter schema supports given type/subtype"""
461+
# TODO: support checking item type in arrays
462+
if isinstance(schema, dict):
463+
actual_type = schema.get("type")
464+
if isinstance(actual_type, str):
465+
if actual_type != type:
466+
return False
467+
elif isinstance(actual_type, list):
468+
if type not in actual_type:
469+
return False
470+
else:
471+
raise ValueError(actual_type)
472+
if subtype:
473+
if schema.get("subtype") != subtype:
474+
return False
475+
return True
476+
elif isinstance(schema, list):
477+
return any(schema_supports(s, type=type, subtype=subtype) for s in schema)
478+
else:
479+
raise ValueError(schema)
Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141

4242
_log = logging.getLogger(__name__)
4343

44+
4445
class _Backend(NamedTuple):
4546
"""Container for backend info/settings"""
4647

@@ -70,15 +71,6 @@ def exists(self) -> bool:
7071
"""Does the job database already exist, to read job data from?"""
7172
...
7273

73-
@abc.abstractmethod
74-
def read(self) -> pd.DataFrame:
75-
"""
76-
Read job data from the database as pandas DataFrame.
77-
78-
:return: loaded job data.
79-
"""
80-
...
81-
8274
@abc.abstractmethod
8375
def persist(self, df: pd.DataFrame):
8476
"""
@@ -364,9 +356,9 @@ def start_job_thread(self, start_job: Callable[[], BatchJob], job_db: JobDatabas
364356

365357
# Resume from existing db
366358
_log.info(f"Resuming `run_jobs` from existing {job_db}")
367-
df = job_db.read()
368359

369360
self._stop_thread = False
361+
370362
def run_loop():
371363

372364
# TODO: support user-provided `stats`
@@ -810,6 +802,15 @@ def initialize_from_df(self, df: pd.DataFrame, *, on_exists: str = "error"):
810802
# Return self to allow chaining with constructor.
811803
return self
812804

805+
@abc.abstractmethod
806+
def read(self) -> pd.DataFrame:
807+
"""
808+
Read job data from the database as pandas DataFrame.
809+
810+
:return: loaded job data.
811+
"""
812+
...
813+
813814
@property
814815
def df(self) -> pd.DataFrame:
815816
if self._df is None:
@@ -856,6 +857,7 @@ class CsvJobDatabase(FullDataFrameJobDatabase):
856857
857858
.. versionadded:: 0.31.0
858859
"""
860+
859861
def __init__(self, path: Union[str, Path]):
860862
super().__init__()
861863
self.path = Path(path)
@@ -912,6 +914,7 @@ class ParquetJobDatabase(FullDataFrameJobDatabase):
912914
913915
.. versionadded:: 0.31.0
914916
"""
917+
915918
def __init__(self, path: Union[str, Path]):
916919
super().__init__()
917920
self.path = Path(path)
@@ -934,6 +937,7 @@ def read(self) -> pd.DataFrame:
934937
metadata = pyarrow.parquet.read_metadata(self.path)
935938
if b"geo" in metadata.metadata:
936939
import geopandas
940+
937941
return geopandas.read_parquet(self.path)
938942
else:
939943
return pd.read_parquet(self.path)
@@ -1045,6 +1049,7 @@ class ProcessBasedJobCreator:
10451049
`feedback and suggestions for improvement <https://github.com/Open-EO/openeo-python-client/issues>`_.
10461050
10471051
"""
1052+
10481053
def __init__(
10491054
self,
10501055
*,
@@ -1077,7 +1082,6 @@ def _get_process_definition(self, connection: Connection) -> Process:
10771082
f"Unsupported process definition source udp_id={self._process_id!r} namespace={self._namespace!r}"
10781083
)
10791084

1080-
10811085
def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob:
10821086
"""
10831087
Implementation of the ``start_job`` callable interface

openeo/rest/_testing.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,17 @@ class DummyBackend:
3232
"""
3333

3434
# TODO: move to openeo.testing
35+
# TODO: unify "batch_jobs", "batch_jobs_full" and "extra_job_metadata_fields"?
36+
# TODO: unify "sync_requests" and "sync_requests_full"?
3537

3638
__slots__ = (
3739
"_requests_mock",
3840
"connection",
3941
"file_formats",
4042
"sync_requests",
43+
"sync_requests_full",
4144
"batch_jobs",
45+
"batch_jobs_full",
4246
"validation_requests",
4347
"next_result",
4448
"next_validation_errors",
@@ -60,7 +64,9 @@ def __init__(
6064
self.connection = connection
6165
self.file_formats = {"input": {}, "output": {}}
6266
self.sync_requests = []
67+
self.sync_requests_full = []
6368
self.batch_jobs = {}
69+
self.batch_jobs_full = {}
6470
self.validation_requests = []
6571
self.next_result = self.DEFAULT_RESULT
6672
self.next_validation_errors = []
@@ -163,7 +169,9 @@ def setup_file_format(self, name: str, type: str = "output", gis_data_types: Ite
163169

164170
def _handle_post_result(self, request, context):
165171
"""handler of `POST /result` (synchronous execute)"""
166-
pg = request.json()["process"]["process_graph"]
172+
post_data = request.json()
173+
pg = post_data["process"]["process_graph"]
174+
self.sync_requests_full.append(post_data)
167175
self.sync_requests.append(pg)
168176
result = self.next_result
169177
if isinstance(result, (dict, list)):
@@ -185,6 +193,10 @@ def _handle_post_jobs(self, request, context):
185193
job_id = f"job-{len(self.batch_jobs):03d}"
186194
assert job_id not in self.batch_jobs
187195

196+
# Full post data dump
197+
self.batch_jobs_full[job_id] = post_data
198+
199+
# Batch job essentials
188200
job_data = {"job_id": job_id, "pg": pg, "status": "created"}
189201
for field in ["title", "description"]:
190202
if field in post_data:
@@ -272,6 +284,11 @@ def get_sync_pg(self) -> dict:
272284
assert len(self.sync_requests) == 1
273285
return self.sync_requests[0]
274286

287+
def get_sync_post_data(self) -> dict:
288+
"""Get post data of the one and only synchronous job."""
289+
assert len(self.sync_requests_full) == 1
290+
return self.sync_requests_full[0]
291+
275292
def get_batch_pg(self) -> dict:
276293
"""
277294
Get process graph of the one and only batch job.
@@ -280,6 +297,14 @@ def get_batch_pg(self) -> dict:
280297
assert len(self.batch_jobs) == 1
281298
return self.batch_jobs[max(self.batch_jobs.keys())]["pg"]
282299

300+
def get_batch_post_data(self) -> dict:
301+
"""
302+
Get post data of the one and only batch job.
303+
Fails when there is none or more than one.
304+
"""
305+
assert len(self.batch_jobs_full) == 1
306+
return self.batch_jobs_full[max(self.batch_jobs_full.keys())]
307+
283308
def get_validation_pg(self) -> dict:
284309
"""
285310
Get process graph of the one and only validation request.

0 commit comments

Comments
 (0)