Skip to content

Commit 64ff3a3

Browse files
committed
Allow resubmit of failed jobs while others are running
1 parent 7706bfc commit 64ff3a3

File tree

2 files changed

+56
-17
lines changed

2 files changed

+56
-17
lines changed

babs/interaction.py

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""This is the main module."""
22

33
import numpy as np
4+
import pandas as pd
45

56
from babs.base import BABS
67
from babs.scheduler import (
@@ -30,20 +31,16 @@ def babs_submit(self, count=None, submit_df=None, skip_failed=False):
3031
default: None
3132
"""
3233

33-
# Check if there are still jobs running
34+
# Check if there are still jobs running/pending
3435
currently_running_df = self.get_currently_running_jobs_df()
35-
if currently_running_df.shape[0] > 0:
36-
non_cg_states = (
37-
currently_running_df['state'].fillna('').ne('CG')
38-
if 'state' in currently_running_df
39-
else np.array([True] * currently_running_df.shape[0])
40-
)
41-
if non_cg_states.any():
42-
raise Exception(
43-
'There are still jobs running. Please wait for them to finish or cancel them.'
44-
f' Current running jobs:\n{currently_running_df}'
45-
)
46-
print('All currently running jobs are in CG state; proceeding with submission.')
36+
if currently_running_df.empty:
37+
running_pending_df = currently_running_df
38+
elif 'state' in currently_running_df:
39+
running_pending_df = currently_running_df[
40+
currently_running_df['state'].fillna('').isin(['PD', 'R'])
41+
]
42+
else:
43+
running_pending_df = currently_running_df
4744

4845
# Find the rows that don't have results yet
4946
status_df = self.get_job_status_df()
@@ -54,6 +51,36 @@ def babs_submit(self, count=None, submit_df=None, skip_failed=False):
5451
if submit_df is not None:
5552
df_needs_submit = submit_df
5653

54+
# Remove any jobs that are still running/pending
55+
if not running_pending_df.empty:
56+
merge_cols = ['sub_id']
57+
if 'ses_id' in df_needs_submit and 'ses_id' in running_pending_df:
58+
merge_cols = ['sub_id', 'ses_id']
59+
if all(col in running_pending_df for col in merge_cols):
60+
skipped_df = pd.merge(
61+
df_needs_submit,
62+
running_pending_df,
63+
on=merge_cols,
64+
how='inner',
65+
suffixes=('', '_running'),
66+
)
67+
df_needs_submit = df_needs_submit.merge(
68+
running_pending_df[merge_cols].drop_duplicates(),
69+
on=merge_cols,
70+
how='left',
71+
indicator=True,
72+
)
73+
df_needs_submit = df_needs_submit[df_needs_submit['_merge'] == 'left_only']
74+
df_needs_submit = df_needs_submit.drop(columns=['_merge'])
75+
if not skipped_df.empty:
76+
if 'job_id_running' in skipped_df:
77+
job_ids = sorted(
78+
pd.Series(skipped_df['job_id_running']).dropna().unique().tolist()
79+
)
80+
print(f'Skipping running/pending jobs. Job IDs: {job_ids}')
81+
else:
82+
print('Skipping running/pending jobs without job IDs.')
83+
5784
# only run `babs submit` when there are subjects/sessions not yet submitted
5885
if df_needs_submit.empty:
5986
print('No jobs to submit')

tests/test_interaction.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
"""Tests for interaction behaviors."""
22

33
import pandas as pd
4-
import pytest
54

65
from babs.interaction import BABSInteraction
76

@@ -26,7 +25,7 @@ def _minimal_status_df():
2625
)
2726

2827

29-
def test_babs_submit_blocks_non_cg_jobs(babs_project_subjectlevel, monkeypatch):
28+
def test_babs_submit_skips_running_jobs(babs_project_subjectlevel, monkeypatch, capsys):
3029
babs_proj = BABSInteraction(project_root=babs_project_subjectlevel)
3130
running_df = pd.DataFrame(
3231
{
@@ -39,12 +38,25 @@ def test_babs_submit_blocks_non_cg_jobs(babs_project_subjectlevel, monkeypatch):
3938
'cpus': [1],
4039
'partition': ['normal'],
4140
'name': ['test_array_job'],
41+
'sub_id': ['sub-01'],
4242
}
4343
)
4444
monkeypatch.setattr(babs_proj, 'get_currently_running_jobs_df', lambda: running_df)
45+
monkeypatch.setattr(babs_proj, 'get_job_status_df', _minimal_status_df)
46+
47+
submit_calls = []
48+
49+
def _mock_submit_array(analysis_path, queue, total_jobs):
50+
submit_calls.append((analysis_path, queue, total_jobs))
51+
return 123
52+
53+
monkeypatch.setattr('babs.interaction.submit_array', _mock_submit_array)
54+
55+
babs_proj.babs_submit(count=2)
4556

46-
with pytest.raises(Exception, match='There are still jobs running'):
47-
babs_proj.babs_submit(count=1)
57+
assert submit_calls[0][2] == 1
58+
out = capsys.readouterr().out
59+
assert 'Job IDs: [1]' in out
4860

4961

5062
def test_babs_submit_allows_cg_jobs(babs_project_subjectlevel, monkeypatch):

0 commit comments

Comments
 (0)