diff --git a/client/src/api/schema/schema.ts b/client/src/api/schema/schema.ts index a61861be3eaf..c0403966ad5c 100644 --- a/client/src/api/schema/schema.ts +++ b/client/src/api/schema/schema.ts @@ -3045,6 +3045,45 @@ export interface paths { patch?: never; trace?: never; }; + "/api/jobs/{job_id}/files": { + parameters: { + query?: never; + header?: never; + path?: never; + cookie?: never; + }; + /** + * Get a file required to staging a job. + * @description Get a file required to staging a job (proper datasets, extra inputs, task-split inputs, working directory + * files). + * + * This API method is intended only for consumption by job runners, not end users. + */ + get: operations["index_api_jobs__job_id__files_get"]; + /** Populate an output file. */ + put: operations["populate_api_jobs__job_id__files_put"]; + /** + * Populate an output file. + * @description Populate an output file (formal dataset, task split part, working directory file (such as those related to + * metadata). This should be a multipart POST with a 'file' parameter containing the contents of the actual file to + * create. + * + * This API method is intended only for consumption by job runners, not end users. + */ + post: operations["create_api_jobs__job_id__files_post"]; + delete?: never; + options?: never; + /** + * Get a file required to staging a job. + * @description Get a file required to staging a job (proper datasets, extra inputs, task-split inputs, working directory + * files). + * + * This API method is intended only for consumption by job runners, not end users. + */ + head: operations["index_api_jobs__job_id__files_head"]; + patch?: never; + trace?: never; + }; "/api/jobs/{job_id}/inputs": { parameters: { query?: never; @@ -6680,6 +6719,34 @@ export interface components { /** Name */ name?: unknown; }; + /** Body_create_api_jobs__job_id__files_post */ + Body_create_api_jobs__job_id__files_post: { + /** + * File + * Format: binary + */ + __file?: string; + /** File Path */ + __file_path?: string; + /** + * File + * Format: binary + * @description Contents of the file to create. + */ + file?: string; + /** + * Job Key + * @description A key used to authenticate this request as acting on behalf or a job runner for the specified job. + */ + job_key?: string | null; + /** + * Path + * @description Path to file to create. + */ + path?: string | null; + /** Session Id */ + session_id?: string; + }; /** Body_create_form_api_libraries__library_id__contents_post */ Body_create_form_api_libraries__library_id__contents_post: { /** Create Type */ @@ -31495,6 +31562,284 @@ export interface operations { }; }; }; + index_api_jobs__job_id__files_get: { + parameters: { + query: { + /** @description Path to file. */ + path: string; + /** @description A key used to authenticate this request as acting on behalf or a job runner for the specified job. */ + job_key: string; + }; + header?: { + /** @description The user ID that will be used to effectively make this API call. Only admins and designated users can make API calls on behalf of other users. */ + "run-as"?: string | null; + }; + path: { + /** @description Encoded id string of the job. */ + job_id: string; + }; + cookie?: never; + }; + requestBody?: never; + responses: { + /** @description Contents of file. */ + 200: { + headers: { + [name: string]: unknown; + }; + content: { + "application/octet-stream": unknown; + }; + }; + /** @description Path does not refer to a file, or input dataset(s) for job have been purged. */ + 400: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": unknown; + }; + }; + /** @description File not found. */ + 404: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": unknown; + }; + }; + /** @description Input dataset(s) for job have been purged. */ + 500: { + headers: { + [name: string]: unknown; + }; + content?: never; + }; + /** @description Request Error */ + "4XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + /** @description Server Error */ + "5XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + }; + }; + populate_api_jobs__job_id__files_put: { + parameters: { + query: { + /** @description Path to file to create/replace. */ + path: string; + /** @description A key used to authenticate this request as acting on behalf of a job runner for the specified job. */ + job_key: string; + }; + header?: { + /** @description The user ID that will be used to effectively make this API call. Only admins and designated users can make API calls on behalf of other users. */ + "run-as"?: string | null; + }; + path: { + /** @description Encoded id string of the job. */ + job_id: string; + }; + cookie?: never; + }; + requestBody?: never; + responses: { + /** @description Successful Response */ + 200: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": unknown; + }; + }; + /** @description A new file has been created. */ + 201: { + headers: { + [name: string]: unknown; + }; + content?: never; + }; + /** @description An existing file has been replaced. */ + 204: { + headers: { + [name: string]: unknown; + }; + content?: never; + }; + /** @description Bad request. */ + 400: { + headers: { + [name: string]: unknown; + }; + content?: never; + }; + /** @description Request Error */ + "4XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + /** @description Server Error */ + "5XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + }; + }; + create_api_jobs__job_id__files_post: { + parameters: { + query?: { + /** @description Path to file to create. */ + path?: string | null; + /** @description A key used to authenticate this request as acting on behalf or a job runner for the specified job. */ + job_key?: string | null; + }; + header?: { + /** @description The user ID that will be used to effectively make this API call. Only admins and designated users can make API calls on behalf of other users. */ + "run-as"?: string | null; + }; + path: { + /** @description Encoded id string of the job. */ + job_id: string; + }; + cookie?: never; + }; + requestBody?: { + content: { + "multipart/form-data": components["schemas"]["Body_create_api_jobs__job_id__files_post"]; + }; + }; + responses: { + /** @description An okay message. */ + 200: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": unknown; + }; + }; + /** @description Bad request (including no file provided). */ + 400: { + headers: { + [name: string]: unknown; + }; + content?: never; + }; + /** @description Request Error */ + "4XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + /** @description Server Error */ + "5XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + }; + }; + index_api_jobs__job_id__files_head: { + parameters: { + query: { + /** @description Path to file. */ + path: string; + /** @description A key used to authenticate this request as acting on behalf or a job runner for the specified job. */ + job_key: string; + }; + header?: { + /** @description The user ID that will be used to effectively make this API call. Only admins and designated users can make API calls on behalf of other users. */ + "run-as"?: string | null; + }; + path: { + /** @description Encoded id string of the job. */ + job_id: string; + }; + cookie?: never; + }; + requestBody?: never; + responses: { + /** @description Contents of file. */ + 200: { + headers: { + [name: string]: unknown; + }; + content: { + "application/octet-stream": unknown; + }; + }; + /** @description Path does not refer to a file, or input dataset(s) for job have been purged. */ + 400: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": unknown; + }; + }; + /** @description File not found. */ + 404: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": unknown; + }; + }; + /** @description Input dataset(s) for job have been purged. */ + 500: { + headers: { + [name: string]: unknown; + }; + content?: never; + }; + /** @description Request Error */ + "4XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + /** @description Server Error */ + "5XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + }; + }; get_inputs_api_jobs__job_id__inputs_get: { parameters: { query?: never; diff --git a/lib/galaxy/jobs/runners/pulsar.py b/lib/galaxy/jobs/runners/pulsar.py index 4dbe3632a2bd..15c6acb2a99e 100644 --- a/lib/galaxy/jobs/runners/pulsar.py +++ b/lib/galaxy/jobs/runners/pulsar.py @@ -40,6 +40,7 @@ from sqlalchemy import select from galaxy import model +from galaxy.authnz.util import provider_name_to_backend from galaxy.job_execution.compute_environment import ( ComputeEnvironment, dataset_path_to_extra_path, @@ -63,6 +64,7 @@ log = logging.getLogger(__name__) __all__ = ( + "PulsarARCJobRunner", "PulsarLegacyJobRunner", "PulsarRESTJobRunner", "PulsarMQJobRunner", @@ -247,8 +249,15 @@ def _init_client_manager(self): for kwd in self.runner_params.keys(): if kwd.startswith("amqp_") or kwd.startswith("transport_"): client_manager_kwargs[kwd] = self.runner_params[kwd] + + client_manager_kwargs.update(self._init_client_manager_extend_kwargs(**client_manager_kwargs)) + self.client_manager = build_client_manager(**client_manager_kwargs) + def _init_client_manager_extend_kwargs(self, **kwargs): + """Override this method to consider additional (or alter) keyword arguments when building the client manager.""" + return kwargs + def __init_pulsar_app(self, conf, pulsar_conf_path): if conf is None and pulsar_conf_path is None and not self.default_build_pulsar_app: self.pulsar_app = None @@ -1061,6 +1070,94 @@ def _populate_parameter_defaults(self, job_destination): pulsar_app_config["staging_directory"] = params.get("jobs_directory") +ARC_DESTINATION_DEFAULTS: Dict[str, Any] = { + **COEXECUTION_DESTINATION_DEFAULTS, + "default_file_action": "json_transfer", +} + + +class PulsarARCJobRunner(PulsarCoexecutionJobRunner): + runner_name = "PulsarARCJobRunner" + + destination_defaults = ARC_DESTINATION_DEFAULTS + + use_mq = False + poll = True + + def get_client_from_state(self, job_state): + client = super().get_client_from_state(job_state) + client._arc_job_id = job_state.job_id # used by the client to get the job state + return client + + def queue_job(self, job_wrapper): + """ + Queue a job to run it using the Pulsar ARC client. + + ARC supports authentication via either x509 certificates or OIDC tokens. Since Galaxy only supports the latter + (through OIDC providers), the Pulsar ARC client implementation is designed to work with OIDC. Thus, to run jobs, + the Pulsar ARC client needs an ARC endpoint URL and an OIDC access token. Those are passed as destination + parameters. + + OIDC tokens are, for obvious reasons, not meant to be part of the job configuration file nor of TPV + configuration files; they have to be obtained before the job is queued. For admins, it may also be interesting + to have a mechanism to inject an ARC endpoint URL from the user preferences, so that users can configure their + own ARC endpoint URLs. + + Therefore, this method provides a framework to: + - Obtain an ARC endpoint URL from the user's preferences (if enabled). + - Obtain an OIDC access token for the user running the job. + - Decide which OIDC provider to obtain the token from if multiple are available. + + To let users configure their own settings, admins have to set the destination parameter + "arc_user_preferences_key". Galaxy will then read the options "arc_url" and "arc_oidc_provider" under that key + from the user extra preferences. Both are optional; if the user does not configure any, the destination defaults + will be used. If no destination default exists and the user account is associated with exactly one OIDC + provider, then Galaxy will use that provider. + """ + job = job_wrapper.get_job() + user = job.user + + extra_user_preferences_key = job_wrapper.job_destination.params.get("arc_user_preferences_key") + # for example, "distributed_compute_arc" + + user_arc_url = ( + user.extra_preferences.get(f"{extra_user_preferences_key}|arc_url") if extra_user_preferences_key else None + ) + user_arc_oidc_provider = ( + user.extra_preferences.get(f"{extra_user_preferences_key}|arc_oidc_provider") + if extra_user_preferences_key + else None + ) + destination_arc_url = job_wrapper.job_destination.params.get("arc_url") + destination_oidc_provider = job_wrapper.job_destination.params.get("arc_oidc_provider") + arc_url = user_arc_url or destination_arc_url + arc_oidc_provider = user_arc_oidc_provider or destination_oidc_provider + if arc_oidc_provider is None: + user_oidc_providers = [auth.provider for auth in user.custos_auth + user.social_auth] + if len(user_oidc_providers) > 1: + raise Exception( + f"Multiple identity providers are linked to your user account '{user.username}', please select one " + f"in your user preferences to launch ARC jobs." + ) + elif len(user_oidc_providers) == 0: + raise Exception( + f"No identity provider is linked to your user account '{user.username}', please log in using an " + f"identity provider to launch ARC jobs." + ) + arc_oidc_provider = user_oidc_providers[0] + arc_oidc_provider_backend = provider_name_to_backend(arc_oidc_provider) + arc_oidc_token = user.get_oidc_tokens(arc_oidc_provider_backend)["access"] + + job_wrapper.job_destination.params.update({"arc_url": arc_url, "arc_oidc_token": arc_oidc_token}) + + return super().queue_job(job_wrapper) + + def _init_client_manager_extend_kwargs(self, **kwargs): + kwargs = super()._init_client_manager_extend_kwargs(**kwargs) + kwargs["arc_enabled"] = True + return kwargs + + KUBERNETES_DESTINATION_DEFAULTS: Dict[str, Any] = {"k8s_enabled": True, **COEXECUTION_DESTINATION_DEFAULTS} diff --git a/lib/galaxy/webapps/galaxy/api/__init__.py b/lib/galaxy/webapps/galaxy/api/__init__.py index 51066fed89c1..5be6fff2405a 100644 --- a/lib/galaxy/webapps/galaxy/api/__init__.py +++ b/lib/galaxy/webapps/galaxy/api/__init__.py @@ -242,6 +242,9 @@ def __init__(self, request: Request): def base(self) -> str: return str(self.__request.base_url) + def stream(self) -> AsyncGenerator: + return self.__request.stream() + @property def url_path(self) -> str: scope = self.__request.scope @@ -250,6 +253,10 @@ def url_path(self) -> str: url = urljoin(url, root_path) return url + @property + def url(self) -> str: + return str(self.__request.url) + @property def host(self) -> str: return self.__request.base_url.netloc @@ -265,6 +272,10 @@ def environ(self) -> Environ: self.__environ = build_environ(self.__request.scope, None) # type: ignore[arg-type] return self.__environ + @property + def method(self): + return self.__request.method + @property def headers(self): return self.__request.headers diff --git a/lib/galaxy/webapps/galaxy/api/job_files.py b/lib/galaxy/webapps/galaxy/api/job_files.py index 31a4974376e7..5f8614dc6fce 100644 --- a/lib/galaxy/webapps/galaxy/api/job_files.py +++ b/lib/galaxy/webapps/galaxy/api/job_files.py @@ -1,11 +1,30 @@ -"""API for asynchronous job running mechanisms can use to fetch or put files -related to running and queued jobs. +""" +API for asynchronous job running mechanisms can use to fetch or put files related to running and queued jobs. """ +import asyncio import logging import os import re import shutil +from typing import ( + cast, + IO, + Optional, +) +from urllib.parse import unquote + +from fastapi import ( + File, + Form, + Path, + Query, + Request, + Response, + UploadFile, +) +from fastapi.params import Depends +from typing_extensions import Annotated from galaxy import ( exceptions, @@ -13,55 +32,182 @@ ) from galaxy.managers.context import ProvidesAppContext from galaxy.model import Job -from galaxy.web import ( - expose_api_anonymous_and_sessionless, - expose_api_raw_anonymous_and_sessionless, +from galaxy.web import expose_api_anonymous_and_sessionless +from galaxy.webapps.base.api import GalaxyFileResponse +from galaxy.webapps.galaxy.api import ( + DependsOnTrans, + Router, ) +from galaxy.work.context import SessionRequestContext from . import BaseGalaxyAPIController +__all__ = ("FastAPIJobFiles", "JobFilesAPIController", "router") + log = logging.getLogger(__name__) -class JobFilesAPIController(BaseGalaxyAPIController): - """This job files controller allows remote job running mechanisms to - read and modify the current state of files for queued and running jobs. - It is certainly not meant to represent part of Galaxy's stable, user - facing API. - - Furthermore, even if a user key corresponds to the user running the job, - it should not be accepted for authorization - this API allows access to - low-level unfiltered files and such authorization would break Galaxy's - security model for tool execution. +router = Router( + # keep the endpoint in the undocumented section of the API docs `/api/docs`, as all endpoints from `FastAPIJobFiles` + # are certainly not meant to represent part of Galaxy's stable, user facing API + tags=["undocumented"] +) + + +def path_query_or_form( + request: Request, + path_query: Annotated[Optional[str], Query(alias="path", description="Path to file to create.")] = None, + path: Annotated[Optional[str], Form(alias="path", description="Path to file to create.")] = None, +): """ + Accept `path` parameter both in query and form format. - @expose_api_raw_anonymous_and_sessionless - def index(self, trans: ProvidesAppContext, job_id, **kwargs): - """ - GET /api/jobs/{job_id}/files + This method does not force the client to provide the parameter, it could simply not submit the parameter in either + format. To force the client to provide the parameter, coerce the output of the method to a string, i.e. + `path: str = Depends(path_query_or_form)` so that FastAPI responds with status code 500 when the parameter is not + provided. + """ + return path_query or path + + +def job_key_query_or_form( + request: Request, + job_key_query: Annotated[ + Optional[str], + Query( + alias="job_key", + description=( + "A key used to authenticate this request as acting on behalf or a job runner for the specified job." + ), + ), + ] = None, + job_key: Annotated[ + Optional[str], + Form( + alias="job_key", + description=( + "A key used to authenticate this request as acting on behalf or a job runner for the specified job." + ), + ), + ] = None, +): + """ + Accept `job_key` parameter both in query and form format. - Get a file required to staging a job (proper datasets, extra inputs, - task-split inputs, working directory files). + This method does not force the client to provide the parameter, it could simply not submit the parameter in either + format. To force the client to provide the parameter, coerce the output of the method to a string, i.e. + `job_key: str = Depends(job_key_query_or_form)` so that FastAPI responds with status code 500 when the parameter is + not provided. + """ + return job_key_query or job_key + + +@router.cbv +class FastAPIJobFiles: + """ + This job files controller allows remote job running mechanisms to read and modify the current state of files for + queued and running jobs. It is certainly not meant to represent part of Galaxy's stable, user facing API. - :type job_id: str - :param job_id: encoded id string of the job - :type path: str - :param path: Path to file. - :type job_key: str - :param job_key: A key used to authenticate this request as acting on - behalf or a job runner for the specified job. + Furthermore, even if a user key corresponds to the user running the job, it should not be accepted for authorization + - this API allows access to low-level unfiltered files and such authorization would break Galaxy's security model + for tool execution. + """ - ..note: - This API method is intended only for consumption by job runners, - not end users. + # FastAPI answers HEAD requests automatically for GET endpoints. However, because of the way legacy WSGI endpoints + # are injected into the FastAPI app (using `app.mount("/", wsgi_handler)`), the built-in support for `HEAD` requests + # breaks, because such requests are passed to the `wsgi_handler` sub-application. This means that the endpoint still + # needs to include some code to handle this behavior, as tests existing before the migration to FastAPI expect HEAD + # requests to work. + # + @router.get( + # simplify me (remove `_args` and `_kwargs` defined using the walrus operator) when ALL endpoints have been + # migrated to FastAPI, this is a workaround for FastAPI bug https://github.com/fastapi/fastapi/issues/13175 + *(_args := ["/api/jobs/{job_id}/files"]), + **( + _kwargs := dict( + summary="Get a file required to staging a job.", + responses={ + 200: { + "description": "Contents of file.", + "content": {"application/json": None, "application/octet-stream": {"example": None}}, + }, + 400: { + "description": "Path does not refer to a file, or input dataset(s) for job have been purged.", + "content": { + "application/json": { + "example": { + "detail": ( + "Path does not refer to a file, or input dataset(s) for job have been purged." + ) + }, + } + }, + }, + 404: { + "description": "File not found.", + "content": { + "application/json": { + "example": {"detail": "File not found."}, + } + }, + }, + 500: {"description": "Input dataset(s) for job have been purged."}, + }, + ) + ), + ) + @router.head(*_args, **_kwargs) # type: ignore[name-defined] + # remove `@router.head(...)` when ALL endpoints have been migrated to FastAPI + @router.api_route( + *_args, # type: ignore[name-defined] + **{key: value for key, value in _kwargs.items() if key != "responses"}, # type: ignore[name-defined] + responses={501: {"description": "Not implemented."}}, + methods=["PROPFIND"], + include_in_schema=False, + ) + # remove `@router.api_route(..., methods=["PROPFIND"])` when ALL endpoints have been migrated to FastAPI + # The ARC remote job runner (`lib.galaxy.jobs.runners.pulsar.PulsarARCJobRunner`) expects this to return HTTP codes + # other than 404 when `PROPFIND` requests are issued. They are not part of the HTTP spec, but they are used in the + # WebDAV protocol. The correct answer to such requests is likely 501 (not implemented). FastAPI returns HTTP 405 + # (method not allowed) for `PROPFIND`, which maybe is not fully correct but tolerable because it is one less quirk + # to maintain. However, because of the way legacy WSGI endpoints are injected into the FastAPI app (using + # `app.mount("/", wsgi_handler)`), the built-in support for returning HTTP 405 for `PROPFIND` breaks, because such + # requests are passed to the `wsgi_handler` sub-application. This means that the endpoint still needs to include + # some code to handle this behavior. + def index( + self, + job_id: Annotated[str, Path(description="Encoded id string of the job.")], + path: Annotated[ + str, + Query( + description="Path to file.", + ), + ], + job_key: Annotated[ + str, + Query( + description=( + "A key used to authenticate this request as acting on behalf or a job runner for the specified job." + ), + ), + ], + trans: SessionRequestContext = DependsOnTrans, + ) -> GalaxyFileResponse: + """ + Get a file required to staging a job (proper datasets, extra inputs, task-split inputs, working directory + files). - :rtype: binary - :returns: contents of file + This API method is intended only for consumption by job runners, not end users. """ - job = self.__authorize_job_access(trans, job_id, **kwargs) - path = kwargs["path"] - try: - return open(path, "rb") - except FileNotFoundError: + # PROPFIND is not implemented, but the endpoint needs to return a non-404 error code for it + # remove me when ALL endpoints have been migrated to FastAPI + if trans.request.method == "PROPFIND": + raise exceptions.NotImplemented() + + path = unquote(path) + + job = self.__authorize_job_access(trans, job_id, path=path, job_key=job_key) + + if not os.path.exists(path): # We know that the job is not terminal, but users (or admin scripts) can purge input datasets. # Here we discriminate that case from truly unexpected bugs. # Not failing the job here, this is or should be handled by pulsar. @@ -70,55 +216,128 @@ def index(self, trans: ProvidesAppContext, job_id, **kwargs): # This looks like a galaxy dataset, check if any job input has been deleted. if any(jtid.dataset.dataset.purged for jtid in job.input_datasets): raise exceptions.ItemDeletionException("Input dataset(s) for job have been purged.") - else: - raise + raise exceptions.ObjectNotFound("File not found.") + elif not os.path.isfile(path): + raise exceptions.RequestParameterInvalidException("Path does not refer to a file.") + + return GalaxyFileResponse(path) + + # The ARC remote job runner (`lib.galaxy.jobs.runners.pulsar.PulsarARCJobRunner`) expects a `PUT` endpoint to stage + # out result files back to Galaxy. + @router.put( + "/api/jobs/{job_id}/files", + summary="Populate an output file.", + responses={ + 201: {"description": "A new file has been created."}, + 204: {"description": "An existing file has been replaced."}, + 400: {"description": "Bad request."}, + }, + ) + def populate( + self, + job_id: Annotated[str, Path(description="Encoded id string of the job.")], + path: Annotated[str, Query(description="Path to file to create/replace.")], + job_key: Annotated[ + str, + Query( + description=( + "A key used to authenticate this request as acting on behalf of a job runner for the specified job." + ), + ), + ], + trans: SessionRequestContext = DependsOnTrans, + ): + path = unquote(path) + + job = self.__authorize_job_access(trans, job_id, path=path, job_key=job_key) + self.__check_job_can_write_to_path(trans, job, path) - @expose_api_anonymous_and_sessionless - def create(self, trans, job_id, payload, **kwargs): + destination_file_exists = os.path.exists(path) + + # FastAPI can only read the file contents from the request body in an async context. To write the file without + # using an async endpoint, the async code that reads the file from the body and writes it to disk will have to + # run within the sync endpoint. Since the code that writes the data to disk is blocking + # `destination_file.write(chunk)`, it has to run on its own event loop within the thread spawned to answer the + # request to the sync endpoint. + async def write(): + with open(path, "wb") as destination_file: + async for chunk in trans.request.stream(): + destination_file.write(chunk) + + target_dir = os.path.dirname(path) + util.safe_makedirs(target_dir) + event_loop = asyncio.new_event_loop() + try: + asyncio.set_event_loop(event_loop) + event_loop.run_until_complete(write()) + finally: + event_loop.close() + + return ( + Response(status_code=201, headers={"Location": str(trans.request.url)}) + if not destination_file_exists + else Response(status_code=204) + ) + + @router.post( + "/api/jobs/{job_id}/files", + summary="Populate an output file.", + responses={ + 200: {"description": "An okay message.", "content": {"application/json": {"example": {"message": "ok"}}}}, + 400: {"description": "Bad request (including no file provided)."}, + }, + ) + def create( + self, + job_id: Annotated[str, Path(description="Encoded id string of the job.")], + path: Annotated[str, Depends(path_query_or_form)], + job_key: Annotated[str, Depends(job_key_query_or_form)], + file: UploadFile = File(None, description="Contents of the file to create."), + session_id: str = Form(None), + nginx_upload_module_file_path: str = Form( + None, + alias="__file_path", + validation_alias="__file_path", + # both `alias` and `validation_alias` are needed for body parameters, see + # https://github.com/fastapi/fastapi/issues/10286#issuecomment-1727642960 + ), + underscore_file: UploadFile = File( + None, + alias="__file", + validation_alias="__file", + # both `alias` and `validation_alias` are needed for body parameters, see + # https://github.com/fastapi/fastapi/issues/10286#issuecomment-1727642960 + ), + trans: ProvidesAppContext = DependsOnTrans, + ): """ - create( self, trans, job_id, payload, **kwargs ) - * POST /api/jobs/{job_id}/files - Populate an output file (formal dataset, task split part, working - directory file (such as those related to metadata)). This should be - a multipart post with a 'file' parameter containing the contents of - the actual file to create. - - :type job_id: str - :param job_id: encoded id string of the job - :type payload: dict - :param payload: dictionary structure containing:: - 'job_key' = Key authenticating - 'path' = Path to file to create. - - ..note: - This API method is intended only for consumption by job runners, - not end users. - - :rtype: dict - :returns: an okay message + Populate an output file (formal dataset, task split part, working directory file (such as those related to + metadata). This should be a multipart POST with a 'file' parameter containing the contents of the actual file to + create. + + This API method is intended only for consumption by job runners, not end users. """ - job = self.__authorize_job_access(trans, job_id, **payload) - path = payload.get("path") - if not path: - raise exceptions.RequestParameterInvalidException("'path' parameter not provided or empty.") + path = unquote(path) + + job = self.__authorize_job_access(trans, job_id, path=path, job_key=job_key) self.__check_job_can_write_to_path(trans, job, path) + input_file: Optional[IO[bytes]] = None + input_file_path: Optional[str] = None # Is this writing an unneeded file? Should this just copy in Python? - if "__file_path" in payload: - file_path = payload.get("__file_path") + if nginx_upload_module_file_path: + input_file_path = nginx_upload_module_file_path upload_store = trans.app.config.nginx_upload_job_files_store assert upload_store, ( "Request appears to have been processed by" " nginx_upload_module but Galaxy is not" " configured to recognize it" ) - assert file_path.startswith( + assert input_file_path.startswith( upload_store - ), f"Filename provided by nginx ({file_path}) is not in correct directory ({upload_store})" - input_file = open(file_path) - elif "session_id" in payload: + ), f"Filename provided by nginx ({input_file_path}) is not in correct directory ({upload_store})" + elif session_id: # code stolen from basic.py - session_id = payload["session_id"] upload_store = ( trans.app.config.tus_upload_store_job_files or trans.app.config.tus_upload_store @@ -127,76 +346,41 @@ def create(self, trans, job_id, payload, **kwargs): if re.match(r"^[\w-]+$", session_id) is None: raise ValueError("Invalid session id format.") local_filename = os.path.abspath(os.path.join(upload_store, session_id)) - input_file = open(local_filename) + input_file_path = local_filename + elif file: + input_file = file.file + elif underscore_file: + input_file = underscore_file.file else: - input_file = payload.get("file", payload.get("__file", None)).file + raise exceptions.RequestParameterMissingException("No file uploaded.") + target_dir = os.path.dirname(path) util.safe_makedirs(target_dir) - try: - if os.path.exists(path) and (path.endswith("tool_stdout") or path.endswith("tool_stderr")): - with open(path, "ab") as destination: - shutil.copyfileobj(open(input_file.name, "rb"), destination) + if os.path.exists(path) and (path.endswith("tool_stdout") or path.endswith("tool_stderr")): + with open(path, "ab") as destination: + if input_file_path: + with open(input_file_path, "rb") as input_file_handle: + shutil.copyfileobj(input_file_handle, destination) + else: + shutil.copyfileobj(cast(IO[bytes], input_file), destination) + else: + # prior to migrating to FastAPI, this operation was done more efficiently for all cases using + # `shutil.move(input_file_path, path)`, but FastAPI stores the uploaded file as + # `tempfile.SpooledTemporaryFile` + # (https://docs.python.org/3/library/tempfile.html#tempfile.SpooledTemporaryFile), so now there is not even + # a path where uploaded files can be accessed on disk + if input_file_path: + shutil.move(input_file_path, path) else: - shutil.move(input_file.name, path) - finally: - try: - input_file.close() - except OSError: - # Fails to close file if not using nginx upload because the - # tempfile has moved and Python wants to delete it. - pass - return {"message": "ok"} + with open(path, "wb") as destination: + shutil.copyfileobj(cast(IO[bytes], input_file), destination) - @expose_api_anonymous_and_sessionless - def tus_patch(self, trans, **kwds): - """ - Exposed as PATCH /api/job_files/resumable_upload. - - I think based on the docs, a separate tusd server is needed for job files if - also hosting one for use facing uploads. - - Setting up tusd for job files should just look like (I think): - - tusd -host localhost -port 1080 -upload-dir=/database/tmp - - See more discussion of checking upload access, but we shouldn't need the - API key and session stuff the user upload tusd server should be configured with. - - Also shouldn't need a hooks endpoint for this reason but if you want to add one - the target CLI entry would be -hooks-http=/api/job_files/tus_hooks - and the action is featured below. - - I would love to check the job state with __authorize_job_access on the first - POST but it seems like TusMiddleware doesn't default to coming in here for that - initial POST the way it does for the subsequent PATCHes. Ultimately, the upload - is still authorized before the write done with POST /api/jobs//files - so I think there is no route here to mess with user data - the worst of the security - issues that can be caused is filling up the sever with needless files that aren't - acted on. Since this endpoint is not meant for public consumption - all the job - files stuff and the TUS server should be blocked to public IPs anyway and restricted - to your Pulsar servers and similar targeting could be accomplished with a user account - and the user facing upload endpoints. - """ - return None - - @expose_api_anonymous_and_sessionless - def tus_hooks(self, trans, **kwds): - """No-op but if hook specified the way we do for user upload it would hit this action. - - Exposed as PATCH /api/job_files/tus_hooks and documented in the docstring for - tus_patch. - """ - pass - - def __authorize_job_access(self, trans, encoded_job_id, **kwargs): - for key in ["path", "job_key"]: - if key not in kwargs: - error_message = f"Job files action requires a valid '{key}'." - raise exceptions.ObjectAttributeMissingException(error_message) + return {"message": "ok"} + def __authorize_job_access(self, trans, encoded_job_id, path, job_key): job_id = trans.security.decode_id(encoded_job_id) - job_key = trans.security.encode_id(job_id, kind="jobs_files") - if not util.safe_str_cmp(str(kwargs["job_key"]), job_key): + job_key_from_job_id = trans.security.encode_id(job_id, kind="jobs_files") + if not util.safe_str_cmp(str(job_key), job_key_from_job_id): raise exceptions.ItemAccessibilityException("Invalid job_key supplied.") # Verify job is active. Don't update the contents of complete jobs. @@ -207,9 +391,9 @@ def __authorize_job_access(self, trans, encoded_job_id, **kwargs): return job def __check_job_can_write_to_path(self, trans, job, path): - """Verify an idealized job runner should actually be able to write to - the specified path - it must be a dataset output, a dataset "extra - file", or a some place in the working directory of this job. + """ + Verify an idealized job runner should actually be able to write to the specified path - it must be a dataset + output, a dataset "extra file", or a some place in the working directory of this job. Would like similar checks for reading the unstructured nature of loc files make this very difficult. (See abandoned work here @@ -220,8 +404,8 @@ def __check_job_can_write_to_path(self, trans, job, path): raise exceptions.ItemAccessibilityException("Job is not authorized to write to supplied path.") def __is_output_dataset_path(self, job, path): - """Check if is an output path for this job or a file in the an - output's extra files path. + """ + Check if is an output path for this job or a file in the output's extra files path. """ da_lists = [job.output_datasets, job.output_library_datasets] for da_list in da_lists: @@ -240,3 +424,59 @@ def __in_working_directory(self, job, path, app): job, base_dir="job_work", dir_only=True, extra_dir=str(job.id) ) return util.in_directory(path, working_directory) + + +class JobFilesAPIController(BaseGalaxyAPIController): + """ + Legacy WSGI endpoints dedicated to TUS uploads. + + TUS upload endpoints work in tandem with the WSGI middleware `TusMiddleware` from the `tuswsgi` package. Both + WSGI middlewares and endpoints are injected into the FastAPI app after FastAPI routes as a single sub-application + `wsgi_handler` using `app.mount("/", wsgi_handler)`, meaning that requests are passed to the `wsgi_handler` + sub-application (and thus to `TusMiddleware`) only if there was no FastAPI endpoint defined to handle them. + + Therefore, these legacy WSGI endpoints cannot be migrated to FastAPI unless `TusMiddleware` is migrated to ASGI. + """ + + @expose_api_anonymous_and_sessionless + def tus_patch(self, trans, **kwds): + """ + Exposed as PATCH /api/job_files/resumable_upload. + + I think based on the docs, a separate tusd server is needed for job files if + also hosting one for use facing uploads. + + Setting up tusd for job files should just look like (I think): + + `tusd -host localhost -port 1080 -upload-dir=/database/tmp` + + See more discussion of checking upload access, but we shouldn't need the + API key and session stuff the user upload tusd server should be configured with. + + Also shouldn't need a hooks endpoint for this reason but if you want to add one + the target CLI entry would be `-hooks-http=/api/job_files/tus_hooks` + and the action is featured below. + + I would love to check the job state with `__authorize_job_access` on the first + POST but it seems like `TusMiddleware` doesn't default to coming in here for that + initial POST the way it does for the subsequent PATCHes. Ultimately, the upload + is still authorized before the write done with POST `/api/jobs//files` + so I think there is no route here to mess with user data - the worst of the security + issues that can be caused is filling up the sever with needless files that aren't + acted on. Since this endpoint is not meant for public consumption - all the job + files stuff and the TUS server should be blocked to public IPs anyway and restricted + to your Pulsar servers and similar targeting could be accomplished with a user account + and the user facing upload endpoints. + """ + ... + return None + + @expose_api_anonymous_and_sessionless + def tus_hooks(self, trans, **kwds): + """ + No-op but if hook specified the way we do for user upload it would hit this action. + + Exposed as PATCH /api/job_files/tus_hooks and documented in the docstring for tus_patch. + """ + ... + return None diff --git a/lib/galaxy/webapps/galaxy/buildapp.py b/lib/galaxy/webapps/galaxy/buildapp.py index 5f8cf044d6b8..4fd9b77d30c9 100644 --- a/lib/galaxy/webapps/galaxy/buildapp.py +++ b/lib/galaxy/webapps/galaxy/buildapp.py @@ -872,24 +872,6 @@ def populate_api_routes(webapp, app): conditions=dict(method=["GET"]), ) - # Job files controllers. Only for consumption by remote job runners. - webapp.mapper.resource( - "file", - "files", - controller="job_files", - name_prefix="job_", - path_prefix="/api/jobs/{job_id}", - parent_resources=dict(member_name="job", collection_name="jobs"), - ) - - webapp.mapper.connect( - "index", - "/api/jobs/{job_id}/files", - controller="job_files", - action="index", - conditions=dict(method=["HEAD"]), - ) - webapp.mapper.resource( "port", "ports", diff --git a/lib/galaxy/work/context.py b/lib/galaxy/work/context.py index e723e069109c..8d0b9f86b912 100644 --- a/lib/galaxy/work/context.py +++ b/lib/galaxy/work/context.py @@ -1,6 +1,7 @@ import abc from typing import ( Any, + AsyncGenerator, Dict, List, Optional, @@ -87,16 +88,30 @@ class GalaxyAbstractRequest: def base(self) -> str: """Base URL of the request.""" + @abc.abstractmethod + def stream(self) -> AsyncGenerator: + """Request body split in parts.""" + @property @abc.abstractmethod def url_path(self) -> str: """Base with optional prefix added.""" + @property + @abc.abstractmethod + def url(self): + """URL of the request.""" + @property @abc.abstractmethod def host(self) -> str: """The host address.""" + @property + @abc.abstractmethod + def method(self) -> str: + """The request's HTTP method.""" + @property @abc.abstractmethod def is_secure(self) -> bool: diff --git a/test/integration/job_resource_rules/test_pulsar_arc.py b/test/integration/job_resource_rules/test_pulsar_arc.py new file mode 100644 index 000000000000..ff9e79d22532 --- /dev/null +++ b/test/integration/job_resource_rules/test_pulsar_arc.py @@ -0,0 +1,50 @@ +"""Dynamic job rule for the Pulsar ARC (Advanced Resource Connector) job runner integration tests. + +This file contains a dynamic job rule for the Pulsar ARC job runner integration tests in +`test/integration/test_pulsar_arc.py`. The rule allows selecting a job destination based on a dynamic reference that can +be read and updated during the tests using `test_pulsar_arc_get_job_destination()` and +`test_pulsar_arc_set_job_destination()` respectively. +""" + +from typing import Optional + +from galaxy.app import UniverseApplication +from galaxy.jobs import JobMappingException # type: ignore[attr-defined] +from galaxy.jobs import ( + JobDestination, +) +from galaxy.model import ( + Job, + User as GalaxyUser, +) +from galaxy.tools import Tool as GalaxyTool + +__all__ = ("test_pulsar_arc", "test_pulsar_arc_get_job_destination", "test_pulsar_arc_set_job_destination") + + +test_pulsar_arc_job_destination_ref: list[JobDestination] = [] + + +def test_pulsar_arc_get_job_destination() -> Optional[JobDestination]: + """ + Return the last stored job destination (if any). + """ + return test_pulsar_arc_job_destination_ref[0] if test_pulsar_arc_job_destination_ref else None + + +def test_pulsar_arc_set_job_destination(job_destination: JobDestination) -> None: + """ + Store a job destination, overwriting any previous one. + """ + test_pulsar_arc_job_destination_ref[:] = [job_destination] + + +def test_pulsar_arc(app: UniverseApplication, job: Job, tool: GalaxyTool, user: GalaxyUser) -> JobDestination: + """ + Dynamic job rule that maps jobs to the stored job destination. + """ + job_destination = test_pulsar_arc_get_job_destination() + if not job_destination: + raise JobMappingException("No job destination set for dynamic job rule.") + + return job_destination diff --git a/test/integration/oidc/test_auth_oidc.py b/test/integration/oidc/test_auth_oidc.py index bf0992db54be..f16f41d0f7e1 100644 --- a/test/integration/oidc/test_auth_oidc.py +++ b/test/integration/oidc/test_auth_oidc.py @@ -93,6 +93,9 @@ class BaseKeycloakIntegrationTestCase(integration_util.IntegrationTestCase): backend_config_file: ClassVar[str] saved_oauthlib_insecure_transport: ClassVar[bool] + REGEX_KEYCLOAK_LOGIN_ACTION = re.compile(r"action=\"(.*)\"\s+") + REGEX_GALAXY_CSRF_TOKEN = re.compile(r"session_csrf_token\": \"(.*)\"") + @classmethod def setUpClass(cls): # By default, the oidc callback must be done over a secure transport, so @@ -158,40 +161,38 @@ def handle_galaxy_oidc_config_kwds(cls, config): def _get_interactor(self, api_key=None, allow_anonymous=False) -> "ApiTestInteractor": return super()._get_interactor(api_key=None, allow_anonymous=True) + def _login_via_keycloak(self, username, password, expected_codes=None, save_cookies=False, session=None): + if expected_codes is None: + expected_codes = [200, 404] + session = session or requests.Session() + response = session.get(f"{self.url}authnz/keycloak/login") + provider_url = response.json()["redirect_uri"] + response = session.get(provider_url, verify=False) + matches = self.REGEX_KEYCLOAK_LOGIN_ACTION.search(response.text) + assert matches + auth_url = html.unescape(str(matches.groups(1)[0])) + response = session.post(auth_url, data={"username": username, "password": password}, verify=False) + assert response.status_code in expected_codes, response + if save_cookies: + self.galaxy_interactor.cookies = session.cookies + return session, response + + def _get_keycloak_access_token( + self, client_id="gxyclient", username=KEYCLOAK_TEST_USERNAME, password=KEYCLOAK_TEST_PASSWORD, scopes=None + ): + data = { + "client_id": client_id, + "client_secret": "dummyclientsecret", + "grant_type": "password", + "username": username, + "password": password, + "scope": scopes or [], + } + response = requests.post(f"{KEYCLOAK_URL}/protocol/openid-connect/token", data=data, verify=False) + return response.json()["access_token"] + class TestGalaxyOIDCLoginIntegration(AbstractTestCases.BaseKeycloakIntegrationTestCase): - REGEX_KEYCLOAK_LOGIN_ACTION = re.compile(r"action=\"(.*)\"\s+") - REGEX_GALAXY_CSRF_TOKEN = re.compile(r"session_csrf_token\": \"(.*)\"") - - def _login_via_keycloak(self, username, password, expected_codes=None, save_cookies=False, session=None): - if expected_codes is None: - expected_codes = [200, 404] - session = session or requests.Session() - response = session.get(f"{self.url}authnz/keycloak/login") - provider_url = response.json()["redirect_uri"] - response = session.get(provider_url, verify=False) - matches = self.REGEX_KEYCLOAK_LOGIN_ACTION.search(response.text) - assert matches - auth_url = html.unescape(str(matches.groups(1)[0])) - response = session.post(auth_url, data={"username": username, "password": password}, verify=False) - assert response.status_code in expected_codes, response - if save_cookies: - self.galaxy_interactor.cookies = session.cookies - return session, response - - def _get_keycloak_access_token( - self, client_id="gxyclient", username=KEYCLOAK_TEST_USERNAME, password=KEYCLOAK_TEST_PASSWORD, scopes=None - ): - data = { - "client_id": client_id, - "client_secret": "dummyclientsecret", - "grant_type": "password", - "username": username, - "password": password, - "scope": scopes or [], - } - response = requests.post(f"{KEYCLOAK_URL}/protocol/openid-connect/token", data=data, verify=False) - return response.json()["access_token"] def test_oidc_login_new_user(self): _, response = self._login_via_keycloak(KEYCLOAK_TEST_USERNAME, KEYCLOAK_TEST_PASSWORD, save_cookies=True) diff --git a/test/integration/test_job_files.py b/test/integration/test_job_files.py index 4f244419156b..9a7458730696 100644 --- a/test/integration/test_job_files.py +++ b/test/integration/test_job_files.py @@ -17,8 +17,14 @@ import io import os +import shutil import tempfile -from typing import Dict +from typing import ( + Any, + Dict, + IO, + Optional, +) import requests from sqlalchemy import select @@ -42,26 +48,36 @@ class TestJobFilesIntegration(integration_util.IntegrationTestCase): initialized = False dataset_populator: DatasetPopulator + hist_id: int # cannot use `history_id` as name, it collides with a pytest fixture + input_hda: model.HistoryDatasetAssociation + input_hda_dict: Dict[str, Any] + _nginx_upload_job_files_store: str + @classmethod def handle_galaxy_config_kwds(cls, config): super().handle_galaxy_config_kwds(config) + config["job_config_file"] = SIMPLE_JOB_CONFIG_FILE config["object_store_store_by"] = "uuid" config["server_name"] = "files" - cls.initialized = False + config["nginx_upload_job_files_store"] = tempfile.mkdtemp() + cls._nginx_upload_job_files_store = config["nginx_upload_job_files_store"] + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls._nginx_upload_job_files_store) + super().tearDownClass() def setUp(self): super().setUp() self.dataset_populator = DatasetPopulator(self.galaxy_interactor) - if not TestJobFilesIntegration.initialized: - history_id = self.dataset_populator.new_history() - sa_session = self.sa_session - stmt = select(model.HistoryDatasetAssociation) - assert len(sa_session.scalars(stmt).all()) == 0 - self.input_hda_dict = self.dataset_populator.new_dataset(history_id, content=TEST_INPUT_TEXT, wait=True) - assert len(sa_session.scalars(stmt).all()) == 1 - self.input_hda = sa_session.scalars(stmt).all()[0] - TestJobFilesIntegration.initialized = True + history_id_encoded = self.dataset_populator.new_history() + self.hist_id = self._app.security.decode_id(history_id_encoded) + self.input_hda_dict = self.dataset_populator.new_dataset(history_id_encoded, content=TEST_INPUT_TEXT, wait=True) + sa_session = self.sa_session + stmt = select(model.HistoryDatasetAssociation).where(model.HistoryDatasetAssociation.history_id == self.hist_id) + assert len(sa_session.scalars(stmt).all()) == 1 + self.input_hda = sa_session.scalars(stmt).first() def test_read_by_state(self): job, _, _ = self.create_static_job_with_state("running") @@ -96,9 +112,57 @@ def test_read_fails_if_input_file_purged(self): self.input_hda_dict["history_id"], content_id=self.input_hda_dict["id"], purge=True, wait_for_purge=True ) assert delete_response.status_code == 200 - head_response = requests.get(get_url, params=data) + response = requests.get(get_url, params=data) + assert response.status_code == 400 + assert response.json()["err_msg"] == "Input dataset(s) for job have been purged." + + def test_read_missing_file(self): + job, _, _ = self.create_static_job_with_state("running") + job_id, job_key = self._api_job_keys(job) + data = {"path": self.input_hda.get_file_name() + "_missing", "job_key": job_key} + get_url = self._api_url(f"jobs/{job_id}/files", use_key=True) + + head_response = requests.head(get_url, params=data) + assert head_response.status_code == 404 + + response = requests.get(get_url, params=data) + assert response.status_code == 404 + + def test_read_folder(self): + job, _, _ = self.create_static_job_with_state("running") + job_id, job_key = self._api_job_keys(job) + data = {"path": os.path.dirname(self.input_hda.get_file_name()), "job_key": job_key} + get_url = self._api_url(f"jobs/{job_id}/files", use_key=True) + + head_response = requests.head(get_url, params=data) assert head_response.status_code == 400 - assert head_response.json()["err_msg"] == "Input dataset(s) for job have been purged." + + response = requests.get(get_url, params=data) + assert response.status_code == 400 + + def test_write_no_file(self): + job, output_hda, working_directory = self.create_static_job_with_state("running") + job_id, job_key = self._api_job_keys(job) + path = self._app.object_store.get_filename(output_hda.dataset) + assert path + data = {"path": path, "job_key": job_key} + + post_url = self._api_url(f"jobs/{job_id}/files", use_key=False) + response = requests.post(post_url, data=data) + assert response.status_code == 400 + + def test_propfind(self): + # remove this test when ALL Galaxy endpoints have been migrated to FastAPI; it will then be FastAPI's + # responsibility to return a status code other than 404 + job, output_hda, working_directory = self.create_static_job_with_state("running") + job_id, job_key = self._api_job_keys(job) + path = self._app.object_store.get_filename(output_hda.dataset) + assert path + data = {"path": path, "job_key": job_key} + + propfind_url = self._api_url(f"jobs/{job_id}/files", use_key=False) + response = requests.request("PROPFIND", propfind_url, params=data) + assert response.status_code == 501 def test_write_by_state(self): job, output_hda, working_directory = self.create_static_job_with_state("running") @@ -160,6 +224,101 @@ def test_write_with_tus(self): api_asserts.assert_status_code_is_ok(response) assert open(path).read() == "some initial text data" + def test_write_with_nginx_upload_module(self): + job, output_hda, working_directory = self.create_static_job_with_state("running") + job_id, job_key = self._api_job_keys(job) + path = self._app.object_store.get_filename(output_hda.dataset) + assert path + data = {"path": path, "job_key": job_key} + + file: Optional[IO[bytes]] = None + try: + with open(os.path.join(self._app.config.nginx_upload_job_files_store, "nginx_upload"), "wb") as file: + file.write(b"some initial text data") + + post_url = self._api_url(f"jobs/{job_id}/files", use_key=False) + response = requests.post(post_url, data=dict(**data, __file_path=file.name)) + + api_asserts.assert_status_code_is_ok(response) + assert not os.path.exists(file.name) + assert os.path.exists(path) + with open(path) as uploaded_file: + assert uploaded_file.read() == "some initial text data" + finally: + # remove `file.name` + try: + if file is not None: + os.remove(file.name) + except FileNotFoundError: + pass + + def test_write_with_session_id(self): + job, output_hda, working_directory = self.create_static_job_with_state("running") + job_id, job_key = self._api_job_keys(job) + path = self._app.object_store.get_filename(output_hda.dataset) + assert path + data = {"path": path, "job_key": job_key} + + upload_store = ( + self._app.config.tus_upload_store_job_files + or self._app.config.tus_upload_store + or self._app.config.new_file_path + ) + upload_id = "35a7c8d3-e659-430e-8579-8d085e7e569d" + upload_path = os.path.join(upload_store, "35a7c8d3-e659-430e-8579-8d085e7e569d") + try: + with open(upload_path, "w") as upload_file: + upload_file.write("some initial text data") + + post_url = self._api_url(f"jobs/{job_id}/files", use_key=False) + response = requests.post(post_url, data=dict(**data, session_id=upload_id)) + api_asserts.assert_status_code_is_ok(response) + assert not os.path.exists(upload_path) + assert os.path.exists(path) + with open(path) as uploaded_file: + assert uploaded_file.read() == "some initial text data" + finally: + # remove `upload_path` + try: + os.remove(upload_path) + except FileNotFoundError: + pass + + def test_write_with_underscored_file_param(self): + job, output_hda, working_directory = self.create_static_job_with_state("running") + job_id, job_key = self._api_job_keys(job) + path = self._app.object_store.get_filename(output_hda.dataset) + assert path + data = {"path": path, "job_key": job_key} + + post_url = self._api_url(f"jobs/{job_id}/files", use_key=False) + response = requests.post(post_url, data=data, files={"__file": io.StringIO("some initial text data")}) + api_asserts.assert_status_code_is_ok(response) + assert open(path).read() == "some initial text data" + + def test_write_with_put_request(self): + job, output_hda, working_directory = self.create_static_job_with_state("running") + job_id, job_key = self._api_job_keys(job) + path = self._app.object_store.get_filename(output_hda.dataset) + assert path + data = {"path": path, "job_key": job_key} + + new_file_path = os.path.join(working_directory, "new_file.txt") + put_url = self._api_url(f"jobs/{job_id}/files", use_key=False) + response = requests.put( + put_url, + params={"path": new_file_path, "job_key": job_key}, + data=b"whole contents of the file", + ) + assert response.status_code == 201 + assert open(new_file_path).read() == "whole contents of the file" + + assert os.path.exists(path) + put_url = self._api_url(f"jobs/{job_id}/files", use_key=False) + response = requests.put(put_url, params=data, data=b"contents of a replacement file") + assert response.status_code == 204 + assert open(path).read() == "contents of a replacement file" + def test_write_protection(self): job, _, _ = self.create_static_job_with_state("running") job_id, job_key = self._api_job_keys(job) @@ -178,9 +337,13 @@ def sa_session(self): def create_static_job_with_state(self, state): """Create a job with unknown handler so its state won't change.""" sa_session = self.sa_session - hda = sa_session.scalars(select(model.HistoryDatasetAssociation)).all()[0] + stmt_hda = select(model.HistoryDatasetAssociation).where( + model.HistoryDatasetAssociation.history_id == self.hist_id + ) + hda = sa_session.scalars(stmt_hda).first() assert hda - history = sa_session.scalars(select(model.History)).all()[0] + stmt_history = select(model.History).where(model.History.id == self.hist_id) + history = sa_session.scalars(stmt_history).first() assert history user = sa_session.scalars(select(model.User)).all()[0] assert user diff --git a/test/integration/test_pulsar_arc.py b/test/integration/test_pulsar_arc.py new file mode 100644 index 000000000000..811f8faa93ad --- /dev/null +++ b/test/integration/test_pulsar_arc.py @@ -0,0 +1,283 @@ +"""Integration tests for the Pulsar ARC (Advanced Resource Connector) job runner. + +These tests verify that the ARC endpoint URL and OIDC provider selection logic works correctly when queuing jobs. They +do not test actual job execution, as that is covered by Pulsar's own tests. +""" + +import json +import string +import tempfile +import threading +from functools import lru_cache +from typing import Optional +from unittest.mock import patch + +from sqlalchemy.orm import object_session + +from galaxy.jobs import ( + JobDestination, + JobWrapper, +) +from galaxy.jobs.runners.pulsar import PulsarARCJobRunner +from galaxy.tool_util.verify.interactor import GalaxyInteractorApi +from galaxy_test.base.api import ApiTestInteractor +from galaxy_test.base.api_util import get_admin_api_key +from galaxy_test.base.env import target_url_parts +from galaxy_test.base.populators import DatasetPopulator +from .job_resource_rules.test_pulsar_arc import test_pulsar_arc_set_job_destination as set_job_destination +from .oidc.test_auth_oidc import ( + AbstractTestCases as OIDCAbstractTestCases, + KEYCLOAK_TEST_PASSWORD, + KEYCLOAK_TEST_USERNAME, +) + +JOB_CONFIG_FILE = """ +execution: + default: arc + environments: + arc: + runner: dynamic + url: ${galaxy_url} + type: python + function: test_pulsar_arc + rules_module: integration.job_resource_rules +runners: + arc_runner: + load: galaxy.jobs.runners.pulsar:PulsarARCJobRunner +""" + + +def job_config(template_str: str, **vars_) -> str: + """ + Create a temporary job configuration file from the provided template string. + """ + job_conf_template = string.Template(template_str) + job_conf_str = job_conf_template.substitute(**vars_) + with tempfile.NamedTemporaryFile(suffix="_arc_integration_job_conf.yml", mode="w", delete=False) as job_conf: + job_conf.write(job_conf_str) + return job_conf.name + + +job_wrapper_ref: list[JobWrapper] = [] # keeps a reference to the job wrapper for which a job was last queued +job_wrapper_event = threading.Event() # synchronization event to signal that a job has been queued + + +def set_job_wrapper(job_wrapper: JobWrapper) -> None: + """ + Store a job wrapper reference. + """ + job_wrapper_ref[:] = [job_wrapper] + + +def get_job_wrapper() -> Optional[JobWrapper]: + """ + Return the last stored job wrapper reference (if any). + """ + return job_wrapper_ref[0] if job_wrapper_ref else None + + +def queue_job(self, job_wrapper: JobWrapper) -> None: + """ + Override the `queue_job()` method of the parent class of the Pulsar ARC job runner. + + Fails job queueing and tracks the job wrapper representing the job which should have been queued. Used to test that + the logic overriding the job destination parameters works correctly. + """ + set_job_wrapper(job_wrapper) + job_wrapper_event.set() + raise Exception("Job queueing failed for testing purposes. This is expected.") + + +@patch.object(PulsarARCJobRunner.__mro__[1], "queue_job", new=queue_job) +class TestArcPulsarIntegration(OIDCAbstractTestCases.BaseKeycloakIntegrationTestCase): + """ + Integration test verifying the logic that selects an ARC endpoint URL and an OIDC provider. + """ + + dataset_populator: DatasetPopulator + framework_tool_and_types = True + + _user_api_key: Optional[str] = None + + @classmethod + def handle_galaxy_config_kwds(cls, config): + """ + Inject job configuration file tailored for this test case in the Galaxy configuration. + """ + super().handle_galaxy_config_kwds(config) + host, port, url = target_url_parts() + config["job_config_file"] = job_config(JOB_CONFIG_FILE, galaxy_url=url) + + # login just once, even if the method is called multiple times + @lru_cache(maxsize=1) # noqa: B019 (bounded cache size) + def _login_via_keycloak(self, *args, **kwargs): + """ + Override parent login method to log-in via Keycloak just once and to override the default Galaxy interactor. + + Normally, one would log in within the `setUpClass()` method and call it a day, but since `_login_via_keycloak()` + is not a class method, this workaround is needed. + """ + session, response = super()._login_via_keycloak( + KEYCLOAK_TEST_USERNAME, KEYCLOAK_TEST_PASSWORD, save_cookies=True + ) + api_interactor = GalaxyInteractorApi( + galaxy_url=self.url, + master_api_key=get_admin_api_key(), + test_user="gxyuser@galaxy.org", + # email for `KEYCLOAK_TEST_USERNAME`, defined in test/integration/oidc/galaxy-realm-export.json + ) + self._user_api_key = api_interactor.api_key + return session, response + + def setUp(self): + """ + Log-in via Keycloak (just once), override Galaxy interactor and initialize a dataset populator. + """ + super().setUp() + self._login_via_keycloak(KEYCLOAK_TEST_USERNAME, KEYCLOAK_TEST_PASSWORD, save_cookies=True) # happens just once + self._galaxy_interactor = ApiTestInteractor(self, api_key=self._user_api_key) + self.dataset_populator = DatasetPopulator(self.galaxy_interactor) + self._job_wrapper = None + + def tearDown(self): + self._job_wrapper = None + + def run_job(self) -> Optional[JobWrapper]: + """ + Run a simple job and return the job wrapper. + """ + with self.dataset_populator.test_history() as history_id: + hda = self.dataset_populator.new_dataset(history_id, content="abc") + self.dataset_populator.run_tool( + tool_id="cat", + inputs={ + "input1": {"src": "hda", "id": hda["id"]}, + }, + history_id=history_id, + ) + self.dataset_populator.wait_for_history(history_id, timeout=20) + job_wrapper_event.wait(timeout=1) + job_wrapper = get_job_wrapper() + return job_wrapper + + def test_queue_job_url_and_oidc_provider_selection(self): + """ + Verify that the ARC endpoint URL and OIDC provider selection logic works correctly when queuing jobs. + """ + set_job_destination( + JobDestination( + id="arc", + name="arc", + runner="arc_runner", + ) + ) + job_wrapper = self.run_job() + assert job_wrapper is not None, "No job wrapper created" + assert job_wrapper.job_destination.params["arc_url"] is None, "No ARC URL expected" + assert isinstance( + job_wrapper.job_destination.params["arc_oidc_token"], str + ), f"Unexpected type {type(job_wrapper.job_destination.params['arc_oidc_token'])} for OIDC token" + assert len(job_wrapper.job_destination.params["arc_oidc_token"]) > 0, "Invalid OIDC token" + + set_job_destination( + JobDestination( + id="arc", + name="arc", + runner="arc_runner", + params={ + "arc_url": "https://arc.example.com", + }, + ) + ) + job_wrapper = self.run_job() + assert job_wrapper is not None, "No job wrapper created" + assert job_wrapper.job_destination.params["arc_url"] == "https://arc.example.com", "Unexpected ARC URL" + assert isinstance( + job_wrapper.job_destination.params["arc_oidc_token"], str + ), f"Unexpected type {type(job_wrapper.job_destination.params['arc_oidc_token'])} for OIDC token" + assert len(job_wrapper.job_destination.params["arc_oidc_token"]) > 0, "Invalid OIDC token" + + set_job_destination( + JobDestination( + id="arc", + name="arc", + runner="arc_runner", + params={ + "arc_oidc_provider": "does_not_exist", + }, + ) + ) + job_wrapper = self.run_job() + assert job_wrapper is not None, "No job wrapper created" + assert job_wrapper.job_destination.params["arc_url"] is None, "No ARC URL expected" + assert job_wrapper.job_destination.params["arc_oidc_token"] is None, "No OIDC token expected" + + set_job_destination( + JobDestination( + id="arc", + name="arc", + runner="arc_runner", + params={ + "arc_user_preferences_key": "does_not_exist", + }, + ) + ) + job_wrapper = self.run_job() + assert job_wrapper is not None, "No job wrapper created" + assert job_wrapper.job_destination.params["arc_url"] is None, "No ARC URL expected" + assert isinstance( + job_wrapper.job_destination.params["arc_oidc_token"], str + ), f"Unexpected type {type(job_wrapper.job_destination.params['arc_oidc_token'])} for OIDC token" + assert len(job_wrapper.job_destination.params["arc_oidc_token"]) > 0, "Invalid OIDC token" + + # test parameters defined via extra user preferences + arc_user_preferences_key = "arc_user_preferences" + user = job_wrapper.get_job().user + assert user is not None, "No user associated with job" + extra_user_preferences = user.extra_preferences + extra_user_preferences[f"{arc_user_preferences_key}|arc_url"] = "https://arc-from-user-prefs.example.com" + extra_user_preferences[f"{arc_user_preferences_key}|arc_oidc_provider"] = "keycloak" + user.preferences["extra_user_preferences"] = json.dumps(extra_user_preferences) + session = object_session(user) + assert session is not None, "No database session associated with user" + session.commit() + + set_job_destination( + JobDestination( + id="arc", + name="arc", + runner="arc_runner", + params={ + "arc_user_preferences_key": "does_not_exist", + "arc_url": "https://arc.example.com", + "arc_oidc_provider": "does_not_exist", + }, + ) + ) + job_wrapper = self.run_job() + assert job_wrapper is not None, "No job wrapper created" + assert job_wrapper.job_destination.params["arc_url"] == "https://arc.example.com", "Unexpected ARC URL" + assert job_wrapper.job_destination.params["arc_oidc_token"] is None, "No OIDC token expected" + + set_job_destination( + JobDestination( + id="arc", + name="arc", + runner="arc_runner", + params={ + "arc_user_preferences_key": arc_user_preferences_key, + "arc_url": "https://arc.example.com", + "arc_oidc_provider": "does_not_exist", + }, + ) + ) + job_wrapper = self.run_job() + assert job_wrapper is not None, "No job wrapper created" + assert ( + job_wrapper.job_destination.params["arc_url"] + == extra_user_preferences[f"{arc_user_preferences_key}|arc_url"] + ), "Unexpected ARC URL" + assert isinstance( + job_wrapper.job_destination.params["arc_oidc_token"], str + ), f"Unexpected type {type(job_wrapper.job_destination.params['arc_oidc_token'])} for OIDC token" + assert len(job_wrapper.job_destination.params["arc_oidc_token"]) > 0, "Invalid OIDC token"