From 7fa52c9209dcc3699a11d7b2db0fecd818880b6c Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Thu, 30 Oct 2025 11:11:10 +0100 Subject: [PATCH] Issue #741: Move ProcessBasedJobCreator to own submodule --- CHANGELOG.md | 2 + docs/cookbook/job_manager.rst | 20 +- openeo/extra/job_management/__init__.py | 197 +---- openeo/extra/job_management/process_based.py | 196 +++++ .../job_management/test_job_management.py | 690 +---------------- .../job_management/test_process_based.py | 709 ++++++++++++++++++ 6 files changed, 930 insertions(+), 884 deletions(-) create mode 100644 openeo/extra/job_management/process_based.py create mode 100644 tests/extra/job_management/test_process_based.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 679770ac0..4a7ee43e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- Move `ProcessBasedJobCreator` to own submodule `openeo.extra.job_management.process_based` ([#741](https://github.com/Open-EO/openeo-python-client/issues/741)) + ### Removed - Remove unused/outdated `XarrayDataCube.plot()` and its related matplotlib dependency ([#472](https://github.com/Open-EO/openeo-python-client/issues/472)) diff --git a/docs/cookbook/job_manager.rst b/docs/cookbook/job_manager.rst index 51c93cdbd..a2a7a0c93 100644 --- a/docs/cookbook/job_manager.rst +++ b/docs/cookbook/job_manager.rst @@ -21,7 +21,7 @@ API .. autoclass:: openeo.extra.job_management.ParquetJobDatabase -.. autoclass:: openeo.extra.job_management.ProcessBasedJobCreator +.. autoclass:: openeo.extra.job_management.process_based.ProcessBasedJobCreator :members: :special-members: __call__ @@ -41,7 +41,7 @@ define a "template" job as a parameterized process and let the job manager fill in the parameters from a given data frame. -The :py:class:`~openeo.extra.job_management.ProcessBasedJobCreator` helper class +The :py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator` helper class allows to do exactly that. Given a reference to a parameterized process, such as a user-defined process or remote process definition, @@ -49,21 +49,21 @@ it can be used directly as ``start_job`` callable to :py:meth:`~openeo.extra.job_management.MultiBackendJobManager.run_jobs` which will fill in the process parameters from the dataframe. -Basic :py:class:`~openeo.extra.job_management.ProcessBasedJobCreator` example ------------------------------------------------------------------------------ +Basic :py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator` example +-------------------------------------------------------------------------------------------- Basic usage example with a remote process definition: .. code-block:: python :linenos: - :caption: Basic :py:class:`~openeo.extra.job_management.ProcessBasedJobCreator` example snippet + :caption: Basic :py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator` example snippet :emphasize-lines: 10-15, 28 from openeo.extra.job_management import ( MultiBackendJobManager, create_job_db, - ProcessBasedJobCreator, ) + from openeo.extra.job_management.process_based import ProcessBasedJobCreator # Job creator, based on a parameterized openEO process # (specified by the remote process definition at given URL) @@ -90,7 +90,7 @@ Basic usage example with a remote process definition: job_manager = MultiBackendJobManager(...) job_manager.run_jobs(job_db=job_db, start_job=job_starter) -In this example, a :py:class:`ProcessBasedJobCreator` is instantiated +In this example, a :py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator` is instantiated based on a remote process definition, which has parameters ``start_date`` and ``bands``. When passed to :py:meth:`~openeo.extra.job_management.MultiBackendJobManager.run_jobs`, @@ -103,11 +103,11 @@ with parameter values based on matching columns in the dataframe: and will get its value from the default specified in the ``parameter_defaults`` argument. -:py:class:`~openeo.extra.job_management.ProcessBasedJobCreator` with geometry handling ---------------------------------------------------------------------------------------------- +:py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator` with geometry handling +----------------------------------------------------------------------------------------------------- Apart from the intuitive name-based parameter-column linking, -:py:class:`~openeo.extra.job_management.ProcessBasedJobCreator` +:py:class:`~openeo.extra.job_management.process_based.ProcessBasedJobCreator` also automatically links: - a process parameters that accepts inline GeoJSON geometries/features diff --git a/openeo/extra/job_management/__init__.py b/openeo/extra/job_management/__init__.py index 983db7221..9dfef353c 100644 --- a/openeo/extra/job_management/__init__.py +++ b/openeo/extra/job_management/__init__.py @@ -5,7 +5,6 @@ import datetime import json import logging -import re import time import warnings from pathlib import Path @@ -23,7 +22,6 @@ Union, ) -import numpy import pandas as pd import requests import shapely.errors @@ -37,14 +35,20 @@ _JobManagerWorkerThreadPool, _JobStartTask, ) -from openeo.internal.processes.parse import ( - Parameter, - Process, - parse_remote_process_definition, -) +from openeo.extra.job_management.process_based import ProcessBasedJobCreator from openeo.rest import OpenEoApiError from openeo.rest.auth.auth import BearerAuth -from openeo.util import LazyLoadCache, deep_get, repr_truncate, rfc3339 +from openeo.util import deep_get, rfc3339 + +__all__ = [ + "JobDatabaseInterface", + "FullDataFrameJobDatabase", + "ParquetJobDatabase", + "CsvJobDatabase", + "ProcessBasedJobCreator", + "create_job_db", + "get_job_db", +] _log = logging.getLogger(__name__) @@ -1157,180 +1161,3 @@ def create_job_db(path: Union[str, Path], df: pd.DataFrame, *, on_exists: str = else: raise NotImplementedError(f"Initialization of {type(job_db)} is not supported.") return job_db - - -class ProcessBasedJobCreator: - """ - Batch job creator - (to be used together with :py:class:`MultiBackendJobManager`) - that takes a parameterized openEO process definition - (e.g a user-defined process (UDP) or a remote openEO process definition), - and creates a batch job - for each row of the dataframe managed by the :py:class:`MultiBackendJobManager` - by filling in the process parameters with corresponding row values. - - .. seealso:: - See :ref:`job-management-with-process-based-job-creator` - for more information and examples. - - Process parameters are linked to dataframe columns by name. - While this intuitive name-based matching should cover most use cases, - there are additional options for overrides or fallbacks: - - - When provided, ``parameter_column_map`` will be consulted - for resolving a process parameter name (key in the dictionary) - to a desired dataframe column name (corresponding value). - - One common case is handled automatically as convenience functionality. - - When: - - - ``parameter_column_map`` is not provided (or set to ``None``), - - and there is a *single parameter* that accepts inline GeoJSON geometries, - - and the dataframe is a GeoPandas dataframe with a *single geometry* column, - - then this parameter and this geometries column will be linked automatically. - - - If a parameter can not be matched with a column by name as described above, - a default value will be picked, - first by looking in ``parameter_defaults`` (if provided), - and then by looking up the default value from the parameter schema in the process definition. - - Finally if no (default) value can be determined and the parameter - is not flagged as optional, an error will be raised. - - - :param process_id: (optional) openEO process identifier. - Can be omitted when working with a remote process definition - that is fully defined with a URL in the ``namespace`` parameter. - :param namespace: (optional) openEO process namespace. - Typically used to provide a URL to a remote process definition. - :param parameter_defaults: (optional) default values for process parameters, - to be used when not available in the dataframe managed by - :py:class:`MultiBackendJobManager`. - :param parameter_column_map: Optional overrides - for linking process parameters to dataframe columns: - mapping of process parameter names as key - to dataframe column names as value. - - .. versionadded:: 0.33.0 - - .. warning:: - This is an experimental API subject to change, - and we greatly welcome - `feedback and suggestions for improvement `_. - - """ - - def __init__( - self, - *, - process_id: Optional[str] = None, - namespace: Union[str, None] = None, - parameter_defaults: Optional[dict] = None, - parameter_column_map: Optional[dict] = None, - ): - if process_id is None and namespace is None: - raise ValueError("At least one of `process_id` and `namespace` should be provided.") - self._process_id = process_id - self._namespace = namespace - self._parameter_defaults = parameter_defaults or {} - self._parameter_column_map = parameter_column_map - self._cache = LazyLoadCache() - - def _get_process_definition(self, connection: Connection) -> Process: - if isinstance(self._namespace, str) and re.match("https?://", self._namespace): - # Remote process definition handling - return self._cache.get( - key=("remote_process_definition", self._namespace, self._process_id), - load=lambda: parse_remote_process_definition(namespace=self._namespace, process_id=self._process_id), - ) - elif self._namespace is None: - # Handling of a user-specific UDP - udp_raw = connection.user_defined_process(self._process_id).describe() - return Process.from_dict(udp_raw) - else: - raise NotImplementedError( - f"Unsupported process definition source udp_id={self._process_id!r} namespace={self._namespace!r}" - ) - - def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob: - """ - Implementation of the ``start_job`` callable interface - of :py:meth:`MultiBackendJobManager.run_jobs` - to create a job based on given dataframe row - - :param row: The row in the pandas dataframe that stores the jobs state and other tracked data. - :param connection: The connection to the backend. - """ - # TODO: refactor out some methods, for better reuse and decoupling: - # `get_arguments()` (to build the arguments dictionary), `get_cube()` (to create the cube), - - process_definition = self._get_process_definition(connection=connection) - process_id = process_definition.id - parameters = process_definition.parameters or [] - - if self._parameter_column_map is None: - self._parameter_column_map = self._guess_parameter_column_map(parameters=parameters, row=row) - - arguments = {} - for parameter in parameters: - param_name = parameter.name - column_name = self._parameter_column_map.get(param_name, param_name) - if column_name in row.index: - # Get value from dataframe row - value = row.loc[column_name] - elif param_name in self._parameter_defaults: - # Fallback on default values from constructor - value = self._parameter_defaults[param_name] - elif parameter.has_default(): - # Explicitly use default value from parameter schema - value = parameter.default - elif parameter.optional: - # Skip optional parameters without any fallback default value - continue - else: - raise ValueError(f"Missing required parameter {param_name!r} for process {process_id!r}") - - # Prepare some values/dtypes for JSON encoding - if isinstance(value, numpy.integer): - value = int(value) - elif isinstance(value, numpy.number): - value = float(value) - elif isinstance(value, shapely.geometry.base.BaseGeometry): - value = shapely.geometry.mapping(value) - - arguments[param_name] = value - - cube = connection.datacube_from_process(process_id=process_id, namespace=self._namespace, **arguments) - - title = row.get("title", f"Process {process_id!r} with {repr_truncate(arguments)}") - description = row.get("description", f"Process {process_id!r} (namespace {self._namespace}) with {arguments}") - job = connection.create_job(cube, title=title, description=description) - - return job - - def __call__(self, *arg, **kwargs) -> BatchJob: - """Syntactic sugar for calling :py:meth:`start_job`.""" - return self.start_job(*arg, **kwargs) - - @staticmethod - def _guess_parameter_column_map(parameters: List[Parameter], row: pd.Series) -> dict: - """ - Guess parameter-column mapping from given parameter list and dataframe row - """ - parameter_column_map = {} - # Geometry based mapping: try to automatically map geometry columns to geojson parameters - geojson_parameters = [p.name for p in parameters if p.schema.accepts_geojson()] - geometry_columns = [i for (i, v) in row.items() if isinstance(v, shapely.geometry.base.BaseGeometry)] - if geojson_parameters and geometry_columns: - if len(geojson_parameters) == 1 and len(geometry_columns) == 1: - # Most common case: one geometry parameter and one geometry column: can be mapped naively - parameter_column_map[geojson_parameters[0]] = geometry_columns[0] - elif all(p in geometry_columns for p in geojson_parameters): - # Each geometry param has geometry column with same name: easy to map - parameter_column_map.update((p, p) for p in geojson_parameters) - else: - raise RuntimeError( - f"Problem with mapping geometry columns ({geometry_columns}) to process parameters ({geojson_parameters})" - ) - _log.debug(f"Guessed parameter-column map: {parameter_column_map}") - return parameter_column_map diff --git a/openeo/extra/job_management/process_based.py b/openeo/extra/job_management/process_based.py new file mode 100644 index 000000000..051297612 --- /dev/null +++ b/openeo/extra/job_management/process_based.py @@ -0,0 +1,196 @@ +import logging +import re +from typing import List, Optional, Union + +import numpy +import pandas as pd +import shapely.errors +import shapely.geometry.base +import shapely.wkt + +from openeo import BatchJob, Connection +from openeo.internal.processes.parse import ( + Parameter, + Process, + parse_remote_process_definition, +) +from openeo.util import LazyLoadCache, repr_truncate + +_log = logging.getLogger(__name__) + + +class ProcessBasedJobCreator: + """ + Batch job creator + (to be used together with :py:class:`~openeo.extra.job_management.MultiBackendJobManager`) + that takes a parameterized openEO process definition + (e.g a user-defined process (UDP) or a remote openEO process definition), + and creates a batch job + for each row of the dataframe managed by the :py:class:`~openeo.extra.job_management.MultiBackendJobManager` + by filling in the process parameters with corresponding row values. + + .. seealso:: + See :ref:`job-management-with-process-based-job-creator` + for more information and examples. + + Process parameters are linked to dataframe columns by name. + While this intuitive name-based matching should cover most use cases, + there are additional options for overrides or fallbacks: + + - When provided, ``parameter_column_map`` will be consulted + for resolving a process parameter name (key in the dictionary) + to a desired dataframe column name (corresponding value). + - One common case is handled automatically as convenience functionality. + + When: + + - ``parameter_column_map`` is not provided (or set to ``None``), + - and there is a *single parameter* that accepts inline GeoJSON geometries, + - and the dataframe is a GeoPandas dataframe with a *single geometry* column, + + then this parameter and this geometries column will be linked automatically. + + - If a parameter can not be matched with a column by name as described above, + a default value will be picked, + first by looking in ``parameter_defaults`` (if provided), + and then by looking up the default value from the parameter schema in the process definition. + - Finally if no (default) value can be determined and the parameter + is not flagged as optional, an error will be raised. + + + :param process_id: (optional) openEO process identifier. + Can be omitted when working with a remote process definition + that is fully defined with a URL in the ``namespace`` parameter. + :param namespace: (optional) openEO process namespace. + Typically used to provide a URL to a remote process definition. + :param parameter_defaults: (optional) default values for process parameters, + to be used when not available in the dataframe managed by + :py:class:`~openeo.extra.job_management.MultiBackendJobManager`. + :param parameter_column_map: Optional overrides + for linking process parameters to dataframe columns: + mapping of process parameter names as key + to dataframe column names as value. + + .. versionadded:: 0.33.0 + + .. warning:: + This is an experimental API subject to change, + and we greatly welcome + `feedback and suggestions for improvement `_. + + """ + + def __init__( + self, + *, + process_id: Optional[str] = None, + namespace: Union[str, None] = None, + parameter_defaults: Optional[dict] = None, + parameter_column_map: Optional[dict] = None, + ): + if process_id is None and namespace is None: + raise ValueError("At least one of `process_id` and `namespace` should be provided.") + self._process_id = process_id + self._namespace = namespace + self._parameter_defaults = parameter_defaults or {} + self._parameter_column_map = parameter_column_map + self._cache = LazyLoadCache() + + def _get_process_definition(self, connection: Connection) -> Process: + if isinstance(self._namespace, str) and re.match("https?://", self._namespace): + # Remote process definition handling + return self._cache.get( + key=("remote_process_definition", self._namespace, self._process_id), + load=lambda: parse_remote_process_definition(namespace=self._namespace, process_id=self._process_id), + ) + elif self._namespace is None: + # Handling of a user-specific UDP + udp_raw = connection.user_defined_process(self._process_id).describe() + return Process.from_dict(udp_raw) + else: + raise NotImplementedError( + f"Unsupported process definition source udp_id={self._process_id!r} namespace={self._namespace!r}" + ) + + def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob: + """ + Implementation of the ``start_job`` callable interface + of :py:meth:`MultiBackendJobManager.run_jobs` + to create a job based on given dataframe row + + :param row: The row in the pandas dataframe that stores the jobs state and other tracked data. + :param connection: The connection to the backend. + """ + # TODO: refactor out some methods, for better reuse and decoupling: + # `get_arguments()` (to build the arguments dictionary), `get_cube()` (to create the cube), + + process_definition = self._get_process_definition(connection=connection) + process_id = process_definition.id + parameters = process_definition.parameters or [] + + if self._parameter_column_map is None: + self._parameter_column_map = self._guess_parameter_column_map(parameters=parameters, row=row) + + arguments = {} + for parameter in parameters: + param_name = parameter.name + column_name = self._parameter_column_map.get(param_name, param_name) + if column_name in row.index: + # Get value from dataframe row + value = row.loc[column_name] + elif param_name in self._parameter_defaults: + # Fallback on default values from constructor + value = self._parameter_defaults[param_name] + elif parameter.has_default(): + # Explicitly use default value from parameter schema + value = parameter.default + elif parameter.optional: + # Skip optional parameters without any fallback default value + continue + else: + raise ValueError(f"Missing required parameter {param_name!r} for process {process_id!r}") + + # Prepare some values/dtypes for JSON encoding + if isinstance(value, numpy.integer): + value = int(value) + elif isinstance(value, numpy.number): + value = float(value) + elif isinstance(value, shapely.geometry.base.BaseGeometry): + value = shapely.geometry.mapping(value) + + arguments[param_name] = value + + cube = connection.datacube_from_process(process_id=process_id, namespace=self._namespace, **arguments) + + title = row.get("title", f"Process {process_id!r} with {repr_truncate(arguments)}") + description = row.get("description", f"Process {process_id!r} (namespace {self._namespace}) with {arguments}") + job = connection.create_job(cube, title=title, description=description) + + return job + + def __call__(self, *arg, **kwargs) -> BatchJob: + """Syntactic sugar for calling :py:meth:`start_job`.""" + return self.start_job(*arg, **kwargs) + + @staticmethod + def _guess_parameter_column_map(parameters: List[Parameter], row: pd.Series) -> dict: + """ + Guess parameter-column mapping from given parameter list and dataframe row + """ + parameter_column_map = {} + # Geometry based mapping: try to automatically map geometry columns to geojson parameters + geojson_parameters = [p.name for p in parameters if p.schema.accepts_geojson()] + geometry_columns = [i for (i, v) in row.items() if isinstance(v, shapely.geometry.base.BaseGeometry)] + if geojson_parameters and geometry_columns: + if len(geojson_parameters) == 1 and len(geometry_columns) == 1: + # Most common case: one geometry parameter and one geometry column: can be mapped naively + parameter_column_map[geojson_parameters[0]] = geometry_columns[0] + elif all(p in geometry_columns for p in geojson_parameters): + # Each geometry param has geometry column with same name: easy to map + parameter_column_map.update((p, p) for p in geojson_parameters) + else: + raise RuntimeError( + f"Problem with mapping geometry columns ({geometry_columns}) to process parameters ({geojson_parameters})" + ) + _log.debug(f"Guessed parameter-column map: {parameter_column_map}") + return parameter_column_map diff --git a/tests/extra/job_management/test_job_management.py b/tests/extra/job_management/test_job_management.py index 806f78a5a..fab901548 100644 --- a/tests/extra/job_management/test_job_management.py +++ b/tests/extra/job_management/test_job_management.py @@ -1,5 +1,4 @@ import collections -import copy import dataclasses import datetime import json @@ -37,7 +36,6 @@ CsvJobDatabase, MultiBackendJobManager, ParquetJobDatabase, - ProcessBasedJobCreator, create_job_db, get_job_db, ) @@ -46,19 +44,12 @@ _JobManagerWorkerThreadPool, _TaskResult, ) -from openeo.rest._testing import OPENEO_BACKEND, DummyBackend, build_capabilities +from openeo.rest._testing import DummyBackend from openeo.rest.auth.testing import OidcMock from openeo.util import rfc3339 from openeo.utils.version import ComparableVersion -@pytest.fixture -def con(requests_mock) -> openeo.Connection: - requests_mock.get(OPENEO_BACKEND, json=build_capabilities(api_version="1.2.0", udp=True)) - con = openeo.Connection(OPENEO_BACKEND) - return con - - def _job_id_from_year(process_graph) -> Union[str, None]: """Job id generator that extracts the year from the process graph""" try: @@ -1275,682 +1266,3 @@ def test_create_job_db(tmp_path, filename, expected): db = create_job_db(path=path, df=df) assert isinstance(db, expected) assert path.exists() - - -class TestProcessBasedJobCreator: - @pytest.fixture - def dummy_backend(self, requests_mock, con) -> DummyBackend: - dummy = DummyBackend(requests_mock=requests_mock, connection=con) - dummy.setup_simple_job_status_flow(queued=2, running=3, final="finished") - return dummy - - PG_3PLUS5 = { - "id": "3plus5", - "process_graph": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}, - } - PG_INCREMENT = { - "id": "increment", - "parameters": [ - {"name": "data", "description": "data", "schema": {"type": "number"}}, - { - "name": "increment", - "description": "increment", - "schema": {"type": "number"}, - "optional": True, - "default": 1, - }, - ], - "process_graph": { - "process_id": "add", - "arguments": {"x": {"from_parameter": "data"}, "y": {"from_parameter": "increment"}}, - "result": True, - }, - } - PG_OFFSET_POLYGON = { - "id": "offset_polygon", - "parameters": [ - {"name": "data", "description": "data", "schema": {"type": "number"}}, - { - "name": "polygons", - "description": "polygons", - "schema": { - "title": "GeoJSON", - "type": "object", - "subtype": "geojson", - }, - }, - { - "name": "offset", - "description": "Offset", - "schema": {"type": "number"}, - "optional": True, - "default": 0, - }, - ], - } - - @pytest.fixture(autouse=True) - def remote_process_definitions(self, requests_mock) -> dict: - mocks = {} - processes = [self.PG_3PLUS5, self.PG_INCREMENT, self.PG_OFFSET_POLYGON] - mocks["_all"] = requests_mock.get("https://remote.test/_all", json={"processes": processes, "links": []}) - for pg in processes: - process_id = pg["id"] - mocks[process_id] = requests_mock.get(f"https://remote.test/{process_id}.json", json=pg) - return mocks - - def test_minimal(self, con, dummy_backend, remote_process_definitions): - """Bare minimum: just start a job, no parameters/arguments""" - job_factory = ProcessBasedJobCreator(process_id="3plus5", namespace="https://remote.test/3plus5.json") - - job = job_factory.start_job(row=pd.Series({"foo": 123}), connection=con) - assert isinstance(job, BatchJob) - assert dummy_backend.batch_jobs == { - "job-000": { - "job_id": "job-000", - "pg": { - "3plus51": { - "process_id": "3plus5", - "namespace": "https://remote.test/3plus5.json", - "arguments": {}, - "result": True, - } - }, - "status": "created", - "title": "Process '3plus5' with {}", - "description": "Process '3plus5' (namespace https://remote.test/3plus5.json) with {}", - } - } - - assert remote_process_definitions["3plus5"].call_count == 1 - - def test_basic(self, con, dummy_backend, remote_process_definitions): - """Basic parameterized UDP job generation""" - dummy_backend.extra_job_metadata_fields = ["title", "description"] - job_factory = ProcessBasedJobCreator(process_id="increment", namespace="https://remote.test/increment.json") - - job = job_factory.start_job(row=pd.Series({"data": 123}), connection=con) - assert isinstance(job, BatchJob) - assert dummy_backend.batch_jobs == { - "job-000": { - "job_id": "job-000", - "pg": { - "increment1": { - "process_id": "increment", - "namespace": "https://remote.test/increment.json", - "arguments": {"data": 123, "increment": 1}, - "result": True, - } - }, - "status": "created", - "title": "Process 'increment' with {'data': 123, 'increment': 1}", - "description": "Process 'increment' (namespace https://remote.test/increment.json) with {'data': 123, 'increment': 1}", - } - } - assert remote_process_definitions["increment"].call_count == 1 - - @pytest.mark.parametrize( - ["parameter_defaults", "row", "expected_arguments"], - [ - (None, {"data": 123}, {"data": 123, "increment": 1}), - (None, {"data": 123, "increment": 5}, {"data": 123, "increment": 5}), - ({"increment": 5}, {"data": 123}, {"data": 123, "increment": 5}), - ({"increment": 5}, {"data": 123, "increment": 1000}, {"data": 123, "increment": 1000}), - ], - ) - def test_basic_parameterization(self, con, dummy_backend, parameter_defaults, row, expected_arguments): - """Basic parameterized UDP job generation""" - job_factory = ProcessBasedJobCreator( - process_id="increment", - namespace="https://remote.test/increment.json", - parameter_defaults=parameter_defaults, - ) - - job = job_factory.start_job(row=pd.Series(row), connection=con) - assert isinstance(job, BatchJob) - assert dummy_backend.batch_jobs == { - "job-000": { - "job_id": "job-000", - "pg": { - "increment1": { - "process_id": "increment", - "namespace": "https://remote.test/increment.json", - "arguments": expected_arguments, - "result": True, - } - }, - "status": "created", - "title": dirty_equals.IsStr(regex="Process 'increment' with .*"), - "description": dirty_equals.IsStr(regex="Process 'increment' .*"), - } - } - - @pytest.mark.parametrize( - ["process_id", "namespace", "expected"], - [ - ( - # Classic UDP reference - "3plus5", - None, - {"process_id": "3plus5"}, - ), - ( - # Remote process definition (with "redundant" process_id) - "3plus5", - "https://remote.test/3plus5.json", - {"process_id": "3plus5", "namespace": "https://remote.test/3plus5.json"}, - ), - ( - # Remote process definition with just namespace (process_id should be inferred from that) - None, - "https://remote.test/3plus5.json", - {"process_id": "3plus5", "namespace": "https://remote.test/3plus5.json"}, - ), - ( - # Remote process definition from listing - "3plus5", - "https://remote.test/_all", - {"process_id": "3plus5", "namespace": "https://remote.test/_all"}, - ), - ], - ) - def test_process_references_in_constructor( - self, con, requests_mock, dummy_backend, remote_process_definitions, process_id, namespace, expected - ): - """Various ways to provide process references in the constructor""" - - # Register personal UDP - requests_mock.get(con.build_url("/process_graphs/3plus5"), json=self.PG_3PLUS5) - - job_factory = ProcessBasedJobCreator(process_id=process_id, namespace=namespace) - - job = job_factory.start_job(row=pd.Series({"foo": 123}), connection=con) - assert isinstance(job, BatchJob) - assert dummy_backend.batch_jobs == { - "job-000": { - "job_id": "job-000", - "pg": {"3plus51": {**expected, "arguments": {}, "result": True}}, - "status": "created", - "title": "Process '3plus5' with {}", - "description": f"Process '3plus5' (namespace {namespace}) with {{}}", - } - } - - def test_no_process_id_nor_namespace(self): - with pytest.raises(ValueError, match="At least one of `process_id` and `namespace` should be provided"): - _ = ProcessBasedJobCreator() - - @pytest.fixture - def job_manager(self, tmp_path, dummy_backend) -> MultiBackendJobManager: - job_manager = MultiBackendJobManager(root_dir=tmp_path / "job_mgr_root") - job_manager.add_backend("dummy", connection=dummy_backend.connection, parallel_jobs=1) - return job_manager - - def test_with_job_manager_remote_basic( - self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock, remote_process_definitions - ): - job_starter = ProcessBasedJobCreator( - process_id="increment", - namespace="https://remote.test/increment.json", - parameter_defaults={"increment": 5}, - ) - - df = pd.DataFrame({"data": [1, 2, 3]}) - job_db = CsvJobDatabase(tmp_path / "jobs.csv").initialize_from_df(df) - - stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter) - assert stats == dirty_equals.IsPartialDict( - { - "sleep": dirty_equals.IsInt(gt=1), - "start_job call": 3, - "job start": 3, - "job started running": 3, - "job finished": 3, - } - ) - assert set(job_db.read().status) == {"finished"} - - # Verify caching of HTTP request of remote process definition - assert remote_process_definitions["increment"].call_count == 1 - - assert dummy_backend.batch_jobs == { - "job-000": { - "job_id": "job-000", - "pg": { - "increment1": { - "process_id": "increment", - "namespace": "https://remote.test/increment.json", - "arguments": {"data": 1, "increment": 5}, - "result": True, - } - }, - "status": "finished", - "title": "Process 'increment' with {'data': 1, 'increment': 5}", - "description": "Process 'increment' (namespace https://remote.test/increment.json) with {'data': 1, 'increment': 5}", - }, - "job-001": { - "job_id": "job-001", - "pg": { - "increment1": { - "process_id": "increment", - "namespace": "https://remote.test/increment.json", - "arguments": {"data": 2, "increment": 5}, - "result": True, - } - }, - "status": "finished", - "title": "Process 'increment' with {'data': 2, 'increment': 5}", - "description": "Process 'increment' (namespace https://remote.test/increment.json) with {'data': 2, 'increment': 5}", - }, - "job-002": { - "job_id": "job-002", - "pg": { - "increment1": { - "process_id": "increment", - "namespace": "https://remote.test/increment.json", - "arguments": {"data": 3, "increment": 5}, - "result": True, - } - }, - "status": "finished", - "title": "Process 'increment' with {'data': 3, 'increment': 5}", - "description": "Process 'increment' (namespace https://remote.test/increment.json) with {'data': 3, 'increment': 5}", - }, - } - - @pytest.mark.parametrize( - ["parameter_defaults", "df_data", "expected_arguments"], - [ - ( - {"increment": 5}, - {"data": [1, 2, 3]}, - { - "job-000": {"data": 1, "increment": 5}, - "job-001": {"data": 2, "increment": 5}, - "job-002": {"data": 3, "increment": 5}, - }, - ), - ( - None, - {"data": [1, 2, 3], "increment": [44, 55, 66]}, - { - "job-000": {"data": 1, "increment": 44}, - "job-001": {"data": 2, "increment": 55}, - "job-002": {"data": 3, "increment": 66}, - }, - ), - ( - {"increment": 5555}, - {"data": [1, 2, 3], "increment": [44, 55, 66]}, - { - "job-000": {"data": 1, "increment": 44}, - "job-001": {"data": 2, "increment": 55}, - "job-002": {"data": 3, "increment": 66}, - }, - ), - ], - ) - def test_with_job_manager_remote_parameter_handling( - self, - tmp_path, - requests_mock, - dummy_backend, - job_manager, - sleep_mock, - parameter_defaults, - df_data, - expected_arguments, - ): - job_starter = ProcessBasedJobCreator( - process_id="increment", - namespace="https://remote.test/increment.json", - parameter_defaults=parameter_defaults, - ) - - df = pd.DataFrame(df_data) - job_db = CsvJobDatabase(tmp_path / "jobs.csv").initialize_from_df(df) - - stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter) - assert stats == dirty_equals.IsPartialDict( - { - "sleep": dirty_equals.IsInt(gt=1), - "start_job call": 3, - "job start": 3, - "job finished": 3, - } - ) - assert set(job_db.read().status) == {"finished"} - - assert dummy_backend.batch_jobs == { - "job-000": { - "job_id": "job-000", - "pg": { - "increment1": { - "process_id": "increment", - "namespace": "https://remote.test/increment.json", - "arguments": expected_arguments["job-000"], - "result": True, - } - }, - "status": "finished", - "title": dirty_equals.IsStr(regex="Process 'increment' with .*"), - "description": dirty_equals.IsStr(regex="Process 'increment'.*"), - }, - "job-001": { - "job_id": "job-001", - "pg": { - "increment1": { - "process_id": "increment", - "namespace": "https://remote.test/increment.json", - "arguments": expected_arguments["job-001"], - "result": True, - } - }, - "status": "finished", - "title": dirty_equals.IsStr(regex="Process 'increment' with .*"), - "description": dirty_equals.IsStr(regex="Process 'increment'.*"), - }, - "job-002": { - "job_id": "job-002", - "pg": { - "increment1": { - "process_id": "increment", - "namespace": "https://remote.test/increment.json", - "arguments": expected_arguments["job-002"], - "result": True, - } - }, - "status": "finished", - "title": dirty_equals.IsStr(regex="Process 'increment' with .*"), - "description": dirty_equals.IsStr(regex="Process 'increment'.*"), - }, - } - - def test_with_job_manager_remote_geometry(self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock): - job_starter = ProcessBasedJobCreator( - process_id="offset_polygon", - namespace="https://remote.test/offset_polygon.json", - parameter_defaults={"data": 123}, - ) - - df = geopandas.GeoDataFrame.from_features( - { - "type": "FeatureCollection", - "features": [ - { - "type": "Feature", - "id": "one", - "properties": {"offset": 11}, - "geometry": {"type": "Point", "coordinates": (1.0, 2.0)}, - }, - { - "type": "Feature", - "id": "two", - "properties": {"offset": 22}, - "geometry": {"type": "Point", "coordinates": (3.0, 4.0)}, - }, - ], - } - ) - - job_db = CsvJobDatabase(tmp_path / "jobs.csv").initialize_from_df(df) - - stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter) - assert stats == dirty_equals.IsPartialDict( - { - "sleep": dirty_equals.IsInt(gt=1), - "start_job call": 2, - "job start": 2, - "job finished": 2, - } - ) - assert set(job_db.read().status) == {"finished"} - - assert dummy_backend.batch_jobs == { - "job-000": { - "job_id": "job-000", - "pg": { - "offsetpolygon1": { - "process_id": "offset_polygon", - "namespace": "https://remote.test/offset_polygon.json", - "arguments": { - "data": 123, - "polygons": {"type": "Point", "coordinates": [1.0, 2.0]}, - "offset": 11, - }, - "result": True, - } - }, - "status": "finished", - "title": "Process 'offset_polygon' with {'data': 123, 'polygons': {'type': 'Point', 'coordinates': (1...", - "description": "Process 'offset_polygon' (namespace https://remote.test/offset_polygon.json) with {'data': 123, 'polygons': {'type': 'Point', 'coordinates': (1.0, 2.0)}, 'offset': 11}", - }, - "job-001": { - "job_id": "job-001", - "pg": { - "offsetpolygon1": { - "process_id": "offset_polygon", - "namespace": "https://remote.test/offset_polygon.json", - "arguments": { - "data": 123, - "polygons": {"type": "Point", "coordinates": [3.0, 4.0]}, - "offset": 22, - }, - "result": True, - } - }, - "status": "finished", - "title": "Process 'offset_polygon' with {'data': 123, 'polygons': {'type': 'Point', 'coordinates': (3...", - "description": "Process 'offset_polygon' (namespace https://remote.test/offset_polygon.json) with {'data': 123, 'polygons': {'type': 'Point', 'coordinates': (3.0, 4.0)}, 'offset': 22}", - }, - } - - @pytest.mark.parametrize( - ["db_class"], - [ - (CsvJobDatabase,), - (ParquetJobDatabase,), - ], - ) - def test_with_job_manager_remote_geometry_after_resume( - self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock, db_class - ): - """Test if geometry handling works properly after resuming from CSV serialized job db.""" - job_starter = ProcessBasedJobCreator( - process_id="offset_polygon", - namespace="https://remote.test/offset_polygon.json", - parameter_defaults={"data": 123}, - ) - - df = geopandas.GeoDataFrame.from_features( - { - "type": "FeatureCollection", - "features": [ - { - "type": "Feature", - "id": "one", - "properties": {"offset": 11}, - "geometry": {"type": "Point", "coordinates": (1.0, 2.0)}, - }, - { - "type": "Feature", - "id": "two", - "properties": {"offset": 22}, - "geometry": {"type": "Point", "coordinates": (3.0, 4.0)}, - }, - ], - } - ) - - # Persist the job db to CSV/Parquet/... - job_db_path = tmp_path / "jobs.db" - _ = db_class(job_db_path).initialize_from_df(df) - assert job_db_path.exists() - - # Resume from persisted job db - job_db = db_class(job_db_path) - - stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter) - assert stats == dirty_equals.IsPartialDict( - { - "sleep": dirty_equals.IsInt(gt=1), - "start_job call": 2, - "job start": 2, - "job finished": 2, - } - ) - assert set(job_db.read().status) == {"finished"} - - assert dummy_backend.batch_jobs == { - "job-000": { - "job_id": "job-000", - "pg": { - "offsetpolygon1": { - "process_id": "offset_polygon", - "namespace": "https://remote.test/offset_polygon.json", - "arguments": { - "data": 123, - "polygons": {"type": "Point", "coordinates": [1.0, 2.0]}, - "offset": 11, - }, - "result": True, - } - }, - "status": "finished", - "title": dirty_equals.IsStr(regex="Process 'offset_polygon' with.*"), - "description": dirty_equals.IsStr(regex="Process 'offset_polygon' .*"), - }, - "job-001": { - "job_id": "job-001", - "pg": { - "offsetpolygon1": { - "process_id": "offset_polygon", - "namespace": "https://remote.test/offset_polygon.json", - "arguments": { - "data": 123, - "polygons": {"type": "Point", "coordinates": [3.0, 4.0]}, - "offset": 22, - }, - "result": True, - } - }, - "status": "finished", - "title": dirty_equals.IsStr(regex="Process 'offset_polygon' with.*"), - "description": dirty_equals.IsStr(regex="Process 'offset_polygon' .*"), - }, - } - - def test_with_job_manager_udp_basic( - self, tmp_path, requests_mock, con, dummy_backend, job_manager, sleep_mock, remote_process_definitions - ): - # make deep copy - udp = copy.deepcopy(self.PG_INCREMENT) - # Register personal UDP - increment_udp_mock = requests_mock.get(con.build_url("/process_graphs/increment"), json=udp) - - job_starter = ProcessBasedJobCreator( - process_id="increment", - # No namespace to trigger personal UDP mode - namespace=None, - parameter_defaults={"increment": 5}, - ) - assert increment_udp_mock.call_count == 0 - - df = pd.DataFrame({"data": [3, 5]}) - job_db = CsvJobDatabase(tmp_path / "jobs.csv").initialize_from_df(df) - - stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter) - assert stats == dirty_equals.IsPartialDict( - { - "start_job call": 2, - "job finished": 2, - } - ) - assert increment_udp_mock.call_count == 2 - assert set(job_db.read().status) == {"finished"} - - assert dummy_backend.batch_jobs == { - "job-000": { - "job_id": "job-000", - "pg": { - "increment1": { - "process_id": "increment", - "arguments": {"data": 3, "increment": 5}, - "result": True, - } - }, - "status": "finished", - "title": "Process 'increment' with {'data': 3, 'increment': 5}", - "description": "Process 'increment' (namespace None) with {'data': 3, 'increment': 5}", - }, - "job-001": { - "job_id": "job-001", - "pg": { - "increment1": { - "process_id": "increment", - "arguments": {"data": 5, "increment": 5}, - "result": True, - } - }, - "status": "finished", - "title": "Process 'increment' with {'data': 5, 'increment': 5}", - "description": "Process 'increment' (namespace None) with {'data': 5, 'increment': 5}", - }, - } - - def test_with_job_manager_parameter_column_map( - self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock, remote_process_definitions - ): - job_starter = ProcessBasedJobCreator( - process_id="increment", - namespace="https://remote.test/increment.json", - parameter_column_map={"data": "numberzzz", "increment": "add_thiz"}, - ) - - df = pd.DataFrame( - { - "data": [1, 2], - "increment": [-1, -2], - "numberzzz": [3, 5], - "add_thiz": [100, 200], - } - ) - job_db = CsvJobDatabase(tmp_path / "jobs.csv").initialize_from_df(df) - - stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter) - assert stats == dirty_equals.IsPartialDict( - { - "start_job call": 2, - "job finished": 2, - } - ) - assert dummy_backend.batch_jobs == { - "job-000": { - "job_id": "job-000", - "pg": { - "increment1": { - "process_id": "increment", - "namespace": "https://remote.test/increment.json", - "arguments": {"data": 3, "increment": 100}, - "result": True, - } - }, - "status": "finished", - "title": "Process 'increment' with {'data': 3, 'increment': 100}", - "description": "Process 'increment' (namespace https://remote.test/increment.json) with {'data': 3, 'increment': 100}", - }, - "job-001": { - "job_id": "job-001", - "pg": { - "increment1": { - "process_id": "increment", - "namespace": "https://remote.test/increment.json", - "arguments": {"data": 5, "increment": 200}, - "result": True, - } - }, - "status": "finished", - "title": "Process 'increment' with {'data': 5, 'increment': 200}", - "description": "Process 'increment' (namespace https://remote.test/increment.json) with {'data': 5, 'increment': 200}", - }, - } diff --git a/tests/extra/job_management/test_process_based.py b/tests/extra/job_management/test_process_based.py new file mode 100644 index 000000000..6a1105931 --- /dev/null +++ b/tests/extra/job_management/test_process_based.py @@ -0,0 +1,709 @@ +import copy +from unittest import mock + +import dirty_equals +import geopandas +import pandas as pd +import pytest + +import openeo +from openeo import BatchJob +from openeo.extra.job_management import ( + CsvJobDatabase, + MultiBackendJobManager, + ParquetJobDatabase, +) +from openeo.extra.job_management.process_based import ProcessBasedJobCreator +from openeo.rest._testing import OPENEO_BACKEND, DummyBackend, build_capabilities + + +@pytest.fixture +def sleep_mock(): + with mock.patch("time.sleep") as sleep: + yield sleep + + +@pytest.fixture +def con(requests_mock) -> openeo.Connection: + requests_mock.get(OPENEO_BACKEND, json=build_capabilities(api_version="1.2.0", udp=True)) + con = openeo.Connection(OPENEO_BACKEND) + return con + + +class TestProcessBasedJobCreator: + @pytest.fixture + def dummy_backend(self, requests_mock, con) -> DummyBackend: + dummy = DummyBackend(requests_mock=requests_mock, connection=con) + dummy.setup_simple_job_status_flow(queued=2, running=3, final="finished") + return dummy + + PG_3PLUS5 = { + "id": "3plus5", + "process_graph": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}, + } + PG_INCREMENT = { + "id": "increment", + "parameters": [ + {"name": "data", "description": "data", "schema": {"type": "number"}}, + { + "name": "increment", + "description": "increment", + "schema": {"type": "number"}, + "optional": True, + "default": 1, + }, + ], + "process_graph": { + "process_id": "add", + "arguments": {"x": {"from_parameter": "data"}, "y": {"from_parameter": "increment"}}, + "result": True, + }, + } + PG_OFFSET_POLYGON = { + "id": "offset_polygon", + "parameters": [ + {"name": "data", "description": "data", "schema": {"type": "number"}}, + { + "name": "polygons", + "description": "polygons", + "schema": { + "title": "GeoJSON", + "type": "object", + "subtype": "geojson", + }, + }, + { + "name": "offset", + "description": "Offset", + "schema": {"type": "number"}, + "optional": True, + "default": 0, + }, + ], + } + + @pytest.fixture(autouse=True) + def remote_process_definitions(self, requests_mock) -> dict: + mocks = {} + processes = [self.PG_3PLUS5, self.PG_INCREMENT, self.PG_OFFSET_POLYGON] + mocks["_all"] = requests_mock.get("https://remote.test/_all", json={"processes": processes, "links": []}) + for pg in processes: + process_id = pg["id"] + mocks[process_id] = requests_mock.get(f"https://remote.test/{process_id}.json", json=pg) + return mocks + + def test_minimal(self, con, dummy_backend, remote_process_definitions): + """Bare minimum: just start a job, no parameters/arguments""" + job_factory = ProcessBasedJobCreator(process_id="3plus5", namespace="https://remote.test/3plus5.json") + + job = job_factory.start_job(row=pd.Series({"foo": 123}), connection=con) + assert isinstance(job, BatchJob) + assert dummy_backend.batch_jobs == { + "job-000": { + "job_id": "job-000", + "pg": { + "3plus51": { + "process_id": "3plus5", + "namespace": "https://remote.test/3plus5.json", + "arguments": {}, + "result": True, + } + }, + "status": "created", + "title": "Process '3plus5' with {}", + "description": "Process '3plus5' (namespace https://remote.test/3plus5.json) with {}", + } + } + + assert remote_process_definitions["3plus5"].call_count == 1 + + def test_basic(self, con, dummy_backend, remote_process_definitions): + """Basic parameterized UDP job generation""" + dummy_backend.extra_job_metadata_fields = ["title", "description"] + job_factory = ProcessBasedJobCreator(process_id="increment", namespace="https://remote.test/increment.json") + + job = job_factory.start_job(row=pd.Series({"data": 123}), connection=con) + assert isinstance(job, BatchJob) + assert dummy_backend.batch_jobs == { + "job-000": { + "job_id": "job-000", + "pg": { + "increment1": { + "process_id": "increment", + "namespace": "https://remote.test/increment.json", + "arguments": {"data": 123, "increment": 1}, + "result": True, + } + }, + "status": "created", + "title": "Process 'increment' with {'data': 123, 'increment': 1}", + "description": "Process 'increment' (namespace https://remote.test/increment.json) with {'data': 123, 'increment': 1}", + } + } + assert remote_process_definitions["increment"].call_count == 1 + + @pytest.mark.parametrize( + ["parameter_defaults", "row", "expected_arguments"], + [ + (None, {"data": 123}, {"data": 123, "increment": 1}), + (None, {"data": 123, "increment": 5}, {"data": 123, "increment": 5}), + ({"increment": 5}, {"data": 123}, {"data": 123, "increment": 5}), + ({"increment": 5}, {"data": 123, "increment": 1000}, {"data": 123, "increment": 1000}), + ], + ) + def test_basic_parameterization(self, con, dummy_backend, parameter_defaults, row, expected_arguments): + """Basic parameterized UDP job generation""" + job_factory = ProcessBasedJobCreator( + process_id="increment", + namespace="https://remote.test/increment.json", + parameter_defaults=parameter_defaults, + ) + + job = job_factory.start_job(row=pd.Series(row), connection=con) + assert isinstance(job, BatchJob) + assert dummy_backend.batch_jobs == { + "job-000": { + "job_id": "job-000", + "pg": { + "increment1": { + "process_id": "increment", + "namespace": "https://remote.test/increment.json", + "arguments": expected_arguments, + "result": True, + } + }, + "status": "created", + "title": dirty_equals.IsStr(regex="Process 'increment' with .*"), + "description": dirty_equals.IsStr(regex="Process 'increment' .*"), + } + } + + @pytest.mark.parametrize( + ["process_id", "namespace", "expected"], + [ + ( + # Classic UDP reference + "3plus5", + None, + {"process_id": "3plus5"}, + ), + ( + # Remote process definition (with "redundant" process_id) + "3plus5", + "https://remote.test/3plus5.json", + {"process_id": "3plus5", "namespace": "https://remote.test/3plus5.json"}, + ), + ( + # Remote process definition with just namespace (process_id should be inferred from that) + None, + "https://remote.test/3plus5.json", + {"process_id": "3plus5", "namespace": "https://remote.test/3plus5.json"}, + ), + ( + # Remote process definition from listing + "3plus5", + "https://remote.test/_all", + {"process_id": "3plus5", "namespace": "https://remote.test/_all"}, + ), + ], + ) + def test_process_references_in_constructor( + self, con, requests_mock, dummy_backend, remote_process_definitions, process_id, namespace, expected + ): + """Various ways to provide process references in the constructor""" + + # Register personal UDP + requests_mock.get(con.build_url("/process_graphs/3plus5"), json=self.PG_3PLUS5) + + job_factory = ProcessBasedJobCreator(process_id=process_id, namespace=namespace) + + job = job_factory.start_job(row=pd.Series({"foo": 123}), connection=con) + assert isinstance(job, BatchJob) + assert dummy_backend.batch_jobs == { + "job-000": { + "job_id": "job-000", + "pg": {"3plus51": {**expected, "arguments": {}, "result": True}}, + "status": "created", + "title": "Process '3plus5' with {}", + "description": f"Process '3plus5' (namespace {namespace}) with {{}}", + } + } + + def test_no_process_id_nor_namespace(self): + with pytest.raises(ValueError, match="At least one of `process_id` and `namespace` should be provided"): + _ = ProcessBasedJobCreator() + + @pytest.fixture + def job_manager(self, tmp_path, dummy_backend) -> MultiBackendJobManager: + job_manager = MultiBackendJobManager(root_dir=tmp_path / "job_mgr_root") + job_manager.add_backend("dummy", connection=dummy_backend.connection, parallel_jobs=1) + return job_manager + + def test_with_job_manager_remote_basic( + self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock, remote_process_definitions + ): + job_starter = ProcessBasedJobCreator( + process_id="increment", + namespace="https://remote.test/increment.json", + parameter_defaults={"increment": 5}, + ) + + df = pd.DataFrame({"data": [1, 2, 3]}) + job_db = CsvJobDatabase(tmp_path / "jobs.csv").initialize_from_df(df) + + stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter) + assert stats == dirty_equals.IsPartialDict( + { + "sleep": dirty_equals.IsInt(gt=1), + "start_job call": 3, + "job start": 3, + "job started running": 3, + "job finished": 3, + } + ) + assert set(job_db.read().status) == {"finished"} + + # Verify caching of HTTP request of remote process definition + assert remote_process_definitions["increment"].call_count == 1 + + assert dummy_backend.batch_jobs == { + "job-000": { + "job_id": "job-000", + "pg": { + "increment1": { + "process_id": "increment", + "namespace": "https://remote.test/increment.json", + "arguments": {"data": 1, "increment": 5}, + "result": True, + } + }, + "status": "finished", + "title": "Process 'increment' with {'data': 1, 'increment': 5}", + "description": "Process 'increment' (namespace https://remote.test/increment.json) with {'data': 1, 'increment': 5}", + }, + "job-001": { + "job_id": "job-001", + "pg": { + "increment1": { + "process_id": "increment", + "namespace": "https://remote.test/increment.json", + "arguments": {"data": 2, "increment": 5}, + "result": True, + } + }, + "status": "finished", + "title": "Process 'increment' with {'data': 2, 'increment': 5}", + "description": "Process 'increment' (namespace https://remote.test/increment.json) with {'data': 2, 'increment': 5}", + }, + "job-002": { + "job_id": "job-002", + "pg": { + "increment1": { + "process_id": "increment", + "namespace": "https://remote.test/increment.json", + "arguments": {"data": 3, "increment": 5}, + "result": True, + } + }, + "status": "finished", + "title": "Process 'increment' with {'data': 3, 'increment': 5}", + "description": "Process 'increment' (namespace https://remote.test/increment.json) with {'data': 3, 'increment': 5}", + }, + } + + @pytest.mark.parametrize( + ["parameter_defaults", "df_data", "expected_arguments"], + [ + ( + {"increment": 5}, + {"data": [1, 2, 3]}, + { + "job-000": {"data": 1, "increment": 5}, + "job-001": {"data": 2, "increment": 5}, + "job-002": {"data": 3, "increment": 5}, + }, + ), + ( + None, + {"data": [1, 2, 3], "increment": [44, 55, 66]}, + { + "job-000": {"data": 1, "increment": 44}, + "job-001": {"data": 2, "increment": 55}, + "job-002": {"data": 3, "increment": 66}, + }, + ), + ( + {"increment": 5555}, + {"data": [1, 2, 3], "increment": [44, 55, 66]}, + { + "job-000": {"data": 1, "increment": 44}, + "job-001": {"data": 2, "increment": 55}, + "job-002": {"data": 3, "increment": 66}, + }, + ), + ], + ) + def test_with_job_manager_remote_parameter_handling( + self, + tmp_path, + requests_mock, + dummy_backend, + job_manager, + sleep_mock, + parameter_defaults, + df_data, + expected_arguments, + ): + job_starter = ProcessBasedJobCreator( + process_id="increment", + namespace="https://remote.test/increment.json", + parameter_defaults=parameter_defaults, + ) + + df = pd.DataFrame(df_data) + job_db = CsvJobDatabase(tmp_path / "jobs.csv").initialize_from_df(df) + + stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter) + assert stats == dirty_equals.IsPartialDict( + { + "sleep": dirty_equals.IsInt(gt=1), + "start_job call": 3, + "job start": 3, + "job finished": 3, + } + ) + assert set(job_db.read().status) == {"finished"} + + assert dummy_backend.batch_jobs == { + "job-000": { + "job_id": "job-000", + "pg": { + "increment1": { + "process_id": "increment", + "namespace": "https://remote.test/increment.json", + "arguments": expected_arguments["job-000"], + "result": True, + } + }, + "status": "finished", + "title": dirty_equals.IsStr(regex="Process 'increment' with .*"), + "description": dirty_equals.IsStr(regex="Process 'increment'.*"), + }, + "job-001": { + "job_id": "job-001", + "pg": { + "increment1": { + "process_id": "increment", + "namespace": "https://remote.test/increment.json", + "arguments": expected_arguments["job-001"], + "result": True, + } + }, + "status": "finished", + "title": dirty_equals.IsStr(regex="Process 'increment' with .*"), + "description": dirty_equals.IsStr(regex="Process 'increment'.*"), + }, + "job-002": { + "job_id": "job-002", + "pg": { + "increment1": { + "process_id": "increment", + "namespace": "https://remote.test/increment.json", + "arguments": expected_arguments["job-002"], + "result": True, + } + }, + "status": "finished", + "title": dirty_equals.IsStr(regex="Process 'increment' with .*"), + "description": dirty_equals.IsStr(regex="Process 'increment'.*"), + }, + } + + def test_with_job_manager_remote_geometry(self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock): + job_starter = ProcessBasedJobCreator( + process_id="offset_polygon", + namespace="https://remote.test/offset_polygon.json", + parameter_defaults={"data": 123}, + ) + + df = geopandas.GeoDataFrame.from_features( + { + "type": "FeatureCollection", + "features": [ + { + "type": "Feature", + "id": "one", + "properties": {"offset": 11}, + "geometry": {"type": "Point", "coordinates": (1.0, 2.0)}, + }, + { + "type": "Feature", + "id": "two", + "properties": {"offset": 22}, + "geometry": {"type": "Point", "coordinates": (3.0, 4.0)}, + }, + ], + } + ) + + job_db = CsvJobDatabase(tmp_path / "jobs.csv").initialize_from_df(df) + + stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter) + assert stats == dirty_equals.IsPartialDict( + { + "sleep": dirty_equals.IsInt(gt=1), + "start_job call": 2, + "job start": 2, + "job finished": 2, + } + ) + assert set(job_db.read().status) == {"finished"} + + assert dummy_backend.batch_jobs == { + "job-000": { + "job_id": "job-000", + "pg": { + "offsetpolygon1": { + "process_id": "offset_polygon", + "namespace": "https://remote.test/offset_polygon.json", + "arguments": { + "data": 123, + "polygons": {"type": "Point", "coordinates": [1.0, 2.0]}, + "offset": 11, + }, + "result": True, + } + }, + "status": "finished", + "title": "Process 'offset_polygon' with {'data': 123, 'polygons': {'type': 'Point', 'coordinates': (1...", + "description": "Process 'offset_polygon' (namespace https://remote.test/offset_polygon.json) with {'data': 123, 'polygons': {'type': 'Point', 'coordinates': (1.0, 2.0)}, 'offset': 11}", + }, + "job-001": { + "job_id": "job-001", + "pg": { + "offsetpolygon1": { + "process_id": "offset_polygon", + "namespace": "https://remote.test/offset_polygon.json", + "arguments": { + "data": 123, + "polygons": {"type": "Point", "coordinates": [3.0, 4.0]}, + "offset": 22, + }, + "result": True, + } + }, + "status": "finished", + "title": "Process 'offset_polygon' with {'data': 123, 'polygons': {'type': 'Point', 'coordinates': (3...", + "description": "Process 'offset_polygon' (namespace https://remote.test/offset_polygon.json) with {'data': 123, 'polygons': {'type': 'Point', 'coordinates': (3.0, 4.0)}, 'offset': 22}", + }, + } + + @pytest.mark.parametrize( + ["db_class"], + [ + (CsvJobDatabase,), + (ParquetJobDatabase,), + ], + ) + def test_with_job_manager_remote_geometry_after_resume( + self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock, db_class + ): + """Test if geometry handling works properly after resuming from CSV serialized job db.""" + job_starter = ProcessBasedJobCreator( + process_id="offset_polygon", + namespace="https://remote.test/offset_polygon.json", + parameter_defaults={"data": 123}, + ) + + df = geopandas.GeoDataFrame.from_features( + { + "type": "FeatureCollection", + "features": [ + { + "type": "Feature", + "id": "one", + "properties": {"offset": 11}, + "geometry": {"type": "Point", "coordinates": (1.0, 2.0)}, + }, + { + "type": "Feature", + "id": "two", + "properties": {"offset": 22}, + "geometry": {"type": "Point", "coordinates": (3.0, 4.0)}, + }, + ], + } + ) + + # Persist the job db to CSV/Parquet/... + job_db_path = tmp_path / "jobs.db" + _ = db_class(job_db_path).initialize_from_df(df) + assert job_db_path.exists() + + # Resume from persisted job db + job_db = db_class(job_db_path) + + stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter) + assert stats == dirty_equals.IsPartialDict( + { + "sleep": dirty_equals.IsInt(gt=1), + "start_job call": 2, + "job start": 2, + "job finished": 2, + } + ) + assert set(job_db.read().status) == {"finished"} + + assert dummy_backend.batch_jobs == { + "job-000": { + "job_id": "job-000", + "pg": { + "offsetpolygon1": { + "process_id": "offset_polygon", + "namespace": "https://remote.test/offset_polygon.json", + "arguments": { + "data": 123, + "polygons": {"type": "Point", "coordinates": [1.0, 2.0]}, + "offset": 11, + }, + "result": True, + } + }, + "status": "finished", + "title": dirty_equals.IsStr(regex="Process 'offset_polygon' with.*"), + "description": dirty_equals.IsStr(regex="Process 'offset_polygon' .*"), + }, + "job-001": { + "job_id": "job-001", + "pg": { + "offsetpolygon1": { + "process_id": "offset_polygon", + "namespace": "https://remote.test/offset_polygon.json", + "arguments": { + "data": 123, + "polygons": {"type": "Point", "coordinates": [3.0, 4.0]}, + "offset": 22, + }, + "result": True, + } + }, + "status": "finished", + "title": dirty_equals.IsStr(regex="Process 'offset_polygon' with.*"), + "description": dirty_equals.IsStr(regex="Process 'offset_polygon' .*"), + }, + } + + def test_with_job_manager_udp_basic( + self, tmp_path, requests_mock, con, dummy_backend, job_manager, sleep_mock, remote_process_definitions + ): + # make deep copy + udp = copy.deepcopy(self.PG_INCREMENT) + # Register personal UDP + increment_udp_mock = requests_mock.get(con.build_url("/process_graphs/increment"), json=udp) + + job_starter = ProcessBasedJobCreator( + process_id="increment", + # No namespace to trigger personal UDP mode + namespace=None, + parameter_defaults={"increment": 5}, + ) + assert increment_udp_mock.call_count == 0 + + df = pd.DataFrame({"data": [3, 5]}) + job_db = CsvJobDatabase(tmp_path / "jobs.csv").initialize_from_df(df) + + stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter) + assert stats == dirty_equals.IsPartialDict( + { + "start_job call": 2, + "job finished": 2, + } + ) + assert increment_udp_mock.call_count == 2 + assert set(job_db.read().status) == {"finished"} + + assert dummy_backend.batch_jobs == { + "job-000": { + "job_id": "job-000", + "pg": { + "increment1": { + "process_id": "increment", + "arguments": {"data": 3, "increment": 5}, + "result": True, + } + }, + "status": "finished", + "title": "Process 'increment' with {'data': 3, 'increment': 5}", + "description": "Process 'increment' (namespace None) with {'data': 3, 'increment': 5}", + }, + "job-001": { + "job_id": "job-001", + "pg": { + "increment1": { + "process_id": "increment", + "arguments": {"data": 5, "increment": 5}, + "result": True, + } + }, + "status": "finished", + "title": "Process 'increment' with {'data': 5, 'increment': 5}", + "description": "Process 'increment' (namespace None) with {'data': 5, 'increment': 5}", + }, + } + + def test_with_job_manager_parameter_column_map( + self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock, remote_process_definitions + ): + job_starter = ProcessBasedJobCreator( + process_id="increment", + namespace="https://remote.test/increment.json", + parameter_column_map={"data": "numberzzz", "increment": "add_thiz"}, + ) + + df = pd.DataFrame( + { + "data": [1, 2], + "increment": [-1, -2], + "numberzzz": [3, 5], + "add_thiz": [100, 200], + } + ) + job_db = CsvJobDatabase(tmp_path / "jobs.csv").initialize_from_df(df) + + stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter) + assert stats == dirty_equals.IsPartialDict( + { + "start_job call": 2, + "job finished": 2, + } + ) + assert dummy_backend.batch_jobs == { + "job-000": { + "job_id": "job-000", + "pg": { + "increment1": { + "process_id": "increment", + "namespace": "https://remote.test/increment.json", + "arguments": {"data": 3, "increment": 100}, + "result": True, + } + }, + "status": "finished", + "title": "Process 'increment' with {'data': 3, 'increment': 100}", + "description": "Process 'increment' (namespace https://remote.test/increment.json) with {'data': 3, 'increment': 100}", + }, + "job-001": { + "job_id": "job-001", + "pg": { + "increment1": { + "process_id": "increment", + "namespace": "https://remote.test/increment.json", + "arguments": {"data": 5, "increment": 200}, + "result": True, + } + }, + "status": "finished", + "title": "Process 'increment' with {'data': 5, 'increment': 200}", + "description": "Process 'increment' (namespace https://remote.test/increment.json) with {'data': 5, 'increment': 200}", + }, + }