Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/reference/env-vars.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ The following environment variables control the configuration of the Nextflow ru

## Nextflow settings

`NXF_AGENT_MODE`
: :::{versionadded} 26.04.0
:::
: When `true`, enables agent output mode. In this mode, Nextflow replaces the interactive ANSI log with minimal, structured output optimized for AI agents and non-interactive environments. The output uses tagged lines such as `[PIPELINE]`, `[PROCESS]`, `[WARN]`, `[ERROR]`, and `[SUCCESS]`/`[FAILED]` written to standard error (default: `false`).

`NXF_ANSI_LOG`
: Enables/disables ANSI console output (default `true` when ANSI terminal is detected).

Expand Down
14 changes: 8 additions & 6 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ import nextflow.script.ScriptRunner
import nextflow.script.WorkflowMetadata
import nextflow.script.dsl.ProcessConfigBuilder
import nextflow.spack.SpackConfig
import nextflow.trace.AnsiLogObserver
import nextflow.trace.LogObserver
import nextflow.trace.TraceObserver
import nextflow.trace.TraceObserverFactory
import nextflow.trace.TraceObserverFactoryV2
Expand Down Expand Up @@ -311,9 +311,11 @@ class Session implements ISession {

boolean ansiLog

boolean agentLog

boolean disableJobsCancellation

AnsiLogObserver ansiLogObserver
LogObserver logObserver

FilePorter getFilePorter() { filePorter }

Expand Down Expand Up @@ -832,7 +834,7 @@ class Session implements ISession {
shutdown0()
notifyError(null)
// force termination
ansiLogObserver?.forceTermination()
logObserver?.forceTermination()
executorFactory?.signalExecutors()
processesBarrier.forceTermination()
monitorsBarrier.forceTermination()
Expand Down Expand Up @@ -1278,16 +1280,16 @@ class Session implements ISession {
}

void printConsole(String str, boolean newLine=false) {
if( ansiLogObserver )
ansiLogObserver.appendInfo(str)
if( logObserver )
logObserver.appendInfo(str)
else if( newLine )
System.out.println(str)
else
System.out.print(str)
}

void printConsole(Path file) {
ansiLogObserver ? ansiLogObserver.appendInfo(file.text) : Files.copy(file, System.out)
logObserver ? logObserver.appendInfo(file.text) : Files.copy(file, System.out)
}

private volatile ThreadPoolManager finalizePoolManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ class CliOptions {
if( noColor ) {
return ansiLog = false
}

// Disable ANSI log in agent mode for plain, parseable output
if( SysEnv.isAgentMode() ) {
return ansiLog = false
}

return Ansi.isEnabled()
}

Expand Down
13 changes: 12 additions & 1 deletion modules/nextflow/src/main/groovy/nextflow/cli/CmdRun.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,8 @@ class CmdRun extends CmdBase implements HubOptions {
runner.setPreview(this.preview, previewAction)
runner.session.profile = profile
runner.session.commandLine = launcher.cliString
runner.session.ansiLog = launcher.options.ansiLog
runner.session.ansiLog = launcher.options.ansiLog && !SysEnv.isAgentMode()
runner.session.agentLog = SysEnv.isAgentMode()
runner.session.debug = launcher.options.remoteDebug
runner.session.disableJobsCancellation = getDisableJobsCancellation()

Expand Down Expand Up @@ -435,6 +436,11 @@ class CmdRun extends CmdBase implements HubOptions {
}

protected void printBanner() {
// Suppress banner in agent mode for minimal output
if( SysEnv.isAgentMode() ) {
return
}

if( launcher.options.ansiLog ){
// Plain header for verbose log
log.debug "N E X T F L O W ~ version ${BuildInfo.version}"
Expand Down Expand Up @@ -499,6 +505,11 @@ class CmdRun extends CmdBase implements HubOptions {
}

protected void printLaunchInfo(String repo, String head, String revision) {
// Agent mode output is handled by AgentLogObserver
if( SysEnv.isAgentMode() ) {
return
}

if( launcher.options.ansiLog ){
log.debug "${head} [$runName] - revision: ${revision}"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.trace

import java.util.concurrent.ConcurrentHashMap

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.Session
import nextflow.processor.TaskRun
import nextflow.trace.event.TaskEvent
import nextflow.util.LoggerHelper

/**
* AI agent-friendly log observer that outputs minimal, structured information
* to standard error, optimized for AI context windows.
*
* Activated via environment variable {@code NXF_AGENT_MODE=1}.
*
* Output format:
* - {@code [PIPELINE] name version | profile=X}
* - {@code [WORKDIR] /path/to/work}
* - {@code [PROCESS hash] name (tag)}
* - {@code [WARN] warning message} (deduplicated)
* - {@code [ERROR] name} with exit/cmd/stderr/workdir
* - {@code [SUCCESS|FAILED] completed=N failed=N cached=N}
*
* @author Edmund Miller <edmund.miller@utdallas.edu>
*/
@Slf4j
@CompileStatic
class AgentLogObserver implements TraceObserverV2, LogObserver {

private Session session
private WorkflowStatsObserver statsObserver
private final Set<String> seenWarnings = ConcurrentHashMap.newKeySet()
private volatile boolean started = false
private volatile boolean completed = false

/**
* Set the workflow stats observer for retrieving task statistics
*/
void setStatsObserver(WorkflowStatsObserver observer) {
this.statsObserver = observer
}

/**
* Print a line to standard output (agent format)
*/
protected void println(String line) {
System.err.println(line)
}

// -- TraceObserverV2 lifecycle methods --

@Override
void onFlowCreate(Session session) {
this.session = session
}

@Override
void onFlowBegin() {
if( started )
return
started = true

// Print pipeline info
def manifest = session.manifest
def pipelineName = manifest?.name ?: session.scriptName ?: 'unknown'
def version = manifest?.version ?: ''
def profile = session.profile ?: 'standard'

def info = "[PIPELINE] ${pipelineName}"
if( version )
info += " ${version}"
info += " | profile=${profile}"
println(info)

// Print work directory
def workDir = session.workDir?.toUriString() ?: session.workDir?.toString()
if( workDir )
println("[WORKDIR] ${workDir}")
}

@Override
void onFlowComplete() {
if( completed )
return
completed = true
printSummary()
}

@Override
void onFlowError(TaskEvent event) {
// Error is already reported by onTaskComplete for failed tasks
}

@Override
void onTaskSubmit(TaskEvent event) {
def task = event.handler?.task
if( task )
println("[PROCESS ${task.hashLog}] ${task.name}")
}

@Override
void onTaskComplete(TaskEvent event) {
def handler = event.handler
def task = handler?.task
if( task?.isFailed() ) {
printTaskError(task)
}
}

@Override
void onTaskCached(TaskEvent event) {
// Not reported in agent mode
}

/**
* Append a warning message (deduplicated)
*/
void appendWarning(String message) {
if( message == null )
return
// Normalize and deduplicate
def normalized = message.trim().replaceAll(/\s+/, ' ')
if( seenWarnings.add(normalized) ) {
println("[WARN] ${normalized}")
}
}

/**
* Append an error message
*/
void appendError(String message) {
if( message )
println("[ERROR] ${message}")
}

/**
* Append info message to stdout.
* Hash-prefixed task log lines (e.g. {@code [ab/123456] Submitted process > ...})
* are filtered out because {@link #onTaskSubmit} already emits a {@code [PROCESS]} line.
*/
void appendInfo(String message) {
if( message && !LoggerHelper.isHashLogPrefix(message) )
System.out.print(message)
}

/**
* Print task error with full diagnostic context
*/
protected void printTaskError(TaskRun task) {
def name = task.getName()
println("[ERROR] ${name}")

// Exit status
def exitStatus = task.getExitStatus()
if( exitStatus != null && exitStatus != Integer.MAX_VALUE ) {
println("exit: ${exitStatus}")
}

// Command/script (first line or truncated)
def script = task.getScript()?.toString()?.trim()
if( script ) {
// Truncate long commands
def cmd = script.length() > 200 ? script.substring(0, 200) + '...' : script
cmd = cmd.replaceAll(/\n/, ' ').replaceAll(/\s+/, ' ')
println("cmd: ${cmd}")
}

// Stderr
def stderr = task.getStderr()
if( stderr ) {
def lines = stderr.readLines()
if( lines.size() > 10 ) {
lines = lines[-10..-1]
}
println("stderr: ${lines.join(' | ')}")
}

// Stdout (only if relevant)
def stdout = task.getStdout()
if( stdout && !stderr ) {
def lines = stdout.readLines()
if( lines.size() > 5 ) {
lines = lines[-5..-1]
}
println("stdout: ${lines.join(' | ')}")
}

// Work directory
def workDir = task.getWorkDir()
if( workDir ) {
println("workdir: ${workDir.toUriString()}")
}
}

/**
* Print final summary line
*/
protected void printSummary() {
def stats = statsObserver?.getStats()
def succeeded = stats?.succeededCount ?: 0
def failed = stats?.failedCount ?: 0
def cached = stats?.cachedCount ?: 0
def completed = succeeded + failed

def status = failed > 0 ? 'FAILED' : 'SUCCESS'
println("\n[${status}] completed=${completed} failed=${failed} cached=${cached}")
}

/**
* Force termination - called on abort
*/
void forceTermination() {
if( !completed ) {
completed = true
printSummary()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.fusesource.jansi.AnsiConsole
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
@CompileStatic
class AnsiLogObserver implements TraceObserverV2 {
class AnsiLogObserver implements TraceObserverV2, LogObserver {

static final private String NEWLINE = '\n'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class DefaultObserverFactory implements TraceObserverFactoryV2 {

final result = new ArrayList<TraceObserverV2>(5)
createAnsiLogObserver(result)
createAgentLogObserver(result)
createGraphObserver(result)
createReportObserver(result)
createTimelineObserver(result)
Expand All @@ -48,8 +49,18 @@ class DefaultObserverFactory implements TraceObserverFactoryV2 {

protected void createAnsiLogObserver(Collection<TraceObserverV2> result) {
if( session.ansiLog ) {
session.ansiLogObserver = new AnsiLogObserver()
result << session.ansiLogObserver
def observer = new AnsiLogObserver()
session.logObserver = observer
result << observer
}
}

protected void createAgentLogObserver(Collection<TraceObserverV2> result) {
if( session.agentLog ) {
def observer = new AgentLogObserver()
observer.setStatsObserver(session.statsObserver)
session.logObserver = observer
result << observer
}
}

Expand Down
Loading
Loading