diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 70d66fa9..4494c35d 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -25,6 +25,9 @@ jobs: pip install -r requirements.txt pip install -r requirements-dev.txt pip install -r requirements-connector-radical.txt + # Radical depends on PSI/J. However, the version we are + # testing provide that dependency + pip uninstall -y psij-python - name: Typecheck and stylecheck run: | make typecheck @@ -37,6 +40,10 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 + #- uses: mxschmitt/action-tmate@v3 + # with: + # detached: true + # limit-access-to-actor: true - name: Test with pytest uses: addnab/docker-run-action@v3 with: @@ -46,7 +53,7 @@ jobs: cd /workspace echo "Running in ${PWD}" sudo apt update - sudo apt install -y openssh-server openssh-client + sudo apt install -y openssh-server openssh-client netcat-traditional mkdir -p "$HOME/.ssh" chmod 0755 "$HOME" chmod 0700 "$HOME/.ssh" diff --git a/.gitignore b/.gitignore index 4d8418e7..76944679 100644 --- a/.gitignore +++ b/.gitignore @@ -17,6 +17,7 @@ psi_j_python.egg-info/ venv* .venv* build/ +.packages/ docs/.web-build web-build/ -.packages/ \ No newline at end of file +.packages/ diff --git a/.readthedocs.yaml b/.readthedocs.yaml index 6415f34f..85674529 100644 --- a/.readthedocs.yaml +++ b/.readthedocs.yaml @@ -9,6 +9,9 @@ build: os: ubuntu-22.04 tools: python: "3.11" + jobs: + post_install: + - pip uninstall -y psij-python python: install: diff --git a/src/psij-descriptors/cobalt_descriptor.py b/src/psij-descriptors/cobalt_descriptor.py index f6fdfe04..ea27c601 100644 --- a/src/psij-descriptors/cobalt_descriptor.py +++ b/src/psij-descriptors/cobalt_descriptor.py @@ -2,5 +2,5 @@ from psij.descriptor import Descriptor -__PSI_J_EXECUTORS__ = [Descriptor(name="cobalt", nice_name='Cobalt', version=Version("0.0.1"), +__PSI_J_EXECUTORS__ = [Descriptor(name="cobalt", nice_name='Cobalt', version=Version("0.2.0"), cls='psij.executors.batch.cobalt.CobaltJobExecutor')] diff --git a/src/psij-descriptors/core_descriptors.py b/src/psij-descriptors/core_descriptors.py index b16c89a2..cf1183a1 100644 --- a/src/psij-descriptors/core_descriptors.py +++ b/src/psij-descriptors/core_descriptors.py @@ -2,15 +2,15 @@ from psij.descriptor import Descriptor __PSI_J_EXECUTORS__ = [ - Descriptor(name='local', nice_name='Local', version=Version('0.0.1'), + Descriptor(name='local', nice_name='Local', version=Version('0.2.0'), cls='psij.executors.local.LocalJobExecutor') ] __PSI_J_LAUNCHERS__ = [ - Descriptor(name='single', version=Version('0.0.1'), + Descriptor(name='single', version=Version('0.2.0'), cls='psij.launchers.single.SingleLauncher'), - Descriptor(name='multiple', version=Version('0.0.1'), + Descriptor(name='multiple', version=Version('0.2.0'), cls='psij.launchers.multiple.MultipleLauncher'), - Descriptor(name='mpirun', version=Version('0.0.1'), + Descriptor(name='mpirun', version=Version('0.2.0'), cls='psij.launchers.mpirun.MPILauncher'), ] diff --git a/src/psij-descriptors/lsf_descriptor.py b/src/psij-descriptors/lsf_descriptor.py index 87e2b64c..68b19cfd 100644 --- a/src/psij-descriptors/lsf_descriptor.py +++ b/src/psij-descriptors/lsf_descriptor.py @@ -2,5 +2,5 @@ from psij.descriptor import Descriptor -__PSI_J_EXECUTORS__ = [Descriptor(name='lsf', nice_name='LSF', version=Version('0.0.1'), +__PSI_J_EXECUTORS__ = [Descriptor(name='lsf', nice_name='LSF', version=Version('0.2.0'), cls='psij.executors.batch.lsf.LsfJobExecutor')] diff --git a/src/psij-descriptors/pbs_descriptor.py b/src/psij-descriptors/pbs_descriptor.py index 8121e7bc..6ee0dbdd 100644 --- a/src/psij-descriptors/pbs_descriptor.py +++ b/src/psij-descriptors/pbs_descriptor.py @@ -3,8 +3,8 @@ __PSI_J_EXECUTORS__ = [Descriptor(name='pbs', nice_name='PBS Pro', aliases=['pbspro'], - version=Version('0.0.2'), + version=Version('0.2.0'), cls='psij.executors.batch.pbs.PBSJobExecutor'), Descriptor(name='pbs_classic', nice_name='PBS Classic', aliases=['torque'], - version=Version('0.0.2'), + version=Version('0.2.0'), cls='psij.executors.batch.pbs_classic.PBSClassicJobExecutor')] diff --git a/src/psij-descriptors/slurm_descriptor.py b/src/psij-descriptors/slurm_descriptor.py index ff01129c..c9af34dc 100644 --- a/src/psij-descriptors/slurm_descriptor.py +++ b/src/psij-descriptors/slurm_descriptor.py @@ -2,5 +2,5 @@ from psij.descriptor import Descriptor -__PSI_J_EXECUTORS__ = [Descriptor(name='slurm', nice_name='Slurm', version=Version('0.0.1'), +__PSI_J_EXECUTORS__ = [Descriptor(name='slurm', nice_name='Slurm', version=Version('0.2.0'), cls='psij.executors.batch.slurm.SlurmJobExecutor')] diff --git a/src/psij/executors/batch/batch_scheduler_executor.py b/src/psij/executors/batch/batch_scheduler_executor.py index 0c7db560..5d320566 100644 --- a/src/psij/executors/batch/batch_scheduler_executor.py +++ b/src/psij/executors/batch/batch_scheduler_executor.py @@ -1,5 +1,6 @@ import logging import os + import subprocess import time import traceback @@ -8,13 +9,14 @@ from datetime import timedelta from pathlib import Path from threading import Thread, RLock -from typing import Optional, List, Dict, Collection, cast, Union, IO +from typing import Optional, List, Dict, Collection, cast, Union, IO, Set from psij.launchers.script_based_launcher import ScriptBasedLauncher from psij import JobExecutor, JobExecutorConfig, Launcher, Job, SubmitException, \ JobStatus, JobState from psij.executors.batch.template_function_library import ALL as FUNCTION_LIBRARY +from psij.utils import _StatusUpdater, _FileCleaner UNKNOWN_ERROR = 'PSIJ: Unknown error' @@ -207,12 +209,11 @@ def __init__(self, url: Optional[str] = None, configuration is used. """ super().__init__(url=url, config=self._get_config(config)) + self._queue_poll_thread = self._start_queue_poll_thread() assert config self.work_directory = config.work_directory / self.name - self._queue_poll_thread = self._start_queue_poll_thread() - - def _ensure_work_dir(self) -> None: self.work_directory.mkdir(parents=True, exist_ok=True) + cast(_FileCleaner, _FileCleaner.get_instance()).clean(self.work_directory) def _get_config(self, config: Optional[JobExecutorConfig]) -> BatchSchedulerExecutorConfig: if config is None: @@ -224,8 +225,6 @@ def _get_config(self, config: Optional[JobExecutorConfig]) -> BatchSchedulerExec def submit(self, job: Job) -> None: """See :func:`~psij.JobExecutor.submit`.""" logger.info('Job %s: submitting', job.id) - self._ensure_work_dir() - self._check_job(job) context = self._create_script_context(job) @@ -504,7 +503,10 @@ def _create_script_context(self, job: Job) -> Dict[str, object]: 'psij': { 'lib': FUNCTION_LIBRARY, 'launch_command': launch_command, - 'script_dir': str(self.work_directory) + 'script_dir': str(self.work_directory), + 'us_file': self._queue_poll_thread.status_updater.update_file_name, + 'us_port': self._queue_poll_thread.status_updater.update_port, + 'us_addrs': ', '.join(self._queue_poll_thread.status_updater.ips) } } assert job.spec is not None @@ -546,9 +548,11 @@ def _set_job_status(self, job: Job, status: JobStatus) -> None: # is_greater_than returns T/F if the states are comparable and None if not, so # we have to check explicitly for the boolean value rather than truthiness return - if status.state.final and job.native_id: - self._clean_submit_script(job) - self._read_aux_files(job, status) + if status.state.final: + self._queue_poll_thread.unregister_job(job) + if job.native_id: + self._clean_submit_script(job) + self._read_aux_files(job, status) super()._set_job_status(job, status) def _clean_submit_script(self, job: Job) -> None: @@ -588,9 +592,8 @@ def _read_aux_files(self, job: Job, status: JobStatus) -> None: # already present out = self._read_aux_file(job, '.out') if out: - launcher = self._get_launcher_from_job(job) - if launcher.is_launcher_failure(out): - status.message = launcher.get_launcher_failure_message(out) + if '_PSIJ_SCRIPT_DONE' not in out: + status.message = out logger.debug('Output from launcher: %s', status.message) else: self._delete_aux_file(job, '.out') @@ -657,24 +660,31 @@ def __init__(self, name: str, config: BatchSchedulerExecutorConfig, self.done = False self.executor = weakref.ref(executor, self._stop) # native_id -> job - self._jobs: Dict[str, List[Job]] = {} + self._jobs: Dict[str, Set[Job]] = {} # counts consecutive errors while invoking qstat or equivalent self._poll_error_count = 0 self._jobs_lock = RLock() + self.status_updater = cast(_StatusUpdater, _StatusUpdater.get_instance()) def run(self) -> None: - logger.debug('Executor %s: queue poll thread started', self.executor()) + logger.debug('Executor %s: queue poll thread started', self.executor) + time.sleep(self.config.initial_queue_polling_delay) while not self.done: self._poll() - time.sleep(self.config.queue_polling_interval) - - def _stop(self, exec: object) -> None: + start = time.time() + now = start + while now - start < self.config.queue_polling_interval: + time.sleep(1) + now = time.time() + logger.info('Thread %s exiting due to executor collection' % self) + + def _stop(self, executor: object) -> None: self.done = True def _poll(self) -> None: - exec = self.executor() - if exec is None: + executor = self.executor() + if executor is None: return with self._jobs_lock: if len(self._jobs) == 0: @@ -682,12 +692,12 @@ def _poll(self) -> None: jobs_copy = dict(self._jobs) logger.info('Polling for %s jobs', len(jobs_copy)) try: - out = exec._run_command(exec.get_status_command(jobs_copy.keys())) + out = executor._run_command(executor.get_status_command(jobs_copy.keys())) except subprocess.CalledProcessError as ex: out = ex.output exit_code = ex.returncode except Exception as ex: - self._handle_poll_error(exec, True, ex, + self._handle_poll_error(executor, True, ex, f'Failed to poll for job status: {traceback.format_exc()}') return else: @@ -695,27 +705,24 @@ def _poll(self) -> None: self._poll_error_count = 0 logger.debug('Output from status command: %s', out) try: - status_map = exec.parse_status_output(exit_code, out) + status_map = executor.parse_status_output(exit_code, out) except Exception as ex: - self._handle_poll_error(exec, False, ex, + self._handle_poll_error(executor, False, ex, f'Failed to poll for job status: {traceback.format_exc()}') return try: - for native_id, job_list in jobs_copy.items(): + for native_id, job_set in jobs_copy.items(): try: status = self._get_job_status(native_id, status_map) except Exception: status = JobStatus(JobState.FAILED, message='Failed to update job status: %s' % traceback.format_exc()) - for job in job_list: - exec._set_job_status(job, status) - if status.state.final: - with self._jobs_lock: - del self._jobs[native_id] + for job in job_set: + executor._set_job_status(job, status) except Exception as ex: msg = traceback.format_exc() - self._handle_poll_error(exec, True, ex, 'Error updating job statuses {}'.format(msg)) + self._handle_poll_error(executor, True, ex, f'Error updating job statuses {msg}') def _get_job_status(self, native_id: str, status_map: Dict[str, JobStatus]) -> JobStatus: if native_id in status_map: @@ -723,7 +730,7 @@ def _get_job_status(self, native_id: str, status_map: Dict[str, JobStatus]) -> J else: return JobStatus(JobState.COMPLETED) - def _handle_poll_error(self, exec: BatchSchedulerExecutor, immediate: bool, ex: Exception, + def _handle_poll_error(self, executor: BatchSchedulerExecutor, immediate: bool, ex: Exception, msg: str) -> None: logger.warning('Polling error: %s', msg) self._poll_error_count += 1 @@ -739,16 +746,36 @@ def _handle_poll_error(self, exec: BatchSchedulerExecutor, immediate: bool, ex: assert len(self._jobs) > 0 jobs_copy = dict(self._jobs) self._jobs.clear() - for job_list in jobs_copy.values(): - for job in job_list: - exec._set_job_status(job, JobStatus(JobState.FAILED, message=msg)) + for job_set in jobs_copy.values(): + for job in job_set: + self.unregister_job(job) + executor._set_job_status(job, JobStatus(JobState.FAILED, message=msg)) def register_job(self, job: Job) -> None: + executor = self.executor() + # This method is only called from the executor. It stands to reason that the + # executor cannot have been GC-ed. + assert executor is not None + self.status_updater.register_job(job, executor) assert job.native_id logger.info('Job %s: registering', job.id) with self._jobs_lock: native_id = job.native_id - if native_id not in self._jobs: - self._jobs[native_id] = [job] - else: - self._jobs[job.native_id].append(job) + try: + self._jobs[native_id].add(job) + except KeyError: + self._jobs[native_id] = {job} + + def unregister_job(self, job: Job) -> None: + self.status_updater.unregister_job(job) + assert job.native_id + logger.info('Job %s: unregistering', job.id) + with self._jobs_lock: + native_id = job.native_id + try: + del self._jobs[native_id] + except KeyError: + # If two or more jobs are attached to the same native ID, the + # first one being unregistered would already have removed + # the dict entry + pass diff --git a/src/psij/executors/batch/cobalt/cobalt.mustache b/src/psij/executors/batch/cobalt/cobalt.mustache index 90d9b73c..444de0ae 100644 --- a/src/psij/executors/batch/cobalt/cobalt.mustache +++ b/src/psij/executors/batch/cobalt/cobalt.mustache @@ -46,16 +46,27 @@ only results in empty files that are not cleaned up}} #COBALT -e /dev/null #COBALT -o /dev/null -{{!like PBS, this is also cheap and there is not need to check setting}} -PSIJ_NODEFILE="$COBALT_NODEFILE" -export PSIJ_NODEFILE - {{!redirect output here instead of through #COBALT directive since COBALT_JOB_ID is not available when the directives are evaluated; the reason for using the job id in the first place being the same as for the exit code file.}} exec &>> "{{psij.script_dir}}/$COBALT_JOBID.out" + +{{> batch_lib}} + +{{!like PBS, this is also cheap and there is not need to check setting}} +PSIJ_NODEFILE="$COBALT_NODEFILE" +export PSIJ_NODEFILE + +{{> stagein}} +_psij_update_status ACTIVE + {{#psij.launch_command}}{{.}} {{/psij.launch_command}} +_PSIJ_JOB_EC=$? + +{{> stageout}} +{{> cleanup}} {{!we redirect to a file tied to the native ID so that we can reach the file with attach().}} -echo "$?" > "{{psij.script_dir}}/$COBALT_JOBID.ec" +echo $_PSIJ_JOB_EC > "{{psij.script_dir}}/$COBALT_JOBID.ec" +echo "_PSIJ_SCRIPT_DONE" \ No newline at end of file diff --git a/src/psij/executors/batch/common/batch_lib.mustache b/src/psij/executors/batch/common/batch_lib.mustache new file mode 100644 index 00000000..0afc2a16 --- /dev/null +++ b/src/psij/executors/batch/common/batch_lib.mustache @@ -0,0 +1,138 @@ +_psij_update_status() { + STATUS="$1" + + ADDRS={{psij.us_addrs}} + for ADDR in ${ADDRS//,/ }; do + echo "{{job.id}} $STATUS" >/dev/udp/$ADDR/{{psij.us_port}} + done +} + +_psij_fail() { + [ "{{psij.debug}}" != "0" ] && _psij_update_status "LOG Failing: $2" + echo $2 + exit $1 +} + +_psij_check_remote() { + SCHEME="$1" + HOSTPORT="$2" + + if [ "$SCHEME" != "" ] && [ "$SCHEME" != "file" ]; then + _psij_fail 121 "$SCHEME staging is not supported" + fi + if [ "$HOSTPORT" != "" ] && [ "$HOSTPORT" != "localhost" ]; then + _psij_fail 121 "The host, if specified, must be \"localhost\". Got \"$HOSTPORT\"." + fi +} + +_psij_do_stagein() { + SOURCE="$1" + TARGET="$2" + MODE="$3" + SCHEME="$6" + HOSTPORT="$7" + + _psij_check_remote "$SCHEME" "$HOSTPORT" || exit $? + + _psij_do_stage "$SOURCE" "$TARGET" "$MODE" 0 +} + +_psij_do_stage() { + SOURCE="$1" + TARGET="$2" + MODE="$3" + MISSING_OK="$4" + + [ "{{psij.debug}}" != "0" ] && _psij_update_status "LOG Stage $SOURCE -> $TARGET, mode: $MODE, missingok: $MISSING_OK" + + if [ ! -e "$SOURCE" ]; then + if [ "$MISSING_OK" == "0" ]; then + [ "{{psij.debug}}" != "0" ] && _psij_update_status "LOG Missing source file: $SOURCE" + _psij_fail 121 "Missing source file: $SOURCE" + else + [ "{{psij.debug}}" != "0" ] && _psij_update_status "LOG Skipping staging of missing file $SOURCE" + return 0 + fi + fi + + [ "{{psij.debug}}" != "0" ] && _psij_update_status "LOG Staging $SOURCE to $TARGET" + + TARGET_DIR=`dirname "$TARGET"` + + if [ "$TARGET_DIR" != "" ]; then + mkdir -p "$TARGET_DIR" + fi + + if [ -d "$TARGET" ] && [ ! -d "$SOURCE" ]; then + _psij_fail 121 "Target is a directory: $TARGET" + fi + + if [ "$MODE" == "1" ]; then + # copy + cp -r -T "$SOURCE" "$TARGET" || _psij_fail 121 "Failed to copy \"$SOURCE\" to \"$TARGET\"" + elif [ "$MODE" == "2" ]; then + # link + {{!we want the same semantics as cp and mv, which is "overwrite if exists"}} + {{!we resolve the source since it may be a path relative to the job dir}} + rm -f "$TARGET" + SOURCE=`readlink -m $SOURCE` + ln -s "$SOURCE" "$TARGET" || _psij_fail 121 "Failed to link \"$SOURCE\" to \"$TARGET\"" + elif [ "$MODE" == "3" ]; then + # move + mv -T -f "$SOURCE" "$TARGET" || _psij_fail 121 "Failed to move \"$SOURCE\" to \"$TARGET\"" + fi +} + +_FLAG_IF_PRESENT=1 +_FLAG_ON_SUCCESS=2 +_FLAG_ON_ERROR=4 +_FLAG_ON_CANCEL=8 + +_psij_do_stageout() { + SOURCE="$1" + TARGET="$2" + MODE="$3" + FLAGS="$4" + FAILED="$5" + SCHEME="$6" + HOSTPORT="$7" + + _psij_check_remote "$SCHEME" "$HOSTPORT" + + [ "{{psij.debug}}" != "0" ] && _psij_update_status "LOG do_stageout $SOURCE -> $TARGET, mode: $MODE, flags: $FLAGS, failed: $FAILED" + + if [ "$FAILED" == "0" ] && [ "$((FLAGS & _FLAG_ON_SUCCESS))" == "0" ]; then + return 0 + fi + if [ "$FAILED" != "0" ] && [ "$((FLAGS & _FLAG_ON_ERROR))" == "0" ]; then + return 0 + fi + _psij_do_stage "$SOURCE" "$TARGET" "$MODE" $((FLAGS & _FLAG_IF_PRESENT)) +} + +_psij_do_cleanup() { + TARGET="$1" + FLAGS="$2" + FAILED="$3" + + if [ "$FAILED" == "0" ] && [ "$((FLAGS & _FLAG_ON_SUCCESS))" == "0" ]; then + return 0 + fi + if [ "$FAILED" != "0" ] && [ "$((FLAGS & _FLAG_ON_ERROR))" == "0" ]; then + return 0 + fi + + TARGET=`readlink -m "$TARGET"` + DIR=`readlink -m "{{job.spec.directory}}"` + + [ "{{psij.debug}}" != "0" ] && _psij_update_status "LOG Cleaning up $TARGET" + + case "$TARGET" in + "$DIR"*) + rm -rf "$TARGET" + ;; + *) + _psij_fail 121 "Cannot clean $TARGET outside of job directory $DIR" + ;; + esac +} diff --git a/src/psij/executors/batch/common/cleanup.mustache b/src/psij/executors/batch/common/cleanup.mustache new file mode 100644 index 00000000..1f4a1cf2 --- /dev/null +++ b/src/psij/executors/batch/common/cleanup.mustache @@ -0,0 +1,5 @@ +_psij_update_status CLEANUP + +{{#job.spec.cleanup}} +_psij_do_cleanup {{.}} {{job.spec.cleanup_flags}} $_PSIJ_JOB_EC +{{/job.spec.cleanup}} diff --git a/src/psij/executors/batch/common/stagein.mustache b/src/psij/executors/batch/common/stagein.mustache new file mode 100644 index 00000000..ef02b3d4 --- /dev/null +++ b/src/psij/executors/batch/common/stagein.mustache @@ -0,0 +1,5 @@ +_psij_update_status STAGE_IN +{{#job.spec.stage_in}} +_psij_do_stagein "{{source.path}}" "{{target}}" {{mode}} \ + "{{{source.scheme}}}" "{{#source.hostname}}{{{.}}}{{#source.port}}:{{{.}}}{{/source.port}}{{/source.hostname}}" +{{/job.spec.stage_in}} diff --git a/src/psij/executors/batch/common/stageout.mustache b/src/psij/executors/batch/common/stageout.mustache new file mode 100644 index 00000000..a747e13e --- /dev/null +++ b/src/psij/executors/batch/common/stageout.mustache @@ -0,0 +1,5 @@ +_psij_update_status STAGE_OUT +{{#job.spec.stage_out}} +_psij_do_stageout "{{source}}" "{{target.path}}" {{mode}} {{flags}} $_PSIJ_JOB_EC \ + "{{{target.scheme}}}" "{{#target.hostname}}{{{.}}}{{#target.port}}:{{{.}}}{{/target.port}}{{/target.hostname}}" +{{/job.spec.stage_out}} diff --git a/src/psij/executors/batch/lsf/lsf.mustache b/src/psij/executors/batch/lsf/lsf.mustache index a8b60777..19c4cb80 100644 --- a/src/psij/executors/batch/lsf/lsf.mustache +++ b/src/psij/executors/batch/lsf/lsf.mustache @@ -75,15 +75,26 @@ only results in empty files that are not cleaned up}} #BSUB -e /dev/null #BSUB -o /dev/null -PSIJ_NODEFILE="$LSB_HOSTS" -export PSIJ_NODEFILE - {{!redirect output here instead of through #BSUB directive since LSB_JOBID is not available when the directives are evaluated; the reason for using the job id in the first place being the same as for the exit code file.}} exec &>> "{{psij.script_dir}}/$LSB_JOBID.out" + +{{> batch_lib}} + +PSIJ_NODEFILE="$LSB_HOSTS" +export PSIJ_NODEFILE + +{{> stagein}} +_psij_update_status ACTIVE + {{#psij.launch_command}}{{.}} {{/psij.launch_command}} +_PSIJ_JOB_EC=$? + +{{> stageout}} +{{> cleanup}} {{!we redirect to a file tied to the native ID so that we can reach the file with attach().}} -echo "$?" > "{{psij.script_dir}}/$LSB_JOBID.ec" +echo $_PSIJ_JOB_EC > "{{psij.script_dir}}/$LSB_JOBID.ec" +echo "_PSIJ_SCRIPT_DONE" \ No newline at end of file diff --git a/src/psij/executors/batch/nqsv/nqsv.mustache b/src/psij/executors/batch/nqsv/nqsv.mustache index a6184020..99031c47 100644 --- a/src/psij/executors/batch/nqsv/nqsv.mustache +++ b/src/psij/executors/batch/nqsv/nqsv.mustache @@ -72,13 +72,21 @@ do done < $PBS_NODEFILE export PSIJ_NODEFILE +{{> batch_lib}} + +{{> stagein}} +_psij_update_status ACTIVE + {{#psij.launch_command}}{{.}} {{/psij.launch_command}} -E=$? +_PSIJ_JOB_EC=$? + +{{> stageout}} +{{> cleanup}} {{!we redirect to a file tied to the native ID so that we can reach the file with attach().}} if [ "$J" = "0" ]; then - echo "$E" > "{{psij.script_dir}}/$ID.ec" + echo "$_PSIJ_JOB_EC" > "{{psij.script_dir}}/$ID.ec" fi exit $E diff --git a/src/psij/executors/batch/pbs/pbs_classic.mustache b/src/psij/executors/batch/pbs/pbs_classic.mustache index 90997b25..a987dfb2 100644 --- a/src/psij/executors/batch/pbs/pbs_classic.mustache +++ b/src/psij/executors/batch/pbs/pbs_classic.mustache @@ -54,17 +54,26 @@ only results in empty files that are not cleaned up}} export {{name}}={{value}} {{/env}} +exec &>> "{{psij.script_dir}}/$PBS_JOBID.out" + +{{> batch_lib}} + PSIJ_NODEFILE="$PBS_NODEFILE" export PSIJ_NODEFILE - {{#job.spec.directory}} cd "{{.}}" {{/job.spec.directory}} -exec &>> "{{psij.script_dir}}/$PBS_JOBID.out" +{{> stagein}} +_psij_update_status ACTIVE {{#psij.launch_command}}{{.}} {{/psij.launch_command}} +_PSIJ_JOB_EC=$? + +{{> stageout}} +{{> cleanup}} {{!we redirect to a file tied to the native ID so that we can reach the file with attach().}} -echo "$?" > "{{psij.script_dir}}/$PBS_JOBID.ec" +echo $_PSIJ_JOB_EC > "{{psij.script_dir}}/$PBS_JOBID.ec" +echo "_PSIJ_SCRIPT_DONE" \ No newline at end of file diff --git a/src/psij/executors/batch/pbs/pbspro.mustache b/src/psij/executors/batch/pbs/pbspro.mustache index 0617e2d0..a70b2b70 100644 --- a/src/psij/executors/batch/pbs/pbspro.mustache +++ b/src/psij/executors/batch/pbs/pbspro.mustache @@ -49,6 +49,8 @@ only results in empty files that are not cleaned up}} #PBS -e /dev/null #PBS -o /dev/null +exec &>> "{{psij.script_dir}}/$PBS_JOBID.out" + {{#job.spec.inherit_environment}} #PBS -V {{/job.spec.inherit_environment}} @@ -57,17 +59,24 @@ only results in empty files that are not cleaned up}} export {{name}}={{value}} {{/env}} +{{> batch_lib}} + PSIJ_NODEFILE="$PBS_NODEFILE" export PSIJ_NODEFILE - {{#job.spec.directory}} cd "{{.}}" {{/job.spec.directory}} -exec &>> "{{psij.script_dir}}/$PBS_JOBID.out" +{{> stagein}} +_psij_update_status ACTIVE {{#psij.launch_command}}{{.}} {{/psij.launch_command}} +_PSIJ_JOB_EC=$? + +{{> stageout}} +{{> cleanup}} {{!we redirect to a file tied to the native ID so that we can reach the file with attach().}} -echo "$?" > "{{psij.script_dir}}/$PBS_JOBID.ec" +echo $_PSIJ_JOB_EC > "{{psij.script_dir}}/$PBS_JOBID.ec" +echo "_PSIJ_SCRIPT_DONE" \ No newline at end of file diff --git a/src/psij/executors/batch/script_generator.py b/src/psij/executors/batch/script_generator.py index c33003ea..527bb12b 100644 --- a/src/psij/executors/batch/script_generator.py +++ b/src/psij/executors/batch/script_generator.py @@ -1,6 +1,7 @@ import pathlib from abc import ABC -from typing import Dict, Callable, IO +from enum import Enum +from typing import Dict, Callable, IO, Optional import pystache @@ -8,6 +9,16 @@ from .escape_functions import bash_escape +class _Renderer(pystache.Renderer): # type: ignore + def str_coerce(self, val: object) -> str: + if isinstance(val, Enum): + return str(val.value) + elif isinstance(val, bool): + return str(int(val)) + else: + return super().str_coerce(val) # type: ignore + + class SubmitScriptGenerator(ABC): """A base class representing a submit script generator. @@ -16,7 +27,7 @@ class SubmitScriptGenerator(ABC): script specific to a certain batch scheduler. """ - def __init__(self, config: JobExecutorConfig) -> None: + def __init__(self, config: Optional[JobExecutorConfig]) -> None: """ Parameters ---------- @@ -56,7 +67,7 @@ class TemplatedScriptGenerator(SubmitScriptGenerator): implementation of the Mustache templating language (https://mustache.github.io/). """ - def __init__(self, config: JobExecutorConfig, template_path: pathlib.Path, + def __init__(self, config: Optional[JobExecutorConfig], template_path: pathlib.Path, escape: Callable[[object], str] = bash_escape) -> None: """ Parameters @@ -72,7 +83,8 @@ def __init__(self, config: JobExecutorConfig, template_path: pathlib.Path, super().__init__(config) with template_path.open('r') as template_file: self.template = pystache.parse(template_file.read()) - self.renderer = pystache.Renderer(escape=escape) + common_dir = pathlib.Path(__file__).parent / 'common' + self.renderer = _Renderer(escape=escape, search_dirs=[str(common_dir)]) def generate_submit_script(self, job: Job, context: Dict[str, object], out: IO[str]) -> None: """See :func:`~SubmitScriptGenerator.generate_submit_script`. diff --git a/src/psij/executors/batch/slurm/slurm.mustache b/src/psij/executors/batch/slurm/slurm.mustache index a45b1b55..8d943d40 100644 --- a/src/psij/executors/batch/slurm/slurm.mustache +++ b/src/psij/executors/batch/slurm/slurm.mustache @@ -92,6 +92,14 @@ _PSIJ_PPN={{.}} {{/processes_per_node}} {{/job.spec.resources}} +{{!redirect output here instead of through #SBATCH directive since SLURM_JOB_ID is not available +when the directives are evaluated; the reason for using the job id in the first place being the +same as for the exit code file.}} +exec &>> "{{psij.script_dir}}/$SLURM_JOB_ID.out" + + +{{> batch_lib}} + _PSIJ_NC=`scontrol show hostnames | wc -l` {{!Unlike PBS, Slurm only lists the nodes once in the nodelist, so, to bring it to uniform PBS @@ -110,14 +118,15 @@ else fi export PSIJ_NODEFILE - - -{{!redirect output here instead of through #SBATCH directive since SLURM_JOB_ID is not available -when the directives are evaluated; the reason for using the job id in the first place being the -same as for the exit code file.}} -exec &>> "{{psij.script_dir}}/$SLURM_JOB_ID.out" +{{> stagein}} +_psij_update_status ACTIVE {{#psij.launch_command}}{{.}} {{/psij.launch_command}} +_PSIJ_JOB_EC=$? + +{{> stageout}} +{{> cleanup}} {{!we redirect to a file tied to the native ID so that we can reach the file with attach().}} -echo "$?" > "{{psij.script_dir}}/$SLURM_JOB_ID.ec" +echo $_PSIJ_JOB_EC > "{{psij.script_dir}}/$SLURM_JOB_ID.ec" +echo "_PSIJ_SCRIPT_DONE" \ No newline at end of file diff --git a/src/psij/executors/local.py b/src/psij/executors/local.py index 8393b384..976217e1 100644 --- a/src/psij/executors/local.py +++ b/src/psij/executors/local.py @@ -7,6 +7,7 @@ import threading import time from abc import ABC, abstractmethod +from pathlib import Path from tempfile import mkstemp from types import FrameType from typing import Optional, Dict, List, Tuple, Type, cast @@ -16,7 +17,12 @@ from psij import InvalidJobException, SubmitException, Launcher, ResourceSpecV1 from psij import Job, JobSpec, JobExecutorConfig, JobState, JobStatus from psij import JobExecutor -from psij.utils import SingletonThread +from psij.executors.batch.batch_scheduler_executor import _env_to_mustache +from psij.executors.batch.script_generator import TemplatedScriptGenerator +from psij.utils import SingletonThread, _StatusUpdater, _FileCleaner + +from psij.executors.batch.template_function_library import ALL as FUNCTION_LIBRARY + logger = logging.getLogger(__name__) @@ -58,10 +64,13 @@ def __init__(self, job: Job, executor: 'LocalJobExecutor', launcher: Optional[La @abstractmethod def kill(self) -> None: assert self.process is not None - root = psutil.Process(self.process.pid) - for proc in root.children(recursive=True): - proc.kill() - self.process.kill() + try: + root = psutil.Process(self.process.pid) + for proc in root.children(recursive=True): + proc.kill() + self.process.kill() + except psutil.NoSuchProcess: + pass @abstractmethod def poll(self) -> Tuple[Optional[int], Optional[str]]: @@ -117,32 +126,6 @@ def poll(self) -> Tuple[Optional[int], Optional[str]]: return None, None -def _get_env(spec: JobSpec, nodefile: Optional[str]) -> Optional[Dict[str, str]]: - env: Optional[Dict[str, str]] = None - if spec.inherit_environment: - if spec.environment is None and nodefile is None: - # if env is none in Popen, it inherits env from parent - return None - else: - # merge current env with spec env - env = os.environ.copy() - if spec.environment: - env.update(spec.environment) - if nodefile is not None: - env['PSIJ_NODEFILE'] = nodefile - return env - else: - # only spec env - if nodefile is None: - env = spec.environment - else: - env = {'PSIJ_NODEFILE': nodefile} - if spec.environment: - env.update(spec.environment) - - return env - - class _ProcessReaper(SingletonThread): @classmethod @@ -170,8 +153,8 @@ def run(self) -> None: jobs = dict(self._jobs) try: done = self._check_processes(jobs) - except Exception as ex: - logger.error('Error polling for process status', ex) + except Exception: + logger.exception('Error polling for process status.') with self._cvar: self._cvar.wait(_REAPER_SLEEP_TIME) @@ -204,6 +187,7 @@ def _check_processes(self, jobs: Dict[Job, _ProcessEntry]) -> List[_ProcessEntry entry.exit_code = exit_code entry.done_time = time.time() entry.out = out + logger.debug('Output from job: %s' % out) done.append(entry) for entry in done: @@ -246,6 +230,12 @@ def __init__(self, url: Optional[str] = None, """ super().__init__(url=url, config=config if config else JobExecutorConfig()) self._reaper = _ProcessReaper.get_instance() + self._work_dir = Path.home() / '.psij' / 'work' / 'local' + self._work_dir.mkdir(parents=True, exist_ok=True) + cast(_FileCleaner, _FileCleaner.get_instance()).clean(self._work_dir) + self._status_updater = cast(_StatusUpdater, _StatusUpdater.get_instance()) + self.generator = TemplatedScriptGenerator(config, Path(__file__).parent / 'local' + / 'local.mustache') def _generate_nodefile(self, job: Job, p: _ChildProcessEntry) -> Optional[str]: assert job.spec is not None @@ -283,24 +273,44 @@ def submit(self, job: Job) -> None: p = _ChildProcessEntry(job, self, self._get_launcher(self._get_launcher_name(spec))) assert p.launcher - args = p.launcher.get_launch_command(job) - + launch_command = p.launcher.get_launch_command(job) + + nodefile = self._generate_nodefile(job, p) + ctx = { + 'job': job, + 'env': _env_to_mustache(job), + 'psij': { + 'lib': FUNCTION_LIBRARY, + 'launch_command': launch_command, + 'script_dir': str(self._work_dir), + 'us_file': self._status_updater.update_file_name, + 'us_port': self._status_updater.update_port, + 'us_addrs': '127.0.0.1', + 'debug': logger.isEnabledFor(logging.DEBUG), + 'nodefile': nodefile + } + } + + submit_file_path = self._work_dir / (job.id + '.job') + with submit_file_path.open('w') as submit_file: + self.generator.generate_submit_script(job, ctx, submit_file) + + args = ['/bin/bash', str(submit_file_path.absolute())] + self._status_updater.register_job(job, self) try: with job._status_cv: if job.status.state == JobState.CANCELED: raise SubmitException('Job canceled') if logger.isEnabledFor(logging.DEBUG): logger.debug('Running %s', _format_shell_cmd(args)) - nodefile = self._generate_nodefile(job, p) - env = _get_env(spec, nodefile) p.process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, - close_fds=True, cwd=spec.directory, env=env) + close_fds=True, cwd=spec.directory) self._reaper.register(p) job._native_id = p.process.pid self._set_job_status(job, JobStatus(JobState.QUEUED, time=time.time(), metadata={'nativeId': job._native_id})) - self._set_job_status(job, JobStatus(JobState.ACTIVE, time=time.time())) except Exception as ex: + self._status_updater.unregister_job(job) raise SubmitException('Failed to submit job', exception=ex) def cancel(self, job: Job) -> None: @@ -309,11 +319,13 @@ def cancel(self, job: Job) -> None: :param job: The job to cancel. """ + self._status_updater.unregister_job(job) self._set_job_status(job, JobStatus(JobState.CANCELED)) self._reaper.cancel(job) def _process_done(self, p: _ProcessEntry) -> None: assert p.exit_code is not None + logger.debug('%s Process done. EC: %s', p.job.id, p.exit_code) message = None if p.exit_code == 0: state = JobState.COMPLETED @@ -324,13 +336,26 @@ def _process_done(self, p: _ProcessEntry) -> None: # the exit code of the launcher is the exit code of the job, we must use a different # mechanism to distinguish between job errors and launcher errors. So we delegate to # the launcher implementation to figure out if the error belongs to the job or not - if p.launcher and p.out and p.launcher.is_launcher_failure(p.out): - message = p.launcher.get_launcher_failure_message(p.out) + if p.out and '_PSIJ_SCRIPT_DONE' not in p.out: + message = p.out state = JobState.FAILED + if state.final: + self._clean_submit_file(p.job) + # We need to ensure that the status updater has processed all updates that + # have been sent up to this point + self._status_updater.flush() + self._status_updater.unregister_job(p.job) self._set_job_status(p.job, JobStatus(state, time=p.done_time, exit_code=p.exit_code, message=message)) + def _clean_submit_file(self, job: Job) -> None: + submit_file_path = self._work_dir / (job.id + '.job') + try: + submit_file_path.unlink() + except FileNotFoundError: + pass + def list(self) -> List[str]: """ Return a list of ids representing jobs that are running on the underlying implementation. @@ -364,6 +389,8 @@ def attach(self, job: Job, native_id: str) -> None: job.executor = self pid = int(native_id) + job._native_id = pid + self._status_updater.register_job(job, self) self._reaper.register(_AttachedProcessEntry(job, psutil.Process(pid), self)) # We assume that the native_id above is a PID that was obtained at some point using # list(). If so, the process is either still running or has completed. Either way, we must diff --git a/src/psij/executors/local/local.mustache b/src/psij/executors/local/local.mustache new file mode 100644 index 00000000..ecd1142e --- /dev/null +++ b/src/psij/executors/local/local.mustache @@ -0,0 +1,26 @@ +#!/bin/bash + +set -e + +{{> batch_lib}} + +{{#job.spec.directory}} +cd "{{.}}" +{{/job.spec.directory}} + +{{> stagein}} +_psij_update_status ACTIVE + +set +e +{{#job.spec.inherit_environment}}env \{{/job.spec.inherit_environment}}{{^job.spec.inherit_environment}}env --ignore-environment \{{/job.spec.inherit_environment}}{{#env}} +{{name}}="{{value}}" \{{/env}} +PSIJ_NODEFILE={{psij.nodefile}} \ +{{#psij.launch_command}}{{.}} {{/psij.launch_command}} +_PSIJ_JOB_EC="$?" +set -e + +{{> stageout}} +{{> cleanup}} + +echo "_PSIJ_SCRIPT_DONE" +exit "$_PSIJ_JOB_EC" \ No newline at end of file diff --git a/src/psij/job_launcher.py b/src/psij/job_launcher.py index c156cdc2..1e7aae3b 100644 --- a/src/psij/job_launcher.py +++ b/src/psij/job_launcher.py @@ -34,43 +34,6 @@ def get_launch_command(self, job: Job) -> List[str]: """ pass - @abstractmethod - def is_launcher_failure(self, output: str) -> bool: - """ - Determines whether the launcher invocation output contains a launcher failure or not. - - Parameters - ---------- - output - The output (combined stdout/stderr) from an invocation of the launcher command - - Returns - ------- - Returns `True` if the `output` parameter contains a string that represents a launncher - failure. - """ - pass - - @abstractmethod - def get_launcher_failure_message(self, output: str) -> str: - """ - Extracts the launcher error message from the output of this launcher's invocation. - - It is understood that the value of the `output` parameter is such that - :meth:`is_launcher_failure` returns `True` on it. - - Parameters - ---------- - output - The output (combined stdout/stderr) from an invocation of the launcher command. - - Returns - ------- - A string representing the part of the launcher output that describes the launcher - error. - """ - pass - @staticmethod def get_instance(name: str, version_constraint: Optional[str] = None, config: Optional[JobExecutorConfig] = None) -> 'Launcher': diff --git a/src/psij/job_spec.py b/src/psij/job_spec.py index e3585d0d..ff16c26d 100644 --- a/src/psij/job_spec.py +++ b/src/psij/job_spec.py @@ -3,19 +3,25 @@ # for some reason, Sphinx cannot find Path if imported directly # from pathlib import Path import pathlib -from typing import Dict, List, Optional, Union +from typing import Dict, List, Optional, Union, Set from typeguard import typechecked import psij.resource_spec import psij.job_attributes +from psij.staging import StageIn, StageOut, StageOutFlags def _to_path(arg: Union[str, pathlib.Path, None]) -> Optional[pathlib.Path]: + if arg is None: + return None + else: + return _to_path_strict(arg) + + +def _to_path_strict(arg: Union[str, pathlib.Path]) -> pathlib.Path: if isinstance(arg, pathlib.Path): return arg - elif arg is None: - return None else: assert isinstance(arg, str) return pathlib.Path(arg) @@ -33,6 +39,12 @@ def _to_env_dict(arg: Union[Dict[str, Union[str, int]], None]) -> Optional[Dict[ return ret +def _all_to_path(s: Optional[Set[Union[str, pathlib.Path]]]) -> Optional[Set[pathlib.Path]]: + if s is None: + return None + return {_to_path_strict(x) for x in s if x is not None} + + class JobSpec(object): """A class that describes the details of a job.""" @@ -55,7 +67,11 @@ def __init__(self, executable: Optional[str] = None, arguments: Optional[List[st attributes: Optional[psij.job_attributes.JobAttributes] = None, pre_launch: Union[str, pathlib.Path, None] = None, post_launch: Union[str, pathlib.Path, None] = None, - launcher: Optional[str] = None): + launcher: Optional[str] = None, + stage_in: Optional[Set[StageIn]] = None, + stage_out: Optional[Set[StageOut]] = None, + cleanup: Optional[Set[Union[str, pathlib.Path]]] = None, + cleanup_flags: StageOutFlags = StageOutFlags.ALWAYS): """ :param executable: An executable, such as "/bin/date". :param arguments: The argument list to be passed to the executable. Unlike with execve(), @@ -89,6 +105,14 @@ def __init__(self, executable: Optional[str] = None, arguments: Optional[List[st node as the pre-launch script. :param launcher: The name of a launcher to use, such as "mpirun", "srun", "single", etc. For a list of available launchers, see :ref:`Available Launchers `. + :param stage_in: Specifies a set of files to be staged in before the job is launched. + :param stage_out: Specifies a set of files to be staged out after the job terminates. + :param cleanup: Specifies a set of files to remove after the stage out process. + :param cleanup_flags: Specifies the conditions under which the files in `cleanup` should + be removed, such as when the job completes successfully. The flag + `StageOutFlags.IF_PRESENT` is ignored and no error condition is triggered if a file + specified by the `cleanup` argument is not present. + All constructor parameters are accessible as properties. @@ -147,6 +171,10 @@ def __init__(self, executable: Optional[str] = None, arguments: Optional[List[st self._pre_launch = _to_path(pre_launch) self._post_launch = _to_path(post_launch) self.launcher = launcher + self.stage_in = stage_in + self.stage_out = stage_out + self._cleanup = _all_to_path(cleanup) + self.cleanup_flags = cleanup_flags # TODO: `resources` is of type `ResourceSpec`, not `ResourceSpecV1`. An # connector trying to access `job.spec.resources.process_count` @@ -238,6 +266,15 @@ def post_launch(self) -> Optional[pathlib.Path]: def post_launch(self, post_launch: Union[str, pathlib.Path, None]) -> None: self._post_launch = _to_path(post_launch) + @property + def cleanup(self) -> Optional[Set[pathlib.Path]]: + """An optional set of cleanup directives.""" + return self._cleanup + + @cleanup.setter + def cleanup(self, cleanup: Set[Union[str, pathlib.Path]]) -> None: + self._cleanup = _all_to_path(cleanup) + def __eq__(self, o: object) -> bool: """ Tests if this JobSpec is equal to another. @@ -250,7 +287,8 @@ def __eq__(self, o: object) -> bool: for prop_name in ['name', 'executable', 'arguments', 'directory', 'inherit_environment', 'environment', 'stdin_path', 'stdout_path', 'stderr_path', 'resources', - 'attributes', 'pre_launch', 'post_launch', 'launcher']: + 'attributes', 'pre_launch', 'post_launch', 'launcher', 'stage_in', + 'stage_out', 'cleanup', 'cleanup_flags']: if getattr(self, prop_name) != getattr(o, prop_name): return False diff --git a/src/psij/job_state.py b/src/psij/job_state.py index b028f456..701035e8 100644 --- a/src/psij/job_state.py +++ b/src/psij/job_state.py @@ -2,6 +2,9 @@ from typing import Optional +_NAME_MAP = {} + + class JobState(bytes, Enum): """ An enumeration holding the possible job states. @@ -16,6 +19,7 @@ def __new__(cls, index: int, order: int, name: str, final: bool) -> 'JobState': obj._order = order obj._name = name obj._final = final + _NAME_MAP[name] = obj return obj def __init__(self, *args: object) -> None: # noqa: D107 @@ -34,19 +38,32 @@ def __init__(self, *args: object) -> None: # noqa: D107 This is the state of the job after being accepted by a backend for execution, but before the execution of the job begins. """ - ACTIVE = (2, 2, 'ACTIVE', False) + STAGE_IN = (2, 2, 'STAGE_IN', False) + """ + This state indicates that the job is staging files in, in preparation for execution. + """ + ACTIVE = (3, 3, 'ACTIVE', False) """This state represents an actively running job.""" - COMPLETED = (3, 3, 'COMPLETED', True) + STAGE_OUT = (4, 4, 'STAGE_OUT', False) + """ + This state indicates that the executable has finished running and that files are being staged + out. + """ + CLEANUP = (5, 5, 'CLEANUP', False) + """ + This state indicates that cleanup is actively being done for this job. + """ + COMPLETED = (6, 6, 'COMPLETED', True) """ This state represents a job that has completed *successfully* (i.e., with a zero exit code). In other words, a job with the executable set to `/bin/false` cannot enter this state. """ - FAILED = (4, 3, 'FAILED', True) + FAILED = (7, 6, 'FAILED', True) """ Represents a job that has either completed unsuccessfully (with a non-zero exit code) or a job whose handling and/or execution by the backend has failed in some way. """ - CANCELED = (5, 3, 'CANCELED', True) + CANCELED = (8, 6, 'CANCELED', True) """Represents a job that was canceled by a call to :func:`~psij.Job.cancel()`.""" def is_greater_than(self, other: 'JobState') -> Optional[bool]: @@ -112,6 +129,28 @@ def __hash__(self) -> int: """Returns a hash for this object.""" return self._value_ # type: ignore + @staticmethod + def from_name(name: str) -> 'JobState': + """ + Returns a `JobState` object corresponding to its string representation. + + This method is such that `state == JobState.from_name(str(state))`. + """ + return _NAME_MAP[name] + + +_PREV_STATE = { + JobState.NEW: None, + JobState.QUEUED: JobState.NEW, + JobState.STAGE_IN: JobState.QUEUED, + JobState.ACTIVE: JobState.STAGE_IN, + JobState.STAGE_OUT: JobState.ACTIVE, + JobState.CLEANUP: JobState.STAGE_OUT, + JobState.COMPLETED: JobState.CLEANUP, + JobState.FAILED: None, + JobState.CANCELED: None +} + class JobStateOrder: """A class that can be used to reconstruct missing states.""" @@ -125,10 +164,4 @@ def prev(state: JobState) -> Optional[JobState]: previous state. For example, the FAILED state does not have a previous state, since it can be reached from multiple states. """ - if state == JobState.COMPLETED: - return JobState.ACTIVE - if state == JobState.ACTIVE: - return JobState.QUEUED - if state == JobState.QUEUED: - return JobState.NEW - return None + return _PREV_STATE[state] diff --git a/src/psij/launcher.py b/src/psij/launcher.py index 1aab4a0c..29e855ea 100644 --- a/src/psij/launcher.py +++ b/src/psij/launcher.py @@ -36,44 +36,6 @@ def get_launch_command(self, job: Job) -> List[str]: """ pass - @abstractmethod - def is_launcher_failure(self, output: str) -> bool: - """ - Determines whether the launcher invocation output contains a launcher failure or not. - - Parameters - ---------- - output - The output (combined stdout/stderr) from an invocation of the launcher command - - Returns - ------- - Returns `True` if the output of the launcher indicates that it has exited with a - non-zero exit code due to an error occurring in the launcher. - - """ - pass - - @abstractmethod - def get_launcher_failure_message(self, output: str) -> str: - """ - Extracts the launcher error message from the output of this launcher's invocation. - - It is understood that the output is such that - :func:`~psij.launcher.Launcher.is_launcher_failure` returns `True` on it. - - Parameters - ---------- - output - The output (combined stdout/stderr) from an invocation of the launcher command. - - Returns - ------- - A string representing the part of the launcher output that describes the launcher - error. - """ - pass - @staticmethod def get_instance(name: str, version_constraint: Optional[str] = None, config: Optional[JobExecutorConfig] = None) -> 'Launcher': diff --git a/src/psij/launchers/script_based_launcher.py b/src/psij/launchers/script_based_launcher.py index 505b3c0e..45519ff7 100644 --- a/src/psij/launchers/script_based_launcher.py +++ b/src/psij/launchers/script_based_launcher.py @@ -197,14 +197,3 @@ def get_additional_args(self, job: Job) -> List[str]: :param job: The job that is being launched. """ return [] - - def is_launcher_failure(self, output: str) -> bool: - """See :func:`~psij.Launcher.is_launcher_failure`.""" - return output.split('\n')[-2] != '_PSI_J_LAUNCHER_DONE' - - def get_launcher_failure_message(self, output: str) -> str: - """See :func:`~psij.Launcher.get_launcher_failure_message`.""" - # If, according to the above, it is a launcher failure, then - # the magic line should not be present (aka, all of the output - # is the failure). - return output diff --git a/src/psij/launchers/scripts/aprun_launch.sh b/src/psij/launchers/scripts/aprun_launch.sh index 8134f575..bd586ade 100644 --- a/src/psij/launchers/scripts/aprun_launch.sh +++ b/src/psij/launchers/scripts/aprun_launch.sh @@ -16,5 +16,4 @@ log "Command done: $_PSI_J_EC" post_launch -echo "_PSI_J_LAUNCHER_DONE" exit $_PSI_J_EC diff --git a/src/psij/launchers/scripts/jsrun_launch.sh b/src/psij/launchers/scripts/jsrun_launch.sh index 73efe3d8..80fdcff4 100755 --- a/src/psij/launchers/scripts/jsrun_launch.sh +++ b/src/psij/launchers/scripts/jsrun_launch.sh @@ -16,5 +16,4 @@ log "Command done: $_PSI_J_EC" post_launch -echo "_PSI_J_LAUNCHER_DONE" exit $_PSI_J_EC diff --git a/src/psij/launchers/scripts/mpi_launch.sh b/src/psij/launchers/scripts/mpi_launch.sh index 7c01f89c..b23222db 100644 --- a/src/psij/launchers/scripts/mpi_launch.sh +++ b/src/psij/launchers/scripts/mpi_launch.sh @@ -42,5 +42,4 @@ log "Command done: $_PSI_J_EC" post_launch -echo "_PSI_J_LAUNCHER_DONE" exit $_PSI_J_EC diff --git a/src/psij/launchers/scripts/multi_launch.sh b/src/psij/launchers/scripts/multi_launch.sh index 9311ba0c..2c9f3213 100644 --- a/src/psij/launchers/scripts/multi_launch.sh +++ b/src/psij/launchers/scripts/multi_launch.sh @@ -29,5 +29,4 @@ log "All completed" post_launch -echo "_PSI_J_LAUNCHER_DONE" exit $_PSI_J_FAILED_EC diff --git a/src/psij/launchers/scripts/single_launch.sh b/src/psij/launchers/scripts/single_launch.sh index 112609b0..88e5d235 100644 --- a/src/psij/launchers/scripts/single_launch.sh +++ b/src/psij/launchers/scripts/single_launch.sh @@ -12,5 +12,4 @@ log "Command done: $_PSI_J_EC" post_launch -echo "_PSI_J_LAUNCHER_DONE" exit $_PSI_J_EC diff --git a/src/psij/launchers/scripts/srun_launch.sh b/src/psij/launchers/scripts/srun_launch.sh index d3808f6d..d0ac39ca 100644 --- a/src/psij/launchers/scripts/srun_launch.sh +++ b/src/psij/launchers/scripts/srun_launch.sh @@ -16,5 +16,4 @@ log "Command done: $_PSI_J_EC" post_launch -echo "_PSI_J_LAUNCHER_DONE" exit $_PSI_J_EC diff --git a/src/psij/staging.py b/src/psij/staging.py new file mode 100644 index 00000000..4042feae --- /dev/null +++ b/src/psij/staging.py @@ -0,0 +1,336 @@ +from urllib.parse import urlparse +from enum import Enum, Flag +from pathlib import Path +from typing import Optional, Union + + +class URI: + """A class representing a local or remote file.""" + + def __init__(self, urlstring: str) -> None: + """ + Parameters + ---------- + urlstring + A string representation of a URI, such as "http://example.com/file.txt" or "file.txt". + The precise format of an URI string is defined in + `RFC3986 `_. + """ + self.parts = urlparse(urlstring) + + # a __getattr__ solution may be simpler, but doesn't play well with IDEs and + # is not quite self-documenting + @property + def hostname(self) -> Optional[str]: + """ + Returns + ------- + Represents the hostname in this URI or `None` if no hostname was specified. + """ + return self.parts.hostname + + @property + def port(self) -> Optional[int]: + """ + Returns + ------- + Returns the TCP port of this URI or None if a port was not specified. + """ + return self.parts.port + + @property + def scheme(self) -> str: + """ + Returns + ------- + Returns the URI scheme in this URI or the empty string if no scheme was specified. + """ + return self.parts.scheme + + @property + def netloc(self) -> str: + """ + Returns + ------- + Returns the network location, which may the host name, the port, and possibly login + information. If none of these are specified, the empty string is returned. + """ + return self.parts.netloc + + @property + def path(self) -> str: + """ + Returns + ------- + Returns the path in this URI or an empty string if no path was specified. + """ + return self.parts.path + + @property + def params(self) -> str: + """ + Returns + ------- + Returns the URI parameters or an empty string if there are no parameters. + """ + return self.parts.params + + @property + def query(self) -> str: + """ + Returns + ------- + Returns the URI query string or an empty string if no query string was specified. + """ + return self.parts.query + + @property + def fragment(self) -> str: + """ + Returns + ------- + Returns the fragment in this URI or the empty string if no fragment is specified. + """ + return self.parts.fragment + + @property + def username(self) -> Optional[str]: + """ + Returns + ------- + Returns the username in this URI if any, or None if there is no username specified. + """ + return self.parts.username + + @property + def password(self) -> Optional[str]: + """ + Returns + ------- + Returns the password specified in this URI or None if there is no password. + """ + return self.parts.password + + def __str__(self) -> str: + """Returns a string representation of this URI.""" + return self.parts.geturl() + + def __eq__(self, other: object) -> bool: + """ + Tests if the parameter `other` is equal to this `URI`. + + Returns `True` if `other` is a `URI` and if it represents the same + resource as this `URI`. + """ + if isinstance(other, URI): + return self.parts == other.parts + else: + return False + + def __hash__(self) -> int: + """Computes a hash of this object.""" + return hash(self.parts) + + +class StagingMode(Enum): + """ + Defines the possible modes in which the staging of a file can be done. + + JobExecutor implementations are not required to support all staging modes, but must default + to `COPY` if other modes are not implemented. Furthermore, modes different from `COPY` may only + make sense when staging is done locally. + """ + + COPY = 1 + """ + Copies the file to be staged by performing an operation that is equivalent to the familiar + `cp` command. + """ + LINK = 2 + """ + Creates a symbolic link instead of copying the contents of files. + """ + MOVE = 3 + """ + Moves a file instead of copying it. Moving a file can be nearly instantaneous if both the + source and the destination are on the same filesystem. However, the OS will likely have to + resort to copying the contents of the file and the removing the source file if the source and + destination are on different filesystems, so it is unlikely for this mode to be beneficial over + a `COPY`. + """ + + +class StageOutFlags(Flag): + """ + Specifies a set of flags that can be used to alter stage out behavior. + + The flags can be combined using the bitwise or operator (`|`). For example, + `IF_PRESENT | ON_ERROR`. If none of the state conditions + (`ON_SUCCESS`, `ON_ERROR`, `ON_CANCEL`) are specified, it is assumed that the file should be + transferred in all cases, subject to the presence of the `IF_PRESENT` flag. That is, + `NONE` is equivalent to `ALWAYS` or `ON_SUCCESS | ON_ERROR | ON_CANCEL`, while + `IF_PRESENT` is equivalent to `IF_PRESENT | ALWAYS`. + """ + + NONE = 0 + """ + Indicates that no flags are set. This is equivalent to `ALWAYS`. + """ + IF_PRESENT = 1 + """ + Indicates that a file should only be transferred if it exists. If the file does not exist, + the stageout operation continues with the next file. If this flag is not set for a given file, + its absence will result in a stageout error which will cause the job to fail. + """ + ON_SUCCESS = 2 + """ + Indicates that a file should be transferred when the job succeeds (i.e., its exit code is zero). + If a job fails or is cancelled, and no other flags are set, the executor will not attempt to + stage out the file. + """ + ON_ERROR = 4 + """ + Indicates that a stageout should be performed if the job has failed (i.e., its exit code is + non-zero). + """ + ON_CANCEL = 8 + """ + Indicates that a file should be staged out if the job has been canceled. + """ + ALWAYS = ON_SUCCESS | ON_ERROR | ON_CANCEL + """ + Indicates that a file should be staged out irrespective of the status of the job. + """ + + +class StageIn: + """A class representing a stagein directive.""" + + def __init__(self, source: Union[URI, Path, str], target: Union[str, Path], + mode: StagingMode = StagingMode.COPY) -> None: + """ + Parameters + ---------- + source + The source location of the stagein. If the source is a string or a : + class:`~pathlib.Path`, the location refers to a file on a filesystem accessible by the + process in which PSI/J is running. If the path is relative, it is interpreted to be + relative to the current working directory of the process in which PSI/J is running and + normalized to an absolute path. If the source is a :class:`.URI`, it may refer to a + remote location. Support for remote staging is not guaranteed and depends on the + implementation of the :class:`~psij.JobExecutor` that the job to which this stagein + directive belongs is submitted to. + target + The target location for the stagein, which can be either a string or a + :class:`~pathlib.Path`. If the path is relative, it is considered to be relative to the + job directory. That is, a job can access this file at the location specified by + `target` if it does not change its working directory from the one it starts in. + mode + A staging mode, which indicates how the staging is done. For details, see + :class:`.StagingMode`. + """ + if isinstance(source, str): + source = URI(source) + if isinstance(source, Path): + source = URI(str(source)) + if isinstance(target, str): + target = Path(target) + self.source = source + self.target = target + self.mode = mode + + def __str__(self) -> str: + """Returns a string representation of this object.""" + return 'StageIn[%s -> %s, %s]' % (self.source, self.target, self.mode) + + def __eq__(self, other: object) -> bool: + """Compares `other` to this object.""" + if isinstance(other, StageIn): + return (self.source == other.source and self.target == other.target + and self.mode == other.mode) + else: + return False + + def __hash__(self) -> int: + """Computes a hash of this object.""" + return (hash(self.source) << 16) + (hash(self.target) << 8) + hash(self.mode) + + +def _normalize_flags(flags: StageOutFlags) -> StageOutFlags: + if (flags & StageOutFlags.ALWAYS).value == 0: + return flags | StageOutFlags.ALWAYS + else: + return flags + + +class StageOut: + """A class encapsulating a stageout directive.""" + + def __init__(self, source: Union[str, Path], target: Union[str, Path, URI], + flags: StageOutFlags = StageOutFlags.ALWAYS, + mode: StagingMode = StagingMode.COPY): + """ + Parameters + ---------- + source + The source location for the stagein, which can be either a string or a + :class:`~pathlib.Path`. If the path is relative, it is considered to be relative to the + job directory. + target + The target location of the stageout. If the target is a string or a + :class:`~pathlib.Path`, the location refers to a file on a filesystem accessible by the + process in which PSI/J is running. If the path is relative, it is interpreted to be + relative to the current working directory of the process in which PSI/J is running and + normalized to an absolute path. If the target is a :class:`.URI`, it may refer to a + remote location. Support for remote staging is not guaranteed and depends on the + implementation of the :class:`~psij.JobExecutor` that the job to which this stageout + directive belongs is submitted to. + flags + A set of flags specifying the conditions under which the stageout should occur. For + details, see :class:`.StageOutFlags`. + mode + A staging mode, which indicates how the staging is done. For details, see + :class:`.StagingMode`. + """ + if isinstance(source, str): + source = Path(source) + if isinstance(target, str): + target = URI(target) + if isinstance(target, Path): + target = URI(str(target)) + + print(target.parts) + self.source = source + self.target = target + self.flags = flags + self.mode = mode + + @property + def flags(self) -> StageOutFlags: + """ + A set of flags specifying the conditions under which the stageout should occur. + + For details, see :class:`.StageOutFlags`. + """ + return self._flags + + @flags.setter + def flags(self, flags: StageOutFlags) -> None: + self._flags = _normalize_flags(flags) + + def __str__(self) -> str: + """Returns a string representation of this object.""" + return 'StageOut[%s -> %s, %s, %s]' % (self.source, self.target, self.flags, self.mode) + + def __eq__(self, other: object) -> bool: + """Compares `other` to this object.""" + if isinstance(other, StageOut): + return (self.source == other.source and self.target == other.target + and self.mode == other.mode and self.flags == other.flags) + else: + return False + + def __hash__(self) -> int: + """Computes a hash of this object.""" + return ((hash(self.source) << 24) + (hash(self.target) << 16) + (hash(self.mode) << 8) + + hash(self.flags)) diff --git a/src/psij/utils.py b/src/psij/utils.py index 0db4cde7..47357b35 100644 --- a/src/psij/utils.py +++ b/src/psij/utils.py @@ -1,6 +1,25 @@ +import atexit +import io +import logging import os +import queue +import random +import socket +import tempfile import threading -from typing import Type, Dict, Optional +import time +from datetime import datetime, timedelta +from pathlib import Path +from typing import Type, Dict, Optional, Tuple, Set, List + +import psutil + +from psij import JobExecutor, Job, JobState, JobStatus + +logger = logging.getLogger(__name__) + + +_MAX_FILE_AGE_DAYS = 30 class SingletonThread(threading.Thread): @@ -16,7 +35,7 @@ class SingletonThread(threading.Thread): the `run` method. """ - _instances: Dict[int, 'SingletonThread'] = {} + _instances: Dict[int, Dict[type, 'SingletonThread']] = {} _lock = threading.RLock() def __init__(self, name: Optional[str] = None, daemon: bool = False) -> None: @@ -42,8 +61,188 @@ def get_instance(cls: Type['SingletonThread']) -> 'SingletonThread': """ with cls._lock: my_pid = os.getpid() - if my_pid not in cls._instances: + if my_pid in cls._instances: + classes = cls._instances[my_pid] + else: + classes = {} + cls._instances[my_pid] = classes + if cls in classes: + return classes[cls] + else: instance = cls() - cls._instances[my_pid] = instance + classes[cls] = instance instance.start() - return cls._instances[my_pid] + return instance + + +class _StatusUpdater(SingletonThread): + # we are expecting short messages in the form + RECV_BUFSZ = 2048 + + def __init__(self) -> None: + super().__init__() + self.name = 'Status Update Thread' + self.daemon = True + self.work_directory = Path.home() / '.psij' + self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.socket.setblocking(True) + self.socket.settimeout(0.5) + self.socket.bind(('', 0)) + self.update_port = self.socket.getsockname()[1] + self.ips = self._get_ips() + logger.debug('Local IPs: %s' % self.ips) + logger.debug('Status updater port: %s' % self.update_port) + self._create_update_file() + logger.debug('Update file: %s' % self.update_file.name) + self.partial_file_data = '' + self.partial_net_data = '' + self._jobs: Dict[str, Tuple[Job, JobExecutor]] = {} + self._jobs_lock = threading.RLock() + self._sync_ids: Set[str] = set() + self._last_received = '' + + def _get_ips(self) -> List[str]: + addrs = psutil.net_if_addrs() + r = [] + for name, l in addrs.items(): + if name == 'lo': + continue + for a in l: + if a.family == socket.AddressFamily.AF_INET: + r.append(a.address) + return r + + def _create_update_file(self) -> None: + f = tempfile.NamedTemporaryFile(dir=self.work_directory, prefix='supd_', delete=False) + name = f.name + self.update_file_name = name + atexit.register(os.remove, name) + f.close() + self.update_file = open(name, 'r+b') + self.update_file.seek(0, io.SEEK_END) + self.update_file_pos = self.update_file.tell() + + def register_job(self, job: Job, ex: JobExecutor) -> None: + with self._jobs_lock: + self._jobs[job.id] = (job, ex) + + def unregister_job(self, job: Job) -> None: + with self._jobs_lock: + try: + del self._jobs[job.id] + except KeyError: + # There are cases when it's difficult to ensure that this method is only called + # once for each job. Instead, ignore errors here, since the ultimate goal is to + # remove the job from the _jobs dictionary. + pass + + def step(self) -> None: + self._poll_file() + try: + data = self.socket.recv(_StatusUpdater.RECV_BUFSZ) + self._process_update_data(data) + except TimeoutError: + pass + except socket.timeout: + # before 3.10, this was a separate exception from TimeoutError + pass + except BlockingIOError: + pass + + def _poll_file(self) -> None: + self.update_file.seek(0, io.SEEK_END) + pos = self.update_file.tell() + if pos > self.update_file_pos: + self.update_file.seek(self.update_file_pos, io.SEEK_SET) + n = pos - self.update_file_pos + self._process_update_data(self.update_file.read(n)) + self.update_file_pos = pos + + def run(self) -> None: + while True: + try: + self.step() + except Exception: + logger.exception('Exception in status updater thread. Ignoring.') + + def flush(self) -> None: + # Ensures that, upon return from this call, all updates available before this call have + # been processed. To do so, we send a UDP packet to the socket to wake it up and wait until + # it is received. This does not guarantee that file-based updates are necessarily + # processed, since that depends on many factors. + # On the minus side, this method, as implemented, can cause deadlocks if the socket + # reads fail for unexpected reasons. This should probably be accounted for. + token = '_SYNC ' + str(random.getrandbits(128)) + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.sendto(bytes(token, 'utf-8'), ('127.0.0.1', self.update_port)) + self._poll_file() + delay = 0.0001 + while token not in self._sync_ids: + time.sleep(delay) + delay *= 2 + self._sync_ids.remove(token) + + def _process_update_data(self, data: bytes) -> None: + sdata = data.decode('utf-8') + if sdata == self._last_received: + # we send UDP packets to all IP addresses of the submit host, which may + # result in duplicates, so we drop consecutive messages that are identical + return + else: + self._last_received = sdata + lines = sdata.splitlines() + for line in lines: + if sdata.startswith('_SYNC '): + self._sync_ids.add(sdata) + continue + els = line.split() + if len(els) > 2 and els[1] == 'LOG': + logger.info('%s %s' % (els[0], ' '.join(els[2:]))) + continue + if len(els) != 2: + logger.warning('Invalid status update message received: %s' % line) + continue + job_id = els[0] + state = JobState.from_name(els[1]) + job = None + with self._jobs_lock: + try: + (job, executor) = self._jobs[job_id] + except KeyError: + pass + if job: + executor._set_job_status(job, JobStatus(state)) + + +class _FileCleaner(SingletonThread): + def __init__(self) -> None: + super().__init__() + self.name = 'File Cleaner' + self.daemon = True + self.queue: queue.SimpleQueue[Path] = queue.SimpleQueue() + + def clean(self, path: Path) -> None: + self.queue.put(path) + + def run(self) -> None: + while True: + try: + path = self.queue.get(block=True, timeout=1) + try: + self._do_clean(path) + except Exception as ex: + print(f'Warning: cannot clean {path}: {ex}') + except queue.Empty: + pass + + def _do_clean(self, path: Path) -> None: + now = datetime.now() + max_age = timedelta(days=_MAX_FILE_AGE_DAYS) + for child in path.iterdir(): + m_time = datetime.fromtimestamp(child.lstat().st_mtime) + if now - m_time > max_age: + try: + child.unlink() + except FileNotFoundError: + # we try our best + pass diff --git a/tests/_test_tools.py b/tests/_test_tools.py index f780081f..9d4f480d 100644 --- a/tests/_test_tools.py +++ b/tests/_test_tools.py @@ -4,7 +4,7 @@ from contextlib import contextmanager from datetime import timedelta from pathlib import Path -from typing import Optional, Union, Iterator +from typing import Optional, Union, Iterator, IO from executor_test_params import ExecutorTestParams @@ -42,8 +42,14 @@ def assert_completed(job: Job, status: Optional[JobStatus], attached: bool = Fal if status.state != JobState.COMPLETED: if not attached: assert job.spec is not None - stdout = _read_file(job.spec.stdout_path) - stderr = _read_file(job.spec.stderr_path) + try: + stdout = _read_file(job.spec.stdout_path) + except Exception: + stdout = '' + try: + stderr = _read_file(job.spec.stderr_path) + except Exception: + stderr = '' raise AssertionError('Job not completed. Exit code: %s, Status message: %s, ' 'stdout: %s, stderr: %s' % (status.exit_code, status.message, stdout, stderr)) @@ -82,3 +88,60 @@ def _deploy(path: Union[Path, str]) -> Iterator[Path]: yield Path(df.name) finally: os.unlink(df.name) + + +@contextmanager +def _tempfile() -> Iterator[IO[str]]: + # we have type: ignore above because mypy complains that _TemporaryFileWrapper is generic, + # but adding [str] to it results in a runtime error stating that _TemporaryFileWrapper is + # not subscriptable + _make_test_dir() + test_dir = Path.home() / '.psij' / 'test' + with tempfile.NamedTemporaryFile(mode='w', dir=test_dir, delete=False) as f: + try: + yield f + finally: + try: + os.unlink(f.name) + except FileNotFoundError: + # some tests may remove the file themselves + pass + + +@contextmanager +def _temppath() -> Iterator[Path]: + _make_test_dir() + test_dir = Path.home() / '.psij' / 'test' + with tempfile.NamedTemporaryFile(mode='w', dir=test_dir, delete=False) as f: + try: + f.close() + yield Path(f.name) + finally: + try: + os.unlink(f.name) + except FileNotFoundError: + # some tests may remove the file themselves + pass + + +@contextmanager +def _tempdir(keep: bool = False) -> Iterator[Path]: + _make_test_dir() + test_dir = Path.home() / '.psij' / 'test' + d = tempfile.mkdtemp(dir=test_dir) + try: + yield Path(d) + shutil.rmtree(d) + except Exception: + if not keep: + shutil.rmtree(d) + raise + + +def _write_file(f: Union[Path, IO[str]], contents: str) -> None: + if isinstance(f, Path): + f = f.open('w') + try: + f.write(contents) + finally: + f.close() diff --git a/tests/conftest.py b/tests/conftest.py index 417e0bbe..94316251 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,11 +8,13 @@ import re import secrets import shutil +import signal import socket import subprocess import sys import threading import time +import traceback from functools import partial from pathlib import Path from typing import Dict, List, Optional @@ -80,6 +82,24 @@ def pytest_addoption(parser): 'is uploaded to the test aggregation server. ') +def debug(sig, frame): + print('Dumping thread info') + with open('/tmp/python-dump.txt', 'w') as f: + try: + for thr in threading.enumerate(): + f.write(str(thr)) + f.write('\n') + traceback.print_stack(sys._current_frames()[thr.ident], file=f) + f.write('\n\n') + except Exception as ex: + logger.exception('Failed to dump thread info') + f.write(str(ex)) + + +signal.signal(signal.SIGUSR1, debug) +print('SIGUSR1 handler installed.') + + def _get_executors(config: Dict[str, str]) -> List[str]: execs_str = config.getoption('executors') execs = execs_str.split(',') diff --git a/tests/plugins1/_batch_test/test/launcher.sh b/tests/plugins1/_batch_test/test/launcher.sh index 5d7011e6..13a188ed 100644 --- a/tests/plugins1/_batch_test/test/launcher.sh +++ b/tests/plugins1/_batch_test/test/launcher.sh @@ -51,5 +51,4 @@ log "All completed" post_launch -echo "_PSI_J_LAUNCHER_DONE" exit $_PSI_J_FAILED_EC diff --git a/tests/plugins1/_batch_test/test/test.mustache b/tests/plugins1/_batch_test/test/test.mustache index 0759e0af..5fc00bfe 100644 --- a/tests/plugins1/_batch_test/test/test.mustache +++ b/tests/plugins1/_batch_test/test/test.mustache @@ -2,6 +2,8 @@ exec &> "{{psij.script_dir}}/$PSIJ_BATCH_TEST_JOB_ID.out" +{{> batch_lib}} + {{#job.spec.directory}} cd "{{.}}" {{/job.spec.directory}} @@ -43,8 +45,17 @@ done export PSIJ_NODEFILE +{{> stagein}} +_psij_update_status ACTIVE + {{#job.spec.inherit_environment}}env \{{/job.spec.inherit_environment}}{{^job.spec.inherit_environment}}env --ignore-environment \{{/job.spec.inherit_environment}}{{#env}} {{name}}="{{value}}" \{{/env}} {{#psij.launch_command}}{{.}} {{/psij.launch_command}} +_PSIJ_JOB_EC="$?" + +{{> stageout}} +{{> cleanup}} + -echo "$?" > "{{psij.script_dir}}/$PSIJ_BATCH_TEST_JOB_ID.ec" +echo $_PSIJ_JOB_EC > "{{psij.script_dir}}/$PSIJ_BATCH_TEST_JOB_ID.ec" +echo "_PSIJ_SCRIPT_DONE" \ No newline at end of file diff --git a/tests/test_callbacks.py b/tests/test_callbacks.py index c0b53813..1d3f03f3 100644 --- a/tests/test_callbacks.py +++ b/tests/test_callbacks.py @@ -18,6 +18,7 @@ def __init__(self, arg: Any) -> None: def state_cb(self, job: psij.Job, status: psij.JobStatus) -> None: """State callback.""" + print('status change: %s' % status) self._cb_states.append(status.state) def test_job_callbacks(self) -> None: @@ -29,9 +30,14 @@ def test_job_callbacks(self) -> None: jex.submit(job) job.wait() - self.assertEqual(len(self._cb_states), 3) + print('States: %s' % self._cb_states) + + self.assertEqual(len(self._cb_states), 6) self.assertIn(psij.JobState.QUEUED, self._cb_states) + self.assertIn(psij.JobState.STAGE_IN, self._cb_states) self.assertIn(psij.JobState.ACTIVE, self._cb_states) + self.assertIn(psij.JobState.STAGE_OUT, self._cb_states) + self.assertIn(psij.JobState.CLEANUP, self._cb_states) self.assertIn(psij.JobState.FAILED, self._cb_states) self._cb_states = list() @@ -41,9 +47,12 @@ def test_job_callbacks(self) -> None: jex.submit(job) job.wait() - self.assertEqual(len(self._cb_states), 3) + self.assertEqual(len(self._cb_states), 6) self.assertIn(psij.JobState.QUEUED, self._cb_states) + self.assertIn(psij.JobState.STAGE_IN, self._cb_states) self.assertIn(psij.JobState.ACTIVE, self._cb_states) + self.assertIn(psij.JobState.STAGE_OUT, self._cb_states) + self.assertIn(psij.JobState.CLEANUP, self._cb_states) self.assertIn(psij.JobState.COMPLETED, self._cb_states) def test_job_executor_callbacks(self) -> None: @@ -55,7 +64,12 @@ def test_job_executor_callbacks(self) -> None: jex.submit(job) job.wait() - self.assertEqual(len(self._cb_states), 3) + print('States: %s' % self._cb_states) + + self.assertEqual(len(self._cb_states), 6) self.assertIn(psij.JobState.QUEUED, self._cb_states) + self.assertIn(psij.JobState.STAGE_IN, self._cb_states) self.assertIn(psij.JobState.ACTIVE, self._cb_states) + self.assertIn(psij.JobState.STAGE_OUT, self._cb_states) + self.assertIn(psij.JobState.CLEANUP, self._cb_states) self.assertIn(psij.JobState.COMPLETED, self._cb_states) diff --git a/tests/test_staging.py b/tests/test_staging.py new file mode 100644 index 00000000..76a16da4 --- /dev/null +++ b/tests/test_staging.py @@ -0,0 +1,227 @@ +from pathlib import Path + +from executor_test_params import ExecutorTestParams +from _test_tools import _get_executor_instance, _get_timeout, assert_completed, _tempfile, \ + _temppath, _tempdir, _write_file, _read_file +from psij import Job, JobSpec, JobState +from psij.staging import StageIn, StageOut, StagingMode, StageOutFlags +import pytest + + +@pytest.mark.parametrize('mode', [StagingMode.COPY, StagingMode.MOVE, StagingMode.LINK]) +def test_stagein(execparams: ExecutorTestParams, mode: StagingMode) -> None: + if execparams.executor != 'local' and execparams.launcher != 'single': + # The launcher should not affect the staging, so we test all launchers locally, + # but for the other executors, we only test with the single launcher + pytest.skip() + # The executors are not mandated to implement the staging modes, but they are + # meant to default to COPY if MOVE and LINK are not implemented, so we test + # that things function correctly, but not how that is done + with _temppath() as out_path, _temppath() as err_path, _tempdir() as dir: + with _tempfile() as f1, _tempfile() as f2: + _write_file(f1, 'ABCD') + _write_file(f2, 'EFGH') + + job = Job(JobSpec('/bin/cat', ['in1.txt', 'subdir/in2.txt'], + directory=dir, stdout_path=out_path, stderr_path=err_path, + launcher=execparams.launcher)) + assert job.spec is not None + job.spec.stage_in = { + StageIn(f1.name, 'in1.txt', mode=mode), + StageIn(f2.name, 'subdir/in2.txt', mode=mode), + } + ex = _get_executor_instance(execparams, job) + ex.submit(job) + status = job.wait(timeout=_get_timeout(execparams)) + assert_completed(job, status) + + assert _read_file(out_path) == 'ABCDEFGH' + + +@pytest.mark.parametrize('mode', [StagingMode.COPY, StagingMode.MOVE, StagingMode.LINK]) +def test_dir_stagein(execparams: ExecutorTestParams, mode: StagingMode) -> None: + if execparams.executor != 'local' and execparams.launcher != 'single': + pytest.skip() + # The executors are not mandated to implement the staging modes, but they are + # meant to default to COPY if MOVE and LINK are not implemented, so we test + # that things function correctly, but not how that is done + with _temppath() as out_path, _temppath() as err_path, _tempdir() as dir: + with _tempdir() as in_dir: + sub_dir = in_dir / 'subdir' + sub_dir.mkdir() + f1 = sub_dir / 'in3.txt' + f2 = sub_dir / 'in4.txt' + _write_file(f1, 'IJKL') + _write_file(f2, 'MNOP') + + job = Job(JobSpec('/bin/cat', ['indir/in3.txt', 'indir/in4.txt'], + directory=dir, stdout_path=out_path, stderr_path=err_path, + launcher=execparams.launcher)) + assert job.spec is not None + job.spec.stage_in = { + StageIn(sub_dir, 'indir', mode=mode), + } + ex = _get_executor_instance(execparams, job) + ex.submit(job) + status = job.wait(timeout=_get_timeout(execparams)) + assert_completed(job, status) + + assert _read_file(out_path) == 'IJKLMNOP' + + +@pytest.mark.parametrize('mode', [StagingMode.COPY, StagingMode.MOVE, StagingMode.LINK]) +def test_stageout(execparams: ExecutorTestParams, mode: StagingMode) -> None: + if execparams.executor != 'local' and execparams.launcher != 'single': + pytest.skip() + with _temppath() as out_path, _temppath() as err_path, _tempdir() as dir: + job = Job(JobSpec('/bin/echo', ['-n', 'CDEF'], directory=dir, + stdout_path='out.txt', stderr_path=err_path, + launcher=execparams.launcher)) + assert job.spec is not None + job.spec.stage_out = { + StageOut('out.txt', out_path, mode=mode) + } + ex = _get_executor_instance(execparams, job) + ex.submit(job) + status = job.wait(timeout=_get_timeout(execparams)) + assert_completed(job, status) + + assert _read_file(out_path) == 'CDEF' + + +def test_stageout_flags1(execparams: ExecutorTestParams) -> None: + if execparams.executor != 'local' and execparams.launcher != 'single': + pytest.skip() + with _temppath() as out1_path, _temppath() as out2_path, _temppath() as err_path, \ + _tempdir() as dir: + + out2_path.unlink() + job = Job(JobSpec('/bin/echo', ['-n', 'ABC123'], + directory=dir, stdout_path='out1.txt', stderr_path=err_path, + launcher=execparams.launcher)) + assert job.spec is not None + job.spec.stage_out = { + StageOut('out1.txt', out1_path, flags=StageOutFlags.IF_PRESENT), + StageOut('out2.txt', out2_path, flags=StageOutFlags.IF_PRESENT) + } + ex = _get_executor_instance(execparams, job) + ex.submit(job) + status = job.wait(timeout=_get_timeout(execparams)) + assert_completed(job, status) + + assert _read_file(out1_path) == 'ABC123' + assert not out2_path.exists() + + +def test_stageout_flags2(execparams: ExecutorTestParams) -> None: + if execparams.executor != 'local' and execparams.launcher != 'single': + pytest.skip() + with _temppath() as out_path, _temppath() as err_path, _tempdir() as dir: + job = Job(JobSpec('/bin/echo', ['-n', 'EFG456'], + directory=dir, stdout_path='out.txt', stderr_path=err_path, + launcher=execparams.launcher)) + assert job.spec is not None + job.spec.stage_out = { + StageOut('out.txt', out_path, flags=StageOutFlags.ON_SUCCESS), + } + ex = _get_executor_instance(execparams, job) + ex.submit(job) + status = job.wait(timeout=_get_timeout(execparams)) + assert_completed(job, status) + + assert _read_file(out_path) == 'EFG456' + + with _temppath() as out_path, _temppath() as err_path, _tempdir() as dir: + out_path.unlink() + job = Job(JobSpec('/bin/bash', ['-c', 'echo -n "ABC" > out.txt; exit 1'], + directory=dir, stderr_path=err_path, + launcher=execparams.launcher)) + assert job.spec is not None + job.spec.stage_out = { + StageOut('out.txt', out_path, flags=StageOutFlags.ON_SUCCESS), + } + ex = _get_executor_instance(execparams, job) + ex.submit(job) + status = job.wait(timeout=_get_timeout(execparams)) + assert status + assert status.state == JobState.FAILED + assert not out_path.exists() + + +def test_stageout_flags3(execparams: ExecutorTestParams) -> None: + if execparams.executor != 'local' and execparams.launcher != 'single': + pytest.skip() + with _temppath() as out_path, _temppath() as err_path, _tempdir() as dir: + out_path.unlink() + job = Job(JobSpec('/bin/echo', ['-n', 'EFG456'], + directory=dir, stdout_path='out.txt', stderr_path=err_path, + launcher=execparams.launcher)) + assert job.spec is not None + job.spec.stage_out = { + StageOut('out.txt', out_path, flags=StageOutFlags.ON_ERROR), + } + ex = _get_executor_instance(execparams, job) + ex.submit(job) + status = job.wait(timeout=_get_timeout(execparams)) + assert_completed(job, status) + + assert not out_path.exists() + + with _temppath() as out_path, _temppath() as err_path, _tempdir() as dir: + job = Job(JobSpec('/bin/bash', ['-c', 'echo -n "ABC" > out.txt; exit 1'], + directory=dir, stderr_path=err_path, + launcher=execparams.launcher)) + assert job.spec is not None + job.spec.stage_out = { + StageOut('out.txt', out_path, flags=StageOutFlags.ON_ERROR), + } + ex = _get_executor_instance(execparams, job) + ex.submit(job) + status = job.wait(timeout=_get_timeout(execparams)) + assert status + assert status.state == JobState.FAILED + assert out_path.exists() + assert _read_file(out_path) == 'ABC' + + +def test_cleanup(execparams: ExecutorTestParams) -> None: + if execparams.executor != 'local' and execparams.launcher != 'single': + pytest.skip() + with _temppath() as out_path, _temppath() as err_path, _tempdir() as dir: + job = Job(JobSpec('/bin/echo', ['-n', 'ABC'], + directory=dir, stdout_path='out.txt', stderr_path=err_path, + launcher=execparams.launcher)) + assert job.spec is not None + job.spec.stage_out = { + StageOut('out.txt', out_path, flags=StageOutFlags.IF_PRESENT), + } + job.spec.cleanup = {Path('out.txt')} + ex = _get_executor_instance(execparams, job) + ex.submit(job) + status = job.wait(timeout=_get_timeout(execparams)) + assert_completed(job, status) + assert out_path.exists() + assert not (dir / 'out.txt').exists() + assert _read_file(out_path) == 'ABC' + + +def test_cleanup2(execparams: ExecutorTestParams) -> None: + if execparams.executor != 'local' and execparams.launcher != 'single': + pytest.skip() + with _temppath() as out_path, _temppath() as err_path, _tempdir() as dir: + job = Job(JobSpec('/bin/bash', ['-c', 'echo -n "ABC" > out.txt; exit 1'], + directory=dir, stderr_path=err_path, + launcher=execparams.launcher)) + assert job.spec is not None + job.spec.stage_out = { + StageOut('out.txt', out_path, flags=StageOutFlags.IF_PRESENT), + } + job.spec.cleanup = {Path('out.txt')} + job.spec.cleanup_flags = StageOutFlags.ON_SUCCESS + ex = _get_executor_instance(execparams, job) + ex.submit(job) + status = job.wait(timeout=_get_timeout(execparams)) + assert status is not None + assert status.state == JobState.FAILED + assert (dir / 'out.txt').exists() + assert _read_file(out_path) == 'ABC' diff --git a/tests/user_guide/test_job_wait_active.py b/tests/user_guide/test_job_wait_active.py index 839bfc2c..86fbade0 100644 --- a/tests/user_guide/test_job_wait_active.py +++ b/tests/user_guide/test_job_wait_active.py @@ -3,8 +3,9 @@ def test_user_guide_job_wait_active() -> None: ex = JobExecutor.get_instance('local') - job = Job(JobSpec(executable='/bin/date')) + job = Job(JobSpec(executable='/bin/sleep', arguments=['1'])) ex.submit(job) status = job.wait(target_states=[JobState.ACTIVE]) assert status is not None assert status.state == JobState.ACTIVE + job.wait() # prevent logging messages from showing up after test completes