Skip to content

Commit ffa7be2

Browse files
committed
Merge branch 'async_jobmanager'
2 parents 1b737fc + 20c453a commit ffa7be2

File tree

3 files changed

+208
-63
lines changed

3 files changed

+208
-63
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1515
- Wrap OIDC token request failure in more descriptive `OidcException` (related to [#624](https://github.com/Open-EO/openeo-python-client/issues/624))
1616
- Added `auto_add_save_result` option (on by default) to disable automatic addition of `save_result` node on `download`/`create_job`/`execute_batch` ([#513](https://github.com/Open-EO/openeo-python-client/issues/513))
1717
- Add support for `apply_vectorcube` UDF signature in `run_udf_code` ([Open-EO/openeo-geopyspark-driver#881]https://github.com/Open-EO/openeo-geopyspark-driver/issues/811)
18+
- `MultiBackendJobManager`: add API to the update loop in a separate thread, allowing controlled interruption.
1819

1920
### Changed
2021

openeo/extra/job_management.py

Lines changed: 126 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
import time
77
import warnings
88
from pathlib import Path
9-
from typing import Callable, Dict, NamedTuple, Optional, Union, List
9+
from threading import Thread
10+
from typing import Callable, Dict, List, NamedTuple, Optional, Union
1011

1112
import pandas as pd
1213
import requests
@@ -31,6 +32,9 @@ class _Backend(NamedTuple):
3132

3233
MAX_RETRIES = 5
3334

35+
# Sentinel value to indicate that a parameter was not set
36+
_UNSET = object()
37+
3438

3539
class JobDatabaseInterface(metaclass=abc.ABCMeta):
3640
"""
@@ -161,6 +165,7 @@ def __init__(
161165
.. versionchanged:: 0.32.0
162166
Added `cancel_running_job_after` parameter.
163167
"""
168+
self._stop_thread = None
164169
self.backends: Dict[str, _Backend] = {}
165170
self.poll_sleep = poll_sleep
166171
self._connections: Dict[str, _Backend] = {}
@@ -171,6 +176,7 @@ def __init__(
171176
self._cancel_running_job_after = (
172177
datetime.timedelta(seconds=cancel_running_job_after) if cancel_running_job_after is not None else None
173178
)
179+
self._thread = None
174180

175181
def add_backend(
176182
self,
@@ -253,6 +259,7 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame:
253259
:param df: The dataframe to normalize.
254260
:return: a new dataframe that is normalized.
255261
"""
262+
# TODO: this was originally an internal helper, but we need a clean public API for the user
256263

257264
# check for some required columns.
258265
required_with_default = [
@@ -263,6 +270,7 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame:
263270
# TODO: columns "cpu", "memory", "duration" are not referenced directly
264271
# within MultiBackendJobManager making it confusing to claim they are required.
265272
# However, they are through assumptions about job "usage" metadata in `_track_statuses`.
273+
# => proposed solution: allow to configure usage columns when adding a backend
266274
("cpu", None),
267275
("memory", None),
268276
("duration", None),
@@ -273,6 +281,89 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame:
273281

274282
return df
275283

284+
def start_job_thread(self, start_job: Callable[[], BatchJob], job_db: JobDatabaseInterface):
285+
"""
286+
Start running the jobs in a separate thread, returns afterwards.
287+
288+
:param start_job:
289+
A callback which will be invoked with, amongst others,
290+
the row of the dataframe for which a job should be created and/or started.
291+
This callable should return a :py:class:`openeo.rest.job.BatchJob` object.
292+
293+
The following parameters will be passed to ``start_job``:
294+
295+
``row`` (:py:class:`pandas.Series`):
296+
The row in the pandas dataframe that stores the jobs state and other tracked data.
297+
298+
``connection_provider``:
299+
A getter to get a connection by backend name.
300+
Typically, you would need either the parameter ``connection_provider``,
301+
or the parameter ``connection``, but likely you will not need both.
302+
303+
``connection`` (:py:class:`Connection`):
304+
The :py:class:`Connection` itself, that has already been created.
305+
Typically, you would need either the parameter ``connection_provider``,
306+
or the parameter ``connection``, but likely you will not need both.
307+
308+
``provider`` (``str``):
309+
The name of the backend that will run the job.
310+
311+
You do not have to define all the parameters described below, but if you leave
312+
any of them out, then remember to include the ``*args`` and ``**kwargs`` parameters.
313+
Otherwise you will have an exception because :py:meth:`run_jobs` passes unknown parameters to ``start_job``.
314+
:param job_db:
315+
Job database to load/store existing job status data and other metadata from/to.
316+
Can be specified as a path to CSV or Parquet file,
317+
or as a custom database object following the :py:class:`JobDatabaseInterface` interface.
318+
319+
.. note::
320+
Support for Parquet files depends on the ``pyarrow`` package
321+
as :ref:`optional dependency <installation-optional-dependencies>`.
322+
323+
.. versionadded:: 0.32.0
324+
"""
325+
326+
# Resume from existing db
327+
_log.info(f"Resuming `run_jobs` from existing {job_db}")
328+
df = job_db.read()
329+
330+
self._stop_thread = False
331+
def run_loop():
332+
while (
333+
sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0
334+
and not self._stop_thread
335+
):
336+
self._job_update_loop(df, job_db, start_job)
337+
338+
# Do sequence of micro-sleeps to allow for quick thread exit
339+
for _ in range(int(max(1, self.poll_sleep))):
340+
time.sleep(1)
341+
if self._stop_thread:
342+
break
343+
344+
self._thread = Thread(target=run_loop)
345+
self._thread.start()
346+
347+
def stop_job_thread(self, timeout_seconds: Optional[float] = _UNSET):
348+
"""
349+
Stop the job polling thread.
350+
351+
:param timeout_seconds: The time to wait for the thread to stop.
352+
By default, it will wait for 2 times the poll_sleep time.
353+
Set to None to wait indefinitely.
354+
355+
.. versionadded:: 0.32.0
356+
"""
357+
if self._thread is not None:
358+
self._stop_thread = True
359+
if timeout_seconds is _UNSET:
360+
timeout_seconds = 2 * self.poll_sleep
361+
self._thread.join(timeout_seconds)
362+
if self._thread.is_alive():
363+
_log.warning("Job thread did not stop after timeout")
364+
else:
365+
_log.error("No job thread to stop")
366+
276367
def run_jobs(
277368
self,
278369
df: Optional[pd.DataFrame],
@@ -362,32 +453,34 @@ def run_jobs(
362453
df = self._normalize_df(df)
363454
job_db.persist(df)
364455

365-
while (
366-
sum(job_db.count_by_status(statuses=["not_started","created","queued","running"]).values()) > 0
367-
368-
):
369-
370-
with ignore_connection_errors(context="get statuses"):
371-
self._track_statuses(job_db)
372-
373-
not_started = job_db.get_by_status(statuses=["not_started"],max=200)
374-
375-
if len(not_started) > 0:
376-
# Check number of jobs running at each backend
377-
running = job_db.get_by_status(statuses=["created","queued","running"])
378-
per_backend = running.groupby("backend_name").size().to_dict()
379-
_log.info(f"Running per backend: {per_backend}")
380-
for backend_name in self.backends:
381-
backend_load = per_backend.get(backend_name, 0)
382-
if backend_load < self.backends[backend_name].parallel_jobs:
383-
to_add = self.backends[backend_name].parallel_jobs - backend_load
384-
to_launch = not_started.iloc[0:to_add]
385-
for i in to_launch.index:
386-
self._launch_job(start_job, not_started, i, backend_name)
387-
job_db.persist(to_launch)
388-
456+
while sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0:
457+
self._job_update_loop(df, job_db, start_job)
389458
time.sleep(self.poll_sleep)
390459

460+
def _job_update_loop(self, df, job_db, start_job):
461+
"""
462+
Inner loop logic of job management:
463+
go through the necessary jobs to check for status updates,
464+
trigger status events, start new jobs when there is room for them, etc.
465+
"""
466+
with ignore_connection_errors(context="get statuses"):
467+
self._track_statuses(job_db)
468+
469+
not_started = job_db.get_by_status(statuses=["not_started"], max=200)
470+
if len(not_started) > 0:
471+
# Check number of jobs running at each backend
472+
running = job_db.get_by_status(statuses=["created", "queued", "running"])
473+
per_backend = running.groupby("backend_name").size().to_dict()
474+
_log.info(f"Running per backend: {per_backend}")
475+
for backend_name in self.backends:
476+
backend_load = per_backend.get(backend_name, 0)
477+
if backend_load < self.backends[backend_name].parallel_jobs:
478+
to_add = self.backends[backend_name].parallel_jobs - backend_load
479+
to_launch = not_started.iloc[0:to_add]
480+
for i in to_launch.index:
481+
self._launch_job(start_job, not_started, i, backend_name)
482+
job_db.persist(to_launch)
483+
391484
def _launch_job(self, start_job, df, i, backend_name):
392485
"""Helper method for launching jobs
393486
@@ -566,7 +659,8 @@ def _track_statuses(self, job_db: JobDatabaseInterface):
566659

567660
# TODO: there is well hidden coupling here with "cpu", "memory" and "duration" from `_normalize_df`
568661
for key in job_metadata.get("usage", {}).keys():
569-
active.loc[i, key] = _format_usage_stat(job_metadata, key)
662+
if key in active.columns:
663+
active.loc[i, key] = _format_usage_stat(job_metadata, key)
570664

571665
except OpenEoApiError as e:
572666
print(f"error for job {job_id!r} on backend {backend_name}")
@@ -593,7 +687,6 @@ def ignore_connection_errors(context: Optional[str] = None, sleep: int = 5):
593687

594688
class FullDataFrameJobDatabase(JobDatabaseInterface):
595689

596-
597690
def __init__(self):
598691
super().__init__()
599692
self._df = None
@@ -608,8 +701,6 @@ def count_by_status(self, statuses: List[str]) -> dict:
608701
status_histogram = self.df.groupby("status").size().to_dict()
609702
return {k:v for k,v in status_histogram.items() if k in statuses}
610703

611-
612-
613704
def get_by_status(self, statuses, max=None) -> pd.DataFrame:
614705
"""
615706
Returns a dataframe with jobs, filtered by status.
@@ -647,6 +738,9 @@ def __init__(self, path: Union[str, Path]):
647738
super().__init__()
648739
self.path = Path(path)
649740

741+
def __repr__(self):
742+
return f"{self.__class__.__name__}({str(self.path)!r})"
743+
650744
def exists(self) -> bool:
651745
return self.path.exists()
652746

@@ -696,6 +790,9 @@ def __init__(self, path: Union[str, Path]):
696790
super().__init__()
697791
self.path = Path(path)
698792

793+
def __repr__(self):
794+
return f"{self.__class__.__name__}({str(self.path)!r})"
795+
699796
def exists(self) -> bool:
700797
return self.path.exists()
701798

0 commit comments

Comments
 (0)