Skip to content

Commit 3e89c0e

Browse files
committed
Initial staging commit
1 parent aeecc27 commit 3e89c0e

File tree

10 files changed

+745
-236
lines changed

10 files changed

+745
-236
lines changed

src/psij/exceptions.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,3 +62,21 @@ def __init__(self, message: str, exception: Optional[Exception] = None,
6262
conditions such an error would persist across subsequent re-tries until correct credentials
6363
are used.
6464
"""
65+
66+
class CompositeException(Exception):
67+
def __init__(self, ex: Exception) -> None:
68+
self.exceptions = [ex]
69+
70+
def add_exception(self, ex: Exception) -> None:
71+
self.exceptions.append(ex)
72+
73+
74+
class LauncherException(Exception):
75+
def __init__(self, message: str) -> None:
76+
super().__init__('Launcher failure: %s' % message)
77+
78+
79+
class JobException(Exception):
80+
def __init__(self, exit_code: int) -> None:
81+
super().__init__('Job exited with exit code %s' % exit_code)
82+
self.exit_code = exit_code

src/psij/executors/batch/batch_scheduler_executor.py

Lines changed: 114 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
1+
import atexit
2+
import fcntl
3+
import io
14
import logging
25
import os
6+
import tempfile
7+
8+
import psutil
9+
import socket
310
import subprocess
411
import time
512
import traceback
@@ -14,6 +21,7 @@
1421
from psij import JobExecutor, JobExecutorConfig, Launcher, Job, SubmitException, \
1522
JobStatus, JobState
1623
from psij.executors.batch.template_function_library import ALL as FUNCTION_LIBRARY
24+
from psij.utils import SingletonThread
1725

1826
UNKNOWN_ERROR = 'PSIJ: Unknown error'
1927

@@ -265,6 +273,8 @@ def cancel(self, job: Job) -> None:
265273
except SubmitException:
266274
# re-raise
267275
raise
276+
finally:
277+
self._status_update_thread.unregister_job(job)
268278

269279
def attach(self, job: Job, native_id: str) -> None:
270280
"""Attaches a job to a native job.
@@ -531,9 +541,11 @@ def _set_job_status(self, job: Job, status: JobStatus) -> None:
531541
# is_greater_than returns T/F if the states are comparable and None if not, so
532542
# we have to check explicitly for the boolean value rather than truthiness
533543
return
534-
if status.state.final and job.native_id:
535-
self._clean_submit_script(job)
536-
self._read_aux_files(job, status)
544+
if status.state.final:
545+
self._status_update_thread.unregister_job(job)
546+
if job.native_id:
547+
self._clean_submit_script(job)
548+
self._read_aux_files(job, status)
537549
super()._set_job_status(job, status)
538550

539551
def _clean_submit_script(self, job: Job) -> None:
@@ -638,13 +650,19 @@ def __init__(self, name: str, config: BatchSchedulerExecutorConfig,
638650
# counts consecutive errors while invoking qstat or equivalent
639651
self._poll_error_count = 0
640652
self._jobs_lock = RLock()
653+
self._status_updater = _StatusUpdater(config, executor)
641654

642655
def run(self) -> None:
643656
logger.debug('Executor %s: queue poll thread started', self.executor)
644657
time.sleep(self.config.initial_queue_polling_delay)
645658
while True:
646659
self._poll()
647-
time.sleep(self.config.queue_polling_interval)
660+
start = time.time()
661+
now = start
662+
while now - start < self.config.queue_polling_interval:
663+
self._status_updater.step()
664+
time.sleep(1)
665+
now = time.time()
648666

649667
def _poll(self) -> None:
650668
with self._jobs_lock:
@@ -686,6 +704,8 @@ def _poll(self) -> None:
686704
if status.state.final:
687705
with self._jobs_lock:
688706
del self._jobs[native_id]
707+
for job in job_list:
708+
self._status_updater.unregister_job(job)
689709
except Exception as ex:
690710
msg = traceback.format_exc()
691711
self._handle_poll_error(True, ex, 'Error updating job statuses {}'.format(msg))
@@ -713,9 +733,11 @@ def _handle_poll_error(self, immediate: bool, ex: Exception, msg: str) -> None:
713733
self._jobs.clear()
714734
for job_list in jobs_copy.values():
715735
for job in job_list:
736+
self._status_updater.unregister_job(job)
716737
self.executor._set_job_status(job, JobStatus(JobState.FAILED, message=msg))
717738

718739
def register_job(self, job: Job) -> None:
740+
self._status_updater.register_job(job)
719741
assert job.native_id
720742
logger.info('Job %s: registering', job.id)
721743
with self._jobs_lock:
@@ -724,3 +746,91 @@ def register_job(self, job: Job) -> None:
724746
self._jobs[native_id] = [job]
725747
else:
726748
self._jobs[job.native_id].append(job)
749+
750+
class _StatusUpdater:
751+
# we are expecting short messages in the form <jobid> <status>
752+
RECV_BUFSZ = 2048
753+
754+
def __init__(self, config: BatchSchedulerExecutorConfig,
755+
executor: BatchSchedulerExecutor) -> None:
756+
self.config = config
757+
self.executor = executor
758+
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
759+
self.socket.setblocking(False)
760+
self.socket.bind(('', 0))
761+
self.port = self.socket.getsockname()[1]
762+
self.ips = self._get_ips()
763+
print('IPS: %s' % self.ips)
764+
print('Port: %s' % self.port)
765+
self._create_update_file()
766+
print('Update file: %s' % self.update_file.name)
767+
self.partial_file_data = ''
768+
self.partial_net_data = ''
769+
self._jobs = {}
770+
self._jobs_lock = RLock()
771+
772+
def _get_ips(self) -> List[str]:
773+
addrs = psutil.net_if_addrs()
774+
r = []
775+
for name, l in addrs.items():
776+
if name == 'lo':
777+
continue
778+
for a in l:
779+
if a.family == socket.AddressFamily.AF_INET:
780+
r.append(a.address)
781+
return r
782+
783+
def _create_update_file(self) -> None:
784+
f = tempfile.NamedTemporaryFile(dir=self.config.work_directory, prefix='supd_',
785+
delete=False)
786+
name = f.name
787+
atexit.register(os.remove, name)
788+
f.close()
789+
self.update_file = open(name, 'r+b')
790+
self.update_file.seek(0, io.SEEK_END)
791+
self.update_file_pos = self.update_file.tell()
792+
793+
def register_job(self, job: Job) -> None:
794+
with self._jobs_lock:
795+
self._jobs[job.id] = job
796+
797+
def unregister_job(self, job: Job) -> None:
798+
with self._jobs_lock:
799+
del self._jobs[job.id]
800+
801+
def step(self) -> None:
802+
self.update_file.seek(0, io.SEEK_END)
803+
pos = self.update_file.tell()
804+
if pos > self.update_file_pos:
805+
self.update_file.seek(self.update_file_pos, io.SEEK_SET)
806+
n = pos - self.update_file_pos
807+
self._process_update_data(self.update_file.read(n))
808+
self.update_file_pos = pos
809+
else:
810+
try:
811+
data = self.socket.recv(_StatusUpdater.RECV_BUFSZ)
812+
self._process_update_data(data)
813+
except socket.error as e:
814+
pass
815+
816+
def _process_update_data(self, data: bytes) -> None:
817+
sdata = data.decode('utf-8')
818+
lines = sdata.splitlines()
819+
for line in lines:
820+
print('Status update line: %s' % line)
821+
els = line.split()
822+
if len(els) != 2:
823+
logger.warning('Invalid status update message received: %s' % line)
824+
continue
825+
job_id = els[0]
826+
state = JobState.from_name(els[1])
827+
job = None
828+
with self._jobs_lock:
829+
try:
830+
job = self._jobs[job_id]
831+
except KeyError:
832+
logger.warning('Received status updated for inexistent job with id %s' % job_id)
833+
if job:
834+
self.executor._set_job_status(job, JobStatus(state))
835+
836+
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
_UPDATE_MODE="none"
2+
3+
update_status() {
4+
STATUS="$1"
5+
if [ "$_UPDATE_MODE" == "none" ]; then
6+
if which nc >/dev/null; then
7+
$_UPDATE_MODE="nc"
8+
else
9+
$_UPDATE_MODE="file"
10+
fi
11+
fi
12+
13+
if [ "$_UPDATE_MODE" == "nc" ]; then
14+
for ADDR in "{{psij.us_addrs}}"; do
15+
echo "{{psij.job_id}} $STATUS" | nc -q0 -w0 -4 -u $ADDR {{psij.us_port}}
16+
done
17+
else
18+
echo "{{psij.job_id}} $STATUS" >> {{psij.us_file}}
19+
fi
20+
}
21+
22+
do_stagein() {
23+
do_stage "$@" 0
24+
}
25+
26+
do_stage() {
27+
SOURCE="$1"
28+
TARGET="$2"
29+
MODE="$3"
30+
MISSING_OK="$4"
31+
32+
if [ ! -a "$SOURCE" ] && [ "$MISSING_OK" == "0" ]; then
33+
echo "Missing source file: $SOURCE"
34+
exit 1
35+
fi
36+
37+
if [ "$MODE" == "1" ]; then
38+
# copy
39+
cp -r -T "$SOURCE" "$TARGET"
40+
elif [ "$MODE" == "2" ]; tben
41+
# link
42+
ln -s "$SOURCE" "$TARGET"
43+
elif [ "$MODE" == "3" ]; then
44+
# move
45+
mv -T -f "$SOURCE" "$TARGET"
46+
fi
47+
}
48+
49+
_FLAG_IF_PRESENT=1
50+
_FLAG_ON_SUCCESS=2
51+
_FLAG_ON_ERROR=4
52+
_FLAG_ON_CANCEL=8
53+
54+
do_stageout() {
55+
SOURCE="$1"
56+
TARGET="$2"
57+
MODE="$3"
58+
FLAGS="$4"
59+
FAILED="$5"
60+
61+
if [ "$FAILED" == "0" ] && $((FLAGS & _FLAG_ON_SUCCESS)) ; then
62+
do_stage "$SOURCE" "$TARGET" "$MODE" $((FLAGS & _FLAG_IF_PRESENT))
63+
elif [ "$FAILED" != "0" ] && $((FLAGS & _FLAG_ON_ERROR)) ; then
64+
do_stage "$SOURCE" "$TARGET" "$MODE" $((FLAGS & _FLAG_IF_PRESENT))
65+
fi
66+
}
67+
68+
do_cleanup() {
69+
TARGET="$1"
70+
71+
case "$TARGET" in
72+
"{{job.spec.directory}}"*)
73+
echo "rm -rf $TARGET" >>~/cleanup.log
74+
;;
75+
*)
76+
echo "Cannot clean $TARGET outside of job directory {{job.spec.directory}}"
77+
exit 1
78+
;;
79+
esac
80+
}

src/psij/executors/batch/script_generator.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ def __init__(self, config: JobExecutorConfig, template_path: pathlib.Path,
7272
super().__init__(config)
7373
with template_path.open('r') as template_file:
7474
self.template = pystache.parse(template_file.read())
75-
self.renderer = pystache.Renderer(escape=escape)
75+
common_dir = pathlib.Path(__file__).parent / 'common'
76+
self.renderer = pystache.Renderer(escape=escape, search_dirs=[str(common_dir)])
7677

7778
def generate_submit_script(self, job: Job, context: Dict[str, object], out: IO[str]) -> None:
7879
"""See :func:`~SubmitScriptGenerator.generate_submit_script`.

0 commit comments

Comments
 (0)