Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 54 additions & 3 deletions snakemake_executor_plugin_slurm_jobstep/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,17 @@
import socket
import subprocess
import sys
from dataclasses import dataclass, field
from snakemake_interface_executor_plugins.executors.base import SubmittedJobInfo
from snakemake_interface_executor_plugins.executors.real import RealExecutor
from snakemake_interface_executor_plugins.jobs import (
JobExecutorInterface,
)
from snakemake_interface_executor_plugins.settings import ExecMode, CommonSettings
from snakemake_interface_executor_plugins.settings import (
CommonSettings,
ExecMode,
ExecutorSettingsBase,
)
from snakemake_interface_common.exceptions import WorkflowError


Expand All @@ -38,6 +43,25 @@
)


@dataclass
class ExecutorSettings(ExecutorSettingsBase):
"""Settings for the SLURM jobstep executor plugin."""

pass_command_as_script: bool = field(
default=False,
metadata={
"help": (
"Pass to srun the command to be executed as a shell script "
"(fed through stdin) instead of wrapping it in the command line "
"call. Useful when a limit exists on SLURM command line length (ie. "
"max_submit_line_size)."
),
"env_var": False,
"required": False,
},
)


# Required:
# Implementation of your executor
class Executor(RealExecutor):
Expand All @@ -58,6 +82,7 @@ def run_job(self, job: JobExecutorInterface):
# snakemake_interface_executor_plugins.executors.base.SubmittedJobInfo.

jobsteps = dict()
srun_script = None
# TODO revisit special handling for group job levels via srun at a later stage
# if job.is_group():

Expand Down Expand Up @@ -118,14 +143,40 @@ def run_job(self, job: JobExecutorInterface):

call = "srun -n1 --cpu-bind=q "
call += f" {get_cpu_setting(job, self.gpu_job)} "
call += f" {self.format_job_exec(job)}"
if self.workflow.executor_settings.pass_command_as_script:
# format the job to execute with all the snakemake parameters
# into a script
srun_script = self.format_job_exec(job)
# the process will read the srun script from stdin
call += " sh -s"
else:
call += f" {self.format_job_exec(job)}"

self.logger.debug(f"This job is a group job: {job.is_group()}")
self.logger.debug(f"The call for this job is: {call}")
self.logger.debug(f"Job is running on host: {socket.gethostname()}")
if srun_script is not None:
self.logger.debug(f"The script for this job is: \n{srun_script}")
# this dict is to support the to be implemented feature of oversubscription in
# "ordinary" group jobs.
jobsteps[job] = subprocess.Popen(call, shell=True)
jobsteps[job] = subprocess.Popen(
call, shell=True, text=True, stdin=subprocess.PIPE
)
if srun_script is not None:
try:
# pass the srun bash script via stdin
jobsteps[job].stdin.write(srun_script)
jobsteps[job].stdin.close()
except BrokenPipeError:
# subprocess terminated before reading stdin
self.logger.error(
f"Failed to write script to stdin for job {job}. "
"Subprocess may have terminated prematurely."
)
self.report_job_error(SubmittedJobInfo(job))
raise WorkflowError(
f"Job {job} failed: subprocess terminated before reading script"
)

job_info = SubmittedJobInfo(job)
self.report_job_submission(job_info)
Expand Down
Loading