diff --git a/.github/workflows/testpsijflux.yml b/.github/workflows/testpsijflux.yml new file mode 100644 index 0000000000..8eadb3fd38 --- /dev/null +++ b/.github/workflows/testpsijflux.yml @@ -0,0 +1,38 @@ +name: PSI/J-Flux + +on: + push: + branches: + - master + pull_request: + +jobs: + test: + runs-on: ubuntu-latest + permissions: + packages: read + + container: + image: fluxrm/flux-sched:focal-v0.28.0 + options: "--platform=linux/amd64 --user root -it --init" + + steps: + - name: Make Space + run: | + rm -rf /usr/share/dotnet + rm -rf /opt/ghc + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup Python + run: | + apt-get update && apt-get install -y python3-venv && apt-get install less + export PATH=$PWD/bin:$PATH + ln -s /usr/bin/python3 /usr/bin/python + python -m pip install --upgrade pip && pip install -e ".[test]" && python -c 'import pydra; print(pydra.__version__)' + pip install -e "git+https://github.com/ExaWorks/psij-python.git@main#egg=psij-python" + - name: Run pytest + run: | + export PATH=$PWD/bin:$PATH + flux start python -V + flux start pytest --psij=flux --color=yes -vs --cov pydra --cov-config .coveragerc --cov-report xml:cov.xml --doctest-modules pydra/ diff --git a/pydra/conftest.py b/pydra/conftest.py index 66a1d200fc..aa51422d58 100644 --- a/pydra/conftest.py +++ b/pydra/conftest.py @@ -11,7 +11,7 @@ def pytest_addoption(parser): "--psij", action="store", help="run with psij subtype plugin", - choices=["local", "slurm"], + choices=["local", "slurm", "flux"], ) diff --git a/pydra/engine/workers.py b/pydra/engine/workers.py index eaa40beb0a..9cd00f5b62 100644 --- a/pydra/engine/workers.py +++ b/pydra/engine/workers.py @@ -924,6 +924,15 @@ def __init__(self, **kwargs): logger.debug("Initialize PsijWorker") self.psij = psij + # Check if the provided subtype is valid + valid_subtypes = ["local", "slurm", "flux"] + if subtype not in valid_subtypes: + raise ValueError( + f"Invalid 'subtype' provided. Available options: {', '.join(valid_subtypes)}" + ) + + self.subtype = subtype + def run_el(self, interface, rerun=False, **kwargs): """Run a task.""" return self.exec_psij(interface, rerun=rerun) @@ -995,7 +1004,7 @@ async def exec_psij(self, runnable, rerun=False): with open(file_path, "wb") as file: pickle.dump(runnable._run, file) func_path = absolute_path / "run_pickled.py" - spec = self.make_spec("python", [func_path, file_path]) + spec = self.make_spec("python", [str(func_path), str(file_path)]) else: # it could be tuple that includes pickle files with tasks and inputs cache_dir = runnable[-1].cache_dir file_path_1 = cache_dir / "taskmain.pkl" @@ -1009,9 +1018,9 @@ async def exec_psij(self, runnable, rerun=False): spec = self.make_spec( "python", [ - func_path, - file_path_1, - file_path_2, + str(func_path), + str(file_path_1), + str(file_path_2), ], ) @@ -1053,6 +1062,13 @@ class PsijSlurmWorker(PsijWorker): plugin_name = f"psij-{subtype}" +class PsijFluxWorker(PsijWorker): + """A worker to execute tasks using PSI/J using SLURM.""" + + subtype = "flux" + plugin_name = f"psij-{subtype}" + + WORKERS = { w.plugin_name: w for w in ( @@ -1063,5 +1079,6 @@ class PsijSlurmWorker(PsijWorker): SGEWorker, PsijLocalWorker, PsijSlurmWorker, + PsijFluxWorker, ) }