Skip to content

feat: Azure Batch eagerly terminates jobs after all tasks have been submitted#6159

Open
adamrtalbot wants to merge 32 commits intomasterfrom
5839_azure_batch_jobs_terminate_upon_completion
Open

feat: Azure Batch eagerly terminates jobs after all tasks have been submitted#6159
adamrtalbot wants to merge 32 commits intomasterfrom
5839_azure_batch_jobs_terminate_upon_completion

Conversation

@adamrtalbot
Copy link
Collaborator

@adamrtalbot adamrtalbot commented Jun 4, 2025

Summary

Fixes Azure Batch "job leak" issue where jobs remain in Active state even after task completion, causing quota exhaustion and preventing multiple pipelines from running simultaneously.

Problem: Jobs consume quota slots unnecessarily, blocking other workflows
Solution: Leverage Azure Batch's native auto-termination to release quota immediately when tasks complete

How Azure Batch Eager Job Termination Works

Problem Addressed

Azure Batch has a limitation where jobs remain in an "Active" state even after all their tasks complete. This causes:

  • Quota exhaustion: Active jobs count against Azure Batch service quotas
  • Pipeline blocking: Multiple Nextflow pipelines can't run simultaneously due to quota limits
  • Resource waste: Jobs consume quota slots unnecessarily

Solution Implementation

Job Auto-Termination Configuration

// In AzBatchOpts.groovy
@ConfigOption
@Description("When the workflow completes, set all jobs to terminate on task completion (default: true)")
final Boolean terminateJobsOnCompletion

Default behavior: terminateJobsOnCompletion = true (enabled by default)

Job Termination Mechanism

The service implements a two-phase termination approach:

Phase 1: Set Jobs to Auto-Terminate
protected void terminateJobs() {
    for( String jobId : allJobIds.values() ) {
        final job = apply(() -> client.getJob(jobId))
        final poolInfo = job.poolInfo
        
        final jobParameter = new BatchJobUpdateContent()
            .setOnAllTasksComplete(OnAllBatchTasksComplete.TERMINATE_JOB)  // Key setting
            .setPoolInfo(poolInfo)
        
        apply(() -> client.updateJob(jobId, jobParameter))
    }
}
Phase 2: Cleanup on Workflow Completion
@Override
void close() {
    // Terminate all jobs to prevent them from occupying quota
    if( config.batch().terminateJobsOnCompletion ) {
        terminateJobs()
    }
    
    // Delete all jobs (if configured)
    if( config.batch().deleteJobsOnCompletion ) {
        cleanupJobs()
    }
}

Azure Batch Native Feature Integration

How Auto-Termination Works

  • Uses Azure Batch's native OnAllBatchTasksComplete.TERMINATE_JOB setting
  • This tells Azure Batch: "When all tasks in this job finish, automatically terminate the job"
  • Jobs transition from "Active" → "Terminating" → "Completed"

Eager Termination Flow

  1. Job Creation: Jobs created normally with pool assignments
  2. Task Submission: Tasks submitted to jobs as usual
  3. Workflow Completion: When Nextflow workflow completes
  4. Batch Update: Nextflow updates all existing jobs with OnAllTasksComplete = TERMINATE_JOB
  5. Auto-Termination: Azure Batch automatically terminates jobs as their tasks complete
  6. Quota Release: Terminated jobs no longer consume quota

Key Benefits

Resource Management

  • Quota preservation: Jobs don't consume quota after completion
  • Multiple pipelines: Allows running multiple Nextflow workflows simultaneously
  • Clean resource usage: Prevents resource leaks in Azure Batch

Operational Improvements

  • Zero user impact: Completely transparent to workflow users
  • Backward compatible: Can be disabled if needed
  • Automatic cleanup: No manual intervention required

Configuration Options

Users can control the behavior:

azure {
    batch {
        terminateJobsOnCompletion = true   // Enable eager termination (default)
        deleteJobsOnCompletion = false    // Optionally delete jobs entirely
        deleteTasksOnCompletion = true    // Clean up individual tasks
    }
}

Technical Implementation Details

Job Lifecycle Management

  • Job Reuse: Same job ID reused for same Process+PoolId combination
  • Pool Independence: Each pool can have its own jobs
  • Batch Updates: Uses Azure Batch updateJob API for termination setting

Error Handling

