Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 70 additions & 31 deletions src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
""" Utilities for WMS
"""
import os
from pathlib import Path
from glob import glob
import subprocess
import sys
import json

Expand Down Expand Up @@ -63,8 +66,6 @@ def createJobWrapper(
if os.path.exists(jobWrapperFile):
log.verbose("Removing existing Job Wrapper for", jobID)
os.remove(jobWrapperFile)
with open(os.path.join(diracRoot, defaultWrapperLocation)) as fd:
wrapperTemplate = fd.read()

if "LogLevel" in jobParams:
logLevel = jobParams["LogLevel"]
Expand All @@ -76,34 +77,43 @@ def createJobWrapper(
pythonPath = os.path.realpath(sys.executable)
log.debug("Real python path after resolving links is: ", pythonPath)

# Making real substitutions
sitePython = os.getcwd()
if rootLocation:
sitePython = rootLocation
wrapperTemplate = wrapperTemplate.replace("@SITEPYTHON@", sitePython)

jobWrapperJsonFile = jobWrapperFile + ".json"
with open(jobWrapperJsonFile, "w", encoding="utf8") as jsonFile:
json.dump(arguments, jsonFile, ensure_ascii=False)

with open(jobWrapperFile, "w") as wrapper:
wrapper.write(wrapperTemplate)

if not rootLocation:
rootLocation = wrapperPath

# The "real" location of the jobwrapper after it is started
jobWrapperDirect = os.path.join(rootLocation, f"Wrapper_{jobID}")
jobExeFile = os.path.join(wrapperPath, f"Job{jobID}")
jobFileContents = """#!/bin/sh
{} {} {} -o LogLevel={} -o /DIRAC/Security/UseServerCertificate=no {}
""".format(
pythonPath,
jobWrapperDirect,
extraOptions if extraOptions else "",
logLevel,
cfgPath if cfgPath else "",
)
if "Executable" in jobParams and jobParams["Executable"] == "dirac-cwl-exec":
ret = __createCWLJobWrapper(jobID, wrapperPath, log)
if not ret["OK"]:
return ret
jobWrapperFile, jobWrapperJsonFile, jobExeFile, jobFileContents = ret["Value"]
else:
with open(os.path.join(diracRoot, defaultWrapperLocation)) as fd:
wrapperTemplate = fd.read()

# Making real substitutions
sitePython = os.getcwd()
if rootLocation:
sitePython = rootLocation
wrapperTemplate = wrapperTemplate.replace("@SITEPYTHON@", sitePython)

jobWrapperJsonFile = jobWrapperFile + ".json"
with open(jobWrapperJsonFile, "w", encoding="utf8") as jsonFile:
json.dump(arguments, jsonFile, ensure_ascii=False)

with open(jobWrapperFile, "w") as wrapper:
wrapper.write(wrapperTemplate)

if not rootLocation:
rootLocation = wrapperPath

# The "real" location of the jobwrapper after it is started
jobWrapperDirect = os.path.join(rootLocation, f"Wrapper_{jobID}")
jobExeFile = os.path.join(wrapperPath, f"Job{jobID}")
jobFileContents = """#!/bin/sh
{} {} {} -o LogLevel={} -o /DIRAC/Security/UseServerCertificate=no {}
""".format(
pythonPath,
jobWrapperDirect,
extraOptions if extraOptions else "",
logLevel,
cfgPath if cfgPath else "",
)

with open(jobExeFile, "w") as jobFile:
jobFile.write(jobFileContents)
Expand All @@ -113,11 +123,40 @@ def createJobWrapper(
"JobWrapperConfigPath": jobWrapperJsonFile,
"JobWrapperPath": jobWrapperFile,
}
if rootLocation != wrapperPath:
if rootLocation and rootLocation != wrapperPath:
generatedFiles["JobExecutableRelocatedPath"] = os.path.join(rootLocation, os.path.basename(jobExeFile))
return S_OK(generatedFiles)


def __createCWLJobWrapper(jobID, wrapperPath, log):
# Get the new JobWrapper
protoPath = Path(wrapperPath) / f"proto{jobID}"
protoPath.unlink(missing_ok=True)
log.info("Cloning JobWrapper from repository https://github.com/DIRACGrid/dirac-cwl.git into", protoPath)
try:
subprocess.run(["git", "clone", "https://github.com/DIRACGrid/dirac-cwl.git", str(protoPath)], check=True)
except subprocess.CalledProcessError:
return S_ERROR("Failed to clone the JobWrapper repository")
wrapperFound = glob(os.path.join(str(protoPath), "**", "job_wrapper_template.py"), recursive=True)
if len(wrapperFound) < 1 or not Path(wrapperFound[0]).is_file():
return S_ERROR("Could not find the JobWrapper in the cloned repository")
jobWrapperFile = wrapperFound[0]

jobWrapperJsonFile = Path(wrapperPath) / f"InputSandbox{jobID}" / "job.json"
# Create the executable file
jobExeFile = os.path.join(wrapperPath, f"Job{jobID}")
jobFileContents = f"""#!/bin/bash
# Install pixi
curl -fsSL https://pixi.sh/install.sh | bash
pixi install --manifest-path {str(protoPath)}
# Get json
dirac-wms-job-get-input {jobID} -D {wrapperPath}
# Run JobWrapper
pixi run --manifest-path {str(protoPath)} python {jobWrapperFile} {jobWrapperJsonFile}
"""
return S_OK((jobWrapperFile, jobWrapperJsonFile, jobExeFile, jobFileContents))


def rescheduleJobs(
jobIDs: list[int],
source: str = "",
Expand Down
Loading