Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Added `show_error_logs` argument to `cube.execute_batch()`/`job.start_and_wait()`/... to toggle the automatic printing of error logs on failure ([#505](https://github.com/Open-EO/openeo-python-client/issues/505))

### Changed

### Removed
Expand Down
4 changes: 2 additions & 2 deletions docs/batch_jobs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,8 @@ When using
:py:meth:`job.start_and_wait() <openeo.rest.job.BatchJob.start_and_wait>`
or :py:meth:`cube.execute_batch() <openeo.rest.datacube.DataCube.execute_batch>`
to run a batch job and it fails,
the openEO Python client library will automatically
print the batch job logs and instructions to help with further investigation:
the openEO Python client library will print (by default)
the batch job's error logs and instructions to help with further investigation:

.. code-block:: pycon

Expand Down
10 changes: 9 additions & 1 deletion openeo/rest/datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -2477,6 +2477,7 @@ def execute_batch(
job_options: Optional[dict] = None,
validate: Optional[bool] = None,
auto_add_save_result: bool = True,
show_error_logs: bool = True,
# TODO: deprecate `format_options` as keyword arguments
**format_options,
) -> BatchJob:
Expand All @@ -2494,12 +2495,16 @@ def execute_batch(
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
(overruling the connection's ``auto_validate`` setting).
:param auto_add_save_result: Automatically add a ``save_result`` node to the process graph if there is none yet.
:param show_error_logs: whether to automatically print error logs when the batch job failed.

.. versionchanged:: 0.32.0
Added ``auto_add_save_result`` option

.. versionadded:: 0.36.0
Added argument ``additional``.

.. versionchanged:: 0.37.0
Added argument ``show_error_logs``.
"""
# TODO: start showing deprecation warnings about these inconsistent argument names
if "format" in format_options and not out_format:
Expand Down Expand Up @@ -2529,7 +2534,10 @@ def execute_batch(
)
return job.run_synchronous(
outputfile=outputfile,
print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval
print=print,
max_poll_interval=max_poll_interval,
connection_retry_interval=connection_retry_interval,
show_error_logs=show_error_logs,
)

def create_job(
Expand Down
50 changes: 38 additions & 12 deletions openeo/rest/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,20 +235,43 @@ def logs(
return VisualList("logs", data=entries)

def run_synchronous(
self, outputfile: Union[str, Path, None] = None,
print=print, max_poll_interval=60, connection_retry_interval=30
self,
outputfile: Union[str, Path, None] = None,
print=print,
max_poll_interval=60,
connection_retry_interval=30,
show_error_logs: bool = True,
) -> BatchJob:
"""Start the job, wait for it to finish and download result"""
"""
Start the job, wait for it to finish and download result

:param outputfile: The path of a file to which a result can be written
:param print: print/logging function to show progress/status
:param max_poll_interval: maximum number of seconds to sleep between status polls
:param connection_retry_interval: how long to wait when status poll failed due to connection issue
:param show_error_logs: whether to automatically print error logs when the batch job failed.

.. versionchanged:: 0.37.0
Added argument ``show_error_logs``.
"""
self.start_and_wait(
print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval
print=print,
max_poll_interval=max_poll_interval,
connection_retry_interval=connection_retry_interval,
show_error_logs=show_error_logs,
)
# TODO #135 support multi file result sets too?
if outputfile is not None:
self.download_result(outputfile)
return self

def start_and_wait(
self, print=print, max_poll_interval: int = 60, connection_retry_interval: int = 30, soft_error_max=10
self,
print=print,
max_poll_interval: int = 60,
connection_retry_interval: int = 30,
soft_error_max=10,
show_error_logs: bool = True,
) -> BatchJob:
"""
Start the batch job, poll its status and wait till it finishes (or fails)
Expand All @@ -257,7 +280,10 @@ def start_and_wait(
:param max_poll_interval: maximum number of seconds to sleep between status polls
:param connection_retry_interval: how long to wait when status poll failed due to connection issue
:param soft_error_max: maximum number of soft errors (e.g. temporary connection glitches) to allow
:return:
:param show_error_logs: whether to automatically print error logs when the batch job failed.

.. versionchanged:: 0.37.0
Added argument ``show_error_logs``.
"""
# TODO rename `connection_retry_interval` to something more generic?
start_time = time.time()
Expand Down Expand Up @@ -314,13 +340,13 @@ def soft_error(message: str):
poll_interval = min(1.25 * poll_interval, max_poll_interval)

if status != "finished":
# TODO: allow to disable this printing logs (e.g. in non-interactive contexts)?
# TODO: render logs jupyter-aware in a notebook context?
print(f"Your batch job {self.job_id!r} failed. Error logs:")
print(self.logs(level=logging.ERROR))
print(
f"Full logs can be inspected in an openEO (web) editor or with `connection.job({self.job_id!r}).logs()`."
)
if show_error_logs:
print(f"Your batch job {self.job_id!r} failed. Error logs:")
print(self.logs(level=logging.ERROR))
print(
f"Full logs can be inspected in an openEO (web) editor or with `connection.job({self.job_id!r}).logs()`."
)
raise JobFailedException(
f"Batch job {self.job_id!r} didn't finish successfully. Status: {status} (after {elapsed()}).",
job=self,
Expand Down
12 changes: 10 additions & 2 deletions openeo/rest/mlmodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@ def execute_batch(
connection_retry_interval=30,
additional: Optional[dict] = None,
job_options: Optional[dict] = None,
show_error_logs: bool = True,
) -> BatchJob:
"""
Evaluate the process graph by creating a batch job, and retrieving the results when it is finished.
This method is mostly recommended if the batch job is expected to run in a reasonable amount of time.

For very long running jobs, you probably do not want to keep the client running.
For very long-running jobs, you probably do not want to keep the client running.

:param job_options:
:param outputfile: The path of a file to which a result can be written
Expand All @@ -85,9 +86,13 @@ def execute_batch(
:param additional: additional (top-level) properties to set in the request body
:param job_options: dictionary of job options to pass to the backend
(under top-level property "job_options")
:param show_error_logs: whether to automatically print error logs when the batch job failed.

.. versionadded:: 0.36.0
Added argument ``additional``.

.. versionchanged:: 0.37.0
Added argument ``show_error_logs``.
"""
job = self.create_job(
title=title,
Expand All @@ -100,7 +105,10 @@ def execute_batch(
return job.run_synchronous(
# TODO #135 support multi file result sets too
outputfile=outputfile,
print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval
print=print,
max_poll_interval=max_poll_interval,
connection_retry_interval=connection_retry_interval,
show_error_logs=show_error_logs,
)

def create_job(
Expand Down
10 changes: 9 additions & 1 deletion openeo/rest/vectorcube.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ def execute_batch(
job_options: Optional[dict] = None,
validate: Optional[bool] = None,
auto_add_save_result: bool = True,
show_error_logs: bool = True,
# TODO: avoid using kwargs as format options
**format_options,
) -> BatchJob:
Expand All @@ -277,6 +278,7 @@ def execute_batch(
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
(overruling the connection's ``auto_validate`` setting).
:param auto_add_save_result: Automatically add a ``save_result`` node to the process graph if there is none yet.
:param show_error_logs: whether to automatically print error logs when the batch job failed.

.. versionchanged:: 0.21.0
When not specified explicitly, output format is guessed from output file extension.
Expand All @@ -286,6 +288,9 @@ def execute_batch(

.. versionadded:: 0.36.0
Added argument ``additional``.

.. versionchanged:: 0.37.0
Added argument ``show_error_logs``.
"""
cube = self
if auto_add_save_result:
Expand All @@ -310,7 +315,10 @@ def execute_batch(
return job.run_synchronous(
# TODO #135 support multi file result sets too
outputfile=outputfile,
print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval
print=print,
max_poll_interval=max_poll_interval,
connection_retry_interval=connection_retry_interval,
show_error_logs=show_error_logs,
)

def create_job(
Expand Down
31 changes: 31 additions & 0 deletions tests/rest/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,37 @@ def test_execute_batch_with_error(con100, requests_mock, tmpdir):
]


@pytest.mark.parametrize("show_error_logs", [True, False])
def test_execute_batch_show_error_logs(con100, requests_mock, show_error_logs):
requests_mock.get(API_URL + "/file_formats", json={"output": {"GTiff": {"gis_data_types": ["raster"]}}})
requests_mock.get(API_URL + "/collections/SENTINEL2", json={"foo": "bar"})
requests_mock.post(API_URL + "/jobs", status_code=201, headers={"OpenEO-Identifier": "f00ba5"})
requests_mock.post(API_URL + "/jobs/f00ba5/results", status_code=202)
requests_mock.get(API_URL + "/jobs/f00ba5", json={"status": "error", "progress": 100})
requests_mock.get(
API_URL + "/jobs/f00ba5/logs",
json={"logs": [{"id": "34", "level": "error", "message": "nope"}]},
)

stdout = []
with fake_time(), pytest.raises(JobFailedException):
con100.load_collection("SENTINEL2").execute_batch(
max_poll_interval=0.1, print=stdout.append, show_error_logs=show_error_logs
)

expected = [
"0:00:01 Job 'f00ba5': send 'start'",
"0:00:02 Job 'f00ba5': error (progress 100%)",
]
if show_error_logs:
expected += [
"Your batch job 'f00ba5' failed. Error logs:",
[{"id": "34", "level": "error", "message": "nope"}],
"Full logs can be inspected in an openEO (web) editor or with `connection.job('f00ba5').logs()`.",
]
assert stdout == expected


@pytest.mark.parametrize(["error_response", "expected"], [
(
{"exc": requests.ConnectionError("time out")},
Expand Down
Loading