catch (HttpResponseException e) {
    if (e.response.statusCode == 409) {
        log.debug "Azure Batch job ${jobId} already terminated, skipping termination"
    } else {
        log.warn "Unable to terminate Azure Batch job ${jobId} - Status: ${e.response.statusCode}"
    }
}

Impact

This implementation provides an elegant solution to Azure Batch's job quota problem by leveraging Azure's native auto-termination feature. It ensures that jobs automatically terminate when their tasks complete, preventing quota exhaustion while maintaining full compatibility with existing workflows.

Related


Note

Implements eager Azure Batch job auto-termination after task submission with automatic job recreation on 409 conflicts; updates tests, logging, and config docs.

  • Azure Batch service (plugins/nf-azure/.../AzBatchService.groovy):
    • Eager auto-termination:
      • New setJobTermination(jobId) and setAutoTerminateIfEnabled(jobId, taskId); runTask now calls them after submission.
      • Removed end-of-workflow terminateJobs() usage from close().
    • Resilient task submission:
      • New submitTaskToJob(...) handles 409 on auto-terminated jobs by recreateJobForTask(...) and resubmits.
      • recreateJobForTask(...) synchronizes and updates allJobIds mapping.
    • Other:
      • Lower destFile(...) log level to trace; fix "creddentials" typo; minor comments/docs; preserve job constraints on create.
  • Tests:
    • Added coverage for eager auto-termination and job recreation concurrency; removed close-time termination expectation; misc updates.
  • Docs (docs/reference/config.md):
    • Clarify azure.batch.terminateJobsOnCompletion: set jobs to terminate on task completion (default true).

Written by Cursor Bugbot for commit 899f0a2. This will update automatically on new commits. Configure here.

@netlify
Copy link

netlify bot commented Jun 4, 2025

Deploy Preview for nextflow-docs-staging ready!

Name Link
🔨 Latest commit 02b2d34
🔍 Latest deploy log https://app.netlify.com/projects/nextflow-docs-staging/deploys/69a0cdfdf9e6d100070e3839
😎 Deploy Preview https://deploy-preview-6159--nextflow-docs-staging.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

@adamrtalbot

This comment was marked as outdated.

@adamrtalbot
Copy link
Collaborator Author

Integration tests failing, looks unrelated.

@pditommaso

This comment was marked as outdated.

@adamrtalbot

This comment was marked as outdated.

@pditommaso pditommaso force-pushed the master branch 2 times, most recently from b4b321e to 069653d Compare June 4, 2025 18:54
@bentsherman bentsherman self-requested a review June 16, 2025 12:34
@adamrtalbot
Copy link
Collaborator Author

Related issue on Slack: https://nfcore.slack.com/archives/C02T98A23U7/p1753954588096009

Hi !
Unsure if I should post here or in #nextflow-plugins, as it is concerns help about a plugin.
Concerned plugin is nf-azure , where, in the current state jobs are only deleted after completion of the workflow with AzBatchService.cleanupJobs()
The problem is that a job with all completed tasks still count towards the job quota ; thus allowing only 3-4 pipelines to run instead of 50+ (with 2-3 tasks each)
This is why I want to periodically run a mid-workflow cleanup in the AzBatchService such as :

protected void cleanupCompletedJobsMidRun() {
        for (String jobId : allJobIds.values()) {
            try {
                def tasks = client.listTasks(jobId)
                if (tasks.every { it.state.toString() in ['COMPLETED'] }) {
                    log.trace "Deleting Azure job ${jobId} mid-run"
                    apply(() -> client.deleteJob(jobId))
                }
            }
            catch (Exception e) {
                log.debug "Skipping mid-run cleanup for ${jobId} - ${e.message ?: e}"
            }
        }
    }

My problem is that it implies a modification of the nf-azure plugin, which being a core plugin, must be handled differently that a custom plugin derived from nf-hello
Apart from the natural plugin packaging, my main help wanted is on the overriding of the proper nf-azure plugin in PluginsFacade , so I have my own executor in :

