Skip to content

Commit eae9f7d

Browse files
authored
[ci fast] Add agent output mode (NXF_AGENT_MODE)
Add AI agent-friendly output mode activated via NXF_AGENT_MODE=1. When enabled, Nextflow replaces the interactive ANSI progress display with minimal, structured tagged lines optimized for AI agent context windows: [PIPELINE] name version | profile=X [WORKDIR] /path/to/work [PROCESS hash] name (tag) [WARN] deduplicated warning [ERROR] name with exit/cmd/stderr/workdir [SUCCESS|FAILED] completed=N failed=N cached=N Key changes: - LogObserver interface as common abstraction for AnsiLogObserver and AgentLogObserver - Session holds a single logObserver field instead of two concrete fields - DefaultObserverFactory creates both observer types using the same pattern - AgentLogObserver filters hash-prefixed log lines to avoid duplicating [PROCESS] output - Workflow output (view operator) passes through to stdout in agent mode - Renamed env var from NXF_AGENT to NXF_AGENT_MODE - Added NXF_AGENT_MODE to environment variables documentation
1 parent 6e66aaa commit eae9f7d

File tree

17 files changed

+759
-27
lines changed

17 files changed

+759
-27
lines changed

docs/reference/env-vars.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ The following environment variables control the configuration of the Nextflow ru
1616

1717
## Nextflow settings
1818

19+
`NXF_AGENT_MODE`
20+
: :::{versionadded} 26.04.0
21+
:::
22+
: When `true`, enables agent output mode. In this mode, Nextflow replaces the interactive ANSI log with minimal, structured output optimized for AI agents and non-interactive environments. The output uses tagged lines such as `[PIPELINE]`, `[PROCESS]`, `[WARN]`, `[ERROR]`, and `[SUCCESS]`/`[FAILED]` written to standard error (default: `false`).
23+
1924
`NXF_ANSI_LOG`
2025
: Enables/disables ANSI console output (default `true` when ANSI terminal is detected).
2126

