From f0df4fb91bbfab8f7ee3b816e289c21b681864b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Manuel=20Dom=C3=ADnguez?= Date: Thu, 10 Apr 2025 16:54:04 +0200 Subject: [PATCH 01/10] Migrate `JobFilesAPIController` to FastAPI (excluding TUS uploads) `FastAPIJobFiles` is the new, FastAPI version of `JobFilesAPIController`. The endpoints that have been migrated should exhibit exactly the same behavior as the old ones from `FastAPIJobFiles`. Something to keep in mind is that while FastAPI has some extra built-in features that the legacy WSGI system did not have, such as answering HEAD requests, those do not work because of the way legacy WSGI endpoints are injected into the FastAPI app (using `app.mount("/", wsgi_handler)`), meaning that for example, HEAD requests are passed to the `wsgi_handler` sub-application. Endpoints dedicated to TUS uploads work in tandem with the WSGI middleware `TusMiddleware` from the `tuswsgi` package. As explained above, WSGI middlewares and endpoints are injected into the FastAPI app after FastAPI routes as a single sub-application `wsgi_handler` using `app.mount("/", wsgi_handler)`, meaning that requests are passed to the `wsgi_handler` sub-application (and thus to `TusMiddleware`) only if there was no FastAPI endpoint defined to handle them. Therefore, they cannot be migrated to FastAPI unless `TusMiddleware` is also migrated to ASGI. --- client/src/api/schema/schema.ts | 226 +++++++++++++ lib/galaxy/webapps/galaxy/api/job_files.py | 374 +++++++++++++-------- lib/galaxy/webapps/galaxy/buildapp.py | 18 - test/integration/test_job_files.py | 105 +++++- 4 files changed, 556 insertions(+), 167 deletions(-) diff --git a/client/src/api/schema/schema.ts b/client/src/api/schema/schema.ts index 67f4cd7e1821..30a63c85fde2 100644 --- a/client/src/api/schema/schema.ts +++ b/client/src/api/schema/schema.ts @@ -2997,6 +2997,44 @@ export interface paths { patch?: never; trace?: never; }; + "/api/jobs/{job_id}/files": { + parameters: { + query?: never; + header?: never; + path?: never; + cookie?: never; + }; + /** + * Get a file required to staging a job. + * @description Get a file required to staging a job (proper datasets, extra inputs, task-split inputs, working directory + * files). + * + * This API method is intended only for consumption by job runners, not end users. + */ + get: operations["index_api_jobs__job_id__files_get"]; + put?: never; + /** + * Populate an output file. + * @description Populate an output file (formal dataset, task split part, working directory file (such as those related to + * metadata). This should be a multipart POST with a 'file' parameter containing the contents of the actual file to + * create. + * + * This API method is intended only for consumption by job runners, not end users. + */ + post: operations["create_api_jobs__job_id__files_post"]; + delete?: never; + options?: never; + /** + * Get a file required to staging a job. + * @description Get a file required to staging a job (proper datasets, extra inputs, task-split inputs, working directory + * files). + * + * This API method is intended only for consumption by job runners, not end users. + */ + head: operations["index_api_jobs__job_id__files_get"]; + patch?: never; + trace?: never; + }; "/api/jobs/{job_id}/inputs": { parameters: { query?: never; @@ -6628,6 +6666,34 @@ export interface components { /** Name */ name?: unknown; }; + /** Body_create_api_jobs__job_id__files_post */ + Body_create_api_jobs__job_id__files_post: { + /** + * File + * Format: binary + */ + __file?: string; + /** File Path */ + __file_path?: string; + /** + * File + * Format: binary + * @description Contents of the file to create. + */ + file?: string; + /** + * Job Key + * @description A key used to authenticate this request as acting on behalf or a job runner for the specified job. + */ + job_key: string; + /** + * Path + * @description Path to file to create. + */ + path: string; + /** Session Id */ + session_id?: string; + }; /** Body_create_form_api_libraries__library_id__contents_post */ Body_create_form_api_libraries__library_id__contents_post: { /** Create Type */ @@ -31164,6 +31230,166 @@ export interface operations { }; }; }; + index_api_jobs__job_id__files_get: { + parameters: { + query: { + /** @description Path to file. */ + path: string; + /** @description A key used to authenticate this request as acting on behalf or a job runner for the specified job. */ + job_key: string; + }; + header?: { + /** @description The user ID that will be used to effectively make this API call. Only admins and designated users can make API calls on behalf of other users. */ + "run-as"?: string | null; + }; + path: { + /** @description Encoded id string of the job. */ + job_id: string; + }; + cookie?: never; + }; + requestBody?: never; + responses: { + /** @description Contents of file. */ + 200: { + headers: { + [name: string]: unknown; + }; + content: { + "application/octet-stream": unknown; + }; + }; + /** @description File not found, path does not refer to a file, or input dataset(s) for job have been purged. */ + 400: { + headers: { + [name: string]: unknown; + }; + content?: never; + }; + /** @description Request Error */ + "4XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + /** @description Server Error */ + "5XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + }; + }; + create_api_jobs__job_id__files_post: { + parameters: { + query?: never; + header?: { + /** @description The user ID that will be used to effectively make this API call. Only admins and designated users can make API calls on behalf of other users. */ + "run-as"?: string | null; + }; + path: { + /** @description Encoded id string of the job. */ + job_id: string; + }; + cookie?: never; + }; + requestBody: { + content: { + "multipart/form-data": components["schemas"]["Body_create_api_jobs__job_id__files_post"]; + }; + }; + responses: { + /** @description An okay message. */ + 200: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": unknown; + }; + }; + /** @description Request Error */ + "4XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + /** @description Server Error */ + "5XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + }; + }; + index_api_jobs__job_id__files_get: { + parameters: { + query: { + /** @description Path to file. */ + path: string; + /** @description A key used to authenticate this request as acting on behalf or a job runner for the specified job. */ + job_key: string; + }; + header?: { + /** @description The user ID that will be used to effectively make this API call. Only admins and designated users can make API calls on behalf of other users. */ + "run-as"?: string | null; + }; + path: { + /** @description Encoded id string of the job. */ + job_id: string; + }; + cookie?: never; + }; + requestBody?: never; + responses: { + /** @description Contents of file. */ + 200: { + headers: { + [name: string]: unknown; + }; + content: { + "application/octet-stream": unknown; + }; + }; + /** @description File not found, path does not refer to a file, or input dataset(s) for job have been purged. */ + 400: { + headers: { + [name: string]: unknown; + }; + content?: never; + }; + /** @description Request Error */ + "4XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + /** @description Server Error */ + "5XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + }; + }; get_inputs_api_jobs__job_id__inputs_get: { parameters: { query?: never; diff --git a/lib/galaxy/webapps/galaxy/api/job_files.py b/lib/galaxy/webapps/galaxy/api/job_files.py index 31a4974376e7..70fb7b061920 100644 --- a/lib/galaxy/webapps/galaxy/api/job_files.py +++ b/lib/galaxy/webapps/galaxy/api/job_files.py @@ -1,11 +1,26 @@ -"""API for asynchronous job running mechanisms can use to fetch or put files -related to running and queued jobs. +""" +API for asynchronous job running mechanisms can use to fetch or put files related to running and queued jobs. """ import logging import os import re import shutil +from typing import ( + cast, + IO, + Optional, +) +from urllib.parse import unquote + +from fastapi import ( + File, + Form, + Path, + Query, + UploadFile, +) +from typing_extensions import Annotated from galaxy import ( exceptions, @@ -13,55 +28,90 @@ ) from galaxy.managers.context import ProvidesAppContext from galaxy.model import Job -from galaxy.web import ( - expose_api_anonymous_and_sessionless, - expose_api_raw_anonymous_and_sessionless, +from galaxy.web import expose_api_anonymous_and_sessionless +from galaxy.webapps.base.api import GalaxyFileResponse +from galaxy.webapps.galaxy.api import ( + DependsOnTrans, + Router, ) from . import BaseGalaxyAPIController +__all__ = ("FastAPIJobFiles", "JobFilesAPIController", "router") + log = logging.getLogger(__name__) -class JobFilesAPIController(BaseGalaxyAPIController): - """This job files controller allows remote job running mechanisms to - read and modify the current state of files for queued and running jobs. - It is certainly not meant to represent part of Galaxy's stable, user - facing API. - - Furthermore, even if a user key corresponds to the user running the job, - it should not be accepted for authorization - this API allows access to - low-level unfiltered files and such authorization would break Galaxy's - security model for tool execution. - """ +router = Router( + # keep the endpoint in the undocumented section of the API docs `/api/docs`, as all endpoints from `FastAPIJobFiles` + # are certainly not meant to represent part of Galaxy's stable, user facing API + tags=["undocumented"] +) - @expose_api_raw_anonymous_and_sessionless - def index(self, trans: ProvidesAppContext, job_id, **kwargs): - """ - GET /api/jobs/{job_id}/files - Get a file required to staging a job (proper datasets, extra inputs, - task-split inputs, working directory files). +@router.cbv +class FastAPIJobFiles: + """ + This job files controller allows remote job running mechanisms to read and modify the current state of files for + queued and running jobs. It is certainly not meant to represent part of Galaxy's stable, user facing API. - :type job_id: str - :param job_id: encoded id string of the job - :type path: str - :param path: Path to file. - :type job_key: str - :param job_key: A key used to authenticate this request as acting on - behalf or a job runner for the specified job. + Furthermore, even if a user key corresponds to the user running the job, it should not be accepted for authorization + - this API allows access to low-level unfiltered files and such authorization would break Galaxy's security model + for tool execution. + """ - ..note: - This API method is intended only for consumption by job runners, - not end users. + # FastAPI answers HEAD requests automatically for GET endpoints. However, because of the way legacy WSGI endpoints + # are injected into the FastAPI app (using `app.mount("/", wsgi_handler)`), the built-in support for `HEAD` requests + # breaks, because such requests are passed to the `wsgi_handler` sub-application. This means that the endpoint still + # needs to include some code to handle this behavior, as tests existing before the migration to FastAPI expect HEAD + # requests to work. + # + # @router.get( # use me when ALL endpoints have been migrated to FastAPI + @router.api_route( + "/api/jobs/{job_id}/files", + summary="Get a file required to staging a job.", + responses={ + 200: { + "description": "Contents of file.", + "content": {"application/json": None, "application/octet-stream": {"example": None}}, + }, + 400: { + "description": ( + "File not found, path does not refer to a file, or input dataset(s) for job have been purged." + ) + }, + }, + methods=["GET", "HEAD"], # remove me when ALL endpoints have been migrated to FastAPI + ) + def index( + self, + job_id: Annotated[str, Path(description="Encoded id string of the job.")], + path: Annotated[ + str, + Query( + description="Path to file.", + ), + ], + job_key: Annotated[ + str, + Query( + description=( + "A key used to authenticate this request as acting on behalf or a job runner for the specified job." + ), + ), + ], + trans: ProvidesAppContext = DependsOnTrans, + ) -> GalaxyFileResponse: + """ + Get a file required to staging a job (proper datasets, extra inputs, task-split inputs, working directory + files). - :rtype: binary - :returns: contents of file + This API method is intended only for consumption by job runners, not end users. """ - job = self.__authorize_job_access(trans, job_id, **kwargs) - path = kwargs["path"] - try: - return open(path, "rb") - except FileNotFoundError: + path = unquote(path) + + job = self.__authorize_job_access(trans, job_id, path=path, job_key=job_key) + + if not os.path.exists(path): # We know that the job is not terminal, but users (or admin scripts) can purge input datasets. # Here we discriminate that case from truly unexpected bugs. # Not failing the job here, this is or should be handled by pulsar. @@ -70,55 +120,74 @@ def index(self, trans: ProvidesAppContext, job_id, **kwargs): # This looks like a galaxy dataset, check if any job input has been deleted. if any(jtid.dataset.dataset.purged for jtid in job.input_datasets): raise exceptions.ItemDeletionException("Input dataset(s) for job have been purged.") - else: - raise - @expose_api_anonymous_and_sessionless - def create(self, trans, job_id, payload, **kwargs): + return GalaxyFileResponse(path) + + @router.post( + "/api/jobs/{job_id}/files", + summary="Populate an output file.", + responses={ + 200: {"description": "An okay message.", "content": {"application/json": {"example": {"message": "ok"}}}}, + }, + ) + def create( + self, + job_id: Annotated[str, Path(description="Encoded id string of the job.")], + path: Annotated[str, Form(description="Path to file to create.")], + job_key: Annotated[ + str, + Form( + description=( + "A key used to authenticate this request as acting on behalf or a job runner for the specified job." + ) + ), + ], + file: UploadFile = File(None, description="Contents of the file to create."), + session_id: str = Form(None), + nginx_upload_module_file_path: str = Form( + None, + alias="__file_path", + validation_alias="__file_path", + # both `alias` and `validation_alias` are needed for body parameters, see + # https://github.com/fastapi/fastapi/issues/10286#issuecomment-1727642960 + ), + underscore_file: UploadFile = File( + None, + alias="__file", + validation_alias="__file", + # both `alias` and `validation_alias` are needed for body parameters, see + # https://github.com/fastapi/fastapi/issues/10286#issuecomment-1727642960 + ), + trans: ProvidesAppContext = DependsOnTrans, + ): """ - create( self, trans, job_id, payload, **kwargs ) - * POST /api/jobs/{job_id}/files - Populate an output file (formal dataset, task split part, working - directory file (such as those related to metadata)). This should be - a multipart post with a 'file' parameter containing the contents of - the actual file to create. - - :type job_id: str - :param job_id: encoded id string of the job - :type payload: dict - :param payload: dictionary structure containing:: - 'job_key' = Key authenticating - 'path' = Path to file to create. - - ..note: - This API method is intended only for consumption by job runners, - not end users. - - :rtype: dict - :returns: an okay message + Populate an output file (formal dataset, task split part, working directory file (such as those related to + metadata). This should be a multipart POST with a 'file' parameter containing the contents of the actual file to + create. + + This API method is intended only for consumption by job runners, not end users. """ - job = self.__authorize_job_access(trans, job_id, **payload) - path = payload.get("path") - if not path: - raise exceptions.RequestParameterInvalidException("'path' parameter not provided or empty.") + path = unquote(path) + + job = self.__authorize_job_access(trans, job_id, path=path, job_key=job_key) self.__check_job_can_write_to_path(trans, job, path) + input_file: Optional[IO[bytes]] = None + input_file_path: Optional[str] = None # Is this writing an unneeded file? Should this just copy in Python? - if "__file_path" in payload: - file_path = payload.get("__file_path") + if nginx_upload_module_file_path: + input_file_path = nginx_upload_module_file_path upload_store = trans.app.config.nginx_upload_job_files_store assert upload_store, ( "Request appears to have been processed by" " nginx_upload_module but Galaxy is not" " configured to recognize it" ) - assert file_path.startswith( + assert input_file_path.startswith( upload_store - ), f"Filename provided by nginx ({file_path}) is not in correct directory ({upload_store})" - input_file = open(file_path) - elif "session_id" in payload: + ), f"Filename provided by nginx ({input_file_path}) is not in correct directory ({upload_store})" + elif session_id: # code stolen from basic.py - session_id = payload["session_id"] upload_store = ( trans.app.config.tus_upload_store_job_files or trans.app.config.tus_upload_store @@ -127,76 +196,41 @@ def create(self, trans, job_id, payload, **kwargs): if re.match(r"^[\w-]+$", session_id) is None: raise ValueError("Invalid session id format.") local_filename = os.path.abspath(os.path.join(upload_store, session_id)) - input_file = open(local_filename) + input_file_path = local_filename + elif file: + input_file = file.file + elif underscore_file: + input_file = underscore_file.file else: - input_file = payload.get("file", payload.get("__file", None)).file + raise exceptions.RequestParameterMissingException("No file uploaded.") + target_dir = os.path.dirname(path) util.safe_makedirs(target_dir) - try: - if os.path.exists(path) and (path.endswith("tool_stdout") or path.endswith("tool_stderr")): - with open(path, "ab") as destination: - shutil.copyfileobj(open(input_file.name, "rb"), destination) + if os.path.exists(path) and (path.endswith("tool_stdout") or path.endswith("tool_stderr")): + with open(path, "ab") as destination: + if input_file_path: + with open(input_file_path, "rb") as input_file_handle: + shutil.copyfileobj(input_file_handle, destination) + else: + shutil.copyfileobj(cast(IO[bytes], input_file), destination) + else: + # prior to migrating to FastAPI, this operation was done more efficiently for all cases using + # `shutil.move(input_file_path, path)`, but FastAPI stores the uploaded file as + # `tempfile.SpooledTemporaryFile` + # (https://docs.python.org/3/library/tempfile.html#tempfile.SpooledTemporaryFile), so now there is not even + # a path where uploaded files can be accessed on disk + if input_file_path: + shutil.move(input_file_path, path) else: - shutil.move(input_file.name, path) - finally: - try: - input_file.close() - except OSError: - # Fails to close file if not using nginx upload because the - # tempfile has moved and Python wants to delete it. - pass - return {"message": "ok"} - - @expose_api_anonymous_and_sessionless - def tus_patch(self, trans, **kwds): - """ - Exposed as PATCH /api/job_files/resumable_upload. - - I think based on the docs, a separate tusd server is needed for job files if - also hosting one for use facing uploads. - - Setting up tusd for job files should just look like (I think): - - tusd -host localhost -port 1080 -upload-dir=/database/tmp - - See more discussion of checking upload access, but we shouldn't need the - API key and session stuff the user upload tusd server should be configured with. - - Also shouldn't need a hooks endpoint for this reason but if you want to add one - the target CLI entry would be -hooks-http=/api/job_files/tus_hooks - and the action is featured below. - - I would love to check the job state with __authorize_job_access on the first - POST but it seems like TusMiddleware doesn't default to coming in here for that - initial POST the way it does for the subsequent PATCHes. Ultimately, the upload - is still authorized before the write done with POST /api/jobs//files - so I think there is no route here to mess with user data - the worst of the security - issues that can be caused is filling up the sever with needless files that aren't - acted on. Since this endpoint is not meant for public consumption - all the job - files stuff and the TUS server should be blocked to public IPs anyway and restricted - to your Pulsar servers and similar targeting could be accomplished with a user account - and the user facing upload endpoints. - """ - return None - - @expose_api_anonymous_and_sessionless - def tus_hooks(self, trans, **kwds): - """No-op but if hook specified the way we do for user upload it would hit this action. - - Exposed as PATCH /api/job_files/tus_hooks and documented in the docstring for - tus_patch. - """ - pass + with open(path, "wb") as destination: + shutil.copyfileobj(cast(IO[bytes], input_file), destination) - def __authorize_job_access(self, trans, encoded_job_id, **kwargs): - for key in ["path", "job_key"]: - if key not in kwargs: - error_message = f"Job files action requires a valid '{key}'." - raise exceptions.ObjectAttributeMissingException(error_message) + return {"message": "ok"} + def __authorize_job_access(self, trans, encoded_job_id, path, job_key): job_id = trans.security.decode_id(encoded_job_id) - job_key = trans.security.encode_id(job_id, kind="jobs_files") - if not util.safe_str_cmp(str(kwargs["job_key"]), job_key): + job_key_from_job_id = trans.security.encode_id(job_id, kind="jobs_files") + if not util.safe_str_cmp(str(job_key), job_key_from_job_id): raise exceptions.ItemAccessibilityException("Invalid job_key supplied.") # Verify job is active. Don't update the contents of complete jobs. @@ -207,9 +241,9 @@ def __authorize_job_access(self, trans, encoded_job_id, **kwargs): return job def __check_job_can_write_to_path(self, trans, job, path): - """Verify an idealized job runner should actually be able to write to - the specified path - it must be a dataset output, a dataset "extra - file", or a some place in the working directory of this job. + """ + Verify an idealized job runner should actually be able to write to the specified path - it must be a dataset + output, a dataset "extra file", or a some place in the working directory of this job. Would like similar checks for reading the unstructured nature of loc files make this very difficult. (See abandoned work here @@ -220,8 +254,8 @@ def __check_job_can_write_to_path(self, trans, job, path): raise exceptions.ItemAccessibilityException("Job is not authorized to write to supplied path.") def __is_output_dataset_path(self, job, path): - """Check if is an output path for this job or a file in the an - output's extra files path. + """ + Check if is an output path for this job or a file in the output's extra files path. """ da_lists = [job.output_datasets, job.output_library_datasets] for da_list in da_lists: @@ -240,3 +274,59 @@ def __in_working_directory(self, job, path, app): job, base_dir="job_work", dir_only=True, extra_dir=str(job.id) ) return util.in_directory(path, working_directory) + + +class JobFilesAPIController(BaseGalaxyAPIController): + """ + Legacy WSGI endpoints dedicated to TUS uploads. + + TUS upload endpoints work in tandem with the WSGI middleware `TusMiddleware` from the `tuswsgi` package. Both + WSGI middlewares and endpoints are injected into the FastAPI app after FastAPI routes as a single sub-application + `wsgi_handler` using `app.mount("/", wsgi_handler)`, meaning that requests are passed to the `wsgi_handler` + sub-application (and thus to `TusMiddleware`) only if there was no FastAPI endpoint defined to handle them. + + Therefore, these legacy WSGI endpoints cannot be migrated to FastAPI unless `TusMiddleware` is migrated to ASGI. + """ + + @expose_api_anonymous_and_sessionless + def tus_patch(self, trans, **kwds): + """ + Exposed as PATCH /api/job_files/resumable_upload. + + I think based on the docs, a separate tusd server is needed for job files if + also hosting one for use facing uploads. + + Setting up tusd for job files should just look like (I think): + + `tusd -host localhost -port 1080 -upload-dir=/database/tmp` + + See more discussion of checking upload access, but we shouldn't need the + API key and session stuff the user upload tusd server should be configured with. + + Also shouldn't need a hooks endpoint for this reason but if you want to add one + the target CLI entry would be `-hooks-http=/api/job_files/tus_hooks` + and the action is featured below. + + I would love to check the job state with `__authorize_job_access` on the first + POST but it seems like `TusMiddleware` doesn't default to coming in here for that + initial POST the way it does for the subsequent PATCHes. Ultimately, the upload + is still authorized before the write done with POST `/api/jobs//files` + so I think there is no route here to mess with user data - the worst of the security + issues that can be caused is filling up the sever with needless files that aren't + acted on. Since this endpoint is not meant for public consumption - all the job + files stuff and the TUS server should be blocked to public IPs anyway and restricted + to your Pulsar servers and similar targeting could be accomplished with a user account + and the user facing upload endpoints. + """ + ... + return None + + @expose_api_anonymous_and_sessionless + def tus_hooks(self, trans, **kwds): + """ + No-op but if hook specified the way we do for user upload it would hit this action. + + Exposed as PATCH /api/job_files/tus_hooks and documented in the docstring for tus_patch. + """ + ... + return None diff --git a/lib/galaxy/webapps/galaxy/buildapp.py b/lib/galaxy/webapps/galaxy/buildapp.py index e603abf6a1ef..ad1962de7188 100644 --- a/lib/galaxy/webapps/galaxy/buildapp.py +++ b/lib/galaxy/webapps/galaxy/buildapp.py @@ -870,24 +870,6 @@ def populate_api_routes(webapp, app): conditions=dict(method=["GET"]), ) - # Job files controllers. Only for consumption by remote job runners. - webapp.mapper.resource( - "file", - "files", - controller="job_files", - name_prefix="job_", - path_prefix="/api/jobs/{job_id}", - parent_resources=dict(member_name="job", collection_name="jobs"), - ) - - webapp.mapper.connect( - "index", - "/api/jobs/{job_id}/files", - controller="job_files", - action="index", - conditions=dict(method=["HEAD"]), - ) - webapp.mapper.resource( "port", "ports", diff --git a/test/integration/test_job_files.py b/test/integration/test_job_files.py index 4f244419156b..1d6a450211dd 100644 --- a/test/integration/test_job_files.py +++ b/test/integration/test_job_files.py @@ -17,8 +17,14 @@ import io import os +import shutil import tempfile -from typing import Dict +from typing import ( + Any, + Dict, + IO, + Optional, +) import requests from sqlalchemy import select @@ -42,26 +48,39 @@ class TestJobFilesIntegration(integration_util.IntegrationTestCase): initialized = False dataset_populator: DatasetPopulator + input_hda: model.HistoryDatasetAssociation + input_hda_dict: Dict[str, Any] + _nginx_upload_job_files_store: str + @classmethod def handle_galaxy_config_kwds(cls, config): super().handle_galaxy_config_kwds(config) + config["job_config_file"] = SIMPLE_JOB_CONFIG_FILE config["object_store_store_by"] = "uuid" config["server_name"] = "files" + config["nginx_upload_job_files_store"] = tempfile.mkdtemp() + cls._nginx_upload_job_files_store = config["nginx_upload_job_files_store"] cls.initialized = False + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls._nginx_upload_job_files_store) + super().tearDownClass() + def setUp(self): super().setUp() - self.dataset_populator = DatasetPopulator(self.galaxy_interactor) - if not TestJobFilesIntegration.initialized: - history_id = self.dataset_populator.new_history() + cls = TestJobFilesIntegration + cls.dataset_populator = DatasetPopulator(self.galaxy_interactor) + if not cls.initialized: + history_id = cls.dataset_populator.new_history() sa_session = self.sa_session stmt = select(model.HistoryDatasetAssociation) assert len(sa_session.scalars(stmt).all()) == 0 - self.input_hda_dict = self.dataset_populator.new_dataset(history_id, content=TEST_INPUT_TEXT, wait=True) + cls.input_hda_dict = cls.dataset_populator.new_dataset(history_id, content=TEST_INPUT_TEXT, wait=True) assert len(sa_session.scalars(stmt).all()) == 1 - self.input_hda = sa_session.scalars(stmt).all()[0] - TestJobFilesIntegration.initialized = True + cls.input_hda = sa_session.scalars(stmt).all()[0] + cls.initialized = True def test_read_by_state(self): job, _, _ = self.create_static_job_with_state("running") @@ -160,6 +179,78 @@ def test_write_with_tus(self): api_asserts.assert_status_code_is_ok(response) assert open(path).read() == "some initial text data" + def test_write_with_nginx_upload_module(self): + job, output_hda, working_directory = self.create_static_job_with_state("running") + job_id, job_key = self._api_job_keys(job) + path = self._app.object_store.get_filename(output_hda.dataset) + assert path + data = {"path": path, "job_key": job_key} + + file: Optional[IO[bytes]] = None + try: + with open(os.path.join(self._app.config.nginx_upload_job_files_store, "nginx_upload"), "wb") as file: + file.write(b"some initial text data") + + post_url = self._api_url(f"jobs/{job_id}/files", use_key=False) + response = requests.post(post_url, data=dict(**data, __file_path=file.name)) + + api_asserts.assert_status_code_is_ok(response) + assert not os.path.exists(file.name) + assert os.path.exists(path) + with open(path) as uploaded_file: + assert uploaded_file.read() == "some initial text data" + finally: + # remove `file.name` + try: + if file is not None: + os.remove(file.name) + except FileNotFoundError: + pass + + def test_write_with_session_id(self): + job, output_hda, working_directory = self.create_static_job_with_state("running") + job_id, job_key = self._api_job_keys(job) + path = self._app.object_store.get_filename(output_hda.dataset) + assert path + data = {"path": path, "job_key": job_key} + + upload_store = ( + self._app.config.tus_upload_store_job_files + or self._app.config.tus_upload_store + or self._app.config.new_file_path + ) + upload_id = "35a7c8d3-e659-430e-8579-8d085e7e569d" + upload_path = os.path.join(upload_store, "35a7c8d3-e659-430e-8579-8d085e7e569d") + try: + with open(upload_path, "w") as upload_file: + upload_file.write("some initial text data") + + post_url = self._api_url(f"jobs/{job_id}/files", use_key=False) + response = requests.post(post_url, data=dict(**data, session_id=upload_id)) + api_asserts.assert_status_code_is_ok(response) + assert not os.path.exists(upload_path) + assert os.path.exists(path) + with open(path) as uploaded_file: + assert uploaded_file.read() == "some initial text data" + finally: + # remove `upload_path` + try: + os.remove(upload_path) + except FileNotFoundError: + pass + + def test_write_with_underscored_file_param(self): + job, output_hda, working_directory = self.create_static_job_with_state("running") + job_id, job_key = self._api_job_keys(job) + path = self._app.object_store.get_filename(output_hda.dataset) + assert path + data = {"path": path, "job_key": job_key} + + post_url = self._api_url(f"jobs/{job_id}/files", use_key=False) + response = requests.post(post_url, data=data, files={"__file": io.StringIO("some initial text data")}) + api_asserts.assert_status_code_is_ok(response) + assert open(path).read() == "some initial text data" + def test_write_protection(self): job, _, _ = self.create_static_job_with_state("running") job_id, job_key = self._api_job_keys(job) From 4b8b2b9d80165cf05a50d5232c8437aff022bb1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Manuel=20Dom=C3=ADnguez?= Date: Thu, 15 May 2025 14:30:46 +0200 Subject: [PATCH 02/10] Generate unique operation ids for `/api/jobs/{job_id}/files` Work around a bug in FastAPI (https://github.com/fastapi/fastapi/issues/13175) that assigns the same operation id to both request methods GET and HEAD of the endpoint `/api/jobs/{job_id}/files` when using the `@router.api_route()` decorator with `methods=["GET", "HEAD"]` as keyword argument. --- client/src/api/schema/schema.ts | 4 +-- lib/galaxy/webapps/galaxy/api/job_files.py | 38 +++++++++++++--------- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/client/src/api/schema/schema.ts b/client/src/api/schema/schema.ts index 30a63c85fde2..e130ae725592 100644 --- a/client/src/api/schema/schema.ts +++ b/client/src/api/schema/schema.ts @@ -3031,7 +3031,7 @@ export interface paths { * * This API method is intended only for consumption by job runners, not end users. */ - head: operations["index_api_jobs__job_id__files_get"]; + head: operations["index_api_jobs__job_id__files_head"]; patch?: never; trace?: never; }; @@ -31334,7 +31334,7 @@ export interface operations { }; }; }; - index_api_jobs__job_id__files_get: { + index_api_jobs__job_id__files_head: { parameters: { query: { /** @description Path to file. */ diff --git a/lib/galaxy/webapps/galaxy/api/job_files.py b/lib/galaxy/webapps/galaxy/api/job_files.py index 70fb7b061920..6cfbd67226f0 100644 --- a/lib/galaxy/webapps/galaxy/api/job_files.py +++ b/lib/galaxy/webapps/galaxy/api/job_files.py @@ -65,23 +65,29 @@ class FastAPIJobFiles: # needs to include some code to handle this behavior, as tests existing before the migration to FastAPI expect HEAD # requests to work. # - # @router.get( # use me when ALL endpoints have been migrated to FastAPI - @router.api_route( - "/api/jobs/{job_id}/files", - summary="Get a file required to staging a job.", - responses={ - 200: { - "description": "Contents of file.", - "content": {"application/json": None, "application/octet-stream": {"example": None}}, - }, - 400: { - "description": ( - "File not found, path does not refer to a file, or input dataset(s) for job have been purged." - ) - }, - }, - methods=["GET", "HEAD"], # remove me when ALL endpoints have been migrated to FastAPI + @router.get( + # simplify me (remove `_args` and `_kwargs` defined using the walrus operator) when ALL endpoints have been + # migrated to FastAPI, this is a workaround for FastAPI bug https://github.com/fastapi/fastapi/issues/13175 + *(_args := ["/api/jobs/{job_id}/files"]), + **( + _kwargs := dict( + summary="Get a file required to staging a job.", + responses={ + 200: { + "description": "Contents of file.", + "content": {"application/json": None, "application/octet-stream": {"example": None}}, + }, + 400: { + "description": ( + "File not found, path does not refer to a file, or input dataset(s) for job have been purged." + ) + }, + }, + ) + ), ) + @router.head(*_args, **_kwargs) # type: ignore[name-defined] + # remove `@router.head(...)` when ALL endpoints have been migrated to FastAPI def index( self, job_id: Annotated[str, Path(description="Encoded id string of the job.")], From d17002d62b94dc364220401e8a2350ddc2f230bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Manuel=20Dom=C3=ADnguez?= Date: Tue, 20 May 2025 11:46:51 +0200 Subject: [PATCH 03/10] Accept `path` and `job_key` both as query and form parameters for POST requests to `/api/jobs/{job_id}/files` Pulsar formats the `path` and `job_key` parameters as query parameters when submitting POST requests to `/api/jobs/{job_id}/files`. However, many Galaxy tests format them as form parameters. The only way to keep the endpoint working as it should (as it worked before the migration to FastAPI) is to accept both query and form parameters. --- client/src/api/schema/schema.ts | 17 +++--- lib/galaxy/webapps/galaxy/api/job_files.py | 61 ++++++++++++++++++---- 2 files changed, 63 insertions(+), 15 deletions(-) diff --git a/client/src/api/schema/schema.ts b/client/src/api/schema/schema.ts index e130ae725592..3fcc57611df2 100644 --- a/client/src/api/schema/schema.ts +++ b/client/src/api/schema/schema.ts @@ -6682,15 +6682,15 @@ export interface components { */ file?: string; /** - * Job Key + * Job Key Form * @description A key used to authenticate this request as acting on behalf or a job runner for the specified job. */ - job_key: string; + job_key_form?: string | null; /** - * Path + * Path Form * @description Path to file to create. */ - path: string; + path_form?: string | null; /** Session Id */ session_id?: string; }; @@ -31288,7 +31288,12 @@ export interface operations { }; create_api_jobs__job_id__files_post: { parameters: { - query?: never; + query?: { + /** @description Path to file to create. */ + path?: string | null; + /** @description A key used to authenticate this request as acting on behalf or a job runner for the specified job. */ + job_key?: string | null; + }; header?: { /** @description The user ID that will be used to effectively make this API call. Only admins and designated users can make API calls on behalf of other users. */ "run-as"?: string | null; @@ -31299,7 +31304,7 @@ export interface operations { }; cookie?: never; }; - requestBody: { + requestBody?: { content: { "multipart/form-data": components["schemas"]["Body_create_api_jobs__job_id__files_post"]; }; diff --git a/lib/galaxy/webapps/galaxy/api/job_files.py b/lib/galaxy/webapps/galaxy/api/job_files.py index 6cfbd67226f0..954dd698dd48 100644 --- a/lib/galaxy/webapps/galaxy/api/job_files.py +++ b/lib/galaxy/webapps/galaxy/api/job_files.py @@ -18,8 +18,10 @@ Form, Path, Query, + Request, UploadFile, ) +from fastapi.params import Depends from typing_extensions import Annotated from galaxy import ( @@ -48,6 +50,54 @@ ) +def path_query_or_form( + request: Request, + path_query: Annotated[Optional[str], Query(alias="path", description="Path to file to create.")] = None, + path_form: Annotated[Optional[str], Form(alias="path", description="Path to file to create.")] = None, +): + """ + Accept `path` parameter both in query and form format. + + This method does not force the client to provide the parameter, it could simply not submit the parameter in either + format. To force the client to provide the parameter, coerce the output of the method to a string, i.e. + `path: str = Depends(path_query_or_form)` so that FastAPI responds with status code 500 when the parameter is not + provided. + """ + return path_query or path_form + + +def job_key_query_or_form( + request: Request, + job_key_query: Annotated[ + Optional[str], + Query( + alias="job_key", + description=( + "A key used to authenticate this request as acting on behalf or a job runner for the specified job." + ), + ), + ] = None, + job_key_form: Annotated[ + Optional[str], + Form( + alias="job_key", + description=( + "A key used to authenticate this request as acting on behalf or a job runner for the specified job." + ), + ), + ] = None, +): + """ + Accept `job_key` parameter both in query and form format. + + This method does not force the client to provide the parameter, it could simply not submit the parameter in either + format. To force the client to provide the parameter, coerce the output of the method to a string, i.e. + `job_key: str = Depends(job_key_query_or_form)` so that FastAPI responds with status code 500 when the parameter is + not provided. + """ + return job_key_query or job_key_form + + @router.cbv class FastAPIJobFiles: """ @@ -139,15 +189,8 @@ def index( def create( self, job_id: Annotated[str, Path(description="Encoded id string of the job.")], - path: Annotated[str, Form(description="Path to file to create.")], - job_key: Annotated[ - str, - Form( - description=( - "A key used to authenticate this request as acting on behalf or a job runner for the specified job." - ) - ), - ], + path: Annotated[str, Depends(path_query_or_form)], + job_key: Annotated[str, Depends(job_key_query_or_form)], file: UploadFile = File(None, description="Contents of the file to create."), session_id: str = Form(None), nginx_upload_module_file_path: str = Form( From 071d2d5e45314722b011af739a3a2e27dff172f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Manuel=20Dom=C3=ADnguez?= Date: Tue, 20 May 2025 15:56:51 +0200 Subject: [PATCH 04/10] Fix OpenAPI docs for `path` and `job_key` as form parameters for POST requests to `/api/jobs/{job_id}/files` FastAPI will not use the parameter aliases of form parameters in the OpenAPI docs, but the name of their Python variables. Therefore, the API docs show `path_form` and `job_key_form`. Rename them so that the API docs show the correct parameter names. --- client/src/api/schema/schema.ts | 8 ++++---- lib/galaxy/webapps/galaxy/api/job_files.py | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/client/src/api/schema/schema.ts b/client/src/api/schema/schema.ts index 3fcc57611df2..0d87592004a1 100644 --- a/client/src/api/schema/schema.ts +++ b/client/src/api/schema/schema.ts @@ -6682,15 +6682,15 @@ export interface components { */ file?: string; /** - * Job Key Form + * Job Key * @description A key used to authenticate this request as acting on behalf or a job runner for the specified job. */ - job_key_form?: string | null; + job_key?: string | null; /** - * Path Form + * Path * @description Path to file to create. */ - path_form?: string | null; + path?: string | null; /** Session Id */ session_id?: string; }; diff --git a/lib/galaxy/webapps/galaxy/api/job_files.py b/lib/galaxy/webapps/galaxy/api/job_files.py index 954dd698dd48..a8d3e792cba9 100644 --- a/lib/galaxy/webapps/galaxy/api/job_files.py +++ b/lib/galaxy/webapps/galaxy/api/job_files.py @@ -53,7 +53,7 @@ def path_query_or_form( request: Request, path_query: Annotated[Optional[str], Query(alias="path", description="Path to file to create.")] = None, - path_form: Annotated[Optional[str], Form(alias="path", description="Path to file to create.")] = None, + path: Annotated[Optional[str], Form(alias="path", description="Path to file to create.")] = None, ): """ Accept `path` parameter both in query and form format. @@ -63,7 +63,7 @@ def path_query_or_form( `path: str = Depends(path_query_or_form)` so that FastAPI responds with status code 500 when the parameter is not provided. """ - return path_query or path_form + return path_query or path def job_key_query_or_form( @@ -77,7 +77,7 @@ def job_key_query_or_form( ), ), ] = None, - job_key_form: Annotated[ + job_key: Annotated[ Optional[str], Form( alias="job_key", @@ -95,7 +95,7 @@ def job_key_query_or_form( `job_key: str = Depends(job_key_query_or_form)` so that FastAPI responds with status code 500 when the parameter is not provided. """ - return job_key_query or job_key_form + return job_key_query or job_key @router.cbv From f55352466fa2a65c4bae315e0af1eefe497a3465 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Manuel=20Dom=C3=ADnguez?= Date: Thu, 22 May 2025 11:31:47 +0200 Subject: [PATCH 05/10] Improve compliance of `FastAPIJobFiles` with the HTTP spec (`files` endpoint) This commit introduces the following differences in behavior: - GET and HEAD requests to `/api/jobs/{job_id}/files` return HTTP status code 400 when the given path does not refer to a file or the input datasets for the job have been purged and 404 when the given path does not exist. - PROPFIND requests to `/api/jobs/{job_id}/files` are answered with HTTP status code 501 (read motivation for this change below). - POST requests to `/api/jobs/{job_id}/files` are answered with HTTP status code 400 when no file is provided. The reason behind the code explicitly answering `PROPFIND` requests with status code 501 is an unfortunate interaction between the ARC remote job runner that is under development, the behavior of legacy API endpoints and how they are integrated within the FastAPI app. The ARC remote job runner (which will be implemented as `lib.galaxy.jobs.runners.pulsar.PulsarARCJobRunner`) expects this endpoint to return HTTP codes other than 404 when `PROPFIND` requests are issued. They are not part of the HTTP spec, but they are used in the WebDAV protocol. The correct answer to such requests is likely 501 (not implemented). FastAPI returns HTTP 405 (method not allowed) for `PROPFIND`, which maybe is not fully correct but tolerable because it is one less quirk to maintain. However, because of the way legacy WSGI endpoints are injected into the FastAPI app (using `app.mount("/", wsgi_handler)`), the built-in support for returning HTTP 405 for `PROPFIND` breaks, because such requests are passed to the `wsgi_handler` sub-application. This means that the endpoint still needs to include some code to handle this behavior. When ALL routes have been migrated to ASGI (no WSGI handler sub-application needed anymore), some lines of code can be removed, they are labeled using comments. --- client/src/api/schema/schema.ts | 47 ++++++++++++- lib/galaxy/webapps/galaxy/api/__init__.py | 4 ++ lib/galaxy/webapps/galaxy/api/job_files.py | 50 +++++++++++-- lib/galaxy/work/context.py | 5 ++ test/integration/test_job_files.py | 81 +++++++++++++++++----- 5 files changed, 165 insertions(+), 22 deletions(-) diff --git a/client/src/api/schema/schema.ts b/client/src/api/schema/schema.ts index 0d87592004a1..9e28da862c5c 100644 --- a/client/src/api/schema/schema.ts +++ b/client/src/api/schema/schema.ts @@ -31259,8 +31259,26 @@ export interface operations { "application/octet-stream": unknown; }; }; - /** @description File not found, path does not refer to a file, or input dataset(s) for job have been purged. */ + /** @description Path does not refer to a file, or input dataset(s) for job have been purged. */ 400: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": unknown; + }; + }; + /** @description File not found. */ + 404: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": unknown; + }; + }; + /** @description Input dataset(s) for job have been purged. */ + 500: { headers: { [name: string]: unknown; }; @@ -31319,6 +31337,13 @@ export interface operations { "application/json": unknown; }; }; + /** @description Bad request (including no file provided). */ + 400: { + headers: { + [name: string]: unknown; + }; + content?: never; + }; /** @description Request Error */ "4XX": { headers: { @@ -31368,8 +31393,26 @@ export interface operations { "application/octet-stream": unknown; }; }; - /** @description File not found, path does not refer to a file, or input dataset(s) for job have been purged. */ + /** @description Path does not refer to a file, or input dataset(s) for job have been purged. */ 400: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": unknown; + }; + }; + /** @description File not found. */ + 404: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": unknown; + }; + }; + /** @description Input dataset(s) for job have been purged. */ + 500: { headers: { [name: string]: unknown; }; diff --git a/lib/galaxy/webapps/galaxy/api/__init__.py b/lib/galaxy/webapps/galaxy/api/__init__.py index 51066fed89c1..19097dd34ae7 100644 --- a/lib/galaxy/webapps/galaxy/api/__init__.py +++ b/lib/galaxy/webapps/galaxy/api/__init__.py @@ -265,6 +265,10 @@ def environ(self) -> Environ: self.__environ = build_environ(self.__request.scope, None) # type: ignore[arg-type] return self.__environ + @property + def method(self): + return self.__request.method + @property def headers(self): return self.__request.headers diff --git a/lib/galaxy/webapps/galaxy/api/job_files.py b/lib/galaxy/webapps/galaxy/api/job_files.py index a8d3e792cba9..b429d2179057 100644 --- a/lib/galaxy/webapps/galaxy/api/job_files.py +++ b/lib/galaxy/webapps/galaxy/api/job_files.py @@ -36,6 +36,7 @@ DependsOnTrans, Router, ) +from galaxy.work.context import SessionRequestContext from . import BaseGalaxyAPIController __all__ = ("FastAPIJobFiles", "JobFilesAPIController", "router") @@ -128,16 +129,48 @@ class FastAPIJobFiles: "content": {"application/json": None, "application/octet-stream": {"example": None}}, }, 400: { - "description": ( - "File not found, path does not refer to a file, or input dataset(s) for job have been purged." - ) + "description": "Path does not refer to a file, or input dataset(s) for job have been purged.", + "content": { + "application/json": { + "example": { + "detail": ( + "Path does not refer to a file, or input dataset(s) for job have been purged." + ) + }, + } + }, }, + 404: { + "description": "File not found.", + "content": { + "application/json": { + "example": {"detail": "File not found."}, + } + }, + }, + 500: {"description": "Input dataset(s) for job have been purged."}, }, ) ), ) @router.head(*_args, **_kwargs) # type: ignore[name-defined] # remove `@router.head(...)` when ALL endpoints have been migrated to FastAPI + @router.api_route( + *_args, # type: ignore[name-defined] + **{key: value for key, value in _kwargs.items() if key != "responses"}, # type: ignore[name-defined] + responses={501: {"description": "Not implemented."}}, + methods=["PROPFIND"], + include_in_schema=False, + ) + # remove `@router.api_route(..., methods=["PROPFIND"])` when ALL endpoints have been migrated to FastAPI + # The ARC remote job runner (`lib.galaxy.jobs.runners.pulsar.PulsarARCJobRunner`) expects this to return HTTP codes + # other than 404 when `PROPFIND` requests are issued. They are not part of the HTTP spec, but they are used in the + # WebDAV protocol. The correct answer to such requests is likely 501 (not implemented). FastAPI returns HTTP 405 + # (method not allowed) for `PROPFIND`, which maybe is not fully correct but tolerable because it is one less quirk + # to maintain. However, because of the way legacy WSGI endpoints are injected into the FastAPI app (using + # `app.mount("/", wsgi_handler)`), the built-in support for returning HTTP 405 for `PROPFIND` breaks, because such + # requests are passed to the `wsgi_handler` sub-application. This means that the endpoint still needs to include + # some code to handle this behavior. def index( self, job_id: Annotated[str, Path(description="Encoded id string of the job.")], @@ -155,7 +188,7 @@ def index( ), ), ], - trans: ProvidesAppContext = DependsOnTrans, + trans: SessionRequestContext = DependsOnTrans, ) -> GalaxyFileResponse: """ Get a file required to staging a job (proper datasets, extra inputs, task-split inputs, working directory @@ -163,6 +196,11 @@ def index( This API method is intended only for consumption by job runners, not end users. """ + # PROPFIND is not implemented, but the endpoint needs to return a non-404 error code for it + # remove me when ALL endpoints have been migrated to FastAPI + if trans.request.method == "PROPFIND": + raise exceptions.NotImplemented() + path = unquote(path) job = self.__authorize_job_access(trans, job_id, path=path, job_key=job_key) @@ -176,6 +214,9 @@ def index( # This looks like a galaxy dataset, check if any job input has been deleted. if any(jtid.dataset.dataset.purged for jtid in job.input_datasets): raise exceptions.ItemDeletionException("Input dataset(s) for job have been purged.") + raise exceptions.ObjectNotFound("File not found.") + elif not os.path.isfile(path): + raise exceptions.RequestParameterInvalidException("Path does not refer to a file.") return GalaxyFileResponse(path) @@ -184,6 +225,7 @@ def index( summary="Populate an output file.", responses={ 200: {"description": "An okay message.", "content": {"application/json": {"example": {"message": "ok"}}}}, + 400: {"description": "Bad request (including no file provided)."}, }, ) def create( diff --git a/lib/galaxy/work/context.py b/lib/galaxy/work/context.py index e723e069109c..6584f2d9783f 100644 --- a/lib/galaxy/work/context.py +++ b/lib/galaxy/work/context.py @@ -97,6 +97,11 @@ def url_path(self) -> str: def host(self) -> str: """The host address.""" + @property + @abc.abstractmethod + def method(self) -> str: + """The request's HTTP method.""" + @property @abc.abstractmethod def is_secure(self) -> bool: diff --git a/test/integration/test_job_files.py b/test/integration/test_job_files.py index 1d6a450211dd..ba9cbac168c0 100644 --- a/test/integration/test_job_files.py +++ b/test/integration/test_job_files.py @@ -48,6 +48,7 @@ class TestJobFilesIntegration(integration_util.IntegrationTestCase): initialized = False dataset_populator: DatasetPopulator + hist_id: int # cannot use `history_id` as name, it collides with a pytest fixture input_hda: model.HistoryDatasetAssociation input_hda_dict: Dict[str, Any] _nginx_upload_job_files_store: str @@ -61,7 +62,6 @@ def handle_galaxy_config_kwds(cls, config): config["server_name"] = "files" config["nginx_upload_job_files_store"] = tempfile.mkdtemp() cls._nginx_upload_job_files_store = config["nginx_upload_job_files_store"] - cls.initialized = False @classmethod def tearDownClass(cls): @@ -70,17 +70,14 @@ def tearDownClass(cls): def setUp(self): super().setUp() - cls = TestJobFilesIntegration - cls.dataset_populator = DatasetPopulator(self.galaxy_interactor) - if not cls.initialized: - history_id = cls.dataset_populator.new_history() - sa_session = self.sa_session - stmt = select(model.HistoryDatasetAssociation) - assert len(sa_session.scalars(stmt).all()) == 0 - cls.input_hda_dict = cls.dataset_populator.new_dataset(history_id, content=TEST_INPUT_TEXT, wait=True) - assert len(sa_session.scalars(stmt).all()) == 1 - cls.input_hda = sa_session.scalars(stmt).all()[0] - cls.initialized = True + self.dataset_populator = DatasetPopulator(self.galaxy_interactor) + history_id_encoded = self.dataset_populator.new_history() + self.hist_id = self._app.security.decode_id(history_id_encoded) + self.input_hda_dict = self.dataset_populator.new_dataset(history_id_encoded, content=TEST_INPUT_TEXT, wait=True) + sa_session = self.sa_session + stmt = select(model.HistoryDatasetAssociation).where(model.HistoryDatasetAssociation.history_id == self.hist_id) + assert len(sa_session.scalars(stmt).all()) == 1 + self.input_hda = sa_session.scalars(stmt).first() def test_read_by_state(self): job, _, _ = self.create_static_job_with_state("running") @@ -115,9 +112,57 @@ def test_read_fails_if_input_file_purged(self): self.input_hda_dict["history_id"], content_id=self.input_hda_dict["id"], purge=True, wait_for_purge=True ) assert delete_response.status_code == 200 - head_response = requests.get(get_url, params=data) + response = requests.get(get_url, params=data) + assert response.status_code == 400 + assert response.json()["err_msg"] == "Input dataset(s) for job have been purged." + + def test_read_missing_file(self): + job, _, _ = self.create_static_job_with_state("running") + job_id, job_key = self._api_job_keys(job) + data = {"path": self.input_hda.get_file_name() + "_missing", "job_key": job_key} + get_url = self._api_url(f"jobs/{job_id}/files", use_key=True) + + head_response = requests.head(get_url, params=data) + assert head_response.status_code == 404 + + response = requests.get(get_url, params=data) + assert response.status_code == 404 + + def test_read_folder(self): + job, _, _ = self.create_static_job_with_state("running") + job_id, job_key = self._api_job_keys(job) + data = {"path": os.path.dirname(self.input_hda.get_file_name()), "job_key": job_key} + get_url = self._api_url(f"jobs/{job_id}/files", use_key=True) + + head_response = requests.head(get_url, params=data) assert head_response.status_code == 400 - assert head_response.json()["err_msg"] == "Input dataset(s) for job have been purged." + + response = requests.get(get_url, params=data) + assert response.status_code == 400 + + def test_write_no_file(self): + job, output_hda, working_directory = self.create_static_job_with_state("running") + job_id, job_key = self._api_job_keys(job) + path = self._app.object_store.get_filename(output_hda.dataset) + assert path + data = {"path": path, "job_key": job_key} + + post_url = self._api_url(f"jobs/{job_id}/files", use_key=False) + response = requests.post(post_url, data=data) + assert response.status_code == 400 + + def test_propfind(self): + # remove this test when ALL Galaxy endpoints have been migrated to FastAPI; it will then be FastAPI's + # responsibility to return a status code other than 404 + job, output_hda, working_directory = self.create_static_job_with_state("running") + job_id, job_key = self._api_job_keys(job) + path = self._app.object_store.get_filename(output_hda.dataset) + assert path + data = {"path": path, "job_key": job_key} + + propfind_url = self._api_url(f"jobs/{job_id}/files", use_key=False) + response = requests.request("PROPFIND", propfind_url, params=data) + assert response.status_code == 501 def test_write_by_state(self): job, output_hda, working_directory = self.create_static_job_with_state("running") @@ -269,9 +314,13 @@ def sa_session(self): def create_static_job_with_state(self, state): """Create a job with unknown handler so its state won't change.""" sa_session = self.sa_session - hda = sa_session.scalars(select(model.HistoryDatasetAssociation)).all()[0] + stmt_hda = select(model.HistoryDatasetAssociation).where( + model.HistoryDatasetAssociation.history_id == self.hist_id + ) + hda = sa_session.scalars(stmt_hda).first() assert hda - history = sa_session.scalars(select(model.History)).all()[0] + stmt_history = select(model.History).where(model.History.id == self.hist_id) + history = sa_session.scalars(stmt_history).first() assert history user = sa_session.scalars(select(model.User)).all()[0] assert user From 47f9617f9b1352991b5944d3c40a937dbce4683a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Manuel=20Dom=C3=ADnguez?= Date: Thu, 22 May 2025 10:12:50 +0200 Subject: [PATCH 06/10] Add PUT request support at `/api/jobs/{job_id}/files` to `FastAPIJobFiles` This path already supports GET, HEAD and POST requests; add support for PUT requests. There is a significant difference in behavior between POST and PUT requests: - POST requests take `path` and `job_key` both as query parameters or as body parameters belonging to a multipart request. PUT requests take them only as query parameters (just like GET and HEAD). - POST requests submit a file as one of the fields of the multipart request, whereas the submitted file is the whole body of the request for PUT requests. - POST requests can append to the `tool_stdout` and `tool_stderr`, PUT requests can only create new files or overwrite whole files. - POST requests support resumable uploads but PUT requests do not. - POST requests take the form parameters `__file_path` (path of a file uploaded via the nginx upload module) and `__file` but PUT requests do not. --- client/src/api/schema/schema.ts | 73 +++++++++++++++++++++- lib/galaxy/webapps/galaxy/api/__init__.py | 7 +++ lib/galaxy/webapps/galaxy/api/job_files.py | 60 ++++++++++++++++++ lib/galaxy/work/context.py | 10 +++ test/integration/test_job_files.py | 23 +++++++ 5 files changed, 172 insertions(+), 1 deletion(-) diff --git a/client/src/api/schema/schema.ts b/client/src/api/schema/schema.ts index 0d87592004a1..24a916f24b3e 100644 --- a/client/src/api/schema/schema.ts +++ b/client/src/api/schema/schema.ts @@ -3012,7 +3012,8 @@ export interface paths { * This API method is intended only for consumption by job runners, not end users. */ get: operations["index_api_jobs__job_id__files_get"]; - put?: never; + /** Populate an output file. */ + put: operations["populate_api_jobs__job_id__files_put"]; /** * Populate an output file. * @description Populate an output file (formal dataset, task split part, working directory file (such as those related to @@ -31286,6 +31287,76 @@ export interface operations { }; }; }; + populate_api_jobs__job_id__files_put: { + parameters: { + query: { + /** @description Path to file to create/replace. */ + path: string; + /** @description A key used to authenticate this request as acting on behalf of a job runner for the specified job. */ + job_key: string; + }; + header?: { + /** @description The user ID that will be used to effectively make this API call. Only admins and designated users can make API calls on behalf of other users. */ + "run-as"?: string | null; + }; + path: { + /** @description Encoded id string of the job. */ + job_id: string; + }; + cookie?: never; + }; + requestBody?: never; + responses: { + /** @description Successful Response */ + 200: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": unknown; + }; + }; + /** @description A new file has been created. */ + 201: { + headers: { + [name: string]: unknown; + }; + content?: never; + }; + /** @description An existing file has been replaced. */ + 204: { + headers: { + [name: string]: unknown; + }; + content?: never; + }; + /** @description Bad request. */ + 400: { + headers: { + [name: string]: unknown; + }; + content?: never; + }; + /** @description Request Error */ + "4XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + /** @description Server Error */ + "5XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + }; + }; create_api_jobs__job_id__files_post: { parameters: { query?: { diff --git a/lib/galaxy/webapps/galaxy/api/__init__.py b/lib/galaxy/webapps/galaxy/api/__init__.py index 51066fed89c1..0836e9e2bcda 100644 --- a/lib/galaxy/webapps/galaxy/api/__init__.py +++ b/lib/galaxy/webapps/galaxy/api/__init__.py @@ -242,6 +242,9 @@ def __init__(self, request: Request): def base(self) -> str: return str(self.__request.base_url) + def stream(self) -> AsyncGenerator: + return self.__request.stream() + @property def url_path(self) -> str: scope = self.__request.scope @@ -250,6 +253,10 @@ def url_path(self) -> str: url = urljoin(url, root_path) return url + @property + def url(self) -> str: + return str(self.__request.url) + @property def host(self) -> str: return self.__request.base_url.netloc diff --git a/lib/galaxy/webapps/galaxy/api/job_files.py b/lib/galaxy/webapps/galaxy/api/job_files.py index a8d3e792cba9..9fcfd385338a 100644 --- a/lib/galaxy/webapps/galaxy/api/job_files.py +++ b/lib/galaxy/webapps/galaxy/api/job_files.py @@ -2,6 +2,7 @@ API for asynchronous job running mechanisms can use to fetch or put files related to running and queued jobs. """ +import asyncio import logging import os import re @@ -19,6 +20,7 @@ Path, Query, Request, + Response, UploadFile, ) from fastapi.params import Depends @@ -36,6 +38,7 @@ DependsOnTrans, Router, ) +from galaxy.work.context import SessionRequestContext from . import BaseGalaxyAPIController __all__ = ("FastAPIJobFiles", "JobFilesAPIController", "router") @@ -179,6 +182,63 @@ def index( return GalaxyFileResponse(path) + # The ARC remote job runner (`lib.galaxy.jobs.runners.pulsar.PulsarARCJobRunner`) expects a `PUT` endpoint to stage + # out result files back to Galaxy. + @router.put( + "/api/jobs/{job_id}/files", + summary="Populate an output file.", + responses={ + 201: {"description": "A new file has been created."}, + 204: {"description": "An existing file has been replaced."}, + 400: {"description": "Bad request."}, + }, + ) + def populate( + self, + job_id: Annotated[str, Path(description="Encoded id string of the job.")], + path: Annotated[str, Query(description="Path to file to create/replace.")], + job_key: Annotated[ + str, + Query( + description=( + "A key used to authenticate this request as acting on behalf of a job runner for the specified job." + ), + ), + ], + trans: SessionRequestContext = DependsOnTrans, + ): + path = unquote(path) + + job = self.__authorize_job_access(trans, job_id, path=path, job_key=job_key) + self.__check_job_can_write_to_path(trans, job, path) + + destination_file_exists = os.path.exists(path) + + # FastAPI can only read the file contents from the request body in an async context. To write the file without + # using an async endpoint, the async code that reads the file from the body and writes it to disk will have to + # run within the sync endpoint. Since the code that writes the data to disk is blocking + # `destination_file.write(chunk)`, it has to run on its own event loop within the thread spawned to answer the + # request to the sync endpoint. + async def write(): + with open(path, "wb") as destination_file: + async for chunk in trans.request.stream(): + destination_file.write(chunk) + + target_dir = os.path.dirname(path) + util.safe_makedirs(target_dir) + event_loop = asyncio.new_event_loop() + try: + asyncio.set_event_loop(event_loop) + event_loop.run_until_complete(write()) + finally: + event_loop.close() + + return ( + Response(status_code=201, headers={"Location": str(trans.request.url)}) + if not destination_file_exists + else Response(status_code=204) + ) + @router.post( "/api/jobs/{job_id}/files", summary="Populate an output file.", diff --git a/lib/galaxy/work/context.py b/lib/galaxy/work/context.py index e723e069109c..1e9c54fd0e5d 100644 --- a/lib/galaxy/work/context.py +++ b/lib/galaxy/work/context.py @@ -1,6 +1,7 @@ import abc from typing import ( Any, + AsyncGenerator, Dict, List, Optional, @@ -87,11 +88,20 @@ class GalaxyAbstractRequest: def base(self) -> str: """Base URL of the request.""" + @abc.abstractmethod + def stream(self) -> AsyncGenerator: + """Request body split in parts.""" + @property @abc.abstractmethod def url_path(self) -> str: """Base with optional prefix added.""" + @property + @abc.abstractmethod + def url(self): + """URL of the request.""" + @property @abc.abstractmethod def host(self) -> str: diff --git a/test/integration/test_job_files.py b/test/integration/test_job_files.py index 1d6a450211dd..795c61a6d433 100644 --- a/test/integration/test_job_files.py +++ b/test/integration/test_job_files.py @@ -251,6 +251,29 @@ def test_write_with_underscored_file_param(self): api_asserts.assert_status_code_is_ok(response) assert open(path).read() == "some initial text data" + def test_write_with_put_request(self): + job, output_hda, working_directory = self.create_static_job_with_state("running") + job_id, job_key = self._api_job_keys(job) + path = self._app.object_store.get_filename(output_hda.dataset) + assert path + data = {"path": path, "job_key": job_key} + + new_file_path = os.path.join(working_directory, "new_file.txt") + put_url = self._api_url(f"jobs/{job_id}/files", use_key=False) + response = requests.put( + put_url, + params={"path": new_file_path, "job_key": job_key}, + data=b"whole contents of the file", + ) + assert response.status_code == 201 + assert open(new_file_path).read() == "whole contents of the file" + + assert os.path.exists(path) + put_url = self._api_url(f"jobs/{job_id}/files", use_key=False) + response = requests.put(put_url, params=data, data=b"contents of a replacement file") + assert response.status_code == 204 + assert open(path).read() == "contents of a replacement file" + def test_write_protection(self): job, _, _ = self.create_static_job_with_state("running") job_id, job_key = self._api_job_keys(job) From f0e5816ec583c857279edf03ee1198541c81ecb1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Manuel=20Dom=C3=ADnguez?= Date: Tue, 18 Mar 2025 11:58:29 +0100 Subject: [PATCH 07/10] Pulsar-based ARC Job Runner Define a new Pulsar-based job runner to integrate Galaxy with the Advanced Resource Connector (ARC) middleware. --- lib/galaxy/jobs/runners/pulsar.py | 39 +++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/lib/galaxy/jobs/runners/pulsar.py b/lib/galaxy/jobs/runners/pulsar.py index 4dbe3632a2bd..26c03b53b70f 100644 --- a/lib/galaxy/jobs/runners/pulsar.py +++ b/lib/galaxy/jobs/runners/pulsar.py @@ -63,6 +63,7 @@ log = logging.getLogger(__name__) __all__ = ( + "PulsarARCJobRunner", "PulsarLegacyJobRunner", "PulsarRESTJobRunner", "PulsarMQJobRunner", @@ -1061,6 +1062,44 @@ def _populate_parameter_defaults(self, job_destination): pulsar_app_config["staging_directory"] = params.get("jobs_directory") +ARC_DESTINATION_DEFAULTS: Dict[str, Any] = { + **COEXECUTION_DESTINATION_DEFAULTS, + "default_file_action": "json_transfer", +} + + +class PulsarARCJobRunner(PulsarCoexecutionJobRunner): + runner_name = "PulsarARCJobRunner" + + destination_defaults = ARC_DESTINATION_DEFAULTS + + use_mq = False + poll = True + + def get_client_from_state(self, job_state): + client = super().get_client_from_state(job_state) + client._arc_job_id = job_state.job_id # used by the client to get the job state + return client + + def queue_job(self, job_wrapper): + """ + Inject user's own ARC endpoint and OIDC token (if defined) as destination parameters. + """ + destination_arc_url = job_wrapper.job_destination.params.get("arc_url") + destination_oidc_token = job_wrapper.job_destination.params.get("oidc_token") + user_arc_url = job_wrapper.get_job().user.extra_preferences.get("distributed_arc_compute|remote_arc_resources") + user_oidc_token = job_wrapper.get_job().user.extra_preferences.get("distributed_arc_compute|remote_arc_token") + + job_wrapper.job_destination.params.update( + { + "arc_url": user_arc_url or destination_arc_url, + "oidc_token": user_oidc_token or destination_oidc_token, + } + ) + + return super().queue_job(job_wrapper) + + KUBERNETES_DESTINATION_DEFAULTS: Dict[str, Any] = {"k8s_enabled": True, **COEXECUTION_DESTINATION_DEFAULTS} From 6b6577527e941571707befe4c58f4ba337f51f8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Manuel=20Dom=C3=ADnguez?= Date: Tue, 1 Jul 2025 17:48:23 +0200 Subject: [PATCH 08/10] Let Pulsar job runners choose a client manager Add method to inject kwargs into the `build_client_manager()` function. --- lib/galaxy/jobs/runners/pulsar.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/lib/galaxy/jobs/runners/pulsar.py b/lib/galaxy/jobs/runners/pulsar.py index 26c03b53b70f..34334e8d6a30 100644 --- a/lib/galaxy/jobs/runners/pulsar.py +++ b/lib/galaxy/jobs/runners/pulsar.py @@ -248,8 +248,15 @@ def _init_client_manager(self): for kwd in self.runner_params.keys(): if kwd.startswith("amqp_") or kwd.startswith("transport_"): client_manager_kwargs[kwd] = self.runner_params[kwd] + + client_manager_kwargs.update(self._init_client_manager_extend_kwargs(**client_manager_kwargs)) + self.client_manager = build_client_manager(**client_manager_kwargs) + def _init_client_manager_extend_kwargs(self, **kwargs): + """Override this method to consider additional (or alter) keyword arguments when building the client manager.""" + return kwargs + def __init_pulsar_app(self, conf, pulsar_conf_path): if conf is None and pulsar_conf_path is None and not self.default_build_pulsar_app: self.pulsar_app = None @@ -1099,6 +1106,11 @@ def queue_job(self, job_wrapper): return super().queue_job(job_wrapper) + def _init_client_manager_extend_kwargs(self, **kwargs): + kwargs = super()._init_client_manager_extend_kwargs(**kwargs) + kwargs["arc_enabled"] = True + return kwargs + KUBERNETES_DESTINATION_DEFAULTS: Dict[str, Any] = {"k8s_enabled": True, **COEXECUTION_DESTINATION_DEFAULTS} From 583a405e634a8b0487aebb9c5e1778540d0ed0ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Manuel=20Dom=C3=ADnguez?= Date: Mon, 21 Jul 2025 15:28:10 +0200 Subject: [PATCH 09/10] Improve configurability and OIDC support for the ARC Pulsar job runner Rewrite the method `PulsarARCJobRunner.queue_job()` so that it: - Obtains the ARC endpoint URL either from the user's preferences or the destination parameters. - Requests an OIDC access token for the user running the job. - Decides which OIDC provider to get the token from if multiple are available, based on the user's preferences and the destination parameters. To let users configure their own settings, admins have to set the destination parameter "arc_user_preferences_key". Galaxy will then read the options "arc_url" and "arc_oidc_provider" under that key from the user extra preferences. Both are optional; if the user does not configure a value, the destination default will be used. If no destination default exists and the user account is associated with exactly one OIDC provider, then Galaxy will use that provider. --- lib/galaxy/jobs/runners/pulsar.py | 66 ++++++++++++++++++++++++++----- 1 file changed, 56 insertions(+), 10 deletions(-) diff --git a/lib/galaxy/jobs/runners/pulsar.py b/lib/galaxy/jobs/runners/pulsar.py index 34334e8d6a30..15c6acb2a99e 100644 --- a/lib/galaxy/jobs/runners/pulsar.py +++ b/lib/galaxy/jobs/runners/pulsar.py @@ -40,6 +40,7 @@ from sqlalchemy import select from galaxy import model +from galaxy.authnz.util import provider_name_to_backend from galaxy.job_execution.compute_environment import ( ComputeEnvironment, dataset_path_to_extra_path, @@ -1090,19 +1091,64 @@ def get_client_from_state(self, job_state): def queue_job(self, job_wrapper): """ - Inject user's own ARC endpoint and OIDC token (if defined) as destination parameters. + Queue a job to run it using the Pulsar ARC client. + + ARC supports authentication via either x509 certificates or OIDC tokens. Since Galaxy only supports the latter + (through OIDC providers), the Pulsar ARC client implementation is designed to work with OIDC. Thus, to run jobs, + the Pulsar ARC client needs an ARC endpoint URL and an OIDC access token. Those are passed as destination + parameters. + + OIDC tokens are, for obvious reasons, not meant to be part of the job configuration file nor of TPV + configuration files; they have to be obtained before the job is queued. For admins, it may also be interesting + to have a mechanism to inject an ARC endpoint URL from the user preferences, so that users can configure their + own ARC endpoint URLs. + + Therefore, this method provides a framework to: + - Obtain an ARC endpoint URL from the user's preferences (if enabled). + - Obtain an OIDC access token for the user running the job. + - Decide which OIDC provider to obtain the token from if multiple are available. + + To let users configure their own settings, admins have to set the destination parameter + "arc_user_preferences_key". Galaxy will then read the options "arc_url" and "arc_oidc_provider" under that key + from the user extra preferences. Both are optional; if the user does not configure any, the destination defaults + will be used. If no destination default exists and the user account is associated with exactly one OIDC + provider, then Galaxy will use that provider. """ - destination_arc_url = job_wrapper.job_destination.params.get("arc_url") - destination_oidc_token = job_wrapper.job_destination.params.get("oidc_token") - user_arc_url = job_wrapper.get_job().user.extra_preferences.get("distributed_arc_compute|remote_arc_resources") - user_oidc_token = job_wrapper.get_job().user.extra_preferences.get("distributed_arc_compute|remote_arc_token") + job = job_wrapper.get_job() + user = job.user - job_wrapper.job_destination.params.update( - { - "arc_url": user_arc_url or destination_arc_url, - "oidc_token": user_oidc_token or destination_oidc_token, - } + extra_user_preferences_key = job_wrapper.job_destination.params.get("arc_user_preferences_key") + # for example, "distributed_compute_arc" + + user_arc_url = ( + user.extra_preferences.get(f"{extra_user_preferences_key}|arc_url") if extra_user_preferences_key else None ) + user_arc_oidc_provider = ( + user.extra_preferences.get(f"{extra_user_preferences_key}|arc_oidc_provider") + if extra_user_preferences_key + else None + ) + destination_arc_url = job_wrapper.job_destination.params.get("arc_url") + destination_oidc_provider = job_wrapper.job_destination.params.get("arc_oidc_provider") + arc_url = user_arc_url or destination_arc_url + arc_oidc_provider = user_arc_oidc_provider or destination_oidc_provider + if arc_oidc_provider is None: + user_oidc_providers = [auth.provider for auth in user.custos_auth + user.social_auth] + if len(user_oidc_providers) > 1: + raise Exception( + f"Multiple identity providers are linked to your user account '{user.username}', please select one " + f"in your user preferences to launch ARC jobs." + ) + elif len(user_oidc_providers) == 0: + raise Exception( + f"No identity provider is linked to your user account '{user.username}', please log in using an " + f"identity provider to launch ARC jobs." + ) + arc_oidc_provider = user_oidc_providers[0] + arc_oidc_provider_backend = provider_name_to_backend(arc_oidc_provider) + arc_oidc_token = user.get_oidc_tokens(arc_oidc_provider_backend)["access"] + + job_wrapper.job_destination.params.update({"arc_url": arc_url, "arc_oidc_token": arc_oidc_token}) return super().queue_job(job_wrapper) From b2c75b22377bb08fc9bafdc7ff6433bd0d33221e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Manuel=20Dom=C3=ADnguez?= Date: Thu, 31 Jul 2025 11:10:37 +0200 Subject: [PATCH 10/10] Add integration tests for the Pulsar ARC job runner These tests verify that the ARC endpoint URL and OIDC provider selection logic works correctly when queuing jobs. They do not test actual job execution, as that is covered by Pulsar's own tests. --- .../job_resource_rules/test_pulsar_arc.py | 50 ++++ test/integration/oidc/test_auth_oidc.py | 65 ++-- test/integration/test_pulsar_arc.py | 283 ++++++++++++++++++ 3 files changed, 366 insertions(+), 32 deletions(-) create mode 100644 test/integration/job_resource_rules/test_pulsar_arc.py create mode 100644 test/integration/test_pulsar_arc.py diff --git a/test/integration/job_resource_rules/test_pulsar_arc.py b/test/integration/job_resource_rules/test_pulsar_arc.py new file mode 100644 index 000000000000..ff9e79d22532 --- /dev/null +++ b/test/integration/job_resource_rules/test_pulsar_arc.py @@ -0,0 +1,50 @@ +"""Dynamic job rule for the Pulsar ARC (Advanced Resource Connector) job runner integration tests. + +This file contains a dynamic job rule for the Pulsar ARC job runner integration tests in +`test/integration/test_pulsar_arc.py`. The rule allows selecting a job destination based on a dynamic reference that can +be read and updated during the tests using `test_pulsar_arc_get_job_destination()` and +`test_pulsar_arc_set_job_destination()` respectively. +""" + +from typing import Optional + +from galaxy.app import UniverseApplication +from galaxy.jobs import JobMappingException # type: ignore[attr-defined] +from galaxy.jobs import ( + JobDestination, +) +from galaxy.model import ( + Job, + User as GalaxyUser, +) +from galaxy.tools import Tool as GalaxyTool + +__all__ = ("test_pulsar_arc", "test_pulsar_arc_get_job_destination", "test_pulsar_arc_set_job_destination") + + +test_pulsar_arc_job_destination_ref: list[JobDestination] = [] + + +def test_pulsar_arc_get_job_destination() -> Optional[JobDestination]: + """ + Return the last stored job destination (if any). + """ + return test_pulsar_arc_job_destination_ref[0] if test_pulsar_arc_job_destination_ref else None + + +def test_pulsar_arc_set_job_destination(job_destination: JobDestination) -> None: + """ + Store a job destination, overwriting any previous one. + """ + test_pulsar_arc_job_destination_ref[:] = [job_destination] + + +def test_pulsar_arc(app: UniverseApplication, job: Job, tool: GalaxyTool, user: GalaxyUser) -> JobDestination: + """ + Dynamic job rule that maps jobs to the stored job destination. + """ + job_destination = test_pulsar_arc_get_job_destination() + if not job_destination: + raise JobMappingException("No job destination set for dynamic job rule.") + + return job_destination diff --git a/test/integration/oidc/test_auth_oidc.py b/test/integration/oidc/test_auth_oidc.py index bf0992db54be..f16f41d0f7e1 100644 --- a/test/integration/oidc/test_auth_oidc.py +++ b/test/integration/oidc/test_auth_oidc.py @@ -93,6 +93,9 @@ class BaseKeycloakIntegrationTestCase(integration_util.IntegrationTestCase): backend_config_file: ClassVar[str] saved_oauthlib_insecure_transport: ClassVar[bool] + REGEX_KEYCLOAK_LOGIN_ACTION = re.compile(r"action=\"(.*)\"\s+") + REGEX_GALAXY_CSRF_TOKEN = re.compile(r"session_csrf_token\": \"(.*)\"") + @classmethod def setUpClass(cls): # By default, the oidc callback must be done over a secure transport, so @@ -158,40 +161,38 @@ def handle_galaxy_oidc_config_kwds(cls, config): def _get_interactor(self, api_key=None, allow_anonymous=False) -> "ApiTestInteractor": return super()._get_interactor(api_key=None, allow_anonymous=True) + def _login_via_keycloak(self, username, password, expected_codes=None, save_cookies=False, session=None): + if expected_codes is None: + expected_codes = [200, 404] + session = session or requests.Session() + response = session.get(f"{self.url}authnz/keycloak/login") + provider_url = response.json()["redirect_uri"] + response = session.get(provider_url, verify=False) + matches = self.REGEX_KEYCLOAK_LOGIN_ACTION.search(response.text) + assert matches + auth_url = html.unescape(str(matches.groups(1)[0])) + response = session.post(auth_url, data={"username": username, "password": password}, verify=False) + assert response.status_code in expected_codes, response + if save_cookies: + self.galaxy_interactor.cookies = session.cookies + return session, response + + def _get_keycloak_access_token( + self, client_id="gxyclient", username=KEYCLOAK_TEST_USERNAME, password=KEYCLOAK_TEST_PASSWORD, scopes=None + ): + data = { + "client_id": client_id, + "client_secret": "dummyclientsecret", + "grant_type": "password", + "username": username, + "password": password, + "scope": scopes or [], + } + response = requests.post(f"{KEYCLOAK_URL}/protocol/openid-connect/token", data=data, verify=False) + return response.json()["access_token"] + class TestGalaxyOIDCLoginIntegration(AbstractTestCases.BaseKeycloakIntegrationTestCase): - REGEX_KEYCLOAK_LOGIN_ACTION = re.compile(r"action=\"(.*)\"\s+") - REGEX_GALAXY_CSRF_TOKEN = re.compile(r"session_csrf_token\": \"(.*)\"") - - def _login_via_keycloak(self, username, password, expected_codes=None, save_cookies=False, session=None): - if expected_codes is None: - expected_codes = [200, 404] - session = session or requests.Session() - response = session.get(f"{self.url}authnz/keycloak/login") - provider_url = response.json()["redirect_uri"] - response = session.get(provider_url, verify=False) - matches = self.REGEX_KEYCLOAK_LOGIN_ACTION.search(response.text) - assert matches - auth_url = html.unescape(str(matches.groups(1)[0])) - response = session.post(auth_url, data={"username": username, "password": password}, verify=False) - assert response.status_code in expected_codes, response - if save_cookies: - self.galaxy_interactor.cookies = session.cookies - return session, response - - def _get_keycloak_access_token( - self, client_id="gxyclient", username=KEYCLOAK_TEST_USERNAME, password=KEYCLOAK_TEST_PASSWORD, scopes=None - ): - data = { - "client_id": client_id, - "client_secret": "dummyclientsecret", - "grant_type": "password", - "username": username, - "password": password, - "scope": scopes or [], - } - response = requests.post(f"{KEYCLOAK_URL}/protocol/openid-connect/token", data=data, verify=False) - return response.json()["access_token"] def test_oidc_login_new_user(self): _, response = self._login_via_keycloak(KEYCLOAK_TEST_USERNAME, KEYCLOAK_TEST_PASSWORD, save_cookies=True) diff --git a/test/integration/test_pulsar_arc.py b/test/integration/test_pulsar_arc.py new file mode 100644 index 000000000000..811f8faa93ad --- /dev/null +++ b/test/integration/test_pulsar_arc.py @@ -0,0 +1,283 @@ +"""Integration tests for the Pulsar ARC (Advanced Resource Connector) job runner. + +These tests verify that the ARC endpoint URL and OIDC provider selection logic works correctly when queuing jobs. They +do not test actual job execution, as that is covered by Pulsar's own tests. +""" + +import json +import string +import tempfile +import threading +from functools import lru_cache +from typing import Optional +from unittest.mock import patch + +from sqlalchemy.orm import object_session + +from galaxy.jobs import ( + JobDestination, + JobWrapper, +) +from galaxy.jobs.runners.pulsar import PulsarARCJobRunner +from galaxy.tool_util.verify.interactor import GalaxyInteractorApi +from galaxy_test.base.api import ApiTestInteractor +from galaxy_test.base.api_util import get_admin_api_key +from galaxy_test.base.env import target_url_parts +from galaxy_test.base.populators import DatasetPopulator +from .job_resource_rules.test_pulsar_arc import test_pulsar_arc_set_job_destination as set_job_destination +from .oidc.test_auth_oidc import ( + AbstractTestCases as OIDCAbstractTestCases, + KEYCLOAK_TEST_PASSWORD, + KEYCLOAK_TEST_USERNAME, +) + +JOB_CONFIG_FILE = """ +execution: + default: arc + environments: + arc: + runner: dynamic + url: ${galaxy_url} + type: python + function: test_pulsar_arc + rules_module: integration.job_resource_rules +runners: + arc_runner: + load: galaxy.jobs.runners.pulsar:PulsarARCJobRunner +""" + + +def job_config(template_str: str, **vars_) -> str: + """ + Create a temporary job configuration file from the provided template string. + """ + job_conf_template = string.Template(template_str) + job_conf_str = job_conf_template.substitute(**vars_) + with tempfile.NamedTemporaryFile(suffix="_arc_integration_job_conf.yml", mode="w", delete=False) as job_conf: + job_conf.write(job_conf_str) + return job_conf.name + + +job_wrapper_ref: list[JobWrapper] = [] # keeps a reference to the job wrapper for which a job was last queued +job_wrapper_event = threading.Event() # synchronization event to signal that a job has been queued + + +def set_job_wrapper(job_wrapper: JobWrapper) -> None: + """ + Store a job wrapper reference. + """ + job_wrapper_ref[:] = [job_wrapper] + + +def get_job_wrapper() -> Optional[JobWrapper]: + """ + Return the last stored job wrapper reference (if any). + """ + return job_wrapper_ref[0] if job_wrapper_ref else None + + +def queue_job(self, job_wrapper: JobWrapper) -> None: + """ + Override the `queue_job()` method of the parent class of the Pulsar ARC job runner. + + Fails job queueing and tracks the job wrapper representing the job which should have been queued. Used to test that + the logic overriding the job destination parameters works correctly. + """ + set_job_wrapper(job_wrapper) + job_wrapper_event.set() + raise Exception("Job queueing failed for testing purposes. This is expected.") + + +@patch.object(PulsarARCJobRunner.__mro__[1], "queue_job", new=queue_job) +class TestArcPulsarIntegration(OIDCAbstractTestCases.BaseKeycloakIntegrationTestCase): + """ + Integration test verifying the logic that selects an ARC endpoint URL and an OIDC provider. + """ + + dataset_populator: DatasetPopulator + framework_tool_and_types = True + + _user_api_key: Optional[str] = None + + @classmethod + def handle_galaxy_config_kwds(cls, config): + """ + Inject job configuration file tailored for this test case in the Galaxy configuration. + """ + super().handle_galaxy_config_kwds(config) + host, port, url = target_url_parts() + config["job_config_file"] = job_config(JOB_CONFIG_FILE, galaxy_url=url) + + # login just once, even if the method is called multiple times + @lru_cache(maxsize=1) # noqa: B019 (bounded cache size) + def _login_via_keycloak(self, *args, **kwargs): + """ + Override parent login method to log-in via Keycloak just once and to override the default Galaxy interactor. + + Normally, one would log in within the `setUpClass()` method and call it a day, but since `_login_via_keycloak()` + is not a class method, this workaround is needed. + """ + session, response = super()._login_via_keycloak( + KEYCLOAK_TEST_USERNAME, KEYCLOAK_TEST_PASSWORD, save_cookies=True + ) + api_interactor = GalaxyInteractorApi( + galaxy_url=self.url, + master_api_key=get_admin_api_key(), + test_user="gxyuser@galaxy.org", + # email for `KEYCLOAK_TEST_USERNAME`, defined in test/integration/oidc/galaxy-realm-export.json + ) + self._user_api_key = api_interactor.api_key + return session, response + + def setUp(self): + """ + Log-in via Keycloak (just once), override Galaxy interactor and initialize a dataset populator. + """ + super().setUp() + self._login_via_keycloak(KEYCLOAK_TEST_USERNAME, KEYCLOAK_TEST_PASSWORD, save_cookies=True) # happens just once + self._galaxy_interactor = ApiTestInteractor(self, api_key=self._user_api_key) + self.dataset_populator = DatasetPopulator(self.galaxy_interactor) + self._job_wrapper = None + + def tearDown(self): + self._job_wrapper = None + + def run_job(self) -> Optional[JobWrapper]: + """ + Run a simple job and return the job wrapper. + """ + with self.dataset_populator.test_history() as history_id: + hda = self.dataset_populator.new_dataset(history_id, content="abc") + self.dataset_populator.run_tool( + tool_id="cat", + inputs={ + "input1": {"src": "hda", "id": hda["id"]}, + }, + history_id=history_id, + ) + self.dataset_populator.wait_for_history(history_id, timeout=20) + job_wrapper_event.wait(timeout=1) + job_wrapper = get_job_wrapper() + return job_wrapper + + def test_queue_job_url_and_oidc_provider_selection(self): + """ + Verify that the ARC endpoint URL and OIDC provider selection logic works correctly when queuing jobs. + """ + set_job_destination( + JobDestination( + id="arc", + name="arc", + runner="arc_runner", + ) + ) + job_wrapper = self.run_job() + assert job_wrapper is not None, "No job wrapper created" + assert job_wrapper.job_destination.params["arc_url"] is None, "No ARC URL expected" + assert isinstance( + job_wrapper.job_destination.params["arc_oidc_token"], str + ), f"Unexpected type {type(job_wrapper.job_destination.params['arc_oidc_token'])} for OIDC token" + assert len(job_wrapper.job_destination.params["arc_oidc_token"]) > 0, "Invalid OIDC token" + + set_job_destination( + JobDestination( + id="arc", + name="arc", + runner="arc_runner", + params={ + "arc_url": "https://arc.example.com", + }, + ) + ) + job_wrapper = self.run_job() + assert job_wrapper is not None, "No job wrapper created" + assert job_wrapper.job_destination.params["arc_url"] == "https://arc.example.com", "Unexpected ARC URL" + assert isinstance( + job_wrapper.job_destination.params["arc_oidc_token"], str + ), f"Unexpected type {type(job_wrapper.job_destination.params['arc_oidc_token'])} for OIDC token" + assert len(job_wrapper.job_destination.params["arc_oidc_token"]) > 0, "Invalid OIDC token" + + set_job_destination( + JobDestination( + id="arc", + name="arc", + runner="arc_runner", + params={ + "arc_oidc_provider": "does_not_exist", + }, + ) + ) + job_wrapper = self.run_job() + assert job_wrapper is not None, "No job wrapper created" + assert job_wrapper.job_destination.params["arc_url"] is None, "No ARC URL expected" + assert job_wrapper.job_destination.params["arc_oidc_token"] is None, "No OIDC token expected" + + set_job_destination( + JobDestination( + id="arc", + name="arc", + runner="arc_runner", + params={ + "arc_user_preferences_key": "does_not_exist", + }, + ) + ) + job_wrapper = self.run_job() + assert job_wrapper is not None, "No job wrapper created" + assert job_wrapper.job_destination.params["arc_url"] is None, "No ARC URL expected" + assert isinstance( + job_wrapper.job_destination.params["arc_oidc_token"], str + ), f"Unexpected type {type(job_wrapper.job_destination.params['arc_oidc_token'])} for OIDC token" + assert len(job_wrapper.job_destination.params["arc_oidc_token"]) > 0, "Invalid OIDC token" + + # test parameters defined via extra user preferences + arc_user_preferences_key = "arc_user_preferences" + user = job_wrapper.get_job().user + assert user is not None, "No user associated with job" + extra_user_preferences = user.extra_preferences + extra_user_preferences[f"{arc_user_preferences_key}|arc_url"] = "https://arc-from-user-prefs.example.com" + extra_user_preferences[f"{arc_user_preferences_key}|arc_oidc_provider"] = "keycloak" + user.preferences["extra_user_preferences"] = json.dumps(extra_user_preferences) + session = object_session(user) + assert session is not None, "No database session associated with user" + session.commit() + + set_job_destination( + JobDestination( + id="arc", + name="arc", + runner="arc_runner", + params={ + "arc_user_preferences_key": "does_not_exist", + "arc_url": "https://arc.example.com", + "arc_oidc_provider": "does_not_exist", + }, + ) + ) + job_wrapper = self.run_job() + assert job_wrapper is not None, "No job wrapper created" + assert job_wrapper.job_destination.params["arc_url"] == "https://arc.example.com", "Unexpected ARC URL" + assert job_wrapper.job_destination.params["arc_oidc_token"] is None, "No OIDC token expected" + + set_job_destination( + JobDestination( + id="arc", + name="arc", + runner="arc_runner", + params={ + "arc_user_preferences_key": arc_user_preferences_key, + "arc_url": "https://arc.example.com", + "arc_oidc_provider": "does_not_exist", + }, + ) + ) + job_wrapper = self.run_job() + assert job_wrapper is not None, "No job wrapper created" + assert ( + job_wrapper.job_destination.params["arc_url"] + == extra_user_preferences[f"{arc_user_preferences_key}|arc_url"] + ), "Unexpected ARC URL" + assert isinstance( + job_wrapper.job_destination.params["arc_oidc_token"], str + ), f"Unexpected type {type(job_wrapper.job_destination.params['arc_oidc_token'])} for OIDC token" + assert len(job_wrapper.job_destination.params["arc_oidc_token"]) > 0, "Invalid OIDC token"