Skip to content

Commit 14e2841

Browse files
jorgeepditommaso
andauthored
Cid store quick wins (#5945)
Signed-off-by: jorgee <[email protected]> Signed-off-by: Paolo Di Tommaso <[email protected]> Co-authored-by: Paolo Di Tommaso <[email protected]>
1 parent a5b5907 commit 14e2841

32 files changed

+373
-425
lines changed

modules/nextflow/src/main/groovy/nextflow/cli/CmdCid.groovy

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ class CmdCid extends CmdBase implements UsageAware {
7777
@Override
7878
void run() {
7979
if( !args ) {
80+
usage(List.of())
8081
return
8182
}
8283
// setup the plugins system and load the secrets provider

modules/nextflow/src/test/groovy/nextflow/cli/CmdCidTest.groovy

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,8 @@ import nextflow.data.cid.CidHistoryRecord
2727
import nextflow.data.cid.CidStoreFactory
2828
import nextflow.data.cid.model.Checksum
2929
import nextflow.data.cid.model.Parameter
30-
import nextflow.data.cid.model.TaskOutput
30+
import nextflow.data.cid.model.DataOutput
3131
import nextflow.data.cid.model.TaskRun
32-
import nextflow.data.cid.model.WorkflowOutput
3332
import nextflow.plugin.Plugins
3433
import org.junit.Rule
3534
import spock.lang.Specification
@@ -77,7 +76,7 @@ class CmdCidTest extends Specification {
7776
def launcher = Mock(Launcher){
7877
getOptions() >> new CliOptions(config: [configFile.toString()])
7978
}
80-
def recordEntry = "${CidHistoryRecord.TIMESTAMP_FMT.format(date)}\trun_name\t${uniqueId}\tcid://123456\tcid://456789".toString()
79+
def recordEntry = "${CidHistoryRecord.TIMESTAMP_FMT.format(date)}\trun_name\t${uniqueId}\tcid://123456".toString()
8180
historyFile.text = recordEntry
8281
when:
8382
def cidCmd = new CmdCid(launcher: launcher, args: ["log"])
@@ -136,10 +135,10 @@ class CmdCidTest extends Specification {
136135
def launcher = Mock(Launcher){
137136
getOptions() >> new CliOptions(config: [configFile.toString()])
138137
}
139-
def time = Instant.ofEpochMilli(123456789).toString()
138+
def time = Instant.ofEpochMilli(123456789)
140139
def encoder = new CidEncoder().withPrettyPrint(true)
141-
def entry = new WorkflowOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
142-
"cid://123987/file.bam", 1234, time, time, null)
140+
def entry = new DataOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
141+
"cid://123987/file.bam","cid://123987/", 1234, time, time, null)
143142
def jsonSer = encoder.encode(entry)
144143
def expectedOutput = jsonSer
145144
cidFile.text = jsonSer
@@ -208,22 +207,26 @@ class CmdCidTest extends Specification {
208207
Files.createDirectories(cidFile4.parent)
209208
Files.createDirectories(cidFile5.parent)
210209
def encoder = new CidEncoder()
211-
def time = Instant.ofEpochMilli(123456789).toString()
212-
def entry = new WorkflowOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
213-
"cid://123987/file.bam", 1234, time, time, null)
210+
def time = Instant.ofEpochMilli(123456789)
211+
def entry = new DataOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
212+
"cid://123987/file.bam", "cid://45678", 1234, time, time, null)
214213
cidFile.text = encoder.encode(entry)
215-
entry = new TaskOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
216-
"cid://123987", 1234, time, time, null)
214+
entry = new DataOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
215+
"cid://123987", "cid://123987", 1234, time, time, null)
217216
cidFile2.text = encoder.encode(entry)
218-
entry = new TaskRun("u345-2346-1stw2", "foo", new Checksum("abcde2345","nextflow","standard"),
217+
entry = new TaskRun("u345-2346-1stw2", "foo",
218+
new Checksum("abcde2345","nextflow","standard"),
219+
new Checksum("abfsc2375","nextflow","standard"),
219220
[new Parameter( "ValueInParam", "sample_id","ggal_gut"),
220221
new Parameter("FileInParam","reads",["cid://45678/output.txt"])],
221222
null, null, null, null, [:],[], null)
222223
cidFile3.text = encoder.encode(entry)
223-
entry = new TaskOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
224-
"cid://45678", 1234, time, time, null)
224+
entry = new DataOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
225+
"cid://45678", "cid://45678", 1234, time, time, null)
225226
cidFile4.text = encoder.encode(entry)
226-
entry = new TaskRun("u345-2346-1stw2", "bar", new Checksum("abfs2556","nextflow","standard"),
227+
entry = new TaskRun("u345-2346-1stw2", "bar",
228+
new Checksum("abfs2556","nextflow","standard"),
229+
new Checksum("abfsc2375","nextflow","standard"),
227230
null,null, null, null, null, [:],[], null)
228231
cidFile5.text = encoder.encode(entry)
229232
final network = """flowchart BT
@@ -275,14 +278,14 @@ class CmdCidTest extends Specification {
275278
getOptions() >> new CliOptions(config: [configFile.toString()])
276279
}
277280
def encoder = new CidEncoder().withPrettyPrint(true)
278-
def time = Instant.ofEpochMilli(123456789).toString()
279-
def entry = new WorkflowOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
280-
"cid://123987/file.bam", 1234, time, time, null)
281+
def time = Instant.ofEpochMilli(123456789)
282+
def entry = new DataOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
283+
"cid://123987/file.bam", "cid://123987/", 1234, time, time, null)
281284
def jsonSer = encoder.encode(entry)
282285
def expectedOutput = jsonSer
283286
cidFile.text = jsonSer
284287
when:
285-
def cidCmd = new CmdCid(launcher: launcher, args: ["show", "cid:///?type=WorkflowOutput"])
288+
def cidCmd = new CmdCid(launcher: launcher, args: ["show", "cid:///?type=DataOutput"])
286289
cidCmd.run()
287290
def stdout = capture
288291
.toString()

modules/nf-cid/src/main/nextflow/data/cid/CidHistoryFile.groovy

Lines changed: 18 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,22 @@
1616
*/
1717
package nextflow.data.cid
1818

19-
import groovy.util.logging.Slf4j
20-
2119
import java.nio.channels.FileChannel
2220
import java.nio.channels.FileLock
2321
import java.nio.file.Files
2422
import java.nio.file.Path
2523
import java.nio.file.StandardOpenOption
2624

25+
import groovy.transform.CompileStatic
26+
import groovy.util.logging.Slf4j
27+
import nextflow.extension.FilesEx
2728
/**
2829
* File to store a history of the workflow executions and their corresponding CIDs
2930
*
3031
* @author Jorge Ejarque <[email protected]>
3132
*/
3233
@Slf4j
34+
@CompileStatic
3335
class CidHistoryFile implements CidHistoryLog {
3436

3537
Path path
@@ -38,13 +40,13 @@ class CidHistoryFile implements CidHistoryLog {
3840
this.path = file
3941
}
4042

41-
void write(String name, UUID key, String runCid, String resultsCid, Date date = null) {
43+
void write(String name, UUID key, String runCid, Date date = null) {
4244
assert key
4345

4446
withFileLock {
4547
def timestamp = date ?: new Date()
46-
log.trace("Writting record for $key in CID history file $this")
47-
path << new CidHistoryRecord(timestamp, name, key, runCid, resultsCid).toString() << '\n'
48+
log.trace("Writting record for $key in CID history file ${FilesEx.toUriString(this.path)}")
49+
path << new CidHistoryRecord(timestamp, name, key, runCid).toString() << '\n'
4850
}
4951
}
5052

@@ -55,18 +57,7 @@ class CidHistoryFile implements CidHistoryLog {
5557
withFileLock { updateRunCid0(sessionId, runCid) }
5658
}
5759
catch (Throwable e) {
58-
log.warn "Can't update CID history file: $this", e.message
59-
}
60-
}
61-
62-
void updateResultsCid(UUID sessionId, String resultsCid) {
63-
assert sessionId
64-
65-
try {
66-
withFileLock { updateResultsCid0(sessionId, resultsCid) }
67-
}
68-
catch (Throwable e) {
69-
log.warn "Can't update CID history file: $this", e.message
60+
log.warn "Can't update CID history file: ${FilesEx.toUriString(this.path)}", e.message
7061
}
7162
}
7263

@@ -76,7 +67,7 @@ class CidHistoryFile implements CidHistoryLog {
7667
withFileLock { this.path.eachLine {list.add(CidHistoryRecord.parse(it)) } }
7768
}
7869
catch (Throwable e) {
79-
log.warn "Can't read records from CID history file: $this", e.message
70+
log.warn "Can't read records from CID history file: ${FilesEx.toUriString(this.path)}", e.message
8071
}
8172
return list
8273
}
@@ -91,7 +82,7 @@ class CidHistoryFile implements CidHistoryLog {
9182
return current
9283
}
9384
}
94-
log.warn("Can't find session $id in CID history file $this")
85+
log.warn("Can't find session $id in CID history file ${FilesEx.toUriString(this.path)}")
9586
return null
9687
}
9788

@@ -100,43 +91,19 @@ class CidHistoryFile implements CidHistoryLog {
10091
assert id
10192
def newHistory = new StringBuilder()
10293

103-
this.path.readLines().each { line ->
104-
try {
105-
def current = line ? CidHistoryRecord.parse(line) : null
106-
if (current.sessionId == id) {
107-
log.trace("Updating record for $id in CID history file $this")
108-
final newRecord = new CidHistoryRecord(current.timestamp, current.runName, current.sessionId, runCid, current.resultsCid)
109-
newHistory << newRecord.toString() << '\n'
110-
} else {
111-
newHistory << line << '\n'
112-
}
113-
}
114-
catch (IllegalArgumentException e) {
115-
log.warn("Can't read CID history file: $this", e.message)
116-
}
117-
}
118-
119-
// rewrite the history content
120-
this.path.setText(newHistory.toString())
121-
}
122-
123-
private void updateResultsCid0(UUID id, String resultsCid) {
124-
assert id
125-
def newHistory = new StringBuilder()
126-
127-
this.path.readLines().each { line ->
94+
for( String line : this.path.readLines()) {
12895
try {
12996
def current = line ? CidHistoryRecord.parse(line) : null
13097
if (current.sessionId == id) {
131-
log.trace("Updating record for $id in CID history file $this")
132-
final newRecord = new CidHistoryRecord(current.timestamp, current.runName, current.sessionId, current.runCid, resultsCid)
98+
log.trace("Updating record for $id in CID history file ${FilesEx.toUriString(this.path)}")
99+
final newRecord = new CidHistoryRecord(current.timestamp, current.runName, current.sessionId, runCid)
133100
newHistory << newRecord.toString() << '\n'
134101
} else {
135102
newHistory << line << '\n'
136103
}
137104
}
138105
catch (IllegalArgumentException e) {
139-
log.warn("Can't read CID history file: $this", e.message)
106+
log.warn("Can't read CID history file: ${FilesEx.toUriString(this.path)}", e.message)
140107
}
141108
}
142109

@@ -161,11 +128,11 @@ class CidHistoryFile implements CidHistoryLog {
161128
try {
162129
fos = FileChannel.open(file, StandardOpenOption.WRITE, StandardOpenOption.CREATE)
163130
} catch (UnsupportedOperationException e){
164-
log.warn("File System Provider for ${this.path} do not support file locking. Continuing without lock...")
131+
log.warn("File System Provider for ${this.path} do not support file locking - Attemting without locking", e)
165132
return action.call()
166133
}
167134
if (!fos){
168-
throw new IllegalStateException("Can't create a file channel for ${this.path.toAbsolutePath()}")
135+
throw new IllegalStateException("Can't create a file channel for ${FilesEx.toUriString(this.path)}")
169136
}
170137
try {
171138
Throwable error
@@ -178,7 +145,7 @@ class CidHistoryFile implements CidHistoryLog {
178145
if (System.currentTimeMillis() - ts < 1_000)
179146
sleep rnd.nextInt(75)
180147
else {
181-
error = new IllegalStateException("Can't lock file: ${this.path.toAbsolutePath()} -- Nextflow needs to run in a file system that supports file locks")
148+
error = new IllegalStateException("Can't lock file: ${FilesEx.toUriString(this.path)} - Nextflow needs to run in a file system that supports file locks")
182149
break
183150
}
184151
}
@@ -200,4 +167,4 @@ class CidHistoryFile implements CidHistoryLog {
200167
file.delete()
201168
}
202169
}
203-
}
170+
}

modules/nf-cid/src/main/nextflow/data/cid/CidHistoryLog.groovy

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ interface CidHistoryLog {
3030
* @param runCid Workflow run CID.
3131
* @param resultsCid Workflow results CID.
3232
*/
33-
void write(String name, UUID sessionId, String runCid, String resultsCid)
33+
void write(String name, UUID sessionId, String runCid)
3434

3535
/**
3636
* Updates the run CID for a given session ID.
@@ -40,14 +40,6 @@ interface CidHistoryLog {
4040
*/
4141
void updateRunCid(UUID sessionId, String runCid)
4242

43-
/**
44-
* Updates the results CID for a given session ID.
45-
*
46-
* @param sessionId Workflow session ID.
47-
* @param resultsCid Workflow results CID.
48-
*/
49-
void updateResultsCid(UUID sessionId, String resultsCid)
50-
5143
/**
5244
* Get the store records in the CidHistoryLog.
5345
*

modules/nf-cid/src/main/nextflow/data/cid/CidHistoryRecord.groovy

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,12 @@ class CidHistoryRecord {
3535
final String runName
3636
final UUID sessionId
3737
final String runCid
38-
final String resultsCid
3938

40-
CidHistoryRecord(Date timestamp, String name, UUID sessionId, String runCid, String resultsCid = null) {
39+
CidHistoryRecord(Date timestamp, String name, UUID sessionId, String runCid) {
4140
this.timestamp = timestamp
4241
this.runName = name
4342
this.sessionId = sessionId
4443
this.runCid = runCid
45-
this.resultsCid = resultsCid
4644
}
4745

4846
CidHistoryRecord(UUID sessionId, String name = null) {
@@ -58,7 +56,6 @@ class CidHistoryRecord {
5856
line << (runName ?: '-')
5957
line << (sessionId.toString())
6058
line << (runCid ?: '-')
61-
line << (resultsCid ?: '-')
6259
}
6360

6461
@Override
@@ -71,8 +68,8 @@ class CidHistoryRecord {
7168
if (cols.size() == 2)
7269
return new CidHistoryRecord(UUID.fromString(cols[0]))
7370

74-
if (cols.size() == 5) {
75-
return new CidHistoryRecord(TIMESTAMP_FMT.parse(cols[0]), cols[1], UUID.fromString(cols[2]), cols[3], cols[4])
71+
if (cols.size() == 4) {
72+
return new CidHistoryRecord(TIMESTAMP_FMT.parse(cols[0]), cols[1], UUID.fromString(cols[2]), cols[3])
7673
}
7774

7875
throw new IllegalArgumentException("Not a valid history entry: `$line`")

0 commit comments

Comments
 (0)