modules/nextflow/src/main/groovy/nextflow/Session.groovy

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ import nextflow.script.ScriptRunner
6969
import nextflow.script.WorkflowMetadata
7070
import nextflow.script.dsl.ProcessConfigBuilder
7171
import nextflow.spack.SpackConfig
72-
import nextflow.trace.AnsiLogObserver
72+
import nextflow.trace.LogObserver
7373
import nextflow.trace.TraceObserver
7474
import nextflow.trace.TraceObserverFactory
7575
import nextflow.trace.TraceObserverFactoryV2
@@ -311,9 +311,11 @@ class Session implements ISession {
311311

312312
boolean ansiLog
313313

314+
boolean agentLog
315+
314316
boolean disableJobsCancellation
315317

316-
AnsiLogObserver ansiLogObserver
318+
LogObserver logObserver
317319

318320
FilePorter getFilePorter() { filePorter }
319321

@@ -832,7 +834,7 @@ class Session implements ISession {
832834
shutdown0()
833835
notifyError(null)
834836
// force termination
835-
ansiLogObserver?.forceTermination()
837+
logObserver?.forceTermination()
836838
executorFactory?.signalExecutors()
837839
processesBarrier.forceTermination()
838840
monitorsBarrier.forceTermination()
@@ -1278,16 +1280,16 @@ class Session implements ISession {
12781280
}
12791281

12801282
void printConsole(String str, boolean newLine=false) {
1281-
if( ansiLogObserver )
1282-
ansiLogObserver.appendInfo(str)
1283+
if( logObserver )
1284+
logObserver.appendInfo(str)
12831285
else if( newLine )
12841286
System.out.println(str)
12851287
else
12861288
System.out.print(str)
12871289
}
12881290

12891291
void printConsole(Path file) {
1290-
ansiLogObserver ? ansiLogObserver.appendInfo(file.text) : Files.copy(file, System.out)
1292+
logObserver ? logObserver.appendInfo(file.text) : Files.copy(file, System.out)
12911293
}
12921294

12931295
private volatile ThreadPoolManager finalizePoolManager

modules/nextflow/src/main/groovy/nextflow/cli/CliOptions.groovy

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,12 @@ class CliOptions {
118118
if( noColor ) {
119119
return ansiLog = false
120120
}
121+
122+
// Disable ANSI log in agent mode for plain, parseable output
123+
if( SysEnv.isAgentMode() ) {
124+
return ansiLog = false
125+
}
126+
121127
return Ansi.isEnabled()
122128
}
123129

modules/nextflow/src/main/groovy/nextflow/cli/CmdRun.groovy

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,8 @@ class CmdRun extends CmdBase implements HubOptions {
404404
runner.setPreview(this.preview, previewAction)
405405
runner.session.profile = profile
406406
runner.session.commandLine = launcher.cliString
407-
runner.session.ansiLog = launcher.options.ansiLog
407+
runner.session.ansiLog = launcher.options.ansiLog && !SysEnv.isAgentMode()
408+
runner.session.agentLog = SysEnv.isAgentMode()
408409
runner.session.debug = launcher.options.remoteDebug
409410
runner.session.disableJobsCancellation = getDisableJobsCancellation()
410411

@@ -435,6 +436,11 @@ class CmdRun extends CmdBase implements HubOptions {
435436
}
436437

437438
protected void printBanner() {
439+
// Suppress banner in agent mode for minimal output
440+
if( SysEnv.isAgentMode() ) {
441+
return
442+
}
443+
438444
if( launcher.options.ansiLog ){
439445
// Plain header for verbose log
440446
log.debug "N E X T F L O W ~ version ${BuildInfo.version}"
@@ -499,6 +505,11 @@ class CmdRun extends CmdBase implements HubOptions {
499505
}
500506

501507
protected void printLaunchInfo(String repo, String head, String revision) {
508+
// Agent mode output is handled by AgentLogObserver
509+
if( SysEnv.isAgentMode() ) {
510+
return
511+
}
512+
502513
if( launcher.options.ansiLog ){
503514
log.debug "${head} [$runName] - revision: ${revision}"
504515

Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
/*
2+
* Copyright 2013-2026, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package nextflow.trace
18+
19+
import java.util.concurrent.ConcurrentHashMap
20+
21+
import groovy.transform.CompileStatic
22+
import groovy.util.logging.Slf4j
23+
import nextflow.Session
24+
import nextflow.processor.TaskRun
25+
import nextflow.trace.event.TaskEvent
26+
import nextflow.util.LoggerHelper
27+
28+
/**
29+
* AI agent-friendly log observer that outputs minimal, structured information
30+
* to standard error, optimized for AI context windows.
31+
*
32+
* Activated via environment variable {@code NXF_AGENT_MODE=1}.
33+
*
34+
* Output format:
35+
* - {@code [PIPELINE] name version | profile=X}
36+
* - {@code [WORKDIR] /path/to/work}
37+
* - {@code [PROCESS hash] name (tag)}
38+
* - {@code [WARN] warning message} (deduplicated)
39+
* - {@code [ERROR] name} with exit/cmd/stderr/workdir
40+
* - {@code [SUCCESS|FAILED] completed=N failed=N cached=N}
41+
*
42+
* @author Edmund Miller <edmund.miller@utdallas.edu>
43+
*/
44+
@Slf4j
45+
@CompileStatic
46+
class AgentLogObserver implements TraceObserverV2, LogObserver {
47+
48+
private Session session
49+
private WorkflowStatsObserver statsObserver
50+
private final Set<String> seenWarnings = ConcurrentHashMap.newKeySet()
51+
private volatile boolean started = false
52+
private volatile boolean completed = false
53+
54+
/**
55+
* Set the workflow stats observer for retrieving task statistics
56+
*/
57+
void setStatsObserver(WorkflowStatsObserver observer) {
58+
this.statsObserver = observer
59+
}
60+
61+
/**
62+
* Print a line to standard output (agent format)
63+
*/
64+
protected void println(String line) {
65+
System.err.println(line)
66+
}
67+
68+
// -- TraceObserverV2 lifecycle methods --
69+
70+
@Override
71+
void onFlowCreate(Session session) {
72+
this.session = session
73+
}
74+
75+
@Override
76+
void onFlowBegin() {
77+
if( started )
78+
return
79+
started = true
80+
81+
// Print pipeline info
82+
def manifest = session.manifest
83+
def pipelineName = manifest?.name ?: session.scriptName ?: 'unknown'
84+
def version = manifest?.version ?: ''
85+
def profile = session.profile ?: 'standard'
86+
87+
def info = "[PIPELINE] ${pipelineName}"
88+
if( version )
89+
info += " ${version}"
90+
info += " | profile=${profile}"
91+
println(info)
92+
93+
// Print work directory
94+
def workDir = session.workDir?.toUriString() ?: session.workDir?.toString()
95+
if( workDir )
96+
println("[WORKDIR] ${workDir}")
97+
}
98+
99+
@Override
100+
void onFlowComplete() {
101+
if( completed )
102+
return
103+
completed = true
104+
printSummary()
105+
}
106+
107+
@Override
108+
void onFlowError(TaskEvent event) {
109+
// Error is already reported by onTaskComplete for failed tasks
110+
}
111+
112+
@Override
113+
void onTaskSubmit(TaskEvent event) {
114+
def task = event.handler?.task
115+
if( task )
116+
println("[PROCESS ${task.hashLog}] ${task.name}")
117+
}
118+
119+
@Override
120+
void onTaskComplete(TaskEvent event) {
121+
def handler = event.handler
122+
def task = handler?.task
123+
if( task?.isFailed() ) {
124+
printTaskError(task)
125+
}
126+
}
127+
128+
@Override
129+
void onTaskCached(TaskEvent event) {
130+
// Not reported in agent mode
131+
}
132+
133+
/**
134+
* Append a warning message (deduplicated)
135+
*/
136+
void appendWarning(String message) {
137+
if( message == null )
138+
return
139+
// Normalize and deduplicate
140+
def normalized = message.trim().replaceAll(/\s+/, ' ')
141+
if( seenWarnings.add(normalized) ) {
142+
println("[WARN] ${normalized}")
143+
}
144+
}
145+
146+
/**
147+
* Append an error message
148+
*/
149+
void appendError(String message) {
150+
if( message )
151+
println("[ERROR] ${message}")
152+
}
153+
154+
/**
155+
* Append info message to stdout.
156+
* Hash-prefixed task log lines (e.g. {@code [ab/123456] Submitted process > ...})
157+
* are filtered out because {@link #onTaskSubmit} already emits a {@code [PROCESS]} line.
158+
*/
159+
void appendInfo(String message) {
160+
if( message && !LoggerHelper.isHashLogPrefix(message) )
161+
System.out.print(message)
162+
}
163+
164+
/**
165+
* Print task error with full diagnostic context
166+
*/
167+
protected void printTaskError(TaskRun task) {
168+
def name = task.getName()
169+
println("[ERROR] ${name}")
170+
171+
// Exit status
172+
def exitStatus = task.getExitStatus()
173+
if( exitStatus != null && exitStatus != Integer.MAX_VALUE ) {
174+
println("exit: ${exitStatus}")
175+
}
176+
177+
// Command/script (first line or truncated)
178+
def script = task.getScript()?.toString()?.trim()
179+
if( script ) {
180+
// Truncate long commands
181+
def cmd = script.length() > 200 ? script.substring(0, 200) + '...' : script
182+
cmd = cmd.replaceAll(/\n/, ' ').replaceAll(/\s+/, ' ')
183+
println("cmd: ${cmd}")
184+
}
185+
186+
// Stderr
187+
def stderr = task.getStderr()
188+
if( stderr ) {
189+
def lines = stderr.readLines()
190+
if( lines.size() > 10 ) {
191+
lines = lines[-10..-1]
192+
}
193+
println("stderr: ${lines.join(' | ')}")
194+
}
195+
196+
// Stdout (only if relevant)
197+
def stdout = task.getStdout()
198+
if( stdout && !stderr ) {
199+
def lines = stdout.readLines()
200+
if( lines.size() > 5 ) {
201+
lines = lines[-5..-1]
202+
}
203+
println("stdout: ${lines.join(' | ')}")
204+
}
205+
206+
// Work directory
207+
def workDir = task.getWorkDir()
208+
if( workDir ) {
209+
println("workdir: ${workDir.toUriString()}")
210+
}
211+
}
212+
213+
/**
214+
* Print final summary line
215+
*/
216+
protected void printSummary() {
217+
def stats = statsObserver?.getStats()
218+
def succeeded = stats?.succeededCount ?: 0
219+
def failed = stats?.failedCount ?: 0
220+
def cached = stats?.cachedCount ?: 0
221+
def completed = succeeded + failed
222+
223+
def status = failed > 0 ? 'FAILED' : 'SUCCESS'
224+
println("\n[${status}] completed=${completed} failed=${failed} cached=${cached}")
225+
}
226+
227+
/**
228+
* Force termination - called on abort
229+
*/
230+
void forceTermination() {
231+
if( !completed ) {
232+
completed = true
233+
printSummary()
234+
}
235+
}
236+
}

modules/nextflow/src/main/groovy/nextflow/trace/AnsiLogObserver.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import org.fusesource.jansi.AnsiConsole
3939
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
4040
*/
4141
@CompileStatic
42-
class AnsiLogObserver implements TraceObserverV2 {
42+
class AnsiLogObserver implements TraceObserverV2, LogObserver {
4343

4444
static final private String NEWLINE = '\n'
4545

modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class DefaultObserverFactory implements TraceObserverFactoryV2 {
3939

4040
final result = new ArrayList<TraceObserverV2>(5)
4141
createAnsiLogObserver(result)
42+
createAgentLogObserver(result)
4243
createGraphObserver(result)
4344
createReportObserver(result)
4445
createTimelineObserver(result)
@@ -48,8 +49,18 @@ class DefaultObserverFactory implements TraceObserverFactoryV2 {
4849

4950
protected void createAnsiLogObserver(Collection<TraceObserverV2> result) {
5051
if( session.ansiLog ) {
51-
session.ansiLogObserver = new AnsiLogObserver()
52-
result << session.ansiLogObserver
52+
def observer = new AnsiLogObserver()
53+
session.logObserver = observer
54+
result << observer
55+
}
56+
}
57+
58+
protected void createAgentLogObserver(Collection<TraceObserverV2> result) {
59+
if( session.agentLog ) {
60+
def observer = new AgentLogObserver()
61+
observer.setStatsObserver(session.statsObserver)
62+
session.logObserver = observer
63+
result << observer
5364
}
5465
}
5566

0 commit comments

Comments
 (0)