protected List<PluginSpec> defaultPluginsConf(Map config) {
        // retrieve the list from the env var
        final commaSepList = env.get('NXF_PLUGINS_DEFAULT')
        if( commaSepList && commaSepList !in ['true','false'] ) {
            // if the plugin id in the list does *not* contain the @version suffix, it picks the version
            // specified in the defaults list. Otherwise parse the provider id@version string to the corresponding spec
            return commaSepList
                    .tokenize(',')
                    .collect( it-> defaultPlugins.hasPlugin(it) ? defaultPlugins.getPlugin(it) : PluginSpec.parse(it) )
        }

        final plugins = new ArrayList<PluginSpec>()
        final workDir = config.workDir as String
        final bucketDir = config.bucketDir as String
        final executor = Bolts.navigate(config, 'process.executor')

        if( executor == 'awsbatch' || workDir?.startsWith('s3://') || bucketDir?.startsWith('s3://') || env.containsKey('NXF_ENABLE_AWS_SES') )
            plugins << defaultPlugins.getPlugin('nf-amazon')

        if( executor == 'google-lifesciences' || executor == 'google-batch' || workDir?.startsWith('gs://') || bucketDir?.startsWith('gs://')  )
            plugins << defaultPlugins.getPlugin('nf-google')

        if( executor == 'azurebatch' || workDir?.startsWith('az://') || bucketDir?.startsWith('az://') )
            plugins << defaultPlugins.getPlugin('nf-azure')

...
            
        return plugins
    }

Is this problem easily solvable, or should we focus on balancing the load over multiple batch accounts instead of relying on only one ?

@ghislaindemael
Copy link

ghislaindemael commented Aug 6, 2025

Hi !
Posting my comment here as indicated in Slack.
Our 'use case' is relatively simple, we just run big pipelines (aka with many jobs), and with completed jobs being deleted only when all tasks are completed (or cancelled), we reach our quota relatively quickly, allowing us to have only a few pipelines running with 2-3 active jobs instead of dozens.

Should we not care about resuming a workflow, since task outputs are stored outside of the workdir, a simple flag to delete the job once the last task has finished it's lifecycle would be all we need.

The downside is that to "resume" a workflow, me must find a way to tell Nextflow that task outputs already exist and are stored at X location, as well as supply them. Which leads to the fact that the easiest solution would be to relaunch the whole workflow should we have an error arise somewhere during processing.

@adamrtalbot

This comment was marked as outdated.

@adamrtalbot
Copy link
Collaborator Author

since task outputs are stored outside of the workdir

@ghislaindemael I'm not sure I understand this; Task outputs have to be in the working directory. Even if they're published to a new location, they are copied out after the task is completed by Nextflow itself.

@ghislaindemael
Copy link

@adamrtalbot

My error, indeed I meant as we publish the results outside of the workdir (e.g. in Blob Storage), we can query them from here and thus delete them from the Batch VMs to remove the load and free up jobs for the quota.

@adamrtalbot
Copy link
Collaborator Author

we can query them from here and thus delete them from the Batch VMs to remove the load and free up jobs for the quota.

To clarify the flow here:

  1. A job is created on Azure Batch by Nextflow and assigned to a Node Pool
  2. One or more tasks are added to the Job
  3. Each task is assigned to a node
  4. The task starts
    • It downloads the input files from Azure Blob storage to the local node
    • It creates the output files on the local node
    • It uploads the output files back to Azure Blob storage at the working directory
  5. The task completes
    • The output files on the local node are deleted to clear space for future tasks
    • (Optional): The output files are copied from the working directory to the publishing directory by Nextflow
  6. When the pipeline completes, Nextflow will terminate the job created in 1, preventing any new tasks being added to the job and clearing quota

This PR strictly refers to 1 and 9 and does not interact with any files. If you are having issues with file storage, running out of space, etc., this would be an different issue.

@luanjot
Copy link

luanjot commented Aug 12, 2025

@adamrtalbot we are also looking for a solution for this. We have executions that have hundreds of jobs sometimes, so, even in proper executions without errors, we are limited to 2 or 3 parallel executions per batch account. In our case, at any given moment, we would have something like:

Run1:
200 jobs marked as finished, e.g. [100%] X of X ✔
2 jobs ongoing with pending tasks
20 jobs not started

Run2:
57 jobs marked as finished, e.g. [100%] X of X ✔
4 jobs ongoing with pending tasks
200 jobs not started

So, from Azure's perspective, we have 200+2+57+4=263 jobs ongoing. As the runs progress, we have more and more jobs open and we reach the limit very quickly.

