@@ -892,10 +892,69 @@ def close(self):
892
892
pass
893
893
894
894
895
+ class PsijWorker (Worker ):
896
+ def __init__ (self , ** kwargs ):
897
+ """Initialize worker."""
898
+ try :
899
+ import psij
900
+ except ImportError :
901
+ logger .critical ("Please install psij." )
902
+ raise
903
+ logger .debug ("Initialize PsijWorker" )
904
+
905
+ def run_el (self , interface , rerun = False , ** kwargs ):
906
+ """Run a task."""
907
+ return self .exec_psij (interface , rerun = rerun )
908
+
909
+ def make_spec (self , cmd = None , arg = None ):
910
+ spec = self .psij .JobSpec ()
911
+ spec .executable = cmd
912
+ spec .arguments = arg
913
+ spec .stdout_path = '/pydra/pydra/engine/demo.stdout'
914
+ spec .stderr_path = '/pydra/pydra/engine/demo.stderr'
915
+
916
+ return spec
917
+
918
+ def make_job (self , spec , attributes ):
919
+ job = self .psij .Job ()
920
+ job .spec = spec
921
+ return job
922
+
923
+ async def exec_psij (self , runnable , rerun = False ):
924
+ import psij
925
+ import pickle
926
+ self .psij = psij
927
+ jex = psij .JobExecutor .get_instance ('slurm' )
928
+
929
+ if isinstance (runnable , TaskBase ):
930
+ with open ('/pydra/pydra/engine/my_function.pkl' , 'wb' ) as file :
931
+ pickle .dump (runnable ._run , file )
932
+ spec = self .make_spec ("python3.9" , ["/pydra/pydra/engine/run_pickled_function.py" ])
933
+ else : # it could be tuple that includes pickle files with tasks and inputs
934
+ ind , task_main_pkl , task_orig = runnable
935
+ with open ('/pydra/pydra/engine/my_function.pkl' , 'wb' ) as file :
936
+ pickle .dump (load_and_run , file )
937
+ with open ('/pydra/pydra/engine/taskmain.pkl' , 'wb' ) as file :
938
+ pickle .dump (task_main_pkl , file )
939
+ with open ('/pydra/pydra/engine/ind.pkl' , 'wb' ) as file :
940
+ pickle .dump (ind , file )
941
+ spec = self .make_spec ("python3.9" , ["/pydra/pydra/engine/run_pickled_function_2.py" ])
942
+
943
+ job = self .make_job (spec , None )
944
+ jex .submit (job )
945
+ job .wait ()
946
+
947
+ return
948
+
949
+ def close (self ):
950
+ """Finalize the internal pool of tasks."""
951
+ pass
952
+
895
953
WORKERS = {
896
954
"serial" : SerialWorker ,
897
955
"cf" : ConcurrentFuturesWorker ,
898
956
"slurm" : SlurmWorker ,
899
957
"dask" : DaskWorker ,
900
958
"sge" : SGEWorker ,
959
+ "psij" : PsijWorker ,
901
960
}
0 commit comments