Skip to content

Commit 3a0e965

Browse files
committed
Merge branch 'job_files_fastapi_put' into arc
2 parents 35b71dd + 47f9617 commit 3a0e965

File tree

5 files changed

+171
-1
lines changed

5 files changed

+171
-1
lines changed

client/src/api/schema/schema.ts

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3060,7 +3060,8 @@ export interface paths {
30603060
* This API method is intended only for consumption by job runners, not end users.
30613061
*/
30623062
get: operations["index_api_jobs__job_id__files_get"];
3063-
put?: never;
3063+
/** Populate an output file. */
3064+
put: operations["populate_api_jobs__job_id__files_put"];
30643065
/**
30653066
* Populate an output file.
30663067
* @description Populate an output file (formal dataset, task split part, working directory file (such as those related to
@@ -31635,6 +31636,76 @@ export interface operations {
3163531636
};
3163631637
};
3163731638
};
31639+
populate_api_jobs__job_id__files_put: {
31640+
parameters: {
31641+
query: {
31642+
/** @description Path to file to create/replace. */
31643+
path: string;
31644+
/** @description A key used to authenticate this request as acting on behalf of a job runner for the specified job. */
31645+
job_key: string;
31646+
};
31647+
header?: {
31648+
/** @description The user ID that will be used to effectively make this API call. Only admins and designated users can make API calls on behalf of other users. */
31649+
"run-as"?: string | null;
31650+
};
31651+
path: {
31652+
/** @description Encoded id string of the job. */
31653+
job_id: string;
31654+
};
31655+
cookie?: never;
31656+
};
31657+
requestBody?: never;
31658+
responses: {
31659+
/** @description Successful Response */
31660+
200: {
31661+
headers: {
31662+
[name: string]: unknown;
31663+
};
31664+
content: {
31665+
"application/json": unknown;
31666+
};
31667+
};
31668+
/** @description A new file has been created. */
31669+
201: {
31670+
headers: {
31671+
[name: string]: unknown;
31672+
};
31673+
content?: never;
31674+
};
31675+
/** @description An existing file has been replaced. */
31676+
204: {
31677+
headers: {
31678+
[name: string]: unknown;
31679+
};
31680+
content?: never;
31681+
};
31682+
/** @description Bad request. */
31683+
400: {
31684+
headers: {
31685+
[name: string]: unknown;
31686+
};
31687+
content?: never;
31688+
};
31689+
/** @description Request Error */
31690+
"4XX": {
31691+
headers: {
31692+
[name: string]: unknown;
31693+
};
31694+
content: {
31695+
"application/json": components["schemas"]["MessageExceptionModel"];
31696+
};
31697+
};
31698+
/** @description Server Error */
31699+
"5XX": {
31700+
headers: {
31701+
[name: string]: unknown;
31702+
};
31703+
content: {
31704+
"application/json": components["schemas"]["MessageExceptionModel"];
31705+
};
31706+
};
31707+
};
31708+
};
3163831709
create_api_jobs__job_id__files_post: {
3163931710
parameters: {
3164031711
query?: {

lib/galaxy/webapps/galaxy/api/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,9 @@ def __init__(self, request: Request):
242242
def base(self) -> str:
243243
return str(self.__request.base_url)
244244

245+
def stream(self) -> AsyncGenerator:
246+
return self.__request.stream()
247+
245248
@property
246249
def url_path(self) -> str:
247250
scope = self.__request.scope
@@ -250,6 +253,10 @@ def url_path(self) -> str:
250253
url = urljoin(url, root_path)
251254
return url
252255

256+
@property
257+
def url(self) -> str:
258+
return str(self.__request.url)
259+
253260
@property
254261
def host(self) -> str:
255262
return self.__request.base_url.netloc

lib/galaxy/webapps/galaxy/api/job_files.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
API for asynchronous job running mechanisms can use to fetch or put files related to running and queued jobs.
33
"""
44

5+
import asyncio
56
import logging
67
import os
78
import re
@@ -19,6 +20,7 @@
1920
Path,
2021
Query,
2122
Request,
23+
Response,
2224
UploadFile,
2325
)
2426
from fastapi.params import Depends
@@ -220,6 +222,63 @@ def index(
220222

221223
return GalaxyFileResponse(path)
222224

225+
# The ARC remote job runner (`lib.galaxy.jobs.runners.pulsar.PulsarARCJobRunner`) expects a `PUT` endpoint to stage
226+
# out result files back to Galaxy.
227+
@router.put(
228+
"/api/jobs/{job_id}/files",
229+
summary="Populate an output file.",
230+
responses={
231+
201: {"description": "A new file has been created."},
232+
204: {"description": "An existing file has been replaced."},
233+
400: {"description": "Bad request."},
234+
},
235+
)
236+
def populate(
237+
self,
238+
job_id: Annotated[str, Path(description="Encoded id string of the job.")],
239+
path: Annotated[str, Query(description="Path to file to create/replace.")],
240+
job_key: Annotated[
241+
str,
242+
Query(
243+
description=(
244+
"A key used to authenticate this request as acting on behalf of a job runner for the specified job."
245+
),
246+
),
247+
],
248+
trans: SessionRequestContext = DependsOnTrans,
249+
):
250+
path = unquote(path)
251+
252+
job = self.__authorize_job_access(trans, job_id, path=path, job_key=job_key)
253+
self.__check_job_can_write_to_path(trans, job, path)
254+
255+
destination_file_exists = os.path.exists(path)
256+
257+
# FastAPI can only read the file contents from the request body in an async context. To write the file without
258+
# using an async endpoint, the async code that reads the file from the body and writes it to disk will have to
259+
# run within the sync endpoint. Since the code that writes the data to disk is blocking
260+
# `destination_file.write(chunk)`, it has to run on its own event loop within the thread spawned to answer the
261+
# request to the sync endpoint.
262+
async def write():
263+
with open(path, "wb") as destination_file:
264+
async for chunk in trans.request.stream():
265+
destination_file.write(chunk)
266+
267+
target_dir = os.path.dirname(path)
268+
util.safe_makedirs(target_dir)
269+
event_loop = asyncio.new_event_loop()
270+
try:
271+
asyncio.set_event_loop(event_loop)
272+
event_loop.run_until_complete(write())
273+
finally:
274+
event_loop.close()
275+
276+
return (
277+
Response(status_code=201, headers={"Location": str(trans.request.url)})
278+
if not destination_file_exists
279+
else Response(status_code=204)
280+
)
281+
223282
@router.post(
224283
"/api/jobs/{job_id}/files",
225284
summary="Populate an output file.",

lib/galaxy/work/context.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import abc
22
from typing import (
33
Any,
4+
AsyncGenerator,
45
Dict,
56
List,
67
Optional,
@@ -87,11 +88,20 @@ class GalaxyAbstractRequest:
8788
def base(self) -> str:
8889
"""Base URL of the request."""
8990

91+
@abc.abstractmethod
92+
def stream(self) -> AsyncGenerator:
93+
"""Request body split in parts."""
94+
9095
@property
9196
@abc.abstractmethod
9297
def url_path(self) -> str:
9398
"""Base with optional prefix added."""
9499

100+
@property
101+
@abc.abstractmethod
102+
def url(self):
103+
"""URL of the request."""
104+
95105
@property
96106
@abc.abstractmethod
97107
def host(self) -> str:

test/integration/test_job_files.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,29 @@ def test_write_with_underscored_file_param(self):
296296
api_asserts.assert_status_code_is_ok(response)
297297
assert open(path).read() == "some initial text data"
298298

299+
def test_write_with_put_request(self):
300+
job, output_hda, working_directory = self.create_static_job_with_state("running")
301+
job_id, job_key = self._api_job_keys(job)
302+
path = self._app.object_store.get_filename(output_hda.dataset)
303+
assert path
304+
data = {"path": path, "job_key": job_key}
305+
306+
new_file_path = os.path.join(working_directory, "new_file.txt")
307+
put_url = self._api_url(f"jobs/{job_id}/files", use_key=False)
308+
response = requests.put(
309+
put_url,
310+
params={"path": new_file_path, "job_key": job_key},
311+
data=b"whole contents of the file",
312+
)
313+
assert response.status_code == 201
314+
assert open(new_file_path).read() == "whole contents of the file"
315+
316+
assert os.path.exists(path)
317+
put_url = self._api_url(f"jobs/{job_id}/files", use_key=False)
318+
response = requests.put(put_url, params=data, data=b"contents of a replacement file")
319+
assert response.status_code == 204
320+
assert open(path).read() == "contents of a replacement file"
321+
299322
def test_write_protection(self):
300323
job, _, _ = self.create_static_job_with_state("running")
301324
job_id, job_key = self._api_job_keys(job)

0 commit comments

Comments
 (0)