We are seeing if we can modify / extend the nf-azure plugin to handle this by adding some sort of cron job that deletes the jobs marked as finished (with a ✔), but you seem to say that this might cause issues when resuming the tasks? Why is that? It seems to me that the job names are different when it resumes the execution, no?

@adamrtalbot
Copy link
Collaborator Author

but you seem to say that this might cause issues when resuming the tasks?

Not resume, but retry with an errorStrategy: https://www.nextflow.io/docs/latest/reference/process.html#errorstrategy

Here is the flow that may cause issues:

  1. A job is created for a process
  2. 5 tasks are submitted to the job
  3. Nextflow decides no more tasks will be submitted and closes the Job (sets to terminateOnCompletion)
  4. 4 tasks successfully complete, 1 task fails
  5. All tasks have completed, Azure Batch terminates the job
  6. The failed task tries to retry
  7. It gets submitted to a terminated job!

@luanjot
Copy link

luanjot commented Aug 12, 2025

So using a cron job that validates when all tasks have completed successfully would work, right? In this case, the tasks have already completed. Or can we add it to the bit that runs and "decides" that all the tasks have been completed?

@adamrtalbot
Copy link
Collaborator Author

So using a cron job that validates when all tasks have completed successfully would work, right? In this case, the tasks have already completed. Or can we add it to the bit that runs and "decides" that all the tasks have been completed?

You might have the same issue, in that you terminate a job before you can resubmit a task.

@adamrtalbot
Copy link
Collaborator Author

Note none of this will help you if you just have too many active jobs. A job needs to be active to run a task, so if you just have a lot of work to do this wont help.

Really, the issue is with the terrible design by Azure but given they just fired most of their genomics staff I doubt they will bother to help 🤷 .

@luanjot
Copy link

luanjot commented Aug 12, 2025

I have too many active jobs because nextflow does not close them, not because they are actually active.

Any job that has been marked by nextflow with a ✔ is, in my opinion, finished and will not be re-used for anything never again, however, nextflow does not close it until the full execution of the run is finished. This is the behaviour that I think is incorrect. If the run takes 2 days to run, the first task that finished 47 hours ago is still marked as "active" in batch because Nextflow does not close it even though it will never be used again. I think it is Nextflow that is not using the Batch account properly.

@adamrtalbot
Copy link
Collaborator Author

Any job that has been marked by nextflow with a ✔ is, in my opinion, finished and will not be re-used for anything never again, however, nextflow does not close it until the full execution of the run is finished. This is the behaviour that I think is incorrect. If the run takes 2 days to run, the first task that finished 47 hours ago is still marked as "active" in batch because Nextflow does not close it even though it will never be used again. I think it is Nextflow that is not using the Batch account properly.

Right - so with especially long running pipelines you have many jobs in active state which do not do anything.

Unfortunately, there isn't a way of Nextflow knowing the future and determining if another task will be submitted to the job which makes it tricky to know when to close a job.

Here's an alternative implementation (which has the added benefit of making @pditommaso happy because it wont use the trace observer!):

  1. Create Job
  2. Submit task
  3. IMMEDIATELY set Job to terminate onAllBatchTasksComplete
  4. Add a try/catch so if you try to submit a Task to a Job in completed state, a new Job will be created, allowing more jobs to start.

This should eagerly terminate jobs while still allowing users to submit all tasks as normal.

@luanjot
Copy link

luanjot commented Aug 12, 2025

Unfortunately, there isn't a way of Nextflow knowing the future and determining if another task will be submitted to the job which makes it tricky to know when to close a job.

Then how does it "decide" to add the ✔?

@pditommaso
Copy link
Member

The check is shown when no more tasks for that process need to be executed ie. the process execution is complete

@adamrtalbot
Copy link
Collaborator Author

The check is shown when no more tasks for that process need to be executed ie. the process execution is complete

Excellent, can we use that logic to terminate the Azure Job?

@pditommaso
Copy link
Member

It could be done with a TraceObserver(V2). If i'm not wrong you already made a pr for that

@adamrtalbot adamrtalbot requested a review from a team as a code owner August 12, 2025 12:59
@adamrtalbot
Copy link
Collaborator Author

Hi @ghislaindemael

Sorry for the update, but we hit an edge case where if Nextflow submits a task to a job, but - due to compute limitations as all nodes are used - while this job stays Active, Azure still terminates the job when onAllBatchTasksComplete gets fired. This causes the pipeline to halt without any error thrown.

