Skip to content

Commit 0bbb215

Browse files
authored
Merge pull request #393 from aldbr/main_FIX_sb-assignment-edge-cases
fix(db): assigning non existing sandbox or multiple times the same sandbox
2 parents 4db5094 + e03d727 commit 0bbb215

File tree

4 files changed

+97
-9
lines changed

4 files changed

+97
-9
lines changed

diracx-core/src/diracx/core/exceptions.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,16 @@ def __init__(self, pfn: str, se_name: str, detail: str | None = None):
5656
)
5757

5858

59+
class SandboxAlreadyAssignedError(Exception):
60+
def __init__(self, pfn: str, se_name: str, detail: str | None = None):
61+
self.pfn: str = pfn
62+
self.se_name: str = se_name
63+
super().__init__(
64+
f"Sandbox with {pfn} and {se_name} already assigned"
65+
+ (" ({detail})" if detail else "")
66+
)
67+
68+
5969
class JobError(Exception):
6070
def __init__(self, job_id, detail: str | None = None):
6171
self.job_id: int = job_id

diracx-db/src/diracx/db/sql/sandbox_metadata/db.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from sqlalchemy import Executable, delete, insert, literal, select, update
66
from sqlalchemy.exc import IntegrityError, NoResultFound
77

8-
from diracx.core.exceptions import SandboxNotFoundError
8+
from diracx.core.exceptions import SandboxAlreadyAssignedError, SandboxNotFoundError
99
from diracx.core.models import SandboxInfo, SandboxType, UserInfo
1010
from diracx.db.sql.utils import BaseSQLDB, utcnow
1111

@@ -135,10 +135,18 @@ async def assign_sandbox_to_jobs(
135135
stmt = insert(SBEntityMapping).from_select(
136136
["SBId", "EntityId", "Type"], select_sb_id
137137
)
138-
await self.conn.execute(stmt)
138+
try:
139+
await self.conn.execute(stmt)
140+
except IntegrityError as e:
141+
raise SandboxAlreadyAssignedError(pfn, se_name) from e
139142

140143
stmt = update(SandBoxes).where(SandBoxes.SEPFN == pfn).values(Assigned=True)
141144
result = await self.conn.execute(stmt)
145+
if result.rowcount == 0:
146+
# If the update didn't affect any row, the sandbox doesn't exist
147+
# It means the previous insert didn't have any effect
148+
raise SandboxNotFoundError(pfn, se_name)
149+
142150
assert result.rowcount == 1
143151

144152
async def unassign_sandboxes_to_jobs(self, jobs_ids: list[int]) -> None:

diracx-routers/src/diracx/routers/jobs/sandboxes.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from pydantic_settings import SettingsConfigDict
1414
from pyparsing import Any
1515

16-
from diracx.core.exceptions import SandboxNotFoundError
16+
from diracx.core.exceptions import SandboxAlreadyAssignedError, SandboxNotFoundError
1717
from diracx.core.models import (
1818
SandboxInfo,
1919
SandboxType,
@@ -267,12 +267,21 @@ async def assign_sandbox_to_job(
267267
"""Map the pfn as output sandbox to job."""
268268
await check_permissions(action=ActionType.MANAGE, job_db=job_db, job_ids=[job_id])
269269
short_pfn = pfn.split("|", 1)[-1]
270-
await sandbox_metadata_db.assign_sandbox_to_jobs(
271-
jobs_ids=[job_id],
272-
pfn=short_pfn,
273-
sb_type=SandboxType.Output,
274-
se_name=settings.se_name,
275-
)
270+
try:
271+
await sandbox_metadata_db.assign_sandbox_to_jobs(
272+
jobs_ids=[job_id],
273+
pfn=short_pfn,
274+
sb_type=SandboxType.Output,
275+
se_name=settings.se_name,
276+
)
277+
except SandboxNotFoundError as e:
278+
raise HTTPException(
279+
status_code=HTTPStatus.BAD_REQUEST, detail="Sandbox not found"
280+
) from e
281+
except (SandboxAlreadyAssignedError, AssertionError) as e:
282+
raise HTTPException(
283+
status_code=HTTPStatus.BAD_REQUEST, detail="Sandbox already assigned"
284+
) from e
276285

277286

278287
@router.delete("/{job_id}/sandbox")

diracx-routers/tests/jobs/test_sandboxes.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,3 +249,64 @@ def test_get_empty_job_sandboxes(normal_user_client: TestClient):
249249
r = normal_user_client.get(f"/api/jobs/{job_id}/sandbox")
250250
assert r.status_code == 200
251251
assert r.json() == {"Input": [None], "Output": [None]}
252+
253+
254+
def test_assign_nonexisting_sb_to_job(normal_user_client: TestClient):
255+
"""Test that we cannot assign a non-existing sandbox to a job."""
256+
# Submit a job:
257+
job_definitions = [TEST_JDL]
258+
r = normal_user_client.post("/api/jobs/jdl", json=job_definitions)
259+
assert r.status_code == 200, r.json()
260+
assert len(r.json()) == len(job_definitions)
261+
job_id = r.json()[0]["JobID"]
262+
263+
# Malformed request:
264+
r = normal_user_client.patch(
265+
f"/api/jobs/{job_id}/sandbox/output",
266+
json="/S3/pathto/vo/vo_group/user/sha256:55967b0c430058c3105472b1edae6c8987c65bcf01ef58f10a3f5e93948782d8.tar.bz2",
267+
)
268+
assert r.status_code == 400
269+
270+
271+
def test_assign_sb_to_job_twice(normal_user_client: TestClient):
272+
"""Test that we cannot assign a sandbox to a job twice."""
273+
data = secrets.token_bytes(512)
274+
checksum = hashlib.sha256(data).hexdigest()
275+
276+
# Upload Sandbox:
277+
r = normal_user_client.post(
278+
"/api/jobs/sandbox",
279+
json={
280+
"checksum_algorithm": "sha256",
281+
"checksum": checksum,
282+
"size": len(data),
283+
"format": "tar.bz2",
284+
},
285+
)
286+
287+
assert r.status_code == 200, r.text
288+
upload_info = r.json()
289+
assert upload_info["url"]
290+
sandbox_pfn = upload_info["pfn"]
291+
assert sandbox_pfn.startswith("SB:SandboxSE|/S3/")
292+
293+
# Submit a job:
294+
job_definitions = [TEST_JDL]
295+
r = normal_user_client.post("/api/jobs/jdl", json=job_definitions)
296+
assert r.status_code == 200, r.json()
297+
assert len(r.json()) == len(job_definitions)
298+
job_id = r.json()[0]["JobID"]
299+
300+
# Assign sandbox to the job: first attempt should be successful
301+
r = normal_user_client.patch(
302+
f"/api/jobs/{job_id}/sandbox/output",
303+
json=sandbox_pfn,
304+
)
305+
assert r.status_code == 200
306+
307+
# Assign sandbox to the job: second attempt should fail
308+
r = normal_user_client.patch(
309+
f"/api/jobs/{job_id}/sandbox/output",
310+
json=sandbox_pfn,
311+
)
312+
assert r.status_code == 400

0 commit comments

Comments
 (0)