Skip to content

Commit aa3e653

Browse files
committed
Restore logging of job status histogram during run_jobs #655
1 parent 35b9517 commit aa3e653

File tree

3 files changed

+78
-5
lines changed

3 files changed

+78
-5
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2222
- `MultiBackendJobManager`: Avoid `SettingWithCopyWarning` ([#641](https://github.com/Open-EO/openeo-python-client/issues/641))
2323
- Avoid creating empty file if asset download request failed.
2424
- `MultiBackendJobManager`: avoid dtype loading mistakes in `CsvJobDatabase` on empty columns ([#656](https://github.com/Open-EO/openeo-python-client/issues/656))
25+
- `MultiBackendJobManager`: restore logging of job status histogram during `run_jobs` ([#655](https://github.com/Open-EO/openeo-python-client/issues/655))
2526

2627

2728
## [0.34.0] - 2024-10-31

openeo/extra/job_management.py

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,17 @@
1010
import warnings
1111
from pathlib import Path
1212
from threading import Thread
13-
from typing import Any, Callable, Dict, List, Mapping, NamedTuple, Optional, Union
13+
from typing import (
14+
Any,
15+
Callable,
16+
Dict,
17+
Iterable,
18+
List,
19+
Mapping,
20+
NamedTuple,
21+
Optional,
22+
Union,
23+
)
1424

1525
import numpy
1626
import pandas as pd
@@ -80,10 +90,12 @@ def persist(self, df: pd.DataFrame):
8090
...
8191

8292
@abc.abstractmethod
83-
def count_by_status(self, statuses: List[str]) -> dict:
93+
def count_by_status(self, statuses: Iterable[str] = ()) -> dict:
8494
"""
8595
Retrieve the number of jobs per status.
8696
97+
:param statuses: List/set of statuses to include. If empty, all statuses are included.
98+
8799
:return: dictionary with status as key and the count as value.
88100
"""
89101
...
@@ -355,12 +367,18 @@ def start_job_thread(self, start_job: Callable[[], BatchJob], job_db: JobDatabas
355367

356368
self._stop_thread = False
357369
def run_loop():
370+
371+
# TODO: support user-provided `stats`
372+
stats = collections.defaultdict(int)
373+
358374
while (
359375
sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0
360376
and not self._stop_thread
361377
):
362378
self._job_update_loop(job_db=job_db, start_job=start_job)
379+
stats["run_jobs loop"] += 1
363380

381+
_log.info(f"Job status histogram: {job_db.count_by_status()}. Run stats: {dict(stats)}")
364382
# Do sequence of micro-sleeps to allow for quick thread exit
365383
for _ in range(int(max(1, self.poll_sleep))):
366384
time.sleep(1)
@@ -479,11 +497,15 @@ def run_jobs(
479497
# TODO: start showing deprecation warnings for this usage pattern?
480498
job_db.initialize_from_df(df)
481499

500+
# TODO: support user-provided `stats`
482501
stats = collections.defaultdict(int)
502+
483503
while sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0:
484504
self._job_update_loop(job_db=job_db, start_job=start_job, stats=stats)
485505
stats["run_jobs loop"] += 1
486506

507+
# Show current stats and sleep
508+
_log.info(f"Job status histogram: {job_db.count_by_status()}. Run stats: {dict(stats)}")
487509
time.sleep(self.poll_sleep)
488510
stats["sleep"] += 1
489511

@@ -791,9 +813,12 @@ def df(self) -> pd.DataFrame:
791813
self._df = self.read()
792814
return self._df
793815

794-
def count_by_status(self, statuses: List[str]) -> dict:
816+
def count_by_status(self, statuses: Iterable[str] = ()) -> dict:
795817
status_histogram = self.df.groupby("status").size().to_dict()
796-
return {k:v for k,v in status_histogram.items() if k in statuses}
818+
statuses = set(statuses)
819+
if statuses:
820+
status_histogram = {k: v for k, v in status_histogram.items() if k in statuses}
821+
return status_histogram
797822

798823
def get_by_status(self, statuses, max=None) -> pd.DataFrame:
799824
"""

tests/extra/test_job_management.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import copy
22
import json
3+
import logging
34
import re
45
import threading
56
from pathlib import Path
@@ -616,7 +617,17 @@ def test_empty_csv_handling(self, tmp_path, sleep_mock, recwarn, job_manager):
616617

617618
assert [(w.category, w.message, str(w)) for w in recwarn.list] == []
618619

620+
def test_status_logging(self, tmp_path, job_manager, job_manager_root_dir, sleep_mock, caplog):
621+
caplog.set_level(logging.INFO)
622+
df = pd.DataFrame({"year": [2018, 2019, 2020, 2021, 2022]})
623+
job_db_path = tmp_path / "jobs.csv"
624+
job_db = CsvJobDatabase(job_db_path).initialize_from_df(df)
625+
626+
run_stats = job_manager.run_jobs(job_db=job_db, start_job=self._create_year_job)
627+
assert run_stats == dirty_equals.IsPartialDict({"start_job call": 5, "job finished": 5})
619628

629+
needle = re.compile(r"Job status histogram:.*'queued': 4.*Run stats:.*'start_job call': 4")
630+
assert needle.search(caplog.text)
620631

621632

622633
JOB_DB_DF_BASICS = pd.DataFrame(
@@ -681,7 +692,7 @@ def test_initialize_from_df_on_exists_error(self, tmp_path, db_class):
681692

682693
@pytest.mark.parametrize("db_class", [CsvJobDatabase, ParquetJobDatabase])
683694
def test_initialize_from_df_on_exists_skip(self, tmp_path, db_class):
684-
path = tmp_path / "jobs.csv"
695+
path = tmp_path / "jobs.db"
685696

686697
db = db_class(path).initialize_from_df(
687698
pd.DataFrame({"some_number": [3, 2, 1]}),
@@ -695,6 +706,42 @@ def test_initialize_from_df_on_exists_skip(self, tmp_path, db_class):
695706
)
696707
assert set(db.read()["some_number"]) == {1, 2, 3}
697708

709+
@pytest.mark.parametrize("db_class", [CsvJobDatabase, ParquetJobDatabase])
710+
def test_count_by_status(self, tmp_path, db_class):
711+
path = tmp_path / "jobs.db"
712+
713+
db = db_class(path).initialize_from_df(
714+
pd.DataFrame(
715+
{
716+
"status": [
717+
"not_started",
718+
"created",
719+
"queued",
720+
"queued",
721+
"queued",
722+
"running",
723+
"running",
724+
"finished",
725+
"finished",
726+
"error",
727+
]
728+
}
729+
)
730+
)
731+
assert db.count_by_status(statuses=["not_started"]) == {"not_started": 1}
732+
assert db.count_by_status(statuses=("not_started", "running")) == {"not_started": 1, "running": 2}
733+
assert db.count_by_status(statuses={"finished", "error"}) == {"error": 1, "finished": 2}
734+
735+
# All statuses by default
736+
assert db.count_by_status() == {
737+
"created": 1,
738+
"error": 1,
739+
"finished": 2,
740+
"not_started": 1,
741+
"queued": 3,
742+
"running": 2,
743+
}
744+
698745

699746
class TestCsvJobDatabase:
700747

0 commit comments

Comments
 (0)