Skip to content

Commit 49482bb

Browse files
committed
Issue #720/#402 Introduce StacResource/SaveResult returned from save_result
1 parent 5e70bdc commit 49482bb

File tree

8 files changed

+253
-196
lines changed

8 files changed

+253
-196
lines changed

openeo/rest/_datacube.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import requests
1212

13+
import openeo
1314
from openeo.internal.graph_building import FlatGraphableMixin, PGNode, _FromNodeMixin
1415
from openeo.internal.jupyter import render_component
1516
from openeo.internal.processes.builder import (
@@ -23,6 +24,8 @@
2324
if typing.TYPE_CHECKING:
2425
# Imports for type checking only (circular import issue at runtime).
2526
from openeo.rest.connection import Connection
27+
from openeo.rest.result import SaveResult
28+
from openeo.rest.stac_resource import StacResource
2629

2730
log = logging.getLogger(__name__)
2831

@@ -331,7 +334,7 @@ def _ensure_save_result(
331334
weak_format: Optional[str] = None,
332335
default_format: str,
333336
method: str,
334-
) -> _ProcessGraphAbstraction:
337+
) -> Union[SaveResult, StacResource]:
335338
"""
336339
Make sure there is a`save_result` node in the process graph.
337340
@@ -346,13 +349,17 @@ def _ensure_save_result(
346349

347350
if not save_result_nodes:
348351
# No `save_result` node yet: automatically add it.
349-
# TODO: the `save_result` method is not defined on _ProcessGraphAbstraction, but it is on DataCube and VectorCube
350-
cube = cube.save_result(format=format or weak_format or default_format, options=options)
351-
elif format or options:
352-
raise OpenEoClientException(
353-
f"{method} with explicit output {'format' if format else 'options'} {format or options!r},"
354-
f" but the process graph already has `save_result` node(s)"
355-
f" which is ambiguous and should not be combined."
356-
)
352+
if isinstance(cube, (openeo.DataCube, openeo.VectorCube)):
353+
pg_with_save_result = cube.save_result(format=format or weak_format or default_format, options=options)
354+
else:
355+
raise OpenEoClientException(f"No support to add `save_result` on {cube!r}.")
356+
else:
357+
if format or options:
358+
raise OpenEoClientException(
359+
f"{method} with explicit output {'format' if format else 'options'} {format or options!r},"
360+
f" but the process graph already has `save_result` node(s)"
361+
f" which is ambiguous and should not be combined."
362+
)
363+
pg_with_save_result = cube
357364

358-
return cube
365+
return pg_with_save_result

openeo/rest/datacube.py

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,9 @@
6363
from openeo.rest.graph_building import CollectionProperty
6464
from openeo.rest.job import BatchJob, RESTJob
6565
from openeo.rest.mlmodel import MlModel
66+
from openeo.rest.result import SaveResult
6667
from openeo.rest.service import Service
68+
from openeo.rest.stac_resource import StacResource
6769
from openeo.rest.udp import RESTUserDefinedProcess
6870
from openeo.rest.vectorcube import VectorCube
6971
from openeo.util import dict_no_none, guess_format, load_json, normalize_crs, rfc3339
@@ -2332,21 +2334,23 @@ def save_result(
23322334
self,
23332335
format: str = _DEFAULT_RASTER_FORMAT,
23342336
options: Optional[dict] = None,
2335-
) -> DataCube:
2337+
) -> SaveResult:
23362338
if self._connection:
23372339
formats = set(self._connection.list_output_formats().keys())
23382340
# TODO: map format to correct casing too?
23392341
if format.lower() not in {f.lower() for f in formats}:
23402342
raise ValueError("Invalid format {f!r}. Should be one of {s}".format(f=format, s=formats))
2341-
return self.process(
2343+
2344+
pg = self._build_pgnode(
23422345
process_id="save_result",
23432346
arguments={
2344-
"data": THIS,
2347+
"data": self,
23452348
"format": format,
23462349
# TODO: leave out options if unset?
2347-
"options": options or {}
2348-
}
2350+
"options": options or {},
2351+
},
23492352
)
2353+
return SaveResult(pg, connection=self._connection)
23502354

23512355
def download(
23522356
self,
@@ -2384,18 +2388,19 @@ def download(
23842388
Added arguments ``additional`` and ``job_options``.
23852389
"""
23862390
# TODO #278 centralize download/create_job/execute_job logic in DataCube, VectorCube, MlModel, ...
2387-
cube = self
23882391
if auto_add_save_result:
2389-
cube = _ensure_save_result(
2390-
cube=cube,
2392+
res = _ensure_save_result(
2393+
cube=self,
23912394
format=format,
23922395
options=options,
23932396
weak_format=guess_format(outputfile) if outputfile else None,
23942397
default_format=self._DEFAULT_RASTER_FORMAT,
23952398
method="DataCube.download()",
23962399
)
2400+
else:
2401+
res = self
23972402
return self._connection.download(
2398-
cube.flat_graph(), outputfile, validate=validate, additional=additional, job_options=job_options
2403+
res.flat_graph(), outputfile=outputfile, validate=validate, additional=additional, job_options=job_options
23992404
)
24002405

24012406
def validate(self) -> List[dict]:
@@ -2546,27 +2551,30 @@ def execute_batch(
25462551
out_format = format_options["format"] # align with 'download' call arg name
25472552

25482553
# TODO #278 centralize download/create_job/execute_job logic in DataCube, VectorCube, MlModel, ...
2549-
cube = self
25502554
if auto_add_save_result:
2551-
cube = _ensure_save_result(
2552-
cube=cube,
2555+
res = _ensure_save_result(
2556+
cube=self,
25532557
format=out_format,
25542558
options=format_options,
25552559
weak_format=guess_format(outputfile) if outputfile else None,
25562560
default_format=self._DEFAULT_RASTER_FORMAT,
25572561
method="DataCube.execute_batch()",
25582562
)
2563+
create_kwargs = {}
2564+
else:
2565+
res = self
2566+
create_kwargs = {"auto_add_save_result": False}
25592567

2560-
job = cube.create_job(
2568+
job = res.create_job(
25612569
title=title,
25622570
description=description,
25632571
plan=plan,
25642572
budget=budget,
25652573
additional=additional,
25662574
job_options=job_options,
25672575
validate=validate,
2568-
auto_add_save_result=False,
25692576
log_level=log_level,
2577+
**create_kwargs,
25702578
)
25712579
return job.run_synchronous(
25722580
outputfile=outputfile,
@@ -2629,17 +2637,19 @@ def create_job(
26292637
# TODO: add option to also automatically start the job?
26302638
# TODO: avoid using all kwargs as format_options
26312639
# TODO #278 centralize download/create_job/execute_job logic in DataCube, VectorCube, MlModel, ...
2632-
cube = self
26332640
if auto_add_save_result:
2634-
cube = _ensure_save_result(
2635-
cube=cube,
2641+
res = _ensure_save_result(
2642+
cube=self,
26362643
format=out_format,
26372644
options=format_options or None,
26382645
default_format=self._DEFAULT_RASTER_FORMAT,
26392646
method="DataCube.create_job()",
26402647
)
2648+
else:
2649+
res = self
2650+
26412651
return self._connection.create_job(
2642-
process_graph=cube.flat_graph(),
2652+
process_graph=res.flat_graph(),
26432653
title=title,
26442654
description=description,
26452655
plan=plan,

openeo/rest/result.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from openeo.rest.stac_resource import StacResource
2+
3+
4+
class SaveResult(StacResource):
5+
"""TODO"""
6+
7+
pass

openeo/rest/stac_resource.py

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
from __future__ import annotations
2+
3+
import typing
4+
from pathlib import Path
5+
from typing import Optional, Union
6+
7+
from openeo.internal.documentation import openeo_process
8+
from openeo.internal.graph_building import PGNode
9+
from openeo.rest import (
10+
DEFAULT_JOB_STATUS_POLL_CONNECTION_RETRY_INTERVAL,
11+
DEFAULT_JOB_STATUS_POLL_INTERVAL_MAX,
12+
)
13+
from openeo.rest._datacube import _ProcessGraphAbstraction
14+
from openeo.rest.job import BatchJob
15+
16+
if typing.TYPE_CHECKING:
17+
from openeo.rest.connection import Connection
18+
19+
20+
class StacResource(_ProcessGraphAbstraction):
21+
"""
22+
Handle for a progress graph node that represents a STAC resource (object with subtype "stac"),
23+
e.g. as returned by `save_result`, or handled by `export_workspace`/`stac_modify`.
24+
25+
26+
Refers to a STAC resource of any type (Catalog, Collection, or Item).
27+
It can refer to:
28+
- static STAC resources, e.g. hosted on cloud storage
29+
- dynamic STAC resources made available via a STAC API
30+
- a STAC JSON representation embedded as an argument into an openEO user-defined process
31+
"""
32+
33+
def __init__(self, graph: PGNode, connection: Optional[Connection] = None):
34+
super().__init__(pgnode=graph, connection=connection)
35+
36+
@openeo_process
37+
def export_workspace(self, workspace: str, merge: Union[str, None] = None) -> StacResource:
38+
"""
39+
Export data to a cloud user workspace
40+
41+
Exports the given processing results made available through a STAC resource
42+
(e.g., a STAC Collection) to the given user workspace.
43+
The STAC resource itself is exported with all STAC resources and assets underneath.
44+
45+
:return: the potentially updated STAC resource.
46+
"""
47+
return StacResource(
48+
graph=self._build_pgnode(
49+
process_id="export_workspace", arguments={"data": self, "workspace": workspace, "merge": merge}
50+
),
51+
connection=self._connection,
52+
)
53+
54+
def download(
55+
self,
56+
outputfile: Optional[Union[str, Path]] = None,
57+
*,
58+
validate: Optional[bool] = None,
59+
additional: Optional[dict] = None,
60+
job_options: Optional[dict] = None,
61+
):
62+
"""TODO"""
63+
return self._connection.download(
64+
graph=self.flat_graph(),
65+
outputfile=outputfile,
66+
validate=validate,
67+
additional=additional,
68+
job_options=job_options,
69+
)
70+
71+
def create_job(
72+
self,
73+
*,
74+
title: Optional[str] = None,
75+
description: Optional[str] = None,
76+
plan: Optional[str] = None,
77+
budget: Optional[float] = None,
78+
additional: Optional[dict] = None,
79+
job_options: Optional[dict] = None,
80+
validate: Optional[bool] = None,
81+
log_level: Optional[str] = None,
82+
) -> BatchJob:
83+
"""TODO"""
84+
return self._connection.create_job(
85+
process_graph=self.flat_graph(),
86+
title=title,
87+
description=description,
88+
plan=plan,
89+
budget=budget,
90+
validate=validate,
91+
additional=additional,
92+
job_options=job_options,
93+
log_level=log_level,
94+
)
95+
96+
def execute_batch(
97+
self,
98+
outputfile: Optional[Union[str, Path]] = None,
99+
*,
100+
title: Optional[str] = None,
101+
description: Optional[str] = None,
102+
plan: Optional[str] = None,
103+
budget: Optional[float] = None,
104+
print: typing.Callable[[str], None] = print,
105+
max_poll_interval: float = DEFAULT_JOB_STATUS_POLL_INTERVAL_MAX,
106+
connection_retry_interval: float = DEFAULT_JOB_STATUS_POLL_CONNECTION_RETRY_INTERVAL,
107+
additional: Optional[dict] = None,
108+
job_options: Optional[dict] = None,
109+
validate: Optional[bool] = None,
110+
auto_add_save_result: bool = True,
111+
show_error_logs: bool = True,
112+
log_level: Optional[str] = None,
113+
) -> BatchJob:
114+
"""TODO"""
115+
job = self.create_job(
116+
title=title,
117+
description=description,
118+
plan=plan,
119+
budget=budget,
120+
additional=additional,
121+
job_options=job_options,
122+
validate=validate,
123+
log_level=log_level,
124+
)
125+
return job.run_synchronous(
126+
outputfile=outputfile,
127+
print=print,
128+
max_poll_interval=max_poll_interval,
129+
connection_retry_interval=connection_retry_interval,
130+
show_error_logs=show_error_logs,
131+
)

0 commit comments

Comments
 (0)