Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,14 @@ class TaskArrayCollector {
protected TaskArrayRun createTaskArray(List<TaskRun> tasks) {
// prepare child job launcher scripts
final handlers = tasks.collect( t -> executor.createTaskHandler(t).withArrayChild(true) )
for( TaskHandler handler : handlers ) {
handler.prepareLauncher()
// prepare launchers in parallel to avoid sequential blocking I/O on cloud storage
handlers.parallelStream().forEach { handler ->
try {
handler.prepareLauncher()
} catch (Exception e) {
log.error("Failed to prepare launcher for task ${handler.task.name}", e)
throw e
}
}

// create work directory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ package nextflow.cloud.google.batch
import static nextflow.cloud.google.batch.GoogleBatchScriptLauncher.*

import java.nio.file.Path
import java.util.concurrent.TimeUnit

import com.google.api.gax.rpc.DeadlineExceededException
import com.google.api.gax.rpc.UnavailableException
import com.google.cloud.storage.contrib.nio.CloudStoragePath
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
Expand All @@ -34,11 +37,14 @@ import nextflow.executor.Executor
import nextflow.executor.TaskArrayExecutor
import nextflow.extension.FilesEx
import nextflow.fusion.FusionHelper
import nextflow.processor.ParallelPollingMonitor
import nextflow.processor.TaskHandler
import nextflow.processor.TaskMonitor
import nextflow.processor.TaskPollingMonitor
import nextflow.processor.TaskRun
import nextflow.util.Duration
import nextflow.util.RateUnit
import nextflow.util.ThreadPoolHelper
import nextflow.util.ThrottlingExecutor
import nextflow.util.Escape
import nextflow.util.ServiceName
import org.pf4j.ExtensionPoint
Expand All @@ -57,6 +63,16 @@ class GoogleBatchExecutor extends Executor implements ExtensionPoint, TaskArrayE
private Path remoteBinDir
private BatchLogging logging

/**
* Executor service to throttle job submission requests
*/
private ThrottlingExecutor submitter

/**
* Executor service to throttle job deletion requests
*/
private ThrottlingExecutor reaper

private final Set<String> deletedJobs = new HashSet<>()

BatchClient getClient() { return client }
Expand Down Expand Up @@ -116,20 +132,97 @@ class GoogleBatchExecutor extends Executor implements ExtensionPoint, TaskArrayE

@Override
protected TaskMonitor createTaskMonitor() {
TaskPollingMonitor.create(session, config, name, 1000, Duration.of('10 sec'))
// create the throttling executor services
submitter = createExecutorService('GoogleBatch-executor')
reaper = createExecutorService('GoogleBatch-reaper')

final pollInterval = config.getPollInterval(name, Duration.of('10 sec'))
final dumpInterval = config.getMonitorDumpInterval(name)
final capacity = config.getQueueSize(name, 1000)

final def params = [
name: name,
session: session,
config: config,
pollInterval: pollInterval,
dumpInterval: dumpInterval,
capacity: capacity
]

log.debug "Creating parallel monitor for executor '$name' > capacity=$capacity; pollInterval=$pollInterval; dumpInterval=$dumpInterval"
new ParallelPollingMonitor(submitter, params)
}

/**
* Creates a {@link ThrottlingExecutor} service to throttle
* API requests to the Google Batch service.
*
* @param name The executor service name
* @return A {@link ThrottlingExecutor} instance
*/
private ThrottlingExecutor createExecutorService(String name) {
final qs = 5_000
final limit = config.getExecConfigProp(name, 'submitRateLimit', '50/s') as String
final size = Runtime.runtime.availableProcessors() * 5

final opts = new ThrottlingExecutor.Options()
.retryOn { Throwable t ->
t instanceof UnavailableException ||
t instanceof DeadlineExceededException ||
t instanceof IOException ||
t.cause instanceof IOException
}
.onFailure { Throwable t -> session?.abort(t) }
.onRateLimitChange { RateUnit rate -> logRateLimitChange(rate) }
.withRateLimit(limit)
.withQueueSize(qs)
.withPoolSize(size)
.withKeepAlive(Duration.of('1 min'))
.withAutoThrottle(true)
.withMaxRetries(10)
.withPoolName(name)

ThrottlingExecutor.create(opts)
}

protected void logRateLimitChange(RateUnit rate) {
log.debug "New submission rate limit: $rate"
}

@Override
TaskHandler createTaskHandler(TaskRun task) {
return new GoogleBatchTaskHandler(task, this)
}

ThrottlingExecutor getReaper() { reaper }

@Override
void shutdown() {
// shutdown the submitter executor
def tasks = submitter.shutdownNow()
if( tasks ) log.warn "Execution interrupted -- cleaning up execution pool"
submitter.awaitTermination(5, TimeUnit.MINUTES)

// shutdown the reaper executor
reaper.shutdown()
final waitMsg = "[GOOGLE BATCH] Waiting jobs reaper to complete (%d jobs to be terminated)"
final exitMsg = "[GOOGLE BATCH] Exiting before jobs reaper thread pool complete -- Some jobs may not be terminated"
awaitCompletion(reaper, Duration.of('60min'), waitMsg, exitMsg)

// close batch client and logging
client.shutdown()
logging.close()
}

protected void awaitCompletion(ThrottlingExecutor executor, Duration duration, String waitMsg, String exitMsg) {
try {
ThreadPoolHelper.await(executor, duration, waitMsg, exitMsg)
}
catch( java.util.concurrent.TimeoutException e ) {
log.warn(e.message, e)
}
}

@Override
boolean isFusionEnabled() {
return FusionHelper.isFusionEnabled(session)
Expand Down