I'm not sure I follow this.

  1. Nextflow creates jobs and adds tasks
  2. The Azure nodes are occupied to the Azure tasks wait in the queue
  3. The waiting tasks complete?
  4. Because of the lag, there is a gap where the job is terminated
  5. Eventually, new tasks are required to be submitted and Nextflow tries to submit them to the job
  6. At this point, Nextflow should identify the Azure Batch job is terminated and create a new one <- Would I be correct in saying this silently fails?
  7. At pipeline termination, Nextflow terminates the remaining jobs, clearing out the quota.

So the overall objective is achieved, but Nextflow isn't handling the Job re-creation correctly (step 6). Is my understanding correct?

…etion

Signed-off-by: adamrtalbot <12817534+adamrtalbot@users.noreply.github.com>
@adamrtalbot
Copy link
Collaborator Author

Also note:

Our solution is, like you did at the start, to listen with a TraceObserver to onProcessTerminate, fired when all of the tasks are submitted and completed.

This will not help users such as @luanjot who have too many active jobs during a pipeline, so the solution should involve both.

Prevent tasks from being orphaned when jobs auto-terminate while tasks
are waiting for compute resources. This fixes an issue where pipelines
could silently die without error when resource-constrained pools caused
tasks to remain in Active state.

Problem:
- Task submitted to job, enters Active state (waiting for resources)
- Other tasks complete, triggering onAllTasksComplete = TERMINATE_JOB
- Azure terminates job while task still in Active state
- Per Azure docs: "remaining active Tasks will not be scheduled"
- Task never runs, pipeline silently fails

Solution:
Only set auto-terminate when job has running tasks. If all tasks are
in Active state (waiting for resources), defer auto-terminate to avoid
orphaning them. This ensures tasks get a chance to start running before
the job can be terminated.

Changes:
- Modified setAutoTerminateIfEnabled() to check task states
- Only sets onAllTasksComplete=TERMINATE_JOB if tasks are running
- Defers termination if tasks are still waiting for compute resources
- Updated tests to verify new behavior

Related to #5839

Signed-off-by: adamrtalbot <12817534+adamrtalbot@users.noreply.github.com>
Ensure all Azure Batch jobs are set to auto-terminate when the workflow
completes, even if eager termination was deferred earlier. This provides
a safety net for jobs where eager termination was skipped due to tasks
waiting in Active state for compute resources.

Without this fallback, jobs where eager termination was deferred would
remain active indefinitely, consuming quota unnecessarily. This change
guarantees that jobs will eventually terminate and free up quota.

Changes:
- Added terminateAllJobs() method to set onAllTasksComplete for all jobs
- Called from close() when terminateJobsOnCompletion is enabled
- Catches any jobs where eager termination was deferred
- Ensures quota is freed even if Nextflow dies mid-execution

Related to #5839

Signed-off-by: adamrtalbot <12817534+adamrtalbot@users.noreply.github.com>
Add TraceObserver that sets jobs to auto-terminate when processes complete,
providing an additional layer of job cleanup during workflow execution.

This complements the existing termination mechanisms:
1. Smart eager termination (per-task, when safe)
2. Process-level termination (this observer, when process completes)
3. Fallback termination (on workflow close)

The TraceObserver fires when a process completes (all tasks submitted and
finished), ensuring jobs are terminated promptly without waiting for the
entire workflow to finish. This is particularly useful for long-running
workflows with many processes, as it frees up quota progressively.

Implementation:
- AzBatchProcessObserver: Implements TraceObserver.onProcessTerminate
- AzBatchProcessObserverFactory: Creates observer instances
- Service registration via META-INF/services for automatic discovery

Related to #5839

Signed-off-by: adamrtalbot <12817534+adamrtalbot@users.noreply.github.com>
The previous implementation checked if tasks were in "Running" state
immediately after submission, but tasks need time to transition from
Active → Running. This caused the eager termination to always defer,
making it ineffective.

Changed the logic to check if the job has any tasks (count > 0) instead
of checking task state. Since setAutoTerminateIfEnabled() is called AFTER
task submission, the task will already be in the job, making this check
reliable and timing-independent.

