Skip to content

Commit 8b82f66

Browse files
authored
Overwrite the default traceFileObserver (#41)
1 parent d90c523 commit 8b82f66

File tree

3 files changed

+217
-130
lines changed

3 files changed

+217
-130
lines changed

plugins/nf-snowflake/src/main/nextflow/snowflake/SnowflakeObserver.groovy

Lines changed: 0 additions & 125 deletions
This file was deleted.

plugins/nf-snowflake/src/main/nextflow/snowflake/SnowflakeObserverFactory.groovy

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,42 @@ package nextflow.snowflake
22

33
import groovy.transform.CompileStatic
44
import groovy.util.logging.Slf4j
5-
import nextflow.trace.TraceObserver
6-
import nextflow.trace.TraceObserverFactory
5+
import nextflow.plugin.Priority
6+
import nextflow.trace.TraceObserverV2
7+
import nextflow.trace.TraceObserverFactoryV2
78
import nextflow.Session
9+
import nextflow.snowflake.observers.SnowflakeTraceFileObserver
810

11+
/**
12+
* Factory for creating Snowflake-aware trace observers
13+
*
14+
* Runs with high priority (before DefaultObserverFactory) to intercept
15+
* trace file creation and use buffered uploads instead of streaming.
16+
*
17+
* @author Hongye Yu
18+
*/
919
@Slf4j
1020
@CompileStatic
11-
class SnowflakeObserverFactory implements TraceObserverFactory {
21+
@Priority(100) // Load before DefaultObserverFactory (default priority is 0)
22+
class SnowflakeObserverFactory implements TraceObserverFactoryV2 {
23+
1224
@Override
13-
Collection<TraceObserver> create(Session session) {
14-
return [new SnowflakeObserver()]
25+
Collection<TraceObserverV2> create(Session session) {
26+
def result = new ArrayList<TraceObserverV2>()
27+
28+
// Check if trace file observer should be created
29+
def traceConfig = session.config.trace as Map
30+
if (traceConfig && traceConfig.enabled) {
31+
log.info "Creating Snowflake trace file observer with buffered uploads"
32+
33+
// Create our custom trace observer that uses buffered uploads
34+
result << new SnowflakeTraceFileObserver(session, traceConfig)
35+
36+
// Disable default trace observer by modifying session config
37+
traceConfig.enabled = false
38+
log.debug "Disabled default TraceFileObserver"
39+
}
40+
41+
return result
1542
}
1643
}
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
package nextflow.snowflake.observers
2+
3+
import groovy.transform.CompileStatic
4+
import groovy.util.logging.Slf4j
5+
import nextflow.Session
6+
import nextflow.file.FileHelper
7+
import nextflow.trace.TraceHelper
8+
import nextflow.trace.TraceObserverV2
9+
import nextflow.trace.TraceRecord
10+
import nextflow.trace.event.TaskEvent
11+
12+
import java.nio.file.Files
13+
import java.nio.file.Path
14+
import java.util.concurrent.ConcurrentHashMap
15+
16+
/**
17+
* Snowflake-aware TraceFileObserver that buffers trace data in memory
18+
* and writes at completion to avoid long-running stream timeouts.
19+
*
20+
* This observer replaces the default TraceFileObserver when using Snowflake stages,
21+
* accumulating trace records in memory and writing them all at once when the
22+
* workflow completes.
23+
*
24+
* @author Hongye Yu
25+
*/
26+
@Slf4j
27+
@CompileStatic
28+
class SnowflakeTraceFileObserver implements TraceObserverV2 {
29+
30+
static final String DEF_FILE_NAME = 'trace.txt'
31+
static final String DEF_SEPARATOR = '\t'
32+
33+
private Path tracePath
34+
private String separator = DEF_SEPARATOR
35+
private List<String> fields
36+
private List<String> formats
37+
private boolean useRawNumbers
38+
private boolean overwrite
39+
private Map<Object, TraceRecord> current = new ConcurrentHashMap<>()
40+
41+
// Buffer to store trace lines in memory
42+
private List<String> traceLines = Collections.synchronizedList(new ArrayList<String>())
43+
44+
SnowflakeTraceFileObserver(Session session, Map config) {
45+
this.tracePath = config.file ? FileHelper.asPath(config.file as String) : (DEF_FILE_NAME as Path)
46+
this.overwrite = config.overwrite as Boolean ?: false
47+
this.separator = config.sep as String ?: DEF_SEPARATOR
48+
this.useRawNumbers = config.raw as Boolean ?: false
49+
50+
// Parse fields configuration
51+
setFieldsAndFormats(config.fields as String)
52+
}
53+
54+
protected void setFieldsAndFormats(String fieldsConfig) {
55+
if (!fieldsConfig) {
56+
// Default fields if none specified
57+
this.fields = ['task_id', 'hash', 'native_id', 'name', 'status', 'exit', 'submit', 'duration',
58+
'realtime', '%cpu', 'peak_rss', 'peak_vmem', 'rchar', 'wchar']
59+
this.formats = fields.collect { null }
60+
return
61+
}
62+
63+
// Parse field:format pairs
64+
def items = fieldsConfig.tokenize(',').collect { it.trim() }
65+
this.fields = []
66+
this.formats = []
67+
68+
for (String item : items) {
69+
def parts = item.tokenize(':')
70+
fields.add(parts[0].trim())
71+
formats.add(parts.size() > 1 ? parts[1].trim() : null)
72+
}
73+
}
74+
75+
@Override
76+
void onFlowCreate(Session session) {
77+
log.debug "Flow create -- trace file: $tracePath"
78+
79+
// Create parent directories
80+
def parent = tracePath.parent
81+
if (parent)
82+
Files.createDirectories(parent)
83+
84+
// Check if file exists
85+
if (Files.exists(tracePath) && !overwrite) {
86+
throw new IllegalArgumentException("Trace file already exists: $tracePath -- enable trace `overwrite` option to replace existing file")
87+
}
88+
89+
// Add header line to buffer
90+
traceLines.add(fields.join(separator))
91+
92+
log.info "Created trace file observer (buffered mode): $tracePath"
93+
}
94+
95+
@Override
96+
void onTaskSubmit(TaskEvent event) {
97+
def trace = event.trace
98+
if (trace)
99+
current.put(trace.taskId, trace)
100+
}
101+
102+
@Override
103+
void onTaskStart(TaskEvent event) {
104+
def trace = event.trace
105+
if (trace)
106+
current.put(trace.taskId, trace)
107+
}
108+
109+
@Override
110+
void onTaskComplete(TaskEvent event) {
111+
def trace = event.trace
112+
if (!trace)
113+
return
114+
115+
current.remove(trace.taskId)
116+
117+
// Add rendered trace line to buffer
118+
traceLines.add(render(trace))
119+
}
120+
121+
@Override
122+
void onTaskCached(TaskEvent event) {
123+
def trace = event.trace
124+
if (!trace)
125+
return
126+
127+
// Add rendered trace line to buffer
128+
traceLines.add(render(trace))
129+
}
130+
131+
protected String render(TraceRecord trace) {
132+
def values = fields.collect { name -> trace.get(name) }
133+
def result = new ArrayList(values.size())
134+
135+
for (int i = 0; i < values.size(); i++) {
136+
def value = values[i]
137+
def fmt = formats[i]
138+
result.add(renderValue(value, fmt))
139+
}
140+
141+
return result.join(separator)
142+
}
143+
144+
protected String renderValue(value, String fmt) {
145+
if (value == null)
146+
return ''
147+
148+
if (fmt && value instanceof Number) {
149+
try {
150+
return String.format(fmt, value)
151+
} catch (Exception e) {
152+
log.warn "Error formatting value $value with format $fmt: ${e.message}"
153+
}
154+
}
155+
156+
if (useRawNumbers && value instanceof Number)
157+
return value.toString()
158+
159+
def str = value.toString()
160+
// Escape newlines and quotes
161+
return str.contains('\n') ? "\"${str.replace('"', '""')}\"" : str
162+
}
163+
164+
@Override
165+
void onFlowComplete() {
166+
log.debug "Flow complete -- writing buffered trace file with ${traceLines.size()} lines"
167+
168+
try {
169+
// Write all buffered lines at once using TraceHelper
170+
def writer = TraceHelper.newFileWriter(tracePath, overwrite, 'UTF-8')
171+
try {
172+
traceLines.each { line ->
173+
writer.println(line)
174+
}
175+
writer.flush()
176+
log.info "Successfully wrote trace file: $tracePath"
177+
} finally {
178+
writer.close()
179+
}
180+
} catch (Exception e) {
181+
log.error("Failed to write trace file: ${e.message}", e)
182+
throw e
183+
}
184+
}
185+
}

0 commit comments

Comments
 (0)