Skip to content

Commit de4bab1

Browse files
pditommasoclaude
andcommitted
Add DiskAllocation support to Seqera executor [ci fast]
- Add diskAllocation config option (task/node) to MachineRequirementOpts - Update MapperUtil to map allocation and use correct volumeType API - Add validation: node allocation only supports disk size - Bump sched-client to 0.16.0-SNAPSHOT Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
1 parent 3499d2d commit de4bab1

File tree

5 files changed

+238
-14
lines changed

5 files changed

+238
-14
lines changed

plugins/nf-seqera/VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.6.0
1+
0.7.0

plugins/nf-seqera/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ dependencies {
4646
compileOnly project(':nextflow')
4747
compileOnly 'org.slf4j:slf4j-api:2.0.17'
4848
compileOnly 'org.pf4j:pf4j:3.12.0'
49-
api 'io.seqera:sched-client:0.14.0-SNAPSHOT'
49+
api 'io.seqera:sched-client:0.16.0-SNAPSHOT'
5050

5151
testImplementation(testFixtures(project(":nextflow")))
5252
testImplementation "org.apache.groovy:groovy:4.0.29"

plugins/nf-seqera/src/main/io/seqera/config/MachineRequirementOpts.groovy

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,14 @@ class MachineRequirementOpts implements ConfigScope {
8282
""")
8383
final Boolean diskEncrypted
8484

85+
@ConfigOption
86+
@Description("""
87+
The disk allocation strategy: `task` or `node`.
88+
- `task`: Per-task EBS volume created at task launch (default)
89+
- `node`: Per-node instance storage attached at cluster level
90+
""")
91+
final String diskAllocation
92+
8593
/* required by config scope -- do not remove */
8694
MachineRequirementOpts() {}
8795

@@ -94,6 +102,7 @@ class MachineRequirementOpts implements ConfigScope {
94102
this.diskThroughputMiBps = opts.diskThroughputMiBps as Integer
95103
this.diskIops = opts.diskIops as Integer
96104
this.diskEncrypted = opts.diskEncrypted as Boolean
105+
this.diskAllocation = opts.diskAllocation as String
97106
}
98107

99108
String getArch() {
@@ -127,4 +136,8 @@ class MachineRequirementOpts implements ConfigScope {
127136
Boolean getDiskEncrypted() {
128137
return diskEncrypted
129138
}
139+
140+
String getDiskAllocation() {
141+
return diskAllocation
142+
}
130143
}

plugins/nf-seqera/src/main/io/seqera/util/MapperUtil.groovy

Lines changed: 60 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package io.seqera.util
1919

2020
import groovy.transform.CompileStatic
2121
import io.seqera.config.MachineRequirementOpts
22+
import io.seqera.sched.api.schema.v1a1.DiskAllocation
2223
import io.seqera.sched.api.schema.v1a1.DiskRequirement
2324
import io.seqera.sched.api.schema.v1a1.MachineRequirement
2425
import io.seqera.sched.api.schema.v1a1.PriceModel as SchedPriceModel
@@ -111,14 +112,29 @@ class MapperUtil {
111112
* Uses config options if provided, otherwise defaults to Fusion recommended settings:
112113
* EBS gp3 volume with 325 MiB/s throughput.
113114
*
115+
* For 'node' allocation, only sizeGiB and mountPath are applicable.
116+
* For 'task' allocation (default), all EBS options can be specified.
117+
*
114118
* @param diskSize the disk size (can be null)
115119
* @param opts the machine requirement options with disk settings (can be null)
116120
* @return the DiskRequirement API object, or null if diskSize is null or zero
117121
*/
118122
static DiskRequirement toDiskRequirement(MemoryUnit diskSize, MachineRequirementOpts opts=null) {
119123
if (!diskSize || diskSize.toGiga() <= 0)
120124
return null
121-
// Use config values or Fusion recommended defaults
125+
126+
final allocation = toDiskAllocation(opts?.diskAllocation)
127+
128+
// For 'node' allocation, only size and mountPath are valid
129+
if (allocation == DiskAllocation.NODE) {
130+
validateNodeAllocationOpts(opts)
131+
final DiskRequirement req = new DiskRequirement()
132+
req.sizeGiB(diskSize.toGiga() as Integer)
133+
req.allocation(allocation)
134+
return req
135+
}
136+
137+
// For 'task' allocation (default), apply EBS-specific options
122138
final type = opts?.diskType ?: DEFAULT_DISK_TYPE
123139
// Validate disk type is supported
124140
if (!SUPPORTED_DISK_TYPES.contains(type)) {
@@ -128,10 +144,11 @@ class MapperUtil {
128144
final iops = opts?.diskIops
129145
final encrypted = opts?.diskEncrypted ?: false
130146

131-
def req = new DiskRequirement()
132-
.sizeGiB(diskSize.toGiga() as Integer)
133-
.type(type)
134-
.encrypted(encrypted)
147+
final DiskRequirement req = new DiskRequirement()
148+
req.sizeGiB(diskSize.toGiga() as Integer)
149+
req.volumeType(type)
150+
req.encrypted(encrypted)
151+
req.allocation(allocation)
135152
// Only set throughput for gp3 volumes
136153
if (type == DEFAULT_DISK_TYPE) {
137154
req.throughputMiBps(throughput)
@@ -143,6 +160,44 @@ class MapperUtil {
143160
return req
144161
}
145162

163+
/**
164+
* Validates that no EBS-specific options are set when using 'node' allocation.
165+
* Node allocation uses instance storage, not EBS volumes.
166+
*
167+
* @param opts the machine requirement options
168+
* @throws IllegalArgumentException if EBS-specific options are set with node allocation
169+
*/
170+
private static void validateNodeAllocationOpts(MachineRequirementOpts opts) {
171+
if (!opts)
172+
return
173+
final List<String> invalidOpts = []
174+
if (opts.diskType)
175+
invalidOpts.add('diskType')
176+
if (opts.diskThroughputMiBps)
177+
invalidOpts.add('diskThroughputMiBps')
178+
if (opts.diskIops)
179+
invalidOpts.add('diskIops')
180+
if (opts.diskEncrypted)
181+
invalidOpts.add('diskEncrypted')
182+
183+
if (invalidOpts) {
184+
throw new IllegalArgumentException(
185+
"The following options are not valid with 'node' disk allocation: ${invalidOpts.join(', ')}. " +
186+
"Node allocation uses instance storage; only disk size is applicable."
187+
)
188+
}
189+
}
190+
191+
/**
192+
* Maps a disk allocation string to DiskAllocation enum.
193+
*
194+
* @param value the disk allocation string (task, node)
195+
* @return the DiskAllocation enum value, or null if value is null
196+
*/
197+
static DiskAllocation toDiskAllocation(String value) {
198+
value ? DiskAllocation.fromValue(value) : null
199+
}
200+
146201
/**
147202
* Maps a provisioning string to ProvisioningModel enum.
148203
*

plugins/nf-seqera/src/test/io/seqera/util/MapperUtilTest.groovy

Lines changed: 163 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package io.seqera.util
1919

2020
import io.seqera.config.MachineRequirementOpts
21+
import io.seqera.sched.api.schema.v1a1.DiskAllocation
2122
import io.seqera.sched.api.schema.v1a1.PriceModel as SchedPriceModel
2223
import io.seqera.sched.api.schema.v1a1.ProvisioningModel
2324
import nextflow.cloud.types.PriceModel
@@ -157,18 +158,23 @@ class MapperUtilTest extends Specification {
157158
when:
158159
def result = MapperUtil.toDiskRequirement(MemoryUnit.of('100 GB'))
159160

160-
then:
161+
then: 'disk size is set'
161162
result.sizeGiB == 100
162-
result.type == MapperUtil.DEFAULT_DISK_TYPE
163+
and: 'allocation defaults to null (task allocation on API side)'
164+
result.allocation == null
165+
and: 'EBS defaults are applied for task allocation'
166+
result.volumeType == MapperUtil.DEFAULT_DISK_TYPE
163167
result.throughputMiBps == MapperUtil.DEFAULT_DISK_THROUGHPUT_MIBPS
168+
result.encrypted == false
169+
result.iops == null
164170
}
165171

166172
def 'should map disk size in different units' () {
167173
expect:
168174
MapperUtil.toDiskRequirement(MemoryUnit.of('1 TB')).sizeGiB == 1024
169175
MapperUtil.toDiskRequirement(MemoryUnit.of('50 GB')).sizeGiB == 50
170176
and: 'defaults are applied'
171-
MapperUtil.toDiskRequirement(MemoryUnit.of('1 TB')).type == MapperUtil.DEFAULT_DISK_TYPE
177+
MapperUtil.toDiskRequirement(MemoryUnit.of('1 TB')).volumeType == MapperUtil.DEFAULT_DISK_TYPE
172178
MapperUtil.toDiskRequirement(MemoryUnit.of('1 TB')).throughputMiBps == MapperUtil.DEFAULT_DISK_THROUGHPUT_MIBPS
173179
}
174180

@@ -226,7 +232,7 @@ class MapperUtilTest extends Specification {
226232

227233
then:
228234
result.sizeGiB == 100
229-
result.type == 'ebs/io1'
235+
result.volumeType == 'ebs/io1'
230236
result.iops == 10000
231237
result.throughputMiBps == null // throughput only for gp3
232238
}
@@ -239,7 +245,7 @@ class MapperUtilTest extends Specification {
239245
def result = MapperUtil.toDiskRequirement(MemoryUnit.of('100 GB'), opts)
240246

241247
then:
242-
result.type == MapperUtil.DEFAULT_DISK_TYPE
248+
result.volumeType == MapperUtil.DEFAULT_DISK_TYPE
243249
result.throughputMiBps == 500
244250
}
245251

@@ -268,7 +274,7 @@ class MapperUtilTest extends Specification {
268274

269275
then:
270276
result.sizeGiB == 200
271-
result.type == 'ebs/gp3'
277+
result.volumeType == 'ebs/gp3'
272278
result.throughputMiBps == 600
273279
result.iops == 8000
274280
result.encrypted == true
@@ -289,10 +295,160 @@ class MapperUtilTest extends Specification {
289295
then:
290296
result.arch == 'arm64'
291297
result.disk.sizeGiB == 500
292-
result.disk.type == 'ebs/io2'
298+
result.disk.volumeType == 'ebs/io2'
293299
result.disk.iops == 15000
294300
result.disk.encrypted == true
295301
result.disk.throughputMiBps == null // io2 doesn't use throughput
296302
}
297303

304+
// tests for disk allocation mapping
305+
306+
def 'should map disk allocation' () {
307+
expect:
308+
MapperUtil.toDiskAllocation(null) == null
309+
MapperUtil.toDiskAllocation('task') == DiskAllocation.TASK
310+
MapperUtil.toDiskAllocation('node') == DiskAllocation.NODE
311+
}
312+
313+
def 'should throw exception for invalid disk allocation' () {
314+
when:
315+
MapperUtil.toDiskAllocation('invalid')
316+
317+
then:
318+
thrown(IllegalArgumentException)
319+
}
320+
321+
def 'should use task disk allocation from config' () {
322+
given:
323+
def opts = new MachineRequirementOpts([diskAllocation: 'task'])
324+
325+
when:
326+
def result = MapperUtil.toDiskRequirement(MemoryUnit.of('100 GB'), opts)
327+
328+
then:
329+
result.allocation == DiskAllocation.TASK
330+
}
331+
332+
def 'should use node disk allocation from config' () {
333+
given:
334+
def opts = new MachineRequirementOpts([diskAllocation: 'node'])
335+
336+
when:
337+
def result = MapperUtil.toDiskRequirement(MemoryUnit.of('100 GB'), opts)
338+
339+
then:
340+
result.allocation == DiskAllocation.NODE
341+
result.sizeGiB == 100
342+
result.volumeType == null // node allocation doesn't set EBS options
343+
result.throughputMiBps == null
344+
result.iops == null
345+
result.encrypted == null
346+
}
347+
348+
def 'should default to null disk allocation when not specified' () {
349+
when:
350+
def result = MapperUtil.toDiskRequirement(MemoryUnit.of('100 GB'))
351+
352+
then:
353+
result.allocation == null
354+
}
355+
356+
def 'should include disk allocation in machine requirement' () {
357+
given:
358+
def opts = new MachineRequirementOpts([
359+
arch: 'x86_64',
360+
diskAllocation: 'node'
361+
])
362+
363+
when:
364+
def result = MapperUtil.toMachineRequirement(opts, null, MemoryUnit.of('200 GB'))
365+
366+
then:
367+
result.arch == 'x86_64'
368+
result.disk.sizeGiB == 200
369+
result.disk.allocation == DiskAllocation.NODE
370+
}
371+
372+
// tests for node allocation validation
373+
374+
def 'should throw error when diskType is set with node allocation' () {
375+
given:
376+
def opts = new MachineRequirementOpts([
377+
diskAllocation: 'node',
378+
diskType: 'ebs/gp3'
379+
])
380+
381+
when:
382+
MapperUtil.toDiskRequirement(MemoryUnit.of('100 GB'), opts)
383+
384+
then:
385+
def e = thrown(IllegalArgumentException)
386+
e.message.contains('diskType')
387+
e.message.contains('node')
388+
}
389+
390+
def 'should throw error when diskIops is set with node allocation' () {
391+
given:
392+
def opts = new MachineRequirementOpts([
393+
diskAllocation: 'node',
394+
diskIops: 10000
395+
])
396+
397+
when:
398+
MapperUtil.toDiskRequirement(MemoryUnit.of('100 GB'), opts)
399+
400+
then:
401+
def e = thrown(IllegalArgumentException)
402+
e.message.contains('diskIops')
403+
}
404+
405+
def 'should throw error when diskThroughputMiBps is set with node allocation' () {
406+
given:
407+
def opts = new MachineRequirementOpts([
408+
diskAllocation: 'node',
409+
diskThroughputMiBps: 500
410+
])
411+
412+
when:
413+
MapperUtil.toDiskRequirement(MemoryUnit.of('100 GB'), opts)
414+
415+
then:
416+
def e = thrown(IllegalArgumentException)
417+
e.message.contains('diskThroughputMiBps')
418+
}
419+
420+
def 'should throw error when diskEncrypted is set with node allocation' () {
421+
given:
422+
def opts = new MachineRequirementOpts([
423+
diskAllocation: 'node',
424+
diskEncrypted: true
425+
])
426+
427+
when:
428+
MapperUtil.toDiskRequirement(MemoryUnit.of('100 GB'), opts)
429+
430+
then:
431+
def e = thrown(IllegalArgumentException)
432+
e.message.contains('diskEncrypted')
433+
}
434+
435+
def 'should report all invalid options with node allocation' () {
436+
given:
437+
def opts = new MachineRequirementOpts([
438+
diskAllocation: 'node',
439+
diskType: 'ebs/io1',
440+
diskIops: 10000,
441+
diskEncrypted: true
442+
])
443+
444+
when:
445+
MapperUtil.toDiskRequirement(MemoryUnit.of('100 GB'), opts)
446+
447+
then:
448+
def e = thrown(IllegalArgumentException)
449+
e.message.contains('diskType')
450+
e.message.contains('diskIops')
451+
e.message.contains('diskEncrypted')
452+
}
453+
298454
}

0 commit comments

Comments
 (0)