Skip to content

Commit 5318812

Browse files
committed
Add support for tower_workflow_id
1 parent 1809a04 commit 5318812

File tree

5 files changed

+58
-41
lines changed

5 files changed

+58
-41
lines changed

modules/nextflow/src/main/groovy/nextflow/Session.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ class Session implements ISession {
298298
uniqueId = UUID.fromString(config.resume as String)
299299
}
300300
else {
301-
uniqueId = UUID.randomUUID()
301+
uniqueId = systemEnv.get('NXF_UUID') ? UUID.fromString(systemEnv.get('NXF_UUID')) : UUID.randomUUID()
302302
}
303303
log.debug "Session uuid: $uniqueId"
304304

modules/nextflow/src/main/groovy/nextflow/script/ScriptRunner.groovy

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,8 +236,9 @@ class ScriptRunner {
236236
void verifyAndTrackHistory(String cli, String name) {
237237
assert cli, 'Missing launch command line'
238238

239+
final ignore = System.getenv('NXF_IGNORE_RESUME_HISTORY') as Boolean
239240
// -- when resume, make sure the session id exists in the executions history
240-
if( session.resumeMode && !HistoryFile.DEFAULT.checkExistsById(session.uniqueId.toString())) {
241+
if( session.resumeMode && !ignore && !HistoryFile.DEFAULT.checkExistsById(session.uniqueId.toString()) ) {
241242
throw new AbortOperationException("Can't find a run with the specified id: ${session.uniqueId} -- Execution can't be resumed")
242243
}
243244

modules/nextflow/src/main/groovy/nextflow/script/WorkflowMetadata.groovy

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ class WorkflowMetadata {
202202
*/
203203
Manifest manifest
204204

205-
final private Session session
205+
private Session session
206206

207207
final private List<Closure> onCompleteActions = []
208208

@@ -462,4 +462,4 @@ class WorkflowMetadata {
462462
session.statsObserver.getStats()
463463
}
464464

465-
}
465+
}

modules/nf-tower/src/main/io/seqera/tower/plugin/TowerClient.groovy

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ class TowerClient implements TraceObserver {
9797

9898
private ResourcesAggregator aggregator
9999

100-
private Map<String,String> env = System.getenv()
100+
protected Map<String,String> env = System.getenv()
101101

102102
private LinkedBlockingQueue<ProcessEvent> events = new LinkedBlockingQueue()
103103

@@ -119,6 +119,8 @@ class TowerClient implements TraceObserver {
119119

120120
private int backOffBase
121121

122+
private boolean towerLaunch
123+
122124
/**
123125
* Constructor that consumes a URL and creates
124126
* a basic HTTP client.
@@ -137,16 +139,14 @@ class TowerClient implements TraceObserver {
137139
this.generator = TowerJsonGenerator.create(Collections.EMPTY_MAP)
138140
}
139141

140-
protected String getLaunchId() {
141-
env.get('TOWER_LAUNCH_ID')
142-
}
143-
144142
boolean enableMetrics() { true }
145143

146144
String getEndpoint() { endpoint }
147145

148146
String getWorkflowId() { workflowId }
149147

148+
boolean getTowerLaunch() { towerLaunch }
149+
150150
void setAliveInterval(Duration d) {
151151
this.aliveInterval = d
152152
}
@@ -245,7 +245,8 @@ class TowerClient implements TraceObserver {
245245
result.runName = session.runName
246246
result.projectName = session.workflowMetadata.projectName
247247
result.repository = session.workflowMetadata.repository
248-
result.launchId = launchId
248+
result.workflowId = env.get('TOWER_WORKFLOW_ID')
249+
this.towerLaunch = result.workflowId != null
249250
return result
250251
}
251252

@@ -412,7 +413,7 @@ class TowerClient implements TraceObserver {
412413
def result = new LinkedHashMap(5)
413414
result.workflow = workflow
414415
result.processNames = new ArrayList(processNames)
415-
result.launchId = launchId
416+
result.towerLaunch = towerLaunch
416417
return result
417418
}
418419

modules/nf-tower/src/test/io/seqera/tower/plugin/TowerClientTest.groovy

Lines changed: 45 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111

1212
package io.seqera.tower.plugin
1313

14+
import spock.lang.Specification
15+
1416
import java.nio.file.Files
1517
import java.time.Instant
1618
import java.time.OffsetDateTime
@@ -22,14 +24,11 @@ import nextflow.cloud.types.PriceModel
2224
import nextflow.container.ContainerConfig
2325
import nextflow.exception.AbortOperationException
2426
import nextflow.script.ScriptBinding
25-
import nextflow.script.ScriptFile
2627
import nextflow.script.WorkflowMetadata
2728
import nextflow.trace.TraceRecord
2829
import nextflow.trace.WorkflowStats
2930
import nextflow.trace.WorkflowStatsObserver
3031
import nextflow.util.SimpleHttpClient
31-
import org.junit.Ignore
32-
import spock.lang.Specification
3332
/**
3433
*
3534
* @author Paolo Di Tommaso <[email protected]>
@@ -162,14 +161,6 @@ class TowerClientTest extends Specification {
162161

163162
}
164163

165-
def 'should get launchId from env' () {
166-
when:
167-
def observer = new TowerClient(env:[TOWER_LAUNCH_ID:'foo-123'])
168-
169-
then:
170-
observer.launchId == 'foo-123'
171-
}
172-
173164
def 'should post task records' () {
174165
given:
175166
def URL = 'http://foo.com'
@@ -240,35 +231,56 @@ class TowerClientTest extends Specification {
240231
}
241232

242233

243-
@Ignore
244234
def 'should create workflow json' () {
245235

246236
given:
237+
def sessionId = UUID.randomUUID()
247238
def dir = Files.createTempDirectory('test')
248-
def client = Mock(SimpleHttpClient)
249-
TowerClient tower = Spy(TowerClient, constructorArgs: [ [httpClient: client] ])
239+
def http = Mock(SimpleHttpClient)
240+
Map args = [httpClient: http, env: ENV]
241+
TowerClient client = Spy(TowerClient, constructorArgs: [ args ])
250242
and:
251243
def session = Mock(Session)
244+
session.getUniqueId() >> sessionId
245+
session.getRunName() >> 'foo'
252246
session.config >> [:]
253247
session.containerConfig >> new ContainerConfig()
254248
session.getParams() >> new ScriptBinding.ParamsMap([foo:'Hello', bar:'World'])
255249

256-
def meta = new WorkflowMetadata(session, Mock(ScriptFile))
250+
def meta = new WorkflowMetadata(
251+
session: session,
252+
projectName: 'the-project-name',
253+
repository: 'git://repo.com/foo' )
257254
session.getWorkflowMetadata() >> meta
258255
session.getStatsObserver() >> Mock(WorkflowStatsObserver) { getStats() >> new WorkflowStats() }
259256

260257
when:
261-
def req = tower.makeBeginReq(session)
258+
def req1 = client.makeCreateReq(session)
262259
then:
263-
tower.getWorkflowId() >> '12345'
264-
tower.getLaunchId() >> 'x123'
260+
req1.sessionId == sessionId.toString()
261+
req1.runName == 'foo'
262+
req1.projectName == 'the-project-name'
263+
req1.repository == 'git://repo.com/foo'
264+
req1.workflowId == WORKFLOW_ID
265+
266+
when:
267+
def req = client.makeBeginReq(session)
268+
then:
269+
client.getWorkflowId() >> '12345'
265270
and:
266271
req.workflow.id == '12345'
267272
req.workflow.params == [foo:'Hello', bar:'World']
268-
req.launchId == 'x123'
273+
and:
274+
req.towerLaunch == TOWER_LAUNCH
275+
269276
cleanup:
270277
dir?.deleteDir()
271278

279+
where:
280+
ENV | WORKFLOW_ID | TOWER_LAUNCH
281+
[:] | null | false
282+
[TOWER_WORKFLOW_ID: '1234'] | '1234' | true
283+
272284
}
273285

274286
def 'should convert map' () {
@@ -298,7 +310,7 @@ class TowerClientTest extends Specification {
298310
def 'should create init request' () {
299311
given:
300312
def uuid = UUID.randomUUID()
301-
def tower = new TowerClient(env: [TOWER_LAUNCH_ID: 'x123'])
313+
def client = new TowerClient(env: [TOWER_WORKFLOW_ID: 'x123'])
302314
def meta = Mock(WorkflowMetadata) {
303315
getProjectName() >> 'the-project-name'
304316
getRepository() >> 'git://repo.com/foo'
@@ -310,13 +322,15 @@ class TowerClientTest extends Specification {
310322
}
311323

312324
when:
313-
def req = tower.makeCreateReq(session)
325+
def req = client.makeCreateReq(session)
314326
then:
315327
req.sessionId == uuid.toString()
316328
req.runName == 'foo_bar'
317329
req.projectName == 'the-project-name'
318330
req.repository == 'git://repo.com/foo'
319-
req.launchId == 'x123'
331+
req.workflowId == 'x123'
332+
and:
333+
client.towerLaunch
320334
}
321335

322336
def 'should post create request' () {
@@ -332,19 +346,20 @@ class TowerClientTest extends Specification {
332346
getWorkflowMetadata() >> meta
333347
}
334348

335-
TowerClient tower = Spy(TowerClient, constructorArgs: ['https://tower.nf'])
349+
TowerClient client = Spy(TowerClient, constructorArgs: ['https://tower.nf'])
336350

337351
when:
338-
tower.onFlowCreate(session)
352+
client.onFlowCreate(session)
339353
then:
340-
1 * tower.getAccessToken() >> 'secret'
341-
1 * tower.makeCreateReq(session) >> [runName: 'foo']
342-
1 * tower.sendHttpMessage('https://tower.nf/trace/create', [runName: 'foo'], 'POST') >> new TowerClient.Response(200, '{"workflowId":"xyz123"}')
354+
1 * client.getAccessToken() >> 'secret'
355+
1 * client.makeCreateReq(session) >> [runName: 'foo']
356+
1 * client.sendHttpMessage('https://tower.nf/trace/create', [runName: 'foo'], 'POST') >> new TowerClient.Response(200, '{"workflowId":"xyz123"}')
343357
and:
344-
tower.runName == 'foo_bar'
345-
tower.runId == uuid.toString()
358+
client.runName == 'foo_bar'
359+
client.runId == uuid.toString()
346360
and:
347-
tower.workflowId == 'xyz123'
361+
client.workflowId == 'xyz123'
362+
!client.towerLaunch
348363

349364
}
350365

0 commit comments

Comments
 (0)