Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion plugins/nf-seqera/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ dependencies {
compileOnly project(':nextflow')
compileOnly 'org.slf4j:slf4j-api:2.0.17'
compileOnly 'org.pf4j:pf4j:3.12.0'
api 'io.seqera:sched-client:0.19.0-SNAPSHOT'
api 'io.seqera:sched-client:0.20.1'

testImplementation(testFixtures(project(":nextflow")))
testImplementation "org.apache.groovy:groovy:4.0.29"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,12 @@ class SeqeraTaskHandler extends TaskHandler implements FusionAwareTask {
if( accelerator.type )
resourceReq.acceleratorName(accelerator.type)
}
// build machine requirement merging config settings with task arch and disk
// build machine requirement merging config settings with task arch, disk, and snapshot settings
final machineReq = MapperUtil.toMachineRequirement(
executor.getSeqeraConfig().machineRequirement,
task.getContainerPlatform(),
task.config.getDisk()
task.config.getDisk(),
fusionConfig().snapshotsEnabled()
)
// validate container - Seqera executor requires all processes to specify a container image
final container = task.getContainer()
Expand Down
12 changes: 8 additions & 4 deletions plugins/nf-seqera/src/main/io/seqera/util/MapperUtil.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import io.seqera.sched.api.schema.v1a1.MachineRequirement
import io.seqera.sched.api.schema.v1a1.PriceModel as SchedPriceModel
import io.seqera.sched.api.schema.v1a1.ProvisioningModel
import nextflow.cloud.types.PriceModel
import nextflow.fusion.FusionConfig
import nextflow.util.MemoryUnit

/**
Expand Down Expand Up @@ -80,35 +81,38 @@ class MapperUtil {
* @return the MachineRequirement API object, or null if no settings
*/
static MachineRequirement toMachineRequirement(MachineRequirementOpts opts, String taskArch) {
return toMachineRequirement(opts, taskArch, null)
return toMachineRequirement(opts, taskArch, null, false)
}

/**
* Maps MachineRequirementOpts to MachineRequirement API object, merging with task arch and disk.
* Maps MachineRequirementOpts to MachineRequirement API object, merging with task arch, disk, and snapshots.
* Task arch overrides config arch if specified.
*
* @param opts the config options (can be null)
* @param taskArch the task container platform/arch (can be null)
* @param diskSize the disk size from task config (can be null)
* @param snapshotEnabled whether Fusion snapshots are enabled
* @return the MachineRequirement API object, or null if no settings
*/
static MachineRequirement toMachineRequirement(MachineRequirementOpts opts, String taskArch, MemoryUnit diskSize) {
static MachineRequirement toMachineRequirement(MachineRequirementOpts opts, String taskArch, MemoryUnit diskSize, boolean snapshotEnabled) {
final arch = taskArch ?: opts?.arch
final provisioning = opts?.provisioning
final maxSpotAttempts = opts?.maxSpotAttempts
?: (snapshotEnabled ? FusionConfig.DEFAULT_SNAPSHOT_MAX_SPOT_ATTEMPTS : null)
final machineFamilies = opts?.machineFamilies
// task disk overrides config disk
final effectiveDiskSize = diskSize ?: opts?.diskSize
final diskReq = toDiskRequirement(effectiveDiskSize, opts)
// return null if no settings
if (!arch && !provisioning && !maxSpotAttempts && !machineFamilies && !diskReq)
if (!arch && !provisioning && !maxSpotAttempts && !machineFamilies && !diskReq && !snapshotEnabled)
return null
new MachineRequirement()
.arch(arch)
.provisioning(toProvisioningModel(provisioning))
.maxSpotAttempts(maxSpotAttempts)
.machineFamilies(machineFamilies)
.disk(diskReq)
.snapshotEnabled(snapshotEnabled ? Boolean.TRUE : null)
}

/**
Expand Down
59 changes: 54 additions & 5 deletions plugins/nf-seqera/src/test/io/seqera/util/MapperUtilTest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import io.seqera.sched.api.schema.v1a1.DiskAllocation
import io.seqera.sched.api.schema.v1a1.PriceModel as SchedPriceModel
import io.seqera.sched.api.schema.v1a1.ProvisioningModel
import nextflow.cloud.types.PriceModel
import nextflow.fusion.FusionConfig
import nextflow.util.MemoryUnit
import spock.lang.Specification

Expand Down Expand Up @@ -182,7 +183,8 @@ class MapperUtilTest extends Specification {
def result = MapperUtil.toMachineRequirement(
new MachineRequirementOpts([arch: 'x86_64']),
null,
MemoryUnit.of('200 GB')
MemoryUnit.of('200 GB'),
false
)

then:
Expand All @@ -193,7 +195,7 @@ class MapperUtilTest extends Specification {

def 'should return machine requirement with only disk' () {
when:
def result = MapperUtil.toMachineRequirement(null, null, MemoryUnit.of('100 GB'))
def result = MapperUtil.toMachineRequirement(null, null, MemoryUnit.of('100 GB'), false)

then:
result != null
Expand All @@ -204,7 +206,7 @@ class MapperUtilTest extends Specification {

def 'should return null when no arch, no opts, and no disk' () {
expect:
MapperUtil.toMachineRequirement(null, null, null) == null
MapperUtil.toMachineRequirement(null, null, null, false) == null
}

// tests for custom disk configuration options
Expand Down Expand Up @@ -295,7 +297,7 @@ class MapperUtilTest extends Specification {
])

when:
def result = MapperUtil.toMachineRequirement(opts, null, MemoryUnit.of('500 GB'))
def result = MapperUtil.toMachineRequirement(opts, null, MemoryUnit.of('500 GB'), false)

then:
result.arch == 'arm64'
Expand Down Expand Up @@ -367,7 +369,7 @@ class MapperUtilTest extends Specification {
])

when:
def result = MapperUtil.toMachineRequirement(opts, null, MemoryUnit.of('200 GB'))
def result = MapperUtil.toMachineRequirement(opts, null, MemoryUnit.of('200 GB'), false)

then:
result.arch == 'x86_64'
Expand Down Expand Up @@ -457,4 +459,51 @@ class MapperUtilTest extends Specification {
e.message.contains('diskEncrypted')
}

// tests for snapshot maxSpotAttempts defaulting

def 'should return machine requirement with only snapshot enabled' () {
when:
def result = MapperUtil.toMachineRequirement(null, null, null, true)

then:
result != null
result.snapshotEnabled == true
result.maxSpotAttempts == FusionConfig.DEFAULT_SNAPSHOT_MAX_SPOT_ATTEMPTS
}

def 'should use explicit maxSpotAttempts when snapshot enabled' () {
when:
def result = MapperUtil.toMachineRequirement(new MachineRequirementOpts([maxSpotAttempts: 2]), null, null, true)

then:
result.snapshotEnabled == true
result.maxSpotAttempts == 2
}

def 'should not default maxSpotAttempts when snapshot disabled' () {
when:
def result = MapperUtil.toMachineRequirement(new MachineRequirementOpts([arch: 'x86_64']), null, null, false)

then:
result.snapshotEnabled == null
result.maxSpotAttempts == null
}

def 'should combine snapshot with other machine requirement settings' () {
when:
def result = MapperUtil.toMachineRequirement(
new MachineRequirementOpts([arch: 'arm64', provisioning: 'spot']),
null,
MemoryUnit.of('100 GB'),
true
)

then:
result.arch == 'arm64'
result.provisioning == ProvisioningModel.SPOT
result.disk.sizeGiB == 100
result.snapshotEnabled == true
result.maxSpotAttempts == FusionConfig.DEFAULT_SNAPSHOT_MAX_SPOT_ATTEMPTS
}

}
Loading