Skip to content

Commit 71d0744

Browse files
committed
add psij worker
1 parent 91c5a15 commit 71d0744

File tree

1 file changed

+41
-0
lines changed

1 file changed

+41
-0
lines changed

pydra/engine/workers.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -891,11 +891,52 @@ def close(self):
891891
"""Finalize the internal pool of tasks."""
892892
pass
893893

894+
class PsijWorker(Worker):
895+
def __init__(self, **kwargs):
896+
"""Initialize worker."""
897+
try:
898+
import psij
899+
except ImportError:
900+
logger.critical("Please install psij.")
901+
raise
902+
logger.debug("Initialize PsijWorker")
894903

904+
def run_el(self, interface, rerun=False, **kwargs):
905+
"""Run a task."""
906+
return self.exec_psij(interface, rerun=rerun)
907+
908+
def make_spec(self, cmd=None, arg=None, cache_dir=None):
909+
spec = self.psij.JobSpec()
910+
spec.executable = cmd
911+
spec.arguments = arg
912+
spec.stdout_path = 'demo.stdout'
913+
spec.stderr_path = 'demo.stderr'
914+
915+
return spec
916+
917+
def make_job(self, spec, attributes):
918+
job = self.psij.Job()
919+
job.spec = spec
920+
return job
921+
922+
async def exec_psij(self, runnable, rerun=False):
923+
import psij
924+
self.psij = psij
925+
jex = psij.JobExecutor.get_instance('local')
926+
spec = self.make_spec(runnable.inputs.executable, runnable.inputs.args, runnable.cache_dir)
927+
job = self.make_job(spec, None)
928+
jex.submit(job)
929+
return
930+
931+
def close(self):
932+
"""Finalize the internal pool of tasks."""
933+
pass
934+
895935
WORKERS = {
896936
"serial": SerialWorker,
897937
"cf": ConcurrentFuturesWorker,
898938
"slurm": SlurmWorker,
899939
"dask": DaskWorker,
900940
"sge": SGEWorker,
941+
"psij": PsijWorker,
901942
}

0 commit comments

Comments
 (0)