Skip to content

Commit 927be23

Browse files
authored
Merge pull request #1175 from trws/qmanager-remove-pending
qmanager: split remove to separate pending
2 parents 4918eca + cc57e0d commit 927be23

File tree

4 files changed

+88
-33
lines changed

4 files changed

+88
-33
lines changed

qmanager/modules/qmanager_callbacks.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ void qmanager_cb_t::jobmanager_cancel_cb (flux_t *h, const flux_msg_t *msg,
316316
if ((job = queue->lookup (id)) == nullptr
317317
|| !job->is_pending ())
318318
return;
319-
if (queue->remove (id) < 0) {
319+
if (queue->remove_pending (job.get ()) < 0) {
320320
flux_log_error (h, "%s: remove job (%jd)", __FUNCTION__,
321321
static_cast<intmax_t> (id));
322322
return;

qmanager/policies/base/queue_policy_base.hpp

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ class queue_policy_base_impl_t
101101
{
102102
public:
103103
int insert (std::shared_ptr<job_t> job);
104-
int remove (flux_jobid_t id);
104+
int remove_pending(job_t *job);
105+
int remove(flux_jobid_t id);
105106
const std::shared_ptr<job_t> lookup (flux_jobid_t id);
106107
bool is_schedulable ();
107108
void set_schedulability (bool scheduable);
@@ -116,7 +117,7 @@ class queue_policy_base_impl_t
116117
int process_provisional_cancel ();
117118
int process_provisional_reprio ();
118119
int insert_pending_job (std::shared_ptr<job_t> &job, bool into_provisional);
119-
int erase_pending_job (std::shared_ptr<job_t> &job, bool &found_in_prov);
120+
int erase_pending_job (job_t *job, bool &found_in_prov);
120121
std::shared_ptr<job_t> pending_pop ();
121122
std::shared_ptr<job_t> alloced_pop ();
122123
std::shared_ptr<job_t> rejected_pop ();
@@ -270,6 +271,17 @@ class queue_policy_base_t : public detail::queue_policy_base_impl_t,
270271
*/
271272
int insert (std::shared_ptr<job_t> pending_job);
272273

274+
/*! Remove a job whose jobid is id from the pending or maybe_pending queues.
275+
* If succeeds, it changes the pending queue or resource state. This queue
276+
* becomes "schedulable" if pending job queue is not empty: i.e.,
277+
* is_schedulable() returns true;
278+
*
279+
* \param id jobid of flux_jobid_t type.
280+
* \return 0 on success; -1 on error.
281+
* ENOENT: unknown id.
282+
*/
283+
int remove_pending (job_t *job);
284+
273285
/*! Remove a job whose jobid is id from any internal queues
274286
* (e.g., pending queue, running queue, and alloced queue.)
275287
* If succeeds, it changes the pending queue or resource

qmanager/policies/base/queue_policy_base_impl.hpp

Lines changed: 49 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,11 @@ int queue_policy_base_t::insert (std::shared_ptr<job_t> job)
186186
return detail::queue_policy_base_impl_t::insert (job);
187187
}
188188

189+
int queue_policy_base_t::remove_pending (job_t *job)
190+
{
191+
return detail::queue_policy_base_impl_t::remove_pending (job);
192+
}
193+
189194
int queue_policy_base_t::remove (flux_jobid_t id)
190195
{
191196
return detail::queue_policy_base_impl_t::remove (id);
@@ -336,6 +341,45 @@ int queue_policy_base_impl_t::insert (std::shared_ptr<job_t> job)
336341
return rc;
337342
}
338343

344+
int queue_policy_base_impl_t::remove_pending (job_t *job) {
345+
int rc = -1;
346+
347+
if (!job || job->state != job_state_kind_t::PENDING) {
348+
errno = EINVAL;
349+
return rc;
350+
}
351+
352+
job->t_stamps.canceled_ts = m_cancel_cnt++;
353+
if (is_sched_loop_active ()) {
354+
// if sched-loop is active, the job's pending state
355+
// cannot be determined. There is "MAYBE pending state" where
356+
// a request has been sent out to the match service.
357+
auto res = m_pending_cancel_provisional.insert (
358+
std::pair<uint64_t, flux_jobid_t> (
359+
job->t_stamps.canceled_ts, job->id));
360+
if (!res.second) {
361+
errno = EEXIST;
362+
goto out;
363+
}
364+
} else {
365+
bool found_in_provisional = false;
366+
if (erase_pending_job (job, found_in_provisional) < 0)
367+
goto out;
368+
job->state = job_state_kind_t::CANCELED;
369+
auto res = m_canceled.insert (
370+
std::pair<uint64_t, flux_jobid_t> (
371+
job->t_stamps.canceled_ts, job->id));
372+
if (!res.second) {
373+
errno = EEXIST;
374+
goto out;
375+
}
376+
m_schedulable = true;
377+
}
378+
rc = 0;
379+
out:
380+
return rc;
381+
}
382+
339383
int queue_policy_base_impl_t::remove (flux_jobid_t id)
340384
{
341385
int rc = -1;
@@ -349,32 +393,7 @@ int queue_policy_base_impl_t::remove (flux_jobid_t id)
349393
job = m_jobs[id];
350394
switch (job->state) {
351395
case job_state_kind_t::PENDING:
352-
job->t_stamps.canceled_ts = m_cancel_cnt++;
353-
if (is_sched_loop_active ()) {
354-
// if sched-loop is active, the job's pending state
355-
// cannot be determined. There is "MAYBE pending state" where
356-
// a request has been sent out to the match service.
357-
auto res = m_pending_cancel_provisional.insert (
358-
std::pair<uint64_t, flux_jobid_t> (
359-
job->t_stamps.canceled_ts, job->id));
360-
if (!res.second) {
361-
errno = EEXIST;
362-
goto out;
363-
}
364-
} else {
365-
bool found_in_provisional = false;
366-
if (erase_pending_job (job, found_in_provisional) < 0)
367-
goto out;
368-
job->state = job_state_kind_t::CANCELED;
369-
auto res = m_canceled.insert (
370-
std::pair<uint64_t, flux_jobid_t> (
371-
job->t_stamps.canceled_ts, job->id));
372-
if (!res.second) {
373-
errno = EEXIST;
374-
goto out;
375-
}
376-
m_schedulable = true;
377-
}
396+
this->remove_pending(job.get());
378397
break;
379398
case job_state_kind_t::ALLOC_RUNNING:
380399
m_alloced.erase (job->t_stamps.running_ts);
@@ -593,7 +612,7 @@ int queue_policy_base_impl_t::pending_reprioritize (flux_jobid_t id,
593612
}
594613
} else {
595614
bool found_in_prov = false;
596-
if (erase_pending_job (job, found_in_prov) < 0)
615+
if (erase_pending_job (job.get (), found_in_prov) < 0)
597616
return -1;
598617
job->priority = priority;
599618
if (insert_pending_job (job, found_in_prov) < 0)
@@ -614,7 +633,7 @@ int queue_policy_base_impl_t::process_provisional_cancel ()
614633
auto job = m_jobs[id];
615634
if (job->state == job_state_kind_t::PENDING) {
616635
bool found_in_provisional = false;
617-
if (erase_pending_job (job, found_in_provisional) < 0)
636+
if (erase_pending_job (job.get (), found_in_provisional) < 0)
618637
return -1;
619638
job->state = job_state_kind_t::CANCELED;
620639
auto res = m_canceled.insert (
@@ -644,7 +663,7 @@ int queue_policy_base_impl_t::process_provisional_reprio ()
644663
auto job = m_jobs[id];
645664
if (job->state == job_state_kind_t::PENDING) {
646665
bool found_in_provisional = false;
647-
if (erase_pending_job (job, found_in_provisional) < 0)
666+
if (erase_pending_job (job.get (), found_in_provisional) < 0)
648667
return -1;
649668
job->priority = priority;
650669
if (insert_pending_job (job, found_in_provisional) < 0)
@@ -685,7 +704,7 @@ int queue_policy_base_impl_t::insert_pending_job (std::shared_ptr<job_t> &job,
685704
return 0;
686705
}
687706

688-
int queue_policy_base_impl_t::erase_pending_job (std::shared_ptr<job_t> &job,
707+
int queue_policy_base_impl_t::erase_pending_job (job_t *job,
689708
bool &found_in_prov)
690709
{
691710
size_t s;

t/t1024-alloc-check.t

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,33 @@ test_expect_success 'some jobs received timeout exception' '
6060
test_expect_success 'no jobs received alloc-check exception' '
6161
test_must_fail grep "job.exception type=alloc-check" joberr2
6262
'
63+
test_expect_success 'clean up' '
64+
flux cancel --all &&
65+
flux queue idle &&
66+
(flux resource undrain 0 || true)
67+
'
68+
69+
send_sched_cancel() {
70+
local JOB_ID=$1
71+
shift
72+
flux python -c "import flux; from flux.job.JobID import id_parse; flux.Flux().rpc('sched.cancel', {'id': id_parse('$JOB_ID')})"
73+
}
74+
75+
# ensure sched.cancel doesn't free resources when an epilog is pending
76+
test_expect_success 'submit a job that cannot run, cancel it during epilog, submit another ' '
77+
(flux submit -N 1 --flags=waitable --wait-event epilog-start -c 4 /command/that/does/not/exist > ji1 || true ) &&
78+
send_sched_cancel $(cat ji1) &&
79+
flux submit -N 1 --exclusive hostname > ji2 &&
80+
(flux job wait-event $(cat ji1) epilog-finish || true) &&
81+
(flux job info $(cat ji1) eventlog | grep epilog-finish | jq ".timestamp" > time1) &&
82+
(flux job info $(cat ji2) eventlog | grep alloc | jq ".timestamp" > time2) &&
83+
awk -vt1=$(cat time1) -vt2=$(cat time2) "BEGIN {exit (t1 < t2) ? 0 : 1}"
84+
'
85+
6386
test_expect_success 'clean up' '
6487
cleanup_active_jobs
6588
'
89+
6690
test_expect_success 'remove fluxion modules' '
6791
remove_qmanager &&
6892
remove_resource

0 commit comments

Comments
 (0)