Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- `MultiBackendJobManager`: costs has been added as a column in tracking databases ([[#588](https://github.com/Open-EO/openeo-python-client/issues/588)])

### Removed

### Fixed
Expand Down
3 changes: 3 additions & 0 deletions openeo/extra/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ def start_job(
"cpu": _ColumnProperties(dtype="str"),
"memory": _ColumnProperties(dtype="str"),
"duration": _ColumnProperties(dtype="str"),
"costs": _ColumnProperties(dtype="float64"),
}

def __init__(
Expand Down Expand Up @@ -744,6 +745,8 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] =
for key in job_metadata.get("usage", {}).keys():
if key in active.columns:
active.loc[i, key] = _format_usage_stat(job_metadata, key)
if "costs" in job_metadata.keys():
active.loc[i, "costs"] = job_metadata.get("costs")

except OpenEoApiError as e:
# TODO: inspect status code and e.g. differentiate between 4xx/5xx
Expand Down
10 changes: 9 additions & 1 deletion openeo/rest/_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,19 @@ def _handle_get_job(self, request, context):
self.batch_jobs[job_id]["status"] = self._get_job_status(
job_id=job_id, current_status=self.batch_jobs[job_id]["status"]
)
return {
result = {
# TODO: add some more required fields like "process" and "created"?
"id": job_id,
"status": self.batch_jobs[job_id]["status"],
}
if self.batch_jobs[job_id]["status"] == "finished": # HACK some realistic values for a small job
result["costs"] = 4
result["usage"] = {
"cpu": {"unit": "cpu-seconds", "value": 30.0},
"duration": {"unit": "seconds", "value": 55},
"memory": {"unit": "mb-seconds", "value": 150000.0},
}
return result

def _handle_get_job_results(self, request, context):
"""Handler of `GET /job/{job_id}/results` (list batch job results)."""
Expand Down
13 changes: 12 additions & 1 deletion tests/extra/test_job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ def test_basic(self, tmp_path, job_manager, job_manager_root_dir, sleep_mock):
("job-2022", "finished", "foo"),
]

assert not pd.read_csv(job_db_path)[["cpu", "memory", "duration", "costs"]].isnull().any().any()

# Check downloaded results and metadata.
assert set(p.relative_to(job_manager_root_dir) for p in job_manager_root_dir.glob("**/*.*")) == {
Path(f"job_{job_id}") / filename
Expand Down Expand Up @@ -204,6 +206,7 @@ def test_db_class(self, tmp_path, job_manager, job_manager_root_dir, sleep_mock,
assert len(result) == 5
assert set(result.status) == {"finished"}
assert set(result.backend_name) == {"foo", "bar"}
assert not result[["cpu", "memory", "duration", "costs"]].isnull().any().any()

@pytest.mark.parametrize(
["filename", "expected_db_class"],
Expand Down Expand Up @@ -262,6 +265,8 @@ def test_basic_threading(self, tmp_path, job_manager, job_manager_root_dir, slee
("job-2022", "finished", "foo"),
]

assert not pd.read_csv(job_db_path)[["cpu", "memory", "duration", "costs"]].isnull().any().any()

# Check downloaded results and metadata.
assert set(p.relative_to(job_manager_root_dir) for p in job_manager_root_dir.glob("**/*.*")) == {
Path(f"job_{job_id}") / filename
Expand All @@ -283,6 +288,7 @@ def test_normalize_df(self):
"memory",
"duration",
"backend_name",
"costs",
]
)

Expand Down Expand Up @@ -333,13 +339,15 @@ def start_worker_thread():
)

# Also check that we got sensible end results in the job db.
assert [(r.id, r.status, r.backend_name) for r in pd.read_csv(job_db_path).itertuples()] == [
result = pd.read_csv(job_db_path)
assert [(r.id, r.status, r.backend_name) for r in result.itertuples()] == [
("job-2018", "finished", "foo"),
("job-2019", "finished", "foo"),
("job-2020", "finished", "bar"),
("job-2021", "finished", "bar"),
("job-2022", "error", "foo"),
]
assert not result[result["status"] == "finished"][["cpu", "memory", "duration", "costs"]].isnull().any().any()

# Check downloaded results and metadata.
assert set(p.relative_to(job_manager_root_dir) for p in job_manager_root_dir.glob("**/*.*")) == {
Expand Down Expand Up @@ -673,6 +681,7 @@ def test_initialize_from_df(self, tmp_path, db_class):
"memory",
"duration",
"backend_name",
"costs",
}

actual_columns = set(db_class(path).read().columns)
Expand Down Expand Up @@ -852,6 +861,7 @@ def test_initialize_from_df(self, tmp_path):
"memory",
"duration",
"backend_name",
"costs",
}

# Raw file content check
Expand Down Expand Up @@ -930,6 +940,7 @@ def test_initialize_from_df(self, tmp_path):
"memory",
"duration",
"backend_name",
"costs",
}

df_from_disk = ParquetJobDatabase(path).read()
Expand Down