Skip to content

Commit d0d3e10

Browse files
bentshermanjorgee
andauthored
Fix submitted count when aborting submitted tasks (#6281)
Signed-off-by: Ben Sherman <[email protected]> Co-authored-by: Jorge Ejarque <[email protected]>
1 parent ababdda commit d0d3e10

File tree

3 files changed

+97
-14
lines changed

3 files changed

+97
-14
lines changed

modules/nextflow/src/main/groovy/nextflow/trace/WorkflowStats.groovy

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import groovy.util.logging.Slf4j
2626
import nextflow.processor.ErrorStrategy
2727
import nextflow.processor.TaskProcessor
2828
import nextflow.processor.TaskRun
29+
import nextflow.processor.TaskStatus
2930
import nextflow.util.Duration
3031
import nextflow.util.MemoryUnit
3132
/**
@@ -345,17 +346,24 @@ class WorkflowStats implements Cloneable {
345346

346347
}
347348

348-
void markCompleted(TaskRun task, TraceRecord trace) {
349+
void markCompleted(TaskRun task, TraceRecord trace, TaskStatus status) {
349350
ProgressRecord state = getOrCreateRecord(task.processor)
350351
state.taskName = task.name
351352
state.hash = task.hashLog
352-
state.running --
353-
state.loadCpus -= task.getConfig().getCpus()
354-
state.loadMemory -= (task.getConfig().getMemory()?.toBytes() ?: 0)
355353

356-
this.runningCount --
357-
this.loadCpus -= task.getConfig().getCpus()
358-
this.loadMemory -= (task.getConfig().getMemory()?.toBytes() ?: 0)
354+
if( status == TaskStatus.SUBMITTED ) {
355+
state.submitted --
356+
this.submittedCount --
357+
}
358+
else {
359+
state.running --
360+
state.loadCpus -= task.getConfig().getCpus()
361+
state.loadMemory -= (task.getConfig().getMemory()?.toBytes() ?: 0)
362+
363+
this.runningCount --
364+
this.loadCpus -= task.getConfig().getCpus()
365+
this.loadMemory -= (task.getConfig().getMemory()?.toBytes() ?: 0)
366+
}
359367

360368
if( task.failed ) {
361369
state.failed ++

modules/nextflow/src/main/groovy/nextflow/trace/WorkflowStatsObserver.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ class WorkflowStatsObserver implements TraceObserverV2 {
8181
@Override
8282
void onTaskComplete(TaskEvent event) {
8383
log.trace "== event complete pid=${event.handler.task.processor.id}; status=$event.handler.status"
84-
agent.send { data.markCompleted(event.handler.task, event.trace) }
84+
agent.send { data.markCompleted(event.handler.task, event.trace, event.handler.status) }
8585
}
8686

8787
@Override

modules/nextflow/src/test/groovy/nextflow/trace/WorkflowStatsTest.groovy

Lines changed: 81 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import nextflow.processor.ErrorStrategy
2020
import nextflow.processor.TaskConfig
2121
import nextflow.processor.TaskProcessor
2222
import nextflow.processor.TaskRun
23+
import nextflow.processor.TaskStatus
2324
import nextflow.util.Duration
2425
import nextflow.util.MemoryUnit
2526
import spock.lang.Specification
@@ -339,7 +340,7 @@ class WorkflowStatsTest extends Specification {
339340
def trace = Mock(TraceRecord)
340341

341342
when:
342-
stats.markCompleted(task, trace)
343+
stats.markCompleted(task, trace, TaskStatus.COMPLETED)
343344

344345
then:
345346
1 * trace.get('realtime') >> DURATION.millis
@@ -406,7 +407,7 @@ class WorkflowStatsTest extends Specification {
406407
def trace = Mock(TraceRecord)
407408

408409
when:
409-
stats.markCompleted(task, trace)
410+
stats.markCompleted(task, trace, TaskStatus.COMPLETED)
410411
then:
411412
task.failed >> true
412413
task.getHashLog() >> HASH
@@ -474,7 +475,7 @@ class WorkflowStatsTest extends Specification {
474475
def trace = Mock(TraceRecord)
475476

476477
when:
477-
stats.markCompleted(task, trace)
478+
stats.markCompleted(task, trace, TaskStatus.COMPLETED)
478479
then:
479480
task.failed >> true
480481
task.getHashLog() >> HASH
@@ -501,11 +502,12 @@ class WorkflowStatsTest extends Specification {
501502
!rec.errored
502503
}
503504

504-
def 'should mark aborted' () {
505+
def 'should mark aborted from running' () {
505506
given:
506507
def FAILED = 10
507508
def RETRIES = 2
508509
def IGNORED = 3
510+
def SUBMITTED = 0
509511
def RUNNING = 4
510512
def ABORTED = 5
511513
def SUCCEEDED = 6
@@ -520,6 +522,7 @@ class WorkflowStatsTest extends Specification {
520522
getConfig() >> Mock(TaskConfig) { getCpus() >> CPUS; getMemory() >> MEM } }
521523
and:
522524
def rec = new ProgressRecord(0, 'foo')
525+
rec.submitted = SUBMITTED
523526
rec.running = RUNNING
524527
rec.failed = FAILED
525528
rec.retries = RETRIES
@@ -531,6 +534,7 @@ class WorkflowStatsTest extends Specification {
531534
and:
532535
def stats = new WorkflowStats(
533536
records: [0:rec],
537+
submittedCount: SUBMITTED,
534538
runningCount: RUNNING,
535539
failedCount: FAILED,
536540
retriesCount: RETRIES,
@@ -543,17 +547,19 @@ class WorkflowStatsTest extends Specification {
543547
def trace = Mock(TraceRecord)
544548

545549
when:
546-
stats.markCompleted(task,trace)
550+
stats.markCompleted(task, trace, TaskStatus.RUNNING)
547551

548552
then:
549553
task.aborted >> true
550554
task.getHashLog() >> HASH
551555
and:
556+
stats.submittedCount == SUBMITTED
552557
stats.runningCount == RUNNING -1
553558
stats.loadCpus == LOAD_CPUS - CPUS
554559
stats.loadMemory == (LOAD_MEM - MEM).bytes
555560
stats.abortedCount == ABORTED +1
556561
and:
562+
rec.submitted == SUBMITTED
557563
rec.running == RUNNING -1
558564
rec.loadCpus == LOAD_CPUS - CPUS
559565
rec.loadMemory == (LOAD_MEM - MEM).bytes
@@ -565,6 +571,75 @@ class WorkflowStatsTest extends Specification {
565571
rec.succeeded == SUCCEEDED
566572
}
567573

574+
def 'should mark aborted from submitted' () {
575+
given:
576+
def FAILED = 10
577+
def RETRIES = 2
578+
def IGNORED = 3
579+
def SUBMITTED = 4
580+
def RUNNING = 0
581+
def ABORTED = 5
582+
def SUCCEEDED = 6
583+
def CPUS = 2
584+
def MEM = 4.GB
585+
def LOAD_CPUS = 0
586+
def LOAD_MEM = 0.GB
587+
def HASH = 'xyz'
588+
and:
589+
def task = Mock(TaskRun) {
590+
getProcessor() >> Mock(TaskProcessor) { getId() >> 0 }
591+
getConfig() >> Mock(TaskConfig) { getCpus() >> CPUS; getMemory() >> MEM } }
592+
and:
593+
def rec = new ProgressRecord(0, 'foo')
594+
rec.submitted = SUBMITTED
595+
rec.running = RUNNING
596+
rec.failed = FAILED
597+
rec.retries = RETRIES
598+
rec.ignored = IGNORED
599+
rec.aborted = ABORTED
600+
rec.succeeded = SUCCEEDED
601+
rec.loadCpus = LOAD_CPUS
602+
rec.loadMemory = LOAD_MEM.bytes
603+
and:
604+
def stats = new WorkflowStats(
605+
records: [0:rec],
606+
submittedCount: SUBMITTED,
607+
runningCount: RUNNING,
608+
failedCount: FAILED,
609+
retriesCount: RETRIES,
610+
ignoredCount: IGNORED,
611+
succeededCount: SUCCEEDED,
612+
abortedCount: ABORTED,
613+
loadCpus: LOAD_CPUS,
614+
loadMemory: LOAD_MEM.bytes)
615+
and:
616+
def trace = Mock(TraceRecord)
617+
618+
when:
619+
stats.markCompleted(task, trace, TaskStatus.SUBMITTED)
620+
621+
then:
622+
task.aborted >> true
623+
task.getHashLog() >> HASH
624+
and:
625+
stats.submittedCount == SUBMITTED -1
626+
stats.runningCount == RUNNING
627+
stats.loadCpus == LOAD_CPUS
628+
stats.loadMemory == LOAD_MEM.bytes
629+
stats.abortedCount == ABORTED +1
630+
and:
631+
rec.submitted == SUBMITTED -1
632+
rec.running == RUNNING
633+
rec.loadCpus == LOAD_CPUS
634+
rec.loadMemory == LOAD_MEM.bytes
635+
and:
636+
rec.aborted == ABORTED +1
637+
rec.failed == FAILED
638+
rec.retries == RETRIES
639+
rec.ignored == IGNORED
640+
rec.succeeded == SUCCEEDED
641+
}
642+
568643
def 'should mark succeeded' () {
569644
given:
570645
def FAILED = 10
@@ -607,7 +682,7 @@ class WorkflowStatsTest extends Specification {
607682
def trace = Mock(TraceRecord)
608683

609684
when:
610-
stats.markCompleted(task,trace)
685+
stats.markCompleted(task, trace, TaskStatus.COMPLETED)
611686

612687
then:
613688
task.aborted >> false

0 commit comments

Comments
 (0)