Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion diracx-api/src/diracx/api/jobs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

__all__ = ("create_sandbox", "download_sandbox")
__all__ = ("create_sandbox", "download_sandbox", "submit_jobs")

import hashlib
import logging
Expand All @@ -13,6 +13,8 @@

import httpx
import zstandard
from DIRACCommon.Core.Utilities.ClassAd.ClassAdLight import ClassAd
from typer import FileText

from diracx.client.aio import AsyncDiracClient
from diracx.client.models import SandboxInfo
Expand Down Expand Up @@ -123,3 +125,28 @@ async def download_sandbox(pfn: str, destination: Path, *, client: AsyncDiracCli
with tarfile_open(fh) as tf:
tf.extractall(path=destination, filter="data")
logger.debug("Extracted %s to %s", pfn, destination)


@with_client
async def submit_jobs(jdls: list[FileText], *, client: AsyncDiracClient):
# Create and upload InputSandboxes from JDLs
for i, jdl in enumerate(jdls):
original_jdl = jdl.read()

# Fix possible lack of brackets
if original_jdl.strip()[0] != "[":
original_jdl = f"[{original_jdl}]"

class_ad_job = ClassAd(original_jdl)
if class_ad_job.lookupAttribute("InputSandbox"):
isb = class_ad_job.getListFromExpression("InputSandbox")
sandboxes_pfn = await create_sandbox(
paths=[Path(file_path) for file_path in isb]
)
print(f"InputSandbox created: {sandboxes_pfn[13:]}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. We should use:

  • logging.info instead of print
  • or an optional callback function as a parameter of submit_jobs.

Option1 is probably best in this context.
Option2 could be interesting if we would want to have a progress bar for each processed job for instance (could be implemented later).

Copy link
Contributor Author

@Stellatsuu Stellatsuu Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would something like this be ok for option2?:

In CLI:

@app.async_command()
async def submit(jdls: list[FileText]):
    jdls_values = [jdl.read() for jdl in jdls]

    jobs = await submit_jobs(jdls_values, print_callback=print)
    print(
        f"--- Inserted {len(jobs)} jobs with ids: {', '.join(map(str, (job.job_id for job in jobs)))}"
    )

In API:

@with_client
async def submit_jobs(jdls: list[str], *, client: AsyncDiracClient, print_callback=None):
    # Create and upload InputSandboxes from JDLs
    for i, jdl in enumerate(jdls):
       if print_callback:
            print_callback(f"--- Processing job {i + 1}/{len(jdls)}...")
       ...
       
        if class_ad_job.lookupAttribute("InputSandbox"):
            ...
            isb_log=f"InputSandbox created for job {i + 1}/{len(jdls)}: {sandboxes_pfn[13:]}\n"
            logging.info(isb_log)
            if print_callback:
                print_callback(isb_log)

    ...
    jobs = await client.jobs.submit_jdl_jobs(list(jdls))
    return jobs

Which would return:

> dirac jobs submit test.jdl test2.jdl

--- Processing job 1/2...
InputSandbox created for job 1/2: /S3/demo-sandboxes/diracAdmin/admin/admin/sha256:d264668aeb631a84cb5d5ac1e5c01b1fa361923eb60b1209c9d833abcf0a5c6c.tar.zst

--- Processing job 2/2...
InputSandbox created for job 2/2: /S3/demo-sandboxes/diracAdmin/admin/admin/sha256:984bc7b95829313ca5a5df126662485fe510f2ae4bde648d996777dad055efd4.tar.zst

--- Inserted 2 jobs with ids: 1, 2

class_ad_job.set_expression("InputSandbox", {sandboxes_pfn})

jdls[i] = class_ad_job.asJDL()

jobs = await client.jobs.submit_jdl_jobs(list(jdls))
return jobs
6 changes: 3 additions & 3 deletions diracx-cli/src/diracx/cli/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from rich.table import Table
from typer import FileText, Option

from diracx.api.jobs import submit_jobs
from diracx.client.aio import AsyncDiracClient
from diracx.core.models import ScalarSearchOperator, SearchSpec, VectorSearchOperator
from diracx.core.preferences import OutputFormats, get_diracx_preferences
Expand Down Expand Up @@ -150,9 +151,8 @@ def display_rich(data, content_range: ContentRange) -> None:


@app.async_command()
async def submit(jdl: list[FileText]):
async with AsyncDiracClient() as api:
jobs = await api.jobs.submit_jdl_jobs([x.read() for x in jdl])
async def submit(jdls: list[FileText]):
jobs = await submit_jobs(jdls)
print(
f"Inserted {len(jobs)} jobs with ids: {','.join(map(str, (job.job_id for job in jobs)))}"
)
5 changes: 3 additions & 2 deletions diracx-logic/src/diracx/logic/jobs/sandboxes.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,15 @@ async def assign_sandbox_to_job(
job_id: int,
pfn: str,
sandbox_metadata_db: SandboxMetadataDB,
sandbox_type: Literal["input", "output"],
settings: SandboxStoreSettings,
):
"""Map the pfn as output sandbox to job."""
"""Map the pfn as input or output sandbox to job."""
short_pfn = pfn.split("|", 1)[-1]
await sandbox_metadata_db.assign_sandbox_to_jobs(
jobs_ids=[job_id],
pfn=short_pfn,
sb_type=SandboxType.Output,
sb_type=SandboxType(sandbox_type.capitalize()),
se_name=settings.se_name,
)

Expand Down
4 changes: 0 additions & 4 deletions diracx-logic/src/diracx/logic/jobs/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,6 @@ async def create_jdl_jobs(jobs: list[JobSubmissionSpec], job_db: JobDB, config:
)
)

# Fix possible lack of brackets
if original_jdl.strip()[0] != "[":
original_jdl = f"[{original_jdl}]"

original_jdls.append(
(
original_jdl,
Expand Down
9 changes: 6 additions & 3 deletions diracx-routers/src/diracx/routers/jobs/sandboxes.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,19 +129,22 @@ async def get_job_sandbox(
return await get_job_sandbox_bl(job_id, sandbox_metadata_db, sandbox_type)


@router.patch("/{job_id}/sandbox/output")
@router.patch("/{job_id}/sandbox/{sandbox_type}")
async def assign_sandbox_to_job(
job_id: int,
pfn: Annotated[str, Body(max_length=256, pattern=SANDBOX_PFN_REGEX)],
sandbox_type: Literal["input", "output"],
sandbox_metadata_db: SandboxMetadataDB,
job_db: JobDB,
settings: SandboxStoreSettings,
check_permissions: CheckWMSPolicyCallable,
):
"""Map the pfn as output sandbox to job."""
"""Map the pfn as input or output sandbox to job."""
await check_permissions(action=ActionType.MANAGE, job_db=job_db, job_ids=[job_id])
try:
await assign_sandbox_to_job_bl(job_id, pfn, sandbox_metadata_db, settings)
await assign_sandbox_to_job_bl(
job_id, pfn, sandbox_metadata_db, sandbox_type, settings
)
except SandboxNotFoundError as e:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST, detail="Sandbox not found"
Expand Down
Loading