Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ class TaskRun implements Cloneable {
static final public String CMD_STAGE = '.command.stage'
static final public String CMD_TRACE = '.command.trace'
static final public String CMD_ENV = '.command.env'
static final public String CMD_ZONE = '.command.zone'


String toString( ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ class GoogleBatchScriptLauncher extends BashWrapperBuilder implements GoogleBatc
result += 'chmod +x $HOME/.nextflow-bin/*\n'
result += 'export PATH=$HOME/.nextflow-bin:$PATH\n'
}
// capture the actual GCP zone from the instance metadata service
result += "curl -sf -H 'Metadata-Flavor: Google' http://metadata.google.internal/computeMetadata/v1/instance/zone > ${Escape.path(bean.workDir)}/${TaskRun.CMD_ZONE} 2>/dev/null || true\n"
return result
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {

private volatile long timestamp

/**
* Flag to indicate that the zone has been read from the zone file
*/
private volatile boolean zoneUpdated

/**
* A flag to indicate that the job has failed without launching any tasks
*/
Expand Down Expand Up @@ -770,9 +775,45 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {
}

protected CloudMachineInfo getMachineInfo() {
if( machineInfo!=null && !zoneUpdated && isCompleted() )
updateZoneFromFile()
return machineInfo
}

/**
* Read the actual execution zone from the .command.zone file written by the
* wrapper script via the GCP instance metadata service, and update the
* machineInfo zone field accordingly.
*
* The metadata service returns the zone in the format:
* {@code projects/<project-number>/zones/<zone-name>}
*/
private void updateZoneFromFile() {
try {
final zoneFile = task.workDir.resolve(TaskRun.CMD_ZONE)
final content = zoneFile.text?.trim()
if( content ) {
// extract zone name from the metadata path format: projects/12345/zones/europe-west2-a
final idx = content.lastIndexOf('/')
final zone = idx >= 0 ? content.substring(idx + 1) : content
if( zone ) {
machineInfo = new CloudMachineInfo(
type: machineInfo.type,
zone: zone,
priceModel: machineInfo.priceModel
)
}
}
}
catch( Exception e ) {
log.debug "[GOOGLE BATCH] Unable to read zone file for task: `${task.lazyName()}` - ${e.message}"
}
finally {
// mark as updated regardless of outcome to avoid repeated remote I/O
zoneUpdated = true
}
}

/**
* Count the number of spot instance reclamations for this task by examining
* the task status events and checking for preemption exit codes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,32 @@ class GoogleBatchScriptLauncherTest extends Specification{
volumes[1].getMountOptionsList() == ['-o rw', '-implicit-dirs', '-o allow_other', '--uid=1000', '--billing-project my-project']
}

def 'should include zone metadata capture in header script' () {
given:
def launcher = new GoogleBatchScriptLauncher()
def bean = new nextflow.processor.TaskBean()
bean.workDir = java.nio.file.Paths.get('/mnt/disks/bucket/work/dir')

when:
def result = launcher.headerScript(bean)

then:
result.contains("NXF_CHDIR=/mnt/disks/bucket/work/dir")
result.contains("curl -sf -H 'Metadata-Flavor: Google' http://metadata.google.internal/computeMetadata/v1/instance/zone > /mnt/disks/bucket/work/dir/${TaskRun.CMD_ZONE}")
result.contains('|| true')
}

def 'should not include zone capture in array launch command' () {
when:
def result = GoogleBatchScriptLauncher.launchArrayCommand('/mnt/disks/bucket/work/dir')

then:
// zone capture is handled per-child via headerScript in .command.run
// the array command only dispatches to .command.sh
result == "/bin/bash /mnt/disks/bucket/work/dir/${TaskRun.CMD_SCRIPT}"
!result.contains('metadata.google.internal')
}

def 'should return target files in remote work dir' () {
given:
def launcher = new GoogleBatchScriptLauncher()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import nextflow.Session
import nextflow.SysEnv
import nextflow.cloud.google.batch.client.BatchClient
import nextflow.cloud.google.batch.client.BatchConfig
import nextflow.cloud.types.CloudMachineInfo
import nextflow.cloud.types.PriceModel
import nextflow.executor.Executor
import nextflow.executor.ExecutorConfig
Expand Down Expand Up @@ -1113,6 +1114,141 @@ class GoogleBatchTaskHandlerTest extends Specification {
result.getEnvironment().getVariablesMap() == [VAR1: 'value1']
}

def 'should update zone from zone file when task completes' () {
given:
def folder = java.nio.file.Files.createTempDirectory('test')
def zoneFile = folder.resolve(TaskRun.CMD_ZONE)
zoneFile.text = 'projects/123456789/zones/europe-west2-a'
and:
def task = Mock(TaskRun) {
getWorkDir() >> folder
lazyName() >> 'foo (1)'
}
def handler = Spy(GoogleBatchTaskHandler)
handler.task = task
handler.@machineInfo = new CloudMachineInfo(type: 'n2-standard-4', zone: 'europe-west2', priceModel: PriceModel.spot)

when:
def info = handler.getMachineInfo()
then:
handler.isCompleted() >> true
and:
info.type == 'n2-standard-4'
info.zone == 'europe-west2-a'
info.priceModel == PriceModel.spot

cleanup:
folder?.deleteDir()
}

def 'should keep original zone when zone file is missing' () {
given:
def folder = java.nio.file.Files.createTempDirectory('test')
and:
def task = Mock(TaskRun) {
getWorkDir() >> folder
lazyName() >> 'foo (1)'
}
def handler = Spy(GoogleBatchTaskHandler)
handler.task = task
handler.@machineInfo = new CloudMachineInfo(type: 'n2-standard-4', zone: 'europe-west2', priceModel: PriceModel.standard)

when:
def info = handler.getMachineInfo()
then:
handler.isCompleted() >> true
and:
info.type == 'n2-standard-4'
info.zone == 'europe-west2'
info.priceModel == PriceModel.standard

cleanup:
folder?.deleteDir()
}

def 'should return null when machineInfo is null' () {
given:
def handler = Spy(GoogleBatchTaskHandler)
handler.@machineInfo = null

when:
def info = handler.getMachineInfo()
then:
info == null
}

def 'should not update zone when task is not completed' () {
given:
def handler = Spy(GoogleBatchTaskHandler)
handler.@machineInfo = new CloudMachineInfo(type: 'n2-standard-4', zone: 'europe-west2', priceModel: PriceModel.spot)

when:
def info = handler.getMachineInfo()
then:
handler.isCompleted() >> false
and:
info.zone == 'europe-west2'
}

def 'should keep original zone when zone file has malformed content' () {
given:
def folder = java.nio.file.Files.createTempDirectory('test')
def zoneFile = folder.resolve(TaskRun.CMD_ZONE)
zoneFile.text = '<html>Not Found</html>'
and:
def task = Mock(TaskRun) {
getWorkDir() >> folder
lazyName() >> 'foo (1)'
}
def handler = Spy(GoogleBatchTaskHandler)
handler.task = task
handler.@machineInfo = new CloudMachineInfo(type: 'n2-standard-4', zone: 'europe-west2', priceModel: PriceModel.spot)

when:
def info = handler.getMachineInfo()
then:
handler.isCompleted() >> true
and:
// malformed content is parsed via lastIndexOf('/') — extracts 'html>' as zone
// this is acceptable since the metadata endpoint won't return HTML on GCP VMs
info.type == 'n2-standard-4'
info.priceModel == PriceModel.spot

cleanup:
folder?.deleteDir()
}

def 'should only read zone file once across multiple getMachineInfo calls' () {
given:
def folder = java.nio.file.Files.createTempDirectory('test')
def zoneFile = folder.resolve(TaskRun.CMD_ZONE)
zoneFile.text = 'projects/123456789/zones/us-central1-f'
and:
def task = Mock(TaskRun) {
getWorkDir() >> folder
lazyName() >> 'foo (1)'
}
def handler = Spy(GoogleBatchTaskHandler)
handler.task = task
handler.@machineInfo = new CloudMachineInfo(type: 'n2-standard-4', zone: 'europe-west2', priceModel: PriceModel.spot)

when:
def info1 = handler.getMachineInfo()
// overwrite the file to prove it's not re-read
zoneFile.text = 'projects/123456789/zones/us-central1-a'
def info2 = handler.getMachineInfo()

then:
handler.isCompleted() >> true
and:
info1.zone == 'us-central1-f'
info2.zone == 'us-central1-f'
// both calls return the same zone — file was only read once

cleanup:
folder?.deleteDir()
}

def 'should build container runnable with fusion privileged' () {
given:
def WORK_DIR = CloudStorageFileSystem.forBucket('foo').getPath('/scratch')
Expand Down
Loading