Skip to content

Commit e33347c

Browse files
HansVRPsoxofaan
authored andcommitted
improve error messaging on processing futures
1 parent 79886b6 commit e33347c

File tree

3 files changed

+19
-23
lines changed

3 files changed

+19
-23
lines changed

openeo/extra/job_management/__init__.py

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -677,36 +677,31 @@ def _process_threadworker_updates(
677677
"""
678678
# Retrieve completed task results immediately
679679
results, _ = worker_pool.process_futures(timeout=0)
680-
if not isinstance(results, list):
681-
raise TypeError(f"Expected list of TaskResult, got {results}")
682680

683681
# Collect update dicts
684682
updates: List[Dict[str, Any]] = []
685683
for res in results:
686684
# Process database updates
687685
if res.db_update:
688686
try:
689-
if 'id' in res.db_update or 'df_idx' in res.db_update:
690-
raise KeyError("db_update must not override 'id' or 'df_idx'")
691687
updates.append({
692688
'id': res.job_id,
693689
'df_idx': res.df_idx,
694690
**res.db_update,
695691
})
696692
except Exception as e:
697-
_log.error(f"Skipping invalid db_update for job '{res.job_id}': {e}")
693+
_log.error(f"Skipping invalid db_update '{res.db_update}' for job '{res.job_id}': {e}", )
698694

699695
# Process stats updates
700696
if res.stats_update:
701-
for key, val in res.stats_update.items():
702-
try:
697+
try:
698+
for key, val in res.stats_update.items():
703699
count = int(val)
704700
stats[key] = stats.get(key, 0) + count
705-
except Exception:
706-
_log.error(
707-
f"Skipping invalid stats_update for job '{res.job_id}': "
708-
f"key={key!r}, val={val!r}"
709-
)
701+
except Exception as e:
702+
_log.error(
703+
f"Skipping invalid stats_update {res.stats_update} for job '{res.job_id}': {e}"
704+
)
710705

711706
# No valid updates: nothing to persist
712707
if not updates:
@@ -982,7 +977,7 @@ def get_by_status(self, statuses, max=None) -> pd.DataFrame:
982977

983978
def _merge_into_df(self, df: pd.DataFrame):
984979
if self._df is not None:
985-
self._df.update(df, overwrite=True) #TODO index is not consistent so this creates an issue. --> when we get a row, we need to give the right index to the row. Best solution when creating a task; pass this one along
980+
self._df.update(df, overwrite=True)
986981
else:
987982
self._df = df
988983

tests/extra/job_management/test_job_management.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -803,7 +803,7 @@ def test_no_results_leaves_db_and_stats_untouched(self, tmp_path, caplog):
803803
assert stats == {}
804804

805805

806-
def test_logs_on_invalid_db_update(self, tmp_path, caplog):
806+
def test_logs_on_invalid_update(self, tmp_path, caplog):
807807
pool = _JobManagerWorkerThreadPool(max_workers=2)
808808
stats = collections.defaultdict(int)
809809

@@ -812,7 +812,7 @@ class BadTask:
812812
job_id = "bad-task"
813813
df_idx = 0
814814
db_update = "invalid" # invalid
815-
stats_update = None
815+
stats_update = "a"
816816

817817
def execute(self):
818818
return self
@@ -834,7 +834,8 @@ def execute(self):
834834
assert stats == {}
835835

836836
# Assert log about invalid db update
837-
assert any("Skipping invalid db_update for job" in msg for msg in caplog.messages)
837+
assert any("Skipping invalid db_update" in msg for msg in caplog.messages)
838+
assert any("Skipping invalid stats_update" in msg for msg in caplog.messages)
838839

839840
JOB_DB_DF_BASICS = pd.DataFrame(
840841
{

tests/extra/job_management/test_thread_worker.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ def execute(self) -> _TaskResult:
8484
raise ValueError("Oh no!")
8585
return _TaskResult(
8686
job_id=self.job_id,
87-
df_idx= self.df_idx,
87+
df_idx=self.df_idx,
8888
db_update={"status": "dummified"},
8989
stats_update={"dummy": 1},
9090
)
@@ -104,7 +104,7 @@ def execute(self) -> _TaskResult:
104104
raise TimeoutError("Waiting for event timed out")
105105
if not self.success:
106106
raise ValueError("Oh no!")
107-
return _TaskResult(job_id=self.job_id, df_idx= self.df_idx, db_update={"status": "all fine"})
107+
return _TaskResult(job_id=self.job_id, df_idx=self.df_idx, db_update={"status": "all fine"})
108108

109109

110110

@@ -154,16 +154,16 @@ def test_submit_and_process_with_error(self, worker_pool):
154154
assert remaining == 0
155155

156156
def test_submit_and_process_iterative(self, worker_pool):
157-
worker_pool.submit_task(NopTask(job_id="j-1", df_idx= 1))
157+
worker_pool.submit_task(NopTask(job_id="j-1", df_idx=1))
158158
results, remaining = worker_pool.process_futures(timeout=1)
159-
assert results == [_TaskResult(job_id="j-1", df_idx= 1)]
159+
assert results == [_TaskResult(job_id="j-1", df_idx=1)]
160160
assert remaining == 0
161161

162162
# Add some more
163-
worker_pool.submit_task(NopTask(job_id="j-22", df_idx= 22))
164-
worker_pool.submit_task(NopTask(job_id="j-222", df_idx= 222))
163+
worker_pool.submit_task(NopTask(job_id="j-22", df_idx=22))
164+
worker_pool.submit_task(NopTask(job_id="j-222", df_idx=222))
165165
results, remaining = worker_pool.process_futures(timeout=1)
166-
assert results == [_TaskResult(job_id="j-22", df_idx= 22), _TaskResult(job_id="j-222", df_idx= 222)]
166+
assert results == [_TaskResult(job_id="j-22", df_idx=22), _TaskResult(job_id="j-222", df_idx=222)]
167167
assert remaining == 0
168168

169169
def test_submit_multiple_simple(self, worker_pool):

0 commit comments

Comments
 (0)