1717
1818package nextflow.data.cid
1919
20+ import static nextflow.data.cid.fs.CidPath.*
21+
22+ import java.nio.file.Files
23+ import java.nio.file.Path
24+ import java.nio.file.attribute.BasicFileAttributes
25+
26+ import groovy.transform.CompileStatic
2027import groovy.util.logging.Slf4j
28+ import nextflow.Session
2129import nextflow.data.cid.model.Checksum
2230import nextflow.data.cid.model.DataPath
2331import nextflow.data.cid.model.Parameter
24- import nextflow.data.cid.model.WorkflowResults
32+ import nextflow.data.cid.model.TaskOutput
2533import nextflow.data.cid.model.Workflow
34+ import nextflow.data.cid.model.WorkflowOutput
35+ import nextflow.data.cid.model.WorkflowResults
2636import nextflow.data.cid.model.WorkflowRun
37+ import nextflow.data.cid.serde.CidEncoder
2738import nextflow.file.FileHelper
2839import nextflow.file.FileHolder
40+ import nextflow.processor.TaskHandler
41+ import nextflow.processor.TaskRun
2942import nextflow.script.ScriptMeta
3043import nextflow.script.params.DefaultInParam
3144import nextflow.script.params.FileInParam
32- import nextflow.script.params.InParam
33- import nextflow.util.PathNormalizer
34- import nextflow.util.TestOnly
35-
36- import java.nio.file.Files
37- import java.nio.file.Path
38- import java.nio.file.attribute.BasicFileAttributes
39-
40- import groovy.json.JsonOutput
41- import groovy.transform.CompileStatic
42- import nextflow.Session
43- import nextflow.data.cid.model.DataType
44- import nextflow.data.cid.model.Output
45- import nextflow.processor.TaskHandler
46- import nextflow.processor.TaskRun
4745import nextflow.script.params.FileOutParam
46+ import nextflow.script.params.InParam
4847import nextflow.trace.TraceObserver
4948import nextflow.trace.TraceRecord
5049import nextflow.util.CacheHelper
51-
52- import static nextflow.data.cid.fs.CidPath.CID_PROT
53-
50+ import nextflow.util.PathNormalizer
51+ import nextflow.util.TestOnly
5452/**
5553 * Observer to write the generated workflow metadata in a CID store.
5654 *
@@ -65,6 +63,7 @@ class CidObserver implements TraceObserver {
6563 private Session session
6664 private WorkflowResults workflowResults
6765 private Map<String ,String > outputsStoreDirCid = new HashMap<String ,String > (10 )
66+ private CidEncoder encoder = new CidEncoder ()
6867
6968 CidObserver (Session session , CidStore store ){
7069 this . session = session
@@ -83,18 +82,17 @@ class CidObserver implements TraceObserver {
8382 void onFlowBegin () {
8483 this . executionHash = storeWorkflowRun()
8584 workflowResults = new WorkflowResults (
86- DataType.WorkflowResults ,
8785 " $CID_PROT ${ executionHash} " ,
88- new ArrayList<Parameter > ())
86+ new ArrayList<String > ())
8987 this . store. getHistoryLog(). updateRunCid(session. uniqueId, " ${ CID_PROT}${ this.executionHash} " )
9088 }
9189
9290 @Override
9391 void onFlowComplete (){
9492 if (this . workflowResults){
95- final content = JsonOutput . prettyPrint( JsonOutput . toJson( workflowResults) )
96- final wfResultsHash = CacheHelper . hasher(content ). hash(). toString()
97- this . store. save(wfResultsHash, content )
93+ final json = encoder . encode( workflowResults)
94+ final wfResultsHash = CacheHelper . hasher(json ). hash(). toString()
95+ this . store. save(wfResultsHash, workflowResults )
9896 this . store. getHistoryLog(). updateResultsCid(session. uniqueId, " ${ CID_PROT}${ wfResultsHash} " )
9997 }
10098 }
@@ -121,23 +119,21 @@ class CidObserver implements TraceObserver {
121119 }
122120 }
123121 final workflow = new Workflow (
124- DataType.Workflow ,
125122 mainScript,
126123 otherScripts,
127124 session. workflowMetadata. repository,
128125 session. workflowMetadata. commitId
129126 )
130127 final value = new WorkflowRun (
131- DataType.WorkflowRun ,
132128 workflow,
133129 session. uniqueId. toString(),
134130 session. runName,
135131 getNormalizedParams(session. params, normalizer)
136132 )
137133
138- final content = JsonOutput . prettyPrint( JsonOutput . toJson( value) )
139- final executionHash = CacheHelper . hasher(content ). hash(). toString()
140- store. save(executionHash, content )
134+ final json = encoder . encode( value)
135+ final executionHash = CacheHelper . hasher(json ). hash(). toString()
136+ store. save(executionHash, value )
141137 return executionHash
142138 }
143139
@@ -184,7 +180,6 @@ class CidObserver implements TraceObserver {
184180 final codeChecksum = new Checksum (CacheHelper . hasher(session. stubRun ? task. stubSource: task. source). hash(). toString(),
185181 " nextflow" , CacheHelper.HashMode . DEFAULT (). toString(). toLowerCase())
186182 final value = new nextflow.data.cid.model.TaskRun (
187- DataType.TaskRun ,
188183 session. uniqueId. toString(),
189184 task. getName(),
190185 codeChecksum,
@@ -203,7 +198,7 @@ class CidObserver implements TraceObserver {
203198
204199 // store in the underlying persistence
205200 final key = task. hash. toString()
206- store. save(key, JsonOutput . prettyPrint( JsonOutput . toJson( value)) )
201+ store. save(key, value)
207202 return key
208203 }
209204
@@ -215,15 +210,14 @@ class CidObserver implements TraceObserver {
215210 final key = cid. toString()
216211 final checksum = new Checksum ( CacheHelper . hasher(path). hash(). toString(),
217212 " nextflow" , CacheHelper.HashMode . DEFAULT (). toString(). toLowerCase() )
218- final value = new Output (
219- DataType.TaskOutput ,
213+ final value = new TaskOutput (
220214 path. toUriString(),
221215 checksum,
222216 " $CID_PROT $task . hash " ,
223217 attrs. size(),
224218 attrs. creationTime(). toMillis(),
225219 attrs. lastModifiedTime(). toMillis())
226- store. save(key, JsonOutput . prettyPrint( JsonOutput . toJson( value)) )
220+ store. save(key, value)
227221 } catch (Throwable e) {
228222 log. warn(" Exception storing CID output $path for task ${ task.name} . ${ e.getLocalizedMessage()} " )
229223 }
@@ -273,20 +267,20 @@ class CidObserver implements TraceObserver {
273267 CacheHelper.HashMode . DEFAULT (). toString(). toLowerCase()
274268 )
275269 final rel = getWorkflowRelative(destination)
276- final key = " $executionHash /${ rel} "
270+ final key = " $executionHash /${ rel} " as String
277271 final sourceReference = getSourceReference(source)
278272 final attrs = readAttributes(destination)
279- final value = new Output (
280- DataType.WorkflowOutput ,
273+ final value = new WorkflowOutput (
281274 destination. toUriString(),
282275 checksum,
283276 sourceReference,
284277 attrs. size(),
285278 attrs. creationTime(). toMillis(),
286279 attrs. lastModifiedTime(). toMillis())
287- store. save(key, JsonOutput . prettyPrint(JsonOutput . toJson(value)))
288- workflowResults. outputs. add(" ${ CID_PROT}${ key} " )
289- } catch (Throwable e) {
280+ store. save(key, value)
281+ workflowResults. outputs. add(" ${ CID_PROT}${ key} " . toString())
282+ }
283+ catch (Throwable e) {
290284 log. warn(" Exception storing CID output $destination for workflow ${ executionHash} ." , e)
291285 }
292286 }
@@ -315,16 +309,15 @@ class CidObserver implements TraceObserver {
315309 final rel = getWorkflowRelative(destination)
316310 final key = " $executionHash /${ rel} "
317311 final attrs = readAttributes(destination)
318- final value = new Output (
319- DataType.WorkflowOutput ,
312+ final value = new WorkflowOutput (
320313 destination. toUriString(),
321314 checksum,
322315 " ${ CID_PROT}${ executionHash} " . toString(),
323316 attrs. size(),
324317 attrs. creationTime(). toMillis(),
325318 attrs. lastModifiedTime(). toMillis())
326- store. save(key, JsonOutput . prettyPrint( JsonOutput . toJson( value)) )
327- workflowResults. outputs. add(" ${ CID_PROT}${ key} " )
319+ store. save(key, value)
320+ workflowResults. outputs. add(" ${ CID_PROT}${ key} " as String )
328321 }catch (Throwable e) {
329322 log. warn(" Exception storing CID output $destination for workflow ${ executionHash} . ${ e.getLocalizedMessage()} " )
330323 }
0 commit comments