diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py b/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py index 07a1121eb88..034dd43bf6e 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py @@ -1,6 +1,9 @@ """ Utilities for WMS """ import os +from pathlib import Path +from glob import glob +import subprocess import sys import json @@ -63,8 +66,6 @@ def createJobWrapper( if os.path.exists(jobWrapperFile): log.verbose("Removing existing Job Wrapper for", jobID) os.remove(jobWrapperFile) - with open(os.path.join(diracRoot, defaultWrapperLocation)) as fd: - wrapperTemplate = fd.read() if "LogLevel" in jobParams: logLevel = jobParams["LogLevel"] @@ -76,34 +77,43 @@ def createJobWrapper( pythonPath = os.path.realpath(sys.executable) log.debug("Real python path after resolving links is: ", pythonPath) - # Making real substitutions - sitePython = os.getcwd() - if rootLocation: - sitePython = rootLocation - wrapperTemplate = wrapperTemplate.replace("@SITEPYTHON@", sitePython) - - jobWrapperJsonFile = jobWrapperFile + ".json" - with open(jobWrapperJsonFile, "w", encoding="utf8") as jsonFile: - json.dump(arguments, jsonFile, ensure_ascii=False) - - with open(jobWrapperFile, "w") as wrapper: - wrapper.write(wrapperTemplate) - - if not rootLocation: - rootLocation = wrapperPath - - # The "real" location of the jobwrapper after it is started - jobWrapperDirect = os.path.join(rootLocation, f"Wrapper_{jobID}") - jobExeFile = os.path.join(wrapperPath, f"Job{jobID}") - jobFileContents = """#!/bin/sh -{} {} {} -o LogLevel={} -o /DIRAC/Security/UseServerCertificate=no {} -""".format( - pythonPath, - jobWrapperDirect, - extraOptions if extraOptions else "", - logLevel, - cfgPath if cfgPath else "", - ) + if "Executable" in jobParams and jobParams["Executable"] == "dirac-cwl-exec": + ret = __createCWLJobWrapper(jobID, wrapperPath, log) + if not ret["OK"]: + return ret + jobWrapperFile, jobWrapperJsonFile, jobExeFile, jobFileContents = ret["Value"] + else: + with open(os.path.join(diracRoot, defaultWrapperLocation)) as fd: + wrapperTemplate = fd.read() + + # Making real substitutions + sitePython = os.getcwd() + if rootLocation: + sitePython = rootLocation + wrapperTemplate = wrapperTemplate.replace("@SITEPYTHON@", sitePython) + + jobWrapperJsonFile = jobWrapperFile + ".json" + with open(jobWrapperJsonFile, "w", encoding="utf8") as jsonFile: + json.dump(arguments, jsonFile, ensure_ascii=False) + + with open(jobWrapperFile, "w") as wrapper: + wrapper.write(wrapperTemplate) + + if not rootLocation: + rootLocation = wrapperPath + + # The "real" location of the jobwrapper after it is started + jobWrapperDirect = os.path.join(rootLocation, f"Wrapper_{jobID}") + jobExeFile = os.path.join(wrapperPath, f"Job{jobID}") + jobFileContents = """#!/bin/sh + {} {} {} -o LogLevel={} -o /DIRAC/Security/UseServerCertificate=no {} + """.format( + pythonPath, + jobWrapperDirect, + extraOptions if extraOptions else "", + logLevel, + cfgPath if cfgPath else "", + ) with open(jobExeFile, "w") as jobFile: jobFile.write(jobFileContents) @@ -113,11 +123,40 @@ def createJobWrapper( "JobWrapperConfigPath": jobWrapperJsonFile, "JobWrapperPath": jobWrapperFile, } - if rootLocation != wrapperPath: + if rootLocation and rootLocation != wrapperPath: generatedFiles["JobExecutableRelocatedPath"] = os.path.join(rootLocation, os.path.basename(jobExeFile)) return S_OK(generatedFiles) +def __createCWLJobWrapper(jobID, wrapperPath, log): + # Get the new JobWrapper + protoPath = Path(wrapperPath) / f"proto{jobID}" + protoPath.unlink(missing_ok=True) + log.info("Cloning JobWrapper from repository https://github.com/DIRACGrid/dirac-cwl.git into", protoPath) + try: + subprocess.run(["git", "clone", "https://github.com/DIRACGrid/dirac-cwl.git", str(protoPath)], check=True) + except subprocess.CalledProcessError: + return S_ERROR("Failed to clone the JobWrapper repository") + wrapperFound = glob(os.path.join(str(protoPath), "**", "job_wrapper_template.py"), recursive=True) + if len(wrapperFound) < 1 or not Path(wrapperFound[0]).is_file(): + return S_ERROR("Could not find the JobWrapper in the cloned repository") + jobWrapperFile = wrapperFound[0] + + jobWrapperJsonFile = Path(wrapperPath) / f"InputSandbox{jobID}" / "job.json" + # Create the executable file + jobExeFile = os.path.join(wrapperPath, f"Job{jobID}") + jobFileContents = f"""#!/bin/bash +# Install pixi +curl -fsSL https://pixi.sh/install.sh | bash +pixi install --manifest-path {str(protoPath)} +# Get json +dirac-wms-job-get-input {jobID} -D {wrapperPath} +# Run JobWrapper +pixi run --manifest-path {str(protoPath)} python {jobWrapperFile} {jobWrapperJsonFile} +""" + return S_OK((jobWrapperFile, jobWrapperJsonFile, jobExeFile, jobFileContents)) + + def rescheduleJobs( jobIDs: list[int], source: str = "",