Skip to content

Commit fd46f65

Browse files
jagednabhi18av
authored andcommitted
refactor job status query logic
Signed-off-by: Jorge Aguilera <[email protected]>
1 parent 1b57a8f commit fd46f65

File tree

7 files changed

+101
-45
lines changed

7 files changed

+101
-45
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@ oracle-nomad-cluster
1616
.settings
1717
/validation/nomad_temp/**
1818
/validation/nomad
19+
**/*.tsv

plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadExecutor.groovy

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ class NomadExecutor extends Executor implements ExtensionPoint {
6060

6161
@Override
6262
TaskHandler createTaskHandler(TaskRun task) {
63+
assert task
64+
assert task.workDir
65+
log.trace "[NOMAD] launching process > ${task.name} -- work folder: ${task.workDirStr}"
6366
return new NomadTaskHandler(task, this.config, service)
6467
}
6568

plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -114,52 +114,38 @@ class NomadService implements Closeable{
114114
}
115115

116116

117-
String getJobState(String jobId){
117+
TaskState getTaskState(String jobId){
118118
try {
119119
List<AllocationListStub> allocations = safeExecutor.apply {
120120
jobsApi.getJobAllocations(jobId, config.jobOpts().region, config.jobOpts().namespace,
121121
null, null, null, null, null, null,
122122
null, null)
123123
}
124-
AllocationListStub last = allocations?.sort {
124+
AllocationListStub last = allocations ? allocations.sort {
125125
it.modifyIndex
126-
}?.last()
127-
String currentState = last?.taskStates?.values()?.last()?.state
128-
log.debug "Task $jobId , state=$currentState"
129-
currentState ?: "Unknown"
126+
}?.last() : null
127+
TaskState currentState = last?.taskStates?.values()?.last()
128+
log.debug "Task $jobId , state=${currentState.state}"
129+
currentState
130130
}catch(Exception e){
131131
log.debug("[NOMAD] Failed to get jobState ${jobId} -- Cause: ${e.message ?: e}", e)
132-
"dead"
132+
new TaskState(state: "unknow")
133133
}
134134
}
135135

136136

137137

138-
boolean checkIfRunning(String jobId){
138+
String getJobStatus(String jobId){
139139
try {
140140
Job job = safeExecutor.apply {
141141
jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace,
142142
null, null, null, null, null, null, null)
143143
}
144144
log.debug "[NOMAD] checkIfRunning jobID=$job.ID; status=$job.status"
145-
job.status == "running"
145+
job.status
146146
}catch (Exception e){
147147
log.debug("[NOMAD] Failed to get jobState ${jobId} -- Cause: ${e.message ?: e}", e)
148-
false
149-
}
150-
}
151-
152-
boolean checkIfDead(String jobId){
153-
try{
154-
Job job = safeExecutor.apply {
155-
jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace,
156-
null, null, null, null, null, null, null)
157-
}
158-
log.debug "[NOMAD] checkIfDead jobID=$job.ID; status=$job.status"
159-
job.status == "dead"
160-
}catch (Exception e){
161-
log.debug("[NOMAD] Failed to get job ${jobId} -- Cause: ${e.message ?: e}", e)
162-
true
148+
"Unknow"
163149
}
164150
}
165151

plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadTaskHandler.groovy

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package nextflow.nomad.executor
1919

2020
import groovy.transform.CompileStatic
2121
import groovy.util.logging.Slf4j
22+
import io.nomadproject.client.model.TaskState
2223
import nextflow.exception.ProcessSubmitException
2324
import nextflow.exception.ProcessUnrecoverableException
2425
import nextflow.executor.BashWrapperBuilder
@@ -31,6 +32,7 @@ import nextflow.processor.TaskStatus
3132
import nextflow.trace.TraceRecord
3233
import nextflow.util.Escape
3334
import nextflow.SysEnv
35+
import org.threeten.bp.OffsetDateTime
3436

3537
import java.nio.file.Path
3638

@@ -51,7 +53,7 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {
5153

5254
private String clientName = null
5355

54-
private String state
56+
private TaskState state
5557

5658
private long timestamp
5759

@@ -72,41 +74,46 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {
7274

7375
@Override
7476
boolean checkIfRunning() {
75-
if(isActive()) {
76-
determineClientNode()
77+
if( !jobName ) throw new IllegalStateException("Missing Nomad Job name -- cannot check if running")
78+
if(isSubmitted()) {
79+
def state = taskState0()
80+
// include `terminated` state to allow the handler status to progress
81+
if( state && ( "running" == state.state || "terminated" == state.state)){
82+
status = TaskStatus.RUNNING
83+
determineClientNode()
84+
return true
85+
}
7786
}
78-
nomadService.checkIfRunning(this.jobName)
87+
return false
7988
}
8089

8190
@Override
8291
boolean checkIfCompleted() {
83-
if (!nomadService.checkIfDead(this.jobName)) {
84-
return false
85-
}
92+
if( !jobName ) throw new IllegalStateException("Missing Nomad Job name -- cannot check if running")
8693

87-
state = taskState0(this.jobName)
94+
def state = taskState0()
8895

89-
final isFinished = [
90-
"complete",
91-
"failed",
92-
"dead",
93-
"lost"].contains(state)
96+
final isFinished = state && state.finishedAt != null
9497

95-
log.debug "[NOMAD] checkIfCompleted task.name=$task.name; state=$state completed=$isFinished"
98+
log.debug "[NOMAD] checkIfCompleted task.name=$task.name; state=${state?.state} completed=$isFinished"
9699

97100
if (isFinished) {
98101
// finalize the task
99102
task.exitStatus = readExitFile()
100103
task.stdout = outputFile
101104
task.stderr = errorFile
102-
this.status = TaskStatus.COMPLETED
103-
if (state == "failed" || state == "lost" || state == "unknown")
105+
status = TaskStatus.COMPLETED
106+
if ( !state || state.failed ) {
104107
task.error = new ProcessUnrecoverableException()
108+
task.aborted = true
109+
}
105110

106111
if (shouldDelete()) {
107112
nomadService.jobPurge(this.jobName)
108113
}
109114

115+
updateTimestamps(state?.startedAt, state?.finishedAt)
116+
determineClientNode()
110117
return true
111118
}
112119

@@ -180,13 +187,13 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {
180187
return ret
181188
}
182189

183-
protected String taskState0(String taskName) {
190+
protected TaskState taskState0() {
184191
final now = System.currentTimeMillis()
185192
final delta = now - timestamp;
186193
if (!status || delta >= 1_000) {
187194

188-
def newState = nomadService.getJobState(jobName)
189-
log.debug "[NOMAD] Check jobState: jobName=$jobName currentState=$state newState=$newState"
195+
def newState = nomadService.getTaskState(jobName)
196+
log.debug "[NOMAD] Check jobState: jobName=$jobName currentState=${state?.state} newState=${newState?.state}"
190197

191198
if (newState) {
192199
state = newState
@@ -229,4 +236,19 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {
229236
return result
230237
}
231238

239+
void updateTimestamps(OffsetDateTime start, OffsetDateTime end){
240+
try {
241+
startTimeMillis = start.toInstant().toEpochMilli()
242+
completeTimeMillis = end.toInstant().toEpochMilli()
243+
} catch( Exception e ) {
244+
// Only update if startTimeMillis hasn't already been set.
245+
// If startTimeMillis _has_ been set, then both startTimeMillis
246+
// and completeTimeMillis will have been set with the normal
247+
// TaskHandler mechanism, so there's no need to reset them here.
248+
if (!startTimeMillis) {
249+
startTimeMillis = System.currentTimeMillis()
250+
completeTimeMillis = System.currentTimeMillis()
251+
}
252+
}
253+
}
232254
}

plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ class NomadServiceSpec extends Specification{
6565
mockWebServer.enqueue(new MockResponse()
6666
.addHeader("Content-Type", "application/json"));
6767

68-
def state = service.getJobState("theId")
68+
def state = service.getTaskState("theId")
6969
def recordedRequest = mockWebServer.takeRequest();
7070

7171
then:
@@ -214,7 +214,7 @@ class NomadServiceSpec extends Specification{
214214
""")
215215
.addHeader("Content-Type", "application/json"));
216216

217-
state = service.getJobState("theId")
217+
state = service.getTaskState("theId")
218218
recordedRequest = mockWebServer.takeRequest();
219219

220220
then:

validation/tower/main.nf

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#!/usr/bin/env nextflow
2+
3+
process sayHello {
4+
container 'ubuntu:20.04'
5+
6+
input:
7+
val x
8+
output:
9+
stdout
10+
script:
11+
"""
12+
sleep 10
13+
echo '$x world!'
14+
"""
15+
}
16+
17+
workflow {
18+
Channel.of('Bonjour', 'Ciao', 'Hello', 'Hola') | sayHello | view
19+
}

validation/tower/nextflow.config

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
plugins {
2+
id "nf-nomad@${System.getenv("NOMAD_PLUGIN_VERSION") ?: "latest"}"
3+
}
4+
5+
process {
6+
executor = "nomad"
7+
}
8+
9+
tower {
10+
enabled = true
11+
workspaceId = "276172789442513"
12+
}
13+
14+
nomad {
15+
16+
client {
17+
address = "http://localhost:4646"
18+
}
19+
20+
jobs {
21+
deleteOnCompletion = false
22+
volume = { type "host" name "scratchdir" }
23+
}
24+
25+
}

0 commit comments

Comments
 (0)