This ensures eager per-task termination actually works as intended, with
the TraceObserver and close() fallbacks providing additional safety.

Related to #5839

Signed-off-by: adamrtalbot <12817534+adamrtalbot@users.noreply.github.com>
@ghislaindemael
Copy link

Hi @ghislaindemael

Sorry for the update, but we hit an edge case where if Nextflow submits a task to a job, but - due to compute limitations as all nodes are used - while this job stays Active, Azure still terminates the job when onAllBatchTasksComplete gets fired. This causes the pipeline to halt without any error thrown.

I'm not sure I follow this.

  1. Nextflow creates jobs and adds tasks
  2. The Azure nodes are occupied to the Azure tasks wait in the queue
  3. The waiting tasks complete?
  4. Because of the lag, there is a gap where the job is terminated
  5. Eventually, new tasks are required to be submitted and Nextflow tries to submit them to the job
  6. At this point, Nextflow should identify the Azure Batch job is terminated and create a new one <- Would I be correct in saying this silently fails?
  7. At pipeline termination, Nextflow terminates the remaining jobs, clearing out the quota.

So the overall objective is achieved, but Nextflow isn't handling the Job re-creation correctly (step 6). Is my understanding correct?

In our case :

  1. Nextflow submits a Job and jobs inside (for example 9 out of the 10 it should) and marks it to be terminated on tasks complete
  2. The 9 tasks complete
  3. Nextflow submits a 10th task but since all nodes are taken, it stays "Active"
  4. The onAllBatchTasksComplete cleaner, that runs every X minutes, looks at the job, ignores the 10th task for an unknown reason, and seeing 9 completed tasks, terminates the job.
  5. The TaskStarter (or process that starts tasks) sees that the job is completed so will ignore the fact that there's an active task inside
  6. Nextflow stalls as it waits for the 10th task to be ran, which'll never happen.

@ghislaindemael
Copy link

Update @adamrtalbot

In my humble opinion, I would disable the auto-termination and just go with the termination with onProcessComplete.

This as it triggers only we are sure the job isn't needed anymore and we can't prevent we might hit a race condition where the job is picked up by onAllBatchTasksComplete cleaner but Nextflow seeing the job as Active still inserts a new task inside before job is set to "Terminating" and the cleaner doesn't recheck.

This leads to an 'orphaned' task, that will not be deleted by Azure until job is deleted. As according to docs the maxWallClockTime only applied when tasks starts to run. Which won't happen in this case.

@adamrtalbot
Copy link
Collaborator Author

Nextflow submits a Job and jobs inside (for example 9 out of the 10 it should) and marks it to be terminated on tasks complete
The 9 tasks complete
Nextflow submits a 10th task but since all nodes are taken, it stays "Active"
The onAllBatchTasksComplete cleaner, that runs every X minutes, looks at the job, ignores the 10th task for an unknown reason, and seeing 9 completed tasks, terminates the job.
The TaskStarter (or process that starts tasks) sees that the job is completed so will ignore the fact that there's an active task inside
Nextflow stalls as it waits for the 10th task to be ran, which'll never happen.

OK interesting, I didn't realise Azure Batch considers an 'active' task completed when it's actually pending. What a terrible system 🤦 In which case, we can only switch it when a task is in running status. This should be straightforward to set up a reproducible example.

In my humble opinion, I would disable the auto-termination and just go with the termination with onProcessComplete.

This as it triggers only we are sure the job isn't needed anymore and we can't prevent we might hit a race condition where the job is picked up by onAllBatchTasksComplete cleaner but Nextflow seeing the job as Active still inserts a new task inside before job is set to "Terminating" and the cleaner doesn't recheck.

This leads to an 'orphaned' task, that will not be deleted by Azure until job is deleted. As according to docs the maxWallClockTime only applied when tasks starts to run. Which won't happen in this case.

@pditommaso, how do you feel about using the TraceObserverV2 this way now?

@ghislaindemael
Copy link

I may allow myself to think the Azure engineers thought about this case, but you can't guarantee some race conditions not to happen on systems of these sizes.
I want to say it is what we observe, which might not be fully what they intended. But is having a full maxExistenceClockTime field really useful for some extreme edge cases ?

…jobs_terminate_upon_completion

Signed-off-by: adamrtalbot <12817534+adamrtalbot@users.noreply.github.com>
…ookup, and observer tests

- Use ConcurrentHashMap for allJobIds to ensure thread safety
- Extract getJobIdsForProcessor() to encapsulate job lookup logic
- Simplify setAutoTerminateIfEnabled() by removing unused taskId param
- Use iterator().hasNext() instead of size() for efficient task check
- Improve config docs for terminateJobsOnCompletion
- Add unit tests for AzBatchProcessObserver

Generated by Claude Code

Signed-off-by: adamrtalbot <12817534+adamrtalbot@users.noreply.github.com>
Test pipeline with two scenarios to verify onAllTasksComplete behavior:
- fast_tasks: 50 instant tasks to test submission/completion race
- slow_tasks: 20 long-running tasks to verify queued tasks are not terminated

Generated by Claude Code

Signed-off-by: adamrtalbot <12817534+adamrtalbot@users.noreply.github.com>
This reverts commit ae52e23.

Signed-off-by: adamrtalbot <12817534+adamrtalbot@users.noreply.github.com>
@adamrtalbot
Copy link
Collaborator Author

@ghislaindemael I've been trying to recreate your issue and can't recreate it, I think there is something different occurring. As far as I can tell, Azure will never terminate a job when there are pending tasks in there, and won't let you submit a task to a job that is being terminated. It has to be a scenario where:

  • The job is not in truly active or terminated state
  • Another task is submitted
  • The task submission does not trigger nextflow to create another job to replace the terminated one

I think we should move ahead with releasing to edge and getting more real world feedback.

@adamrtalbot
Copy link
Collaborator Author

adamrtalbot commented Feb 13, 2026

Here's my mini-example which failed to reproduce your error:

/*
 * Test: Azure Batch job termination race condition
 *
 * Each Nextflow process maps to one Azure Batch job; each task instance
 * maps to an Azure Batch task within that job.
 *
 * When terminateJobsOnCompletion is enabled, Nextflow sets
 * onAllTasksComplete=TERMINATE_JOB on the Azure Batch job after each
 * task submission. If all submitted tasks complete before the next task
 * is submitted, Azure terminates the job. The next submission then hits
 * a 409 (Conflict) and must recreate the job.
 *
 * This pipeline reproduces that race by:
 *   1. Submitting the first two tasks immediately
 *   2. Delaying the final task long enough for the first two to complete
 *      and Azure to terminate the job
 *   3. Verifying the delayed task still succeeds (via job recreation)
 *
 * Usage:
 *   NXF_DEBUG=2 ./launch.sh run tests/azure-terminate-race.nf \
 *     -c tests/azure-terminate-race.config \
 *     -w az://<container>/work
 *
 * Expected log output when the race triggers:
 *   "Job ... is in completed/terminating state, creating a new job"
 */

params.num_tasks = 3

/*
 * Warm up the pool so compute nodes are allocated before the
 * race_test tasks are submitted.
 */
process warmup {
    cpus 1

    output:
    val 'ready', emit: signal

    script:
    """
    echo "Pool warmed up at \$(date)"
    """
}

/*
 * Trivially fast Azure Batch tasks. All instances share the same job.
 */
process race_test {
    cpus 1
    tag "${id}"

    input:
    val id

    output:
    stdout

    script:
    """
    echo "race_test task ${id} completed at \$(date)"
    """
}

workflow {
    // Warm up the pool, then emit task IDs.
    // The final task is delayed so the earlier tasks complete and
    // Azure terminates the job before it arrives.
    warmup()

    delayed_ids = warmup.out.signal
        .flatMap { (1..params.num_tasks).collect { it } }
        .map { id ->
            if( id == params.num_tasks )
                Thread.sleep(60_000) // you need to play around with timings!
            return id
        }
        .view()

    race_test(delayed_ids)
}

Running this test gracefully handled the job termination:

Feb-13 14:23:45.182 [Task submitter] DEBUG n.c.azure.batch.AzBatchTaskHandler - [AZURE BATCH] Submitting task race_test (3) - work-dir=az://container/4f/4d475093d2c313ab36282effc8a64f
Feb-13 14:23:45.821 [Task submitter] DEBUG n.cloud.azure.batch.AzBatchService - Job job-786518c3af3f44ef5032-race_test is in completed/terminating state, creating a new job for task nf-4f4d475093d2c313ab36282effc8a64f
Feb-13 14:23:45.822 [Task submitter] DEBUG n.cloud.azure.batch.AzBatchService - [AZURE BATCH] created job for race_test with pool terminate-race-pool
Feb-13 14:23:46.229 [Task submitter] DEBUG n.cloud.azure.batch.AzBatchService - Setting job job-19f91f25bea9b49cf8d3-race_test to auto-terminate

Signed-off-by: Ben Sherman <bentshermann@gmail.com>
@bentsherman
Copy link
Member

Took some time to try to understand the changes here.

The trace observer makes sense and seems like a straightforward solution. But I'm concerned about the changes to AzBatchService.

If I understand correctly, a task could be submitted to a job that is already set to auto-terminate, so you added some logic to re-create the job and submit the task. But if you only set the job to auto-terminate after the process has terminated (all tasks completed), then this should never happen. So I would not expect this logic to be needed at all.

@adamrtalbot
Copy link
Collaborator Author

Let me walk through the scenario:

Each process is mapped to a job, and each job contains several tasks which are instances of a Nextflow process.
There is a maximum number of active jobs in an Azure Batch account, which creates this problem. It would make sense for Nextflow to be as eager as possible about terminating jobs. Azure Batch includes a number of native mechanisms to terminate jobs.

A typical pipeline may involve hundreds of jobs, and several pipelines may be running concurrently. In the current implementation, a pipeline can hold essentially all of its processes in an active state until shutdown, where it then actively terminates jobs during cleanup.

Problematic scenarios

  1. When a user is running many concurrent pipelines, they may hit the job limit due to actual work being processed,causing all pipelines to fail when trying to submit a new job. This is the situation @luanjot encountered. In this case the pipeline exits prematurely with no good recovery path.
  2. An ungraceful exit from Nextflow. We see this internally quite a bit when pushing Nextflow to its limits. When this happens, Nextflow never enters the shutdown phase and jobs are never terminated.

The two fixes

  • The trace observer has Nextflow terminate jobs after all tasks have been submitted. This should resolve both issues however it's not clear to me whether the trace observer fires eagerly throughout pipeline execution or only at pipeline termination. If it's the latter, both scenarios remain unresolved.
  • Switching jobs to terminate when all tasks are complete uses native Azure Batch mechanisms, removing Nextflow from the picture entirely. This resolves scenarios 1 and 2, but adds overhead for Nextflow. The job resubmission logic also gives Nextflow some defence against an external system terminating jobs prematurely.

The key question then is: does the trace observer fire early enough to resolve both scenarios? If yes, we can drop the second fix and shift focus to #5575. If not, we need both sides of this.

@adamrtalbot
Copy link
Collaborator Author

a task could be submitted to a job that is already set to auto-terminate,

A slight clarification on this. Setting jobs to auto-terminate is fine; that's what it's designed to do. The job will remain in an active state until all the tasks have completed. The only time this is a problem is when all tasks within a job complete, the job terminates, and the next task is submitted.

@bentsherman
Copy link
Member

The only time this is a problem is when all tasks within a job complete, the job terminates, and the next task is submitted.

This part I don't understand. If a job is terminated only after all of its tasks have completed, then by definition it should never receive another task

Overall, I think we should focus on #5575 in order to at least stop the bleeding, then perhaps we could add the trace observer. Not sure that the job re-creation is worthwhile

@adamrtalbot
Copy link
Collaborator Author

This part I don't understand. If a job is terminated only after all of its tasks have completed, then by definition it should never receive another task

Nextflow creates Job A and adds task A, B and C to it.

They complete, Job A autocompletes.

Nextflow then creates task D and tries to add it to Job A.

@bentsherman
Copy link
Member

But why would it do that? Where is task D coming from and why is it being sent to Job A?

Is it a different process that happens to use the same job?

@pditommaso pditommaso force-pushed the master branch 2 times, most recently from d9fa5cd to d752bc2 Compare February 28, 2026 13:10
@adamrtalbot
Copy link
Collaborator Author

adamrtalbot commented Feb 28, 2026

But why would it do that? Where is task D coming from and why is it being sent to Job A?

Is it a different process that happens to use the same job?

Well in the context of a pipeline, the process not be submitted until a preceding task was completed. I think my example above recreates this problem.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Nextflow eagerly terminates Azure Batch jobs during execution

6 participants