Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- Move `ProcessBasedJobCreator` to own submodule `openeo.extra.job_management.process_based` ([#741](https://github.com/Open-EO/openeo-python-client/issues/741))

### Removed

- Remove unused/outdated `XarrayDataCube.plot()` and its related matplotlib dependency ([#472](https://github.com/Open-EO/openeo-python-client/issues/472))
Expand Down
20 changes: 10 additions & 10 deletions docs/cookbook/job_manager.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ API
.. autoclass:: openeo.extra.job_management.ParquetJobDatabase


.. autoclass:: openeo.extra.job_management.ProcessBasedJobCreator
.. autoclass:: openeo.extra.job_management.process_based.ProcessBasedJobCreator
:members:
:special-members: __call__

Expand All @@ -41,29 +41,29 @@ define a "template" job as a parameterized process
and let the job manager fill in the parameters
from a given data frame.

The :py:class:`~openeo.extra.job_management.ProcessBasedJobCreator` helper class
The :py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator` helper class
allows to do exactly that.
Given a reference to a parameterized process,
such as a user-defined process or remote process definition,
it can be used directly as ``start_job`` callable to
:py:meth:`~openeo.extra.job_management.MultiBackendJobManager.run_jobs`
which will fill in the process parameters from the dataframe.

Basic :py:class:`~openeo.extra.job_management.ProcessBasedJobCreator` example
-----------------------------------------------------------------------------
Basic :py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator` example
--------------------------------------------------------------------------------------------

Basic usage example with a remote process definition:

.. code-block:: python
:linenos:
:caption: Basic :py:class:`~openeo.extra.job_management.ProcessBasedJobCreator` example snippet
:caption: Basic :py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator` example snippet
:emphasize-lines: 10-15, 28

from openeo.extra.job_management import (
MultiBackendJobManager,
create_job_db,
ProcessBasedJobCreator,
)
from openeo.extra.job_management.process_based import ProcessBasedJobCreator

# Job creator, based on a parameterized openEO process
# (specified by the remote process definition at given URL)
Expand All @@ -90,7 +90,7 @@ Basic usage example with a remote process definition:
job_manager = MultiBackendJobManager(...)
job_manager.run_jobs(job_db=job_db, start_job=job_starter)

In this example, a :py:class:`ProcessBasedJobCreator` is instantiated
In this example, a :py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator` is instantiated
based on a remote process definition,
which has parameters ``start_date`` and ``bands``.
When passed to :py:meth:`~openeo.extra.job_management.MultiBackendJobManager.run_jobs`,
Expand All @@ -103,11 +103,11 @@ with parameter values based on matching columns in the dataframe:
and will get its value from the default specified in the ``parameter_defaults`` argument.


:py:class:`~openeo.extra.job_management.ProcessBasedJobCreator` with geometry handling
---------------------------------------------------------------------------------------------
:py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator` with geometry handling
-----------------------------------------------------------------------------------------------------

Apart from the intuitive name-based parameter-column linking,
:py:class:`~openeo.extra.job_management.ProcessBasedJobCreator`
:py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator`
also automatically links:

- a process parameters that accepts inline GeoJSON geometries/features
Expand Down
197 changes: 12 additions & 185 deletions openeo/extra/job_management/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import datetime
import json
import logging
import re
import time
import warnings
from pathlib import Path
Expand All @@ -23,7 +22,6 @@
Union,
)

import numpy
import pandas as pd
import requests
import shapely.errors
Expand All @@ -37,14 +35,20 @@
_JobManagerWorkerThreadPool,
_JobStartTask,
)
from openeo.internal.processes.parse import (
Parameter,
Process,
parse_remote_process_definition,
)
from openeo.extra.job_management.process_based import ProcessBasedJobCreator
from openeo.rest import OpenEoApiError
from openeo.rest.auth.auth import BearerAuth
from openeo.util import LazyLoadCache, deep_get, repr_truncate, rfc3339
from openeo.util import deep_get, rfc3339

__all__ = [
"JobDatabaseInterface",
"FullDataFrameJobDatabase",
"ParquetJobDatabase",
"CsvJobDatabase",
"ProcessBasedJobCreator",
"create_job_db",
"get_job_db",
]

_log = logging.getLogger(__name__)

Expand Down Expand Up @@ -1157,180 +1161,3 @@ def create_job_db(path: Union[str, Path], df: pd.DataFrame, *, on_exists: str =
else:
raise NotImplementedError(f"Initialization of {type(job_db)} is not supported.")
return job_db


class ProcessBasedJobCreator:
"""
Batch job creator
(to be used together with :py:class:`MultiBackendJobManager`)
that takes a parameterized openEO process definition
(e.g a user-defined process (UDP) or a remote openEO process definition),
and creates a batch job
for each row of the dataframe managed by the :py:class:`MultiBackendJobManager`
by filling in the process parameters with corresponding row values.

.. seealso::
See :ref:`job-management-with-process-based-job-creator`
for more information and examples.

Process parameters are linked to dataframe columns by name.
While this intuitive name-based matching should cover most use cases,
there are additional options for overrides or fallbacks:

- When provided, ``parameter_column_map`` will be consulted
for resolving a process parameter name (key in the dictionary)
to a desired dataframe column name (corresponding value).
- One common case is handled automatically as convenience functionality.

When:

- ``parameter_column_map`` is not provided (or set to ``None``),
- and there is a *single parameter* that accepts inline GeoJSON geometries,
- and the dataframe is a GeoPandas dataframe with a *single geometry* column,

then this parameter and this geometries column will be linked automatically.

- If a parameter can not be matched with a column by name as described above,
a default value will be picked,
first by looking in ``parameter_defaults`` (if provided),
and then by looking up the default value from the parameter schema in the process definition.
- Finally if no (default) value can be determined and the parameter
is not flagged as optional, an error will be raised.


:param process_id: (optional) openEO process identifier.
Can be omitted when working with a remote process definition
that is fully defined with a URL in the ``namespace`` parameter.
:param namespace: (optional) openEO process namespace.
Typically used to provide a URL to a remote process definition.
:param parameter_defaults: (optional) default values for process parameters,
to be used when not available in the dataframe managed by
:py:class:`MultiBackendJobManager`.
:param parameter_column_map: Optional overrides
for linking process parameters to dataframe columns:
mapping of process parameter names as key
to dataframe column names as value.

.. versionadded:: 0.33.0

.. warning::
This is an experimental API subject to change,
and we greatly welcome
`feedback and suggestions for improvement <https://github.com/Open-EO/openeo-python-client/issues>`_.

"""

def __init__(
self,
*,
process_id: Optional[str] = None,
namespace: Union[str, None] = None,
parameter_defaults: Optional[dict] = None,
parameter_column_map: Optional[dict] = None,
):
if process_id is None and namespace is None:
raise ValueError("At least one of `process_id` and `namespace` should be provided.")
self._process_id = process_id
self._namespace = namespace
self._parameter_defaults = parameter_defaults or {}
self._parameter_column_map = parameter_column_map
self._cache = LazyLoadCache()

def _get_process_definition(self, connection: Connection) -> Process:
if isinstance(self._namespace, str) and re.match("https?://", self._namespace):
# Remote process definition handling
return self._cache.get(
key=("remote_process_definition", self._namespace, self._process_id),
load=lambda: parse_remote_process_definition(namespace=self._namespace, process_id=self._process_id),
)
elif self._namespace is None:
# Handling of a user-specific UDP
udp_raw = connection.user_defined_process(self._process_id).describe()
return Process.from_dict(udp_raw)
else:
raise NotImplementedError(
f"Unsupported process definition source udp_id={self._process_id!r} namespace={self._namespace!r}"
)

def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob:
"""
Implementation of the ``start_job`` callable interface
of :py:meth:`MultiBackendJobManager.run_jobs`
to create a job based on given dataframe row

:param row: The row in the pandas dataframe that stores the jobs state and other tracked data.
:param connection: The connection to the backend.
"""
# TODO: refactor out some methods, for better reuse and decoupling:
# `get_arguments()` (to build the arguments dictionary), `get_cube()` (to create the cube),

process_definition = self._get_process_definition(connection=connection)
process_id = process_definition.id
parameters = process_definition.parameters or []

if self._parameter_column_map is None:
self._parameter_column_map = self._guess_parameter_column_map(parameters=parameters, row=row)

arguments = {}
for parameter in parameters:
param_name = parameter.name
column_name = self._parameter_column_map.get(param_name, param_name)
if column_name in row.index:
# Get value from dataframe row
value = row.loc[column_name]
elif param_name in self._parameter_defaults:
# Fallback on default values from constructor
value = self._parameter_defaults[param_name]
elif parameter.has_default():
# Explicitly use default value from parameter schema
value = parameter.default
elif parameter.optional:
# Skip optional parameters without any fallback default value
continue
else:
raise ValueError(f"Missing required parameter {param_name!r} for process {process_id!r}")

# Prepare some values/dtypes for JSON encoding
if isinstance(value, numpy.integer):
value = int(value)
elif isinstance(value, numpy.number):
value = float(value)
elif isinstance(value, shapely.geometry.base.BaseGeometry):
value = shapely.geometry.mapping(value)

arguments[param_name] = value

cube = connection.datacube_from_process(process_id=process_id, namespace=self._namespace, **arguments)

title = row.get("title", f"Process {process_id!r} with {repr_truncate(arguments)}")
description = row.get("description", f"Process {process_id!r} (namespace {self._namespace}) with {arguments}")
job = connection.create_job(cube, title=title, description=description)

return job

def __call__(self, *arg, **kwargs) -> BatchJob:
"""Syntactic sugar for calling :py:meth:`start_job`."""
return self.start_job(*arg, **kwargs)

@staticmethod
def _guess_parameter_column_map(parameters: List[Parameter], row: pd.Series) -> dict:
"""
Guess parameter-column mapping from given parameter list and dataframe row
"""
parameter_column_map = {}
# Geometry based mapping: try to automatically map geometry columns to geojson parameters
geojson_parameters = [p.name for p in parameters if p.schema.accepts_geojson()]
geometry_columns = [i for (i, v) in row.items() if isinstance(v, shapely.geometry.base.BaseGeometry)]
if geojson_parameters and geometry_columns:
if len(geojson_parameters) == 1 and len(geometry_columns) == 1:
# Most common case: one geometry parameter and one geometry column: can be mapped naively
parameter_column_map[geojson_parameters[0]] = geometry_columns[0]
elif all(p in geometry_columns for p in geojson_parameters):
# Each geometry param has geometry column with same name: easy to map
parameter_column_map.update((p, p) for p in geojson_parameters)
else:
raise RuntimeError(
f"Problem with mapping geometry columns ({geometry_columns}) to process parameters ({geojson_parameters})"
)
_log.debug(f"Guessed parameter-column map: {parameter_column_map}")
return parameter_column_map
Loading