diff --git a/dev-requirements.txt b/dev-requirements.txt index b781363d..e209d613 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -31,6 +31,7 @@ types-pycurl types-requests types-psutil sentry-sdk +types-google-cloud-ndb # For release build diff --git a/docs/containers.rst b/docs/containers.rst index 01d639df..570163ba 100644 --- a/docs/containers.rst +++ b/docs/containers.rst @@ -132,6 +132,164 @@ GA4GH TES GA4GH TES job execution with Conda dependencies for the tool and no message queue. +A Galaxy job configuration (job_conf.yml) for using TES with Pulsar and RabbitMQ might look like: + +:: + + runners: + local: + load: galaxy.jobs.runners.local:LocalJobRunner + pulsar_tes: + load: galaxy.jobs.runners.pulsar:PulsarTesJobRunner + # RabbitMQ URL from Galaxy server. + amqp_url: + # If Pulsar needs to talk to Galaxy at a particular host and port, set that here. + #galaxy_url: + + execution: + default: pulsar_tes + environments: + local: + runner: local + local_slots: 1 + pulsar_tes: + runner: pulsar_tes + # TES URL to use. + tes_url: + pulsar_app_config: + # This needs to be the RabbitMQ server, but this should be the host + # and port that your TES nodes would connect to the server via. + message_queue_url: + + tools: + - class: local + environment: local + +For testing on a Macbook with RabbitMQ installed via homebrew and Docker Desktop available +and a Funnel with default configuration server running locally, a configuration might look like: + +:: + + runners: + local: + load: galaxy.jobs.runners.local:LocalJobRunner + pulsar_tes: + load: galaxy.jobs.runners.pulsar:PulsarTesJobRunner + # RabbitMQ URL from Galaxy server. + amqp_url: amqp://guest:guest@localhost:5672// + # Communicate to Pulsar nodes that Galaxy should be accessed on the Docker + # host - the Macbook. + galaxy_url: http://host.docker.internal:8080/ + + execution: + default: pulsar_tes + environments: + local: + runner: local + local_slots: 1 + pulsar_tes: + runner: pulsar_tes + # Funnel will run on 8000 by default. + tes_url: http://localhost:8000 + pulsar_app_config: + message_queue_url: amqp://guest:guest@host.docker.internal:5672// + + tools: + - class: local + environment: local + + +Google Cloud Platform Batch +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. figure:: pulsar_gcp_coexecution_deployment.plantuml.svg + + GA4GH TES job execution with a biocontainer for the tool and no message queue. + +.. figure:: pulsar_gcp_deployment.plantuml.svg + + GA4GH TES job execution with Conda dependencies for the tool and no message queue. + +Pulsar job destination options to configure these scenarios: + +.. figure:: job_destination_parameters_gcp.png + +A Galaxy job configuration (job_conf.yml) for using GCP with Pulsar and RabbitMQ might look like: + +:: + + runners: + local: + load: galaxy.jobs.runners.local:LocalJobRunner + pulsar_gcp: + load: galaxy.jobs.runners.pulsar:PulsarGcpBatchJobRunner + # RabbitMQ URL from Galaxy server. + amqp_url: + # If Pulsar needs to talk to Galaxy at a particular host and port, set that here. + #galaxy_url: + + execution: + default: pulsar_gcp + environments: + local: + runner: local + local_slots: 1 + pulsar_gcp: + runner: pulsar_gcp + # GCP Project ID to use (required) + project_id: project-id-here + # GCP region or zone to use (optional) + #region: us-central1 + # Max walltime to use in seconds (defaults to 60 * 60 * 24) + #walltime_limit: 216000 + # GCP Credentials setup. + #credentials_file: ~/.config/gcloud/application_default_credentials.json + pulsar_app_config: + # RabbitMQ URL the execute nodes should use to connect to the AMQP server. + message_queue_url: + + tools: + - class: local + environment: local + +For testing these configurations - John setup a production-ish RabbitMQ server on +173.255.213.165 with user `john` and password `password` that is accessible from +anywhere. John also opened the router ports to expose their Macbook and set Galaxy +to bind to ``0.0.0.0`` using the `bind` option in the `gunicorn` section of `galaxy.yml`. + +The job configuration for this test setup looked something like: + +:: + + runners: + local: + load: galaxy.jobs.runners.local:LocalJobRunner + pulsar_gcp: + load: galaxy.jobs.runners.pulsar:PulsarGcpBatchJobRunner + amqp_url: "amqp://john:password@173.255.213.165/" + # If Pulsar needs to talk to Galaxy at a particular host and port, set that here. + galaxy_url: http://71.162.7.202:8080/ + + execution: + default: pulsar_gcp + environments: + local: + runner: local + local_slots: 1 + pulsar_gcp: + runner: pulsar_gcp + project_id: tonal-bloom-123435 + region: us-central1 + walltime_limit: 216000 + pulsar_app_config: + # RabbitMQ URL the execute nodes should use to connect to the AMQP server. + message_queue_url: "amqp://john:password@173.255.213.165/" + + tools: + - class: local + environment: local + + AWS Batch ~~~~~~~~~~ diff --git a/docs/galaxy_conf.rst b/docs/galaxy_conf.rst index 19af161e..be6026f8 100644 --- a/docs/galaxy_conf.rst +++ b/docs/galaxy_conf.rst @@ -90,10 +90,11 @@ making use of the HTTP transport method: .. literalinclude:: files/job_conf_sample_mq_rsync.xml :language: xml -Targeting Apache Mesos (Prototype) -`````````````````````````````````` +Targeting GCP Batch, Kubernetes, or TES +``````````````````````````````````````` -See `commit message `_ for initial work on this and `this post on galaxy-dev `_. +Check out :ref:`containers` for information on using Pulsar with these +container-native execution environments. Generating Galaxy Metadata in Pulsar Jobs ````````````````````````````````````````` diff --git a/docs/gen_erd_diagrams.py b/docs/gen_erd_diagrams.py new file mode 100644 index 00000000..56d120a5 --- /dev/null +++ b/docs/gen_erd_diagrams.py @@ -0,0 +1,16 @@ +import os +import sys + +import erdantic as erd + +sys.path.insert(1, os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))) + +from pulsar.client.container_job_config import GcpJobParams + +DOC_SOURCE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__))) +class_to_diagram = { + GcpJobParams: "job_destination_parameters_gcp", +} + +for clazz, diagram_name in class_to_diagram.items(): + erd.draw(clazz, out=f"{DOC_SOURCE_DIR}/{diagram_name}.png") \ No newline at end of file diff --git a/docs/job_destination_parameters_gcp.png b/docs/job_destination_parameters_gcp.png new file mode 100644 index 00000000..6f4b4e90 Binary files /dev/null and b/docs/job_destination_parameters_gcp.png differ diff --git a/docs/job_managers.rst b/docs/job_managers.rst index e243a5ae..f24ef224 100644 --- a/docs/job_managers.rst +++ b/docs/job_managers.rst @@ -158,7 +158,7 @@ the Galaxy mailing list. More Options ------------------------------- -Any manager can override the ``staging_directory`` used by setting this +Most managers can override the ``staging_directory`` used by setting this property in its configuration section. The ``min_polling_interval: 0.5`` option can be set on any manager to control diff --git a/docs/pulsar_gcp_coexecution_deployment.plantuml.svg b/docs/pulsar_gcp_coexecution_deployment.plantuml.svg new file mode 100644 index 00000000..9955e6c0 --- /dev/null +++ b/docs/pulsar_gcp_coexecution_deployment.plantuml.svg @@ -0,0 +1,170 @@ +Google Cloud PlatformJobgalaxyObject StorePulsar Tasks run with a batch_v1.AllocationPolicy.Disk of type "local-ssd"that is used for Pulsar's staging directory and shared across pulsar stagings andtool/biocontainer containers.Google Batch v1 APIpulsar Taskbiocontainer TaskDisk is unrestricted and doesnot need to be shared betweenPulsar and Galaxy.create, delete, get (for status)[manages]stage in and out \ No newline at end of file diff --git a/docs/pulsar_gcp_coexecution_deployment.plantuml.txt b/docs/pulsar_gcp_coexecution_deployment.plantuml.txt new file mode 100644 index 00000000..d892ffde --- /dev/null +++ b/docs/pulsar_gcp_coexecution_deployment.plantuml.txt @@ -0,0 +1,48 @@ +@startuml + +!include plantuml_options.txt + +component galaxy as "galaxy" { + +} + +storage disk as "Object Store" { + +} + +note as disknote + Disk is unrestricted and does + not need to be shared between + Pulsar and Galaxy. +end note + +disk ... disknote + +cloud cluster as "Google Cloud Platform" { + queue api as "Google Batch v1 API" { + + } + + frame pod as "Job" { + + component staging as "pulsar Task" { + } + + component tool as "biocontainer Task" { + } + + } + + note as stagingnote + Pulsar Tasks run with a batch_v1.AllocationPolicy.Disk of type "local-ssd" + that is used for Pulsar's staging directory and shared across pulsar stagings and + tool/biocontainer containers. + end note + pod ... stagingnote +} + +galaxy --> disk +galaxy --> api : create, delete, get (for status) +api -[dashed]-> pod : [manages] +staging --> galaxy : stage in and out +@enduml diff --git a/docs/pulsar_gcp_deployment.plantuml.svg b/docs/pulsar_gcp_deployment.plantuml.svg new file mode 100644 index 00000000..96f1297d --- /dev/null +++ b/docs/pulsar_gcp_deployment.plantuml.svg @@ -0,0 +1,150 @@ +Google Cloud PlatformJobgalaxyObject StoreGoogle Batch v1 APIpulsar+conda TaskDisk is unrestricted and doesnot need to be shared betweenPulsar and Galaxy.create, delete, get (for status)[manages]stage in and out \ No newline at end of file diff --git a/docs/pulsar_gcp_deployment.plantuml.txt b/docs/pulsar_gcp_deployment.plantuml.txt new file mode 100644 index 00000000..54cfe75b --- /dev/null +++ b/docs/pulsar_gcp_deployment.plantuml.txt @@ -0,0 +1,39 @@ +@startuml + +!include plantuml_options.txt + +component galaxy as "galaxy" { + +} + +storage disk as "Object Store" { + +} + +note as disknote + Disk is unrestricted and does + not need to be shared between + Pulsar and Galaxy. +end note + +disk ... disknote + +cloud cluster as "Google Cloud Platform" { + queue api as "Google Batch v1 API" { + + } + + frame pod as "Job" { + + component staging as "pulsar+conda Task" { + } + + } + +} + +galaxy --> disk +galaxy --> api : create, delete, get (for status) +api -[dashed]-> pod : [manages] +staging --> galaxy : stage in and out +@enduml diff --git a/pulsar/client/client.py b/pulsar/client/client.py index f480fdf3..0dcc785b 100644 --- a/pulsar/client/client.py +++ b/pulsar/client/client.py @@ -3,10 +3,9 @@ from enum import Enum from typing import ( Any, + cast, Callable, Dict, - List, - NamedTuple, Optional, ) from typing_extensions import Protocol @@ -17,9 +16,7 @@ TesExecutor, TesState, TesTask, - tes_client_from_dict, tes_galaxy_instance_id, - tes_resources, ) from pulsar.managers.util.pykube_util import ( ensure_pykube, @@ -33,6 +30,25 @@ pykube_client_from_dict, stop_job, ) +from pulsar.managers.util.gcp_util import ( + batch_v1, + delete_gcp_job, + ensure_client as ensure_gcp_client, + gcp_client, + get_gcp_job, +) +from pulsar.client.container_job_config import ( + CoexecutionContainerCommand, + container_command_to_gcp_runnable, + gcp_galaxy_instance_id, + gcp_job_request, + gcp_job_template, + parse_gcp_job_params, + parse_tes_job_params, + tes_client_from_params, + tes_resources, +) + from pulsar.managers import status as manager_status from .action_mapper import ( actions, @@ -341,6 +357,10 @@ class BaseRemoteConfiguredJobClient(BaseJobClient): client_manager: ClientManagerProtocol def __init__(self, destination_params, job_id, client_manager): + if "job_directory" not in destination_params: + default_staging_directory = self.default_staging_directory(destination_params) + if default_staging_directory: + destination_params["jobs_directory"] = default_staging_directory super().__init__(destination_params, job_id) if not self.job_directory: error_message = "Message-queue based Pulsar client requires destination define a remote job_directory to stage files into." @@ -374,6 +394,9 @@ def _build_setup_message(self, command_line, dependencies_description, env, remo launch_params["setup_params"] = setup_params return launch_params + def default_staging_directory(self, destination_params): + return None + def get_pulsar_app_config( self, pulsar_app_config, @@ -394,7 +417,7 @@ def get_pulsar_app_config( if ( "staging_directory" not in manager_config and "staging_directory" not in pulsar_app_config ): - pulsar_app_config["staging_directory"] = CONTAINER_STAGING_DIRECTORY + pulsar_app_config["staging_directory"] = self.default_staging_directory(self.destination_params) if self.amqp_key_prefix: pulsar_app_config["amqp_key_prefix"] = self.amqp_key_prefix @@ -545,14 +568,6 @@ def kill(self): pass -class CoexecutionContainerCommand(NamedTuple): - image: str - command: str - args: List[str] - working_directory: str - ports: Optional[List[int]] = None - - class ExecutionType(str, Enum): # containers run one after each other with similar configuration # like in TES or AWS Batch @@ -565,6 +580,9 @@ class CoexecutionLaunchMixin(BaseRemoteConfiguredJobClient): execution_type: ExecutionType pulsar_container_image: str + def default_staging_directory(self, destination_params): + return CONTAINER_STAGING_DIRECTORY + def launch( self, command_line, @@ -725,6 +743,9 @@ class LaunchesTesContainersMixin(CoexecutionLaunchMixin): ensure_library_available = ensure_tes_client execution_type = ExecutionType.SEQUENTIAL + def default_staging_directory(self, destination_params): + return CONTAINER_STAGING_DIRECTORY + def _launch_containers( self, pulsar_submit_container: CoexecutionContainerCommand, @@ -749,7 +770,7 @@ def _launch_containers( name=name, executors=executors, volumes=volumes, - resources=tes_resources(self.destination_params) + resources=tes_resources(self._tes_job_params) ) created_task = self._tes_client.create_task(tes_task) return ExternalId(created_task.id) @@ -765,7 +786,7 @@ def _container_to_executor(self, container: CoexecutionContainerCommand) -> TesE @property def _tes_client(self) -> TesClient: - return tes_client_from_dict(self.destination_params) + return tes_client_from_params(self._tes_job_params) @property def _tes_job_name(self): @@ -791,6 +812,11 @@ def raw_check_complete(self) -> Dict[str, Any]: "complete": "true" if tes_state_is_complete(tes_state) else "false", # Ancient John, what were you thinking? } + @property + def _tes_job_params(self): + tes_job_params = parse_tes_job_params(self.destination_params) + return tes_job_params + class TesPollingCoexecutionJobClient(BasePollingCoexecutionJobClient, LaunchesTesContainersMixin): """A client that co-executes pods via GA4GH TES and depends on amqp for status updates.""" @@ -999,6 +1025,141 @@ def _raw_check_complete(self): } +class LaunchesGcpContainersMixin(CoexecutionLaunchMixin): + ensure_library_available = ensure_gcp_client + # https://cloud.google.com/php/docs/reference/cloud-batch/latest/V1.Runnable.Barrier + # can we do barriers here to allow sequential? It would allow separate containers for startup + # and shutdown that don't run parallel to the job? + execution_type = ExecutionType.PARALLEL + + def default_staging_directory(self, destination_params): + ssd_name = destination_params.get("ssd_name", "pulsar_staging") + return f"/mnt/disks/{ssd_name}" + + def _setup_gcp_batch_client_properties(self, destination_params): + self.instance_id = gcp_galaxy_instance_id(destination_params) + + def _launch_containers( + self, + pulsar_submit_container: CoexecutionContainerCommand, + tool_container: Optional[CoexecutionContainerCommand], + pulsar_finish_container: Optional[CoexecutionContainerCommand] + ) -> None: + assert pulsar_finish_container is None + gcp_job_params = self._gcp_job_params + job = gcp_job_template(gcp_job_params) + runnable = container_command_to_gcp_runnable("pulsar-container", pulsar_submit_container) + job.task_groups[0].task_spec.runnables.append(runnable) + + if tool_container: + tool_runnable = container_command_to_gcp_runnable("tool-container", tool_container) + job.task_groups[0].task_spec.runnables.append(tool_runnable) + + job_name = self._job_name + create_request = gcp_job_request(gcp_job_params, job, job_name) + client = gcp_client(gcp_job_params.credentials_file) + job = client.create_job(create_request) + + @property + def _job_name(self): + # currently just _k8s_job_prefix... which might be fine? + job_id = self.job_id + job_name = produce_unique_k8s_job_name(app_prefix="pulsar", job_id=job_id, instance_id=self.instance_id) + return job_name + + @property + def _gcp_job_params(self): + gcp_job_params = parse_gcp_job_params(self.destination_params) + return gcp_job_params + + +class GcpMessageCoexecutionJobClient(BaseMessageCoexecutionJobClient, LaunchesGcpContainersMixin): + """A client that co-executes pods via GCP and depends on amqp for status updates.""" + + def __init__(self, destination_params, job_id, client_manager): + super().__init__(destination_params, job_id, client_manager) + self._setup_gcp_batch_client_properties(destination_params) + + +class GcpPollingCoexecutionJobClient(BasePollingCoexecutionJobClient, LaunchesGcpContainersMixin): + """A client that co-executes pods via GCP and doesn't depend on amqp.""" + + def __init__(self, destination_params, job_id, client_manager): + super().__init__(destination_params, job_id, client_manager) + self._setup_gcp_batch_client_properties(destination_params) + + def kill(self): + gcp_job_params = self._gcp_job_params + delete_gcp_job(gcp_job_params.project_id, gcp_job_params.region, self._job_name, gcp_job_params.credentials_file) + + def clean(self): + pass + + def full_status(self): + status = self.raw_check_complete() + return status + + def raw_check_complete(self) -> Dict[str, Any]: + gcp_job_params = self._gcp_job_params + job = get_gcp_job(gcp_job_params.project_id, gcp_job_params.region, self._job_name, gcp_job_params.credentials_file) + status = job.status + state = status.state + return { + "status": gcp_state_to_pulsar_status(state), + "complete": "true" if gcp_state_is_complete(state) else "false", # Ancient John, what were you thinking? + } + + +def gcp_state_to_pulsar_status(state: Optional["batch_v1.JobStatus.State"]) -> str: + state = state or cast(batch_v1.JobStatus.State, batch_v1.JobStatus.State.STATE_UNSPECIFIED) + # STATE_UNSPECIFIED Job state unspecified. + # QUEUED Job is admitted (validated and persisted) and waiting for resources. + # SCHEDULED Job is scheduled to run as soon as resource allocation is ready. The resource + # allocation may happen at a later time but with a high chance to succeed. + # RUNNING Resource allocation has been successful. At least one Task in the Job is RUNNING. + # SUCCEEDED All Tasks in the Job have finished successfully. + # FAILED At least one Task in the Job has failed. + # DELETION_IN_PROGRESS The Job will be deleted, but has not been deleted yet. Typically this is because resources + # used by the Job are still being cleaned up. + # CANCELLATION_IN_PROGRESS The Job cancellation is in progress, this is because the resources used by the Job are + # still being cleaned up. + # CANCELLED The Job has been cancelled, the task executions were stopped and the resources were cleaned up. + state_map = { + batch_v1.JobStatus.State.STATE_UNSPECIFIED: manager_status.FAILED, + batch_v1.JobStatus.State.QUEUED: manager_status.PREPROCESSING, + batch_v1.JobStatus.State.RUNNING: manager_status.RUNNING, + batch_v1.JobStatus.State.SCHEDULED: manager_status.COMPLETE, + batch_v1.JobStatus.State.FAILED: manager_status.FAILED, + batch_v1.JobStatus.State.DELETION_IN_PROGRESS: manager_status.FAILED, + batch_v1.JobStatus.State.CANCELLATION_IN_PROGRESS: manager_status.CANCELLED, + batch_v1.JobStatus.State.CANCELLED: manager_status.CANCELLED, + } + if state not in state_map: + log.warning(f"Unknown tes state encountered [{state}]") + return manager_status.FAILED + else: + return state_map[state] + + +def gcp_state_is_complete(state: Optional["batch_v1.JobStatus.State"]) -> bool: + state = state or cast(batch_v1.JobStatus.State, batch_v1.JobStatus.State.STATE_UNSPECIFIED) + state_map = { + batch_v1.JobStatus.State.STATE_UNSPECIFIED: True, + batch_v1.JobStatus.State.QUEUED: False, + batch_v1.JobStatus.State.RUNNING: False, + batch_v1.JobStatus.State.SCHEDULED: True, + batch_v1.JobStatus.State.FAILED: True, + batch_v1.JobStatus.State.DELETION_IN_PROGRESS: True, + batch_v1.JobStatus.State.CANCELLATION_IN_PROGRESS: True, + batch_v1.JobStatus.State.CANCELLED: True, + } + if state not in state_map: + log.warning(f"Unknown gcp state encountered [{state}]") + return True + else: + return state_map[state] + + class LaunchesAwsBatchContainersMixin(CoexecutionLaunchMixin): """...""" execution_type = ExecutionType.SEQUENTIAL diff --git a/pulsar/client/container_job_config.py b/pulsar/client/container_job_config.py new file mode 100644 index 00000000..2f63affa --- /dev/null +++ b/pulsar/client/container_job_config.py @@ -0,0 +1,292 @@ +"""Setup config objects for Pulsar client container jobs. + +In a traditional batch Pulsar setup, job configuration is configured per destination +by configuring the manager the Pulsar client connects to. In a container job setup, +there is no Pulsar server component and the Pulsar client is responsible for configuring +the job configuration. This module provides the necessary configuration objects and +documents how to map Galaxy job environment configuration objects to the container scheduling +infrastructure. +""" +import base64 +import re +from typing import ( + Dict, + List, + NamedTuple, + Optional, +) + +from galaxy.util import listify +from pydantic import ( + BaseModel, + Field, +) +from pydantictes.models import TesResources +from typing_extensions import Literal + +from pulsar.managers.util.gcp_util import ( + batch_v1, + ensure_client as ensure_gcp_client, +) +from pulsar.managers.util.tes import TesClient + + +DEFAULT_GCP_WALLTIME_LIMIT = 60 * 60 * 24 # Default wall time limit in seconds + + +class CoexecutionContainerCommand(NamedTuple): + image: str + command: str + args: List[str] + working_directory: str + ports: Optional[List[int]] = None + + +def attribute_docs(gcp_class_name: str, attribute: str) -> Optional[str]: + """ + Extracts the documentation string for a given attribute from a class docstring. + + Args: + cls: The class object containing the docstring. + attr_name: The attribute name to extract documentation for. + + Returns: + A string containing the attribute's documentation, or None if not found. + """ + gcp_class = getattr(batch_v1, gcp_class_name, None) + if not gcp_class: + return None + + doc = gcp_class.__doc__ + if not doc: + return None + + lines = doc.expandtabs().splitlines() + inside_attributes = False + current_attr = None + current_lines: List[str] = [] + attr_docs = {} + + attr_pattern = re.compile(r" (\w*).*:.*") + for line in lines: + stripped = line.strip() + + if not inside_attributes: + if stripped == "Attributes:": + inside_attributes = True + continue + + if inside_attributes: + if line.startswith(" ") and not line.startswith(" "): # attr line + match = attr_pattern.match(line) + if match: + if current_attr: + # Save previous attribute + attr_docs[current_attr] = "\n".join(current_lines).strip() + current_lines = [] + current_attr = match.group(1) + else: + continue + elif line.startswith(" ") and current_attr: + current_lines.append(stripped) + + if current_attr and current_lines: + attr_docs[current_attr] = "\n".join(current_lines).strip() + + docs = attr_docs.get(attribute) + if docs: + return f"Docs from {gcp_class_name}.{attribute}:\n{docs}" + else: + return None + + +class GcpJobParams(BaseModel): + project_id: str = Field( + ..., description="GCP project ID to use for job creation." + ) + credentials_file: Optional[str] = Field( + None, description="Path to GCP service account credentials file." + ) + region: str = Field( + "us-central1", description="GCP region where the job will run." + ) + walltime_limit: int = Field( + DEFAULT_GCP_WALLTIME_LIMIT, + description=f"Maximum wall time for the job in seconds. Maps to TaskSpec.max_run_duration.\n{attribute_docs('TaskSpec', 'max_run_duration')}" + ) + retry_count: int = Field( + 2, description=f"Maximum number of retries for the job. Maps to TaskSpec.max_retry_count.\n{attribute_docs('TaskSpec', 'max_retry_count')}" + ) + ssd_name: Optional[str] = Field( + None, description=f"Name of the SSD volume to be mounted in the task. Maps to Volume.device_name.\n{attribute_docs('Volume', 'device_name')}" + ) + disk_size: int = Field( + 375, description="Size of the shared local SSD disk in GB (must be a multiple of 375). Maps to AllocationPolicy.Disk.size_gb." + ) + machine_type: str = Field( + "n1-standard-1", description="Machine type for the job's VM." + ) + labels: Optional[Dict[str, str]] = Field(None) + + +def parse_gcp_job_params(params: dict) -> GcpJobParams: + """ + Parse GCP job parameters from a dictionary (e.g., Galaxy's job destination/environment params). + """ + return GcpJobParams(**params) + + +def gcp_job_template(params: GcpJobParams) -> "batch_v1.Job": + ensure_gcp_client() + + # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/batch/create/create_with_ssd.py + task = batch_v1.TaskSpec() + task.max_retry_count = params.retry_count + task.max_run_duration = f"{params.walltime_limit}s" + + ssd_name = params.ssd_name or "pulsar_staging" + + volume = batch_v1.Volume() + volume.device_name = ssd_name + mount_path = f"/mnt/disks/{ssd_name}" + volume.mount_path = mount_path + task.volumes = [volume] + + # override the staging directory since we cannot set the location of this mount path + # the way we can in K8S based on @jmchilton's initial testing. + environment = batch_v1.Environment( + variables={ + "PULSAR_CONFIG_OVERRIDE_STAGING_DIRECTORY": mount_path, + } + ) + task.environment = environment + + # Tasks are grouped inside a job using TaskGroups. + # Currently, it's possible to have only one task group. + group = batch_v1.TaskGroup() + group.task_count = 1 + group.task_spec = task + + disk = batch_v1.AllocationPolicy.Disk() + disk.type_ = "local-ssd" + # The size of all the local SSDs in GB. Each local SSD is 375 GB, + # so this value must be a multiple of 375 GB. + # For example, for 2 local SSDs, set this value to 750 GB. + disk.size_gb = params.disk_size + assert disk.size_gb % 375 == 0 + + # Policies are used to define on what kind of virtual machines the tasks will run on. + # The allowed number of local SSDs depends on the machine type for your job's VMs. + # In this case, we tell the system to use "n1-standard-1" machine type, which require to attach local ssd manually. + # Read more about local disks here: https://cloud.google.com/compute/docs/disks/local-ssd#lssd_disk_options + policy = batch_v1.AllocationPolicy.InstancePolicy() + policy.machine_type = params.machine_type + + attached_disk = batch_v1.AllocationPolicy.AttachedDisk() + attached_disk.new_disk = disk + attached_disk.device_name = ssd_name + policy.disks = [attached_disk] + + instances = batch_v1.AllocationPolicy.InstancePolicyOrTemplate() + instances.policy = policy + + allocation_policy = batch_v1.AllocationPolicy() + allocation_policy.instances = [instances] + + job = batch_v1.Job() + job.task_groups = [group] + job.allocation_policy = allocation_policy + job.labels = params.labels or {} + # We use Cloud Logging as it's an out of the box available option + job.logs_policy = batch_v1.LogsPolicy() + job.logs_policy.destination = batch_v1.LogsPolicy.Destination.CLOUD_LOGGING # type: ignore[assignment] + + return job + + +def gcp_job_request(params: GcpJobParams, job: "batch_v1.Job", job_name: str) -> "batch_v1.CreateJobRequest": + create_request = batch_v1.CreateJobRequest() + create_request.job = job + create_request.job_id = job_name + region = params.region + project_id = params.project_id + create_request.parent = f"projects/{project_id}/locations/{region}" + return create_request + + +def container_command_to_gcp_runnable(name: str, container: CoexecutionContainerCommand) -> "batch_v1.Runnable": + runnable = batch_v1.Runnable() + runnable.container = batch_v1.Runnable.Container() + runnable.container.image_uri = container.image + runnable.container.commands = [container.command] + container.args + # ports not supported currently + return runnable + + +def gcp_galaxy_instance_id(destination_params: Dict[str, str]) -> Optional[str]: + return destination_params.get("galaxy_instance_id") + + +class BasicAuth(BaseModel): + username: str = Field(..., description="Username for basic authentication.") + password: str = Field(..., description="Password for basic authentication.") + + +class TesJobParams(TesResources): + tes_url: str = Field(..., description="URL of the TES service.") + authorization: Literal["none", "basic"] = Field( + "none", description="Authorization type for TES service." + ) + basic_auth: Optional[BasicAuth] = Field(None, description="Authorization for TES service.") + + +def parse_tes_job_params(params: dict) -> TesJobParams: + """ + Parse GCP job parameters parameters from a dictionary (e.g., Galaxy's job destination/environment params). + """ + legacy_style_keys = { + "tes_cpu_cores": "cpu_cores", + "tes_preemptible": "preemptible", + "tes_ram_gb": "ram_gb", + "tes_disk_gb": "disk_gb", + "tes_zones": "zones", + "tes_backend_parameters": "backend_parameters", + "tes_backend_parameters_strict": "backend_parameters_strict", + "tes_galaxy_instance_id": "galaxy_instance_id", + } + expanded_params = {} + for key, value in params.items(): + if key in legacy_style_keys: + new_key = legacy_style_keys[key] + expanded_params[new_key] = value + else: + expanded_params[key] = value + + if "zones" in expanded_params: + expanded_params["zones"] = listify(expanded_params["zones"]) + + return TesJobParams(**expanded_params) + + +def tes_client_from_params(tes_params: TesJobParams) -> TesClient: + tes_url = tes_params.tes_url + assert tes_url + auth_type = tes_params.authorization # Default to "none" + + headers = {} + + if auth_type == "basic": + basic_auth = tes_params.basic_auth + username = basic_auth.username if basic_auth else None + password = basic_auth.password if basic_auth else None + if username and password: + auth_string = f"{username}:{password}" + auth_base64 = base64.b64encode(auth_string.encode()).decode() + headers["Authorization"] = f"Basic {auth_base64}" + + return TesClient(url=tes_url, headers=headers) + + +def tes_resources(tes_params: TesJobParams) -> TesResources: + # TesJobParams subclasses it so just pass through as is. + return tes_params diff --git a/pulsar/client/manager.py b/pulsar/client/manager.py index 9c429734..2b239212 100644 --- a/pulsar/client/manager.py +++ b/pulsar/client/manager.py @@ -23,6 +23,8 @@ BaseJobClient, InputCachingJobClient, JobClient, + GcpMessageCoexecutionJobClient, + GcpPollingCoexecutionJobClient, K8sMessageCoexecutionJobClient, K8sPollingCoexecutionJobClient, MessageCLIJobClient, @@ -240,6 +242,8 @@ def get_client(self, destination_params, job_id, **kwargs): return K8sMessageCoexecutionJobClient(destination_params, job_id, self) elif destination_params.get("tes_url", False): return TesMessageCoexecutionJobClient(destination_params, job_id, self) + elif destination_params.get("project_id", False): + return GcpMessageCoexecutionJobClient(destination_params, job_id, self) else: return MessageJobClient(destination_params, job_id, self) @@ -256,6 +260,8 @@ def get_client(self, destination_params, job_id, **kwargs): return K8sPollingCoexecutionJobClient(destination_params, job_id, self) elif destination_params.get("tes_url", False): return TesPollingCoexecutionJobClient(destination_params, job_id, self) + elif destination_params.get("project_id", False): + return GcpPollingCoexecutionJobClient(destination_params, job_id, self) else: raise Exception("Unknown client configuration") @@ -268,7 +274,7 @@ def build_client_manager(**kwargs: Dict[str, Any]) -> ClientManagerInterface: return ClientManager(**kwargs) # TODO: Consider more separation here. elif kwargs.get('amqp_url', None): return MessageQueueClientManager(**kwargs) - elif kwargs.get("k8s_enabled") or kwargs.get("tes_url"): + elif kwargs.get("k8s_enabled") or kwargs.get("tes_enabled") or kwargs.get("gcp_batch_enabled"): return PollingJobClientManager(**kwargs) else: return ClientManager(**kwargs) diff --git a/pulsar/managers/util/gcp_util.py b/pulsar/managers/util/gcp_util.py new file mode 100644 index 00000000..cfba0b74 --- /dev/null +++ b/pulsar/managers/util/gcp_util.py @@ -0,0 +1,80 @@ +import logging +from typing import ( + Any, + Optional, +) + +try: + from google.cloud import batch_v1 # type: ignore + from google.oauth2 import service_account # type: ignore +except ImportError as exc: + service_account = None # type: ignore[assignment] + batch_v1 = None # type: ignore[assignment] + GCP_IMPORT_MESSAGE = ( + "The Python google-cloud-batch package is required to use " + "this feature, please install it or correct the " + f"following error:\nImportError {exc}" + ) + +log = logging.getLogger(__name__) + + +def ensure_client(): + if batch_v1 is None: + raise Exception(GCP_IMPORT_MESSAGE) + + +def gcp_client(credentials_file: Optional[str]) -> "batch_v1.BatchServiceClient": + if credentials_file: + credentials = service_account.Credentials.from_service_account_file(credentials_file) + client = batch_v1.BatchServiceClient(credentials=credentials) + else: + client = batch_v1.BatchServiceClient() + return client + + +def get_gcp_job( + project_id: str, + region: str, + job_name: str, + credentials_file: Optional[str] = None, +) -> "batch_v1.Job": + """ + Retrieve a GCP Batch job by its name. + + Args: + project_id: GCP project ID. + region: GCP region where the job is located. + job_name: Name of the job to retrieve. + credentials_file: Path to GCP service account credentials file (optional). + + Returns: + The GCP Batch job object. + """ + ensure_client() + client = gcp_client(credentials_file) + return client.get_job( + name=f"projects/{project_id}/locations/{region}/jobs/{job_name}" + ) + + +def delete_gcp_job( + project_id: str, + region: str, + job_name: str, + credentials_file: Optional[str] = None, +) -> Any: + ensure_client() + client = gcp_client(credentials_file) + return client.delete_job( + name=f"projects/{project_id}/locations/{region}/jobs/{job_name}" + ) + + +__all__ = ( + "ensure_client", + "gcp_client", + "batch_v1", + "get_gcp_job", + "delete_gcp_job", +) diff --git a/pulsar/managers/util/tes.py b/pulsar/managers/util/tes.py index 8b8989f5..8bdb3993 100644 --- a/pulsar/managers/util/tes.py +++ b/pulsar/managers/util/tes.py @@ -1,16 +1,9 @@ -import base64 from typing import ( Any, - cast, Dict, - List, Optional, ) -from galaxy.util import ( - asbool, - listify, -) IMPORT_MESSAGE = None try: @@ -40,73 +33,8 @@ def ensure_tes_client() -> None: raise Exception(IMPORT_MESSAGE) -def tes_client_from_dict(destination_params: Dict[str, Any]) -> TesClient: - tes_url = destination_params.get("tes_url") - auth_type = destination_params.get("authorization", "none") # Default to "none" - - headers = {} - - if auth_type == "basic": - basic_auth = destination_params.get("basic_auth", {}) - username = basic_auth.get("username") - password = basic_auth.get("password") - if username and password: - auth_string = f"{username}:{password}" - auth_base64 = base64.b64encode(auth_string.encode()).decode() - headers["Authorization"] = f"Basic {auth_base64}" - - return TesClient(url=tes_url, headers=headers) - - -def tes_resources(destination_params: Dict[str, Any]) -> TesResources: - cpu_cores: Optional[int] - preemptible: Optional[bool] - ram_gb: Optional[float] - disk_gb: Optional[float] - zones: Optional[List[str]] - backend_parameters: Optional[Dict[str, str]] = None - backend_parameters_strict: Optional[bool] - - raw_cpu_cores = destination_params.get("tes_cpu_cores") - cpu_cores = int(raw_cpu_cores) if raw_cpu_cores is not None else None - - raw_preemptible = destination_params.get("tes_preemptible") - preemptible = asbool(raw_preemptible) if raw_preemptible is not None else None - - raw_ram_gb = destination_params.get("tes_ram_gb") - ram_gb = float(raw_ram_gb) if raw_ram_gb is not None else None - - raw_disk_gb = destination_params.get("tes_disk_gb") - disk_gb = float(raw_disk_gb) if raw_disk_gb is not None else None - - raw_zones = destination_params.get("tes_zones") - zones = listify(raw_zones) if raw_zones is not None else None - - raw_backend_parameters = destination_params.get("tes_backend_parameters") - if raw_backend_parameters is not None: - backend_parameters = {} - for k, v in cast(dict, raw_backend_parameters).items(): - backend_parameters[str(k)] = str(v) - - raw_backend_parameters_strict = destination_params.get("tes_backend_parameters_strict") - if raw_backend_parameters_strict is not None: - backend_parameters_strict = asbool(raw_backend_parameters_strict) - else: - backend_parameters_strict = None - - return TesResources( - cpu_cores=cpu_cores, - preemptible=preemptible, - ram_gb=ram_gb, - disk_gb=disk_gb, - zones=zones, - backend_parameters=backend_parameters, - backend_parameters_strict=backend_parameters_strict, - ) - - -def tes_galaxy_instance_id(destinaton_params: Dict[str, Any]) -> Optional[str]: - return destinaton_params.get("tes_galaxy_instance_id") +def tes_galaxy_instance_id(destination_params: Dict[str, Any]) -> Optional[str]: + return destination_params.get("galaxy_instance_id") or destination_params.get("tes_galaxy_instance_id") __all__ = ( diff --git a/requirements.txt b/requirements.txt index 1db30fa4..5c135f1b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,6 +11,7 @@ typing-extensions pydantic-tes>=0.1.5 pyjwt tuspy +google-cloud-batch ## Uncomment if using DRMAA queue manager. #drmaa diff --git a/test/container_job_config_test.py b/test/container_job_config_test.py new file mode 100644 index 00000000..1e35bf1f --- /dev/null +++ b/test/container_job_config_test.py @@ -0,0 +1,36 @@ +from pulsar.client.container_job_config import ( + GcpJobParams, + gcp_job_template, + parse_gcp_job_params, + DEFAULT_GCP_WALLTIME_LIMIT, + TesJobParams, +) + + +def test_docs(): + print(GcpJobParams.schema()) + print(TesJobParams.schema()) + + +def test_gcp_defaults(): + params = parse_gcp_job_params({"project_id": "moo"}) + assert params.project_id == "moo" + assert params.credentials_file is None + assert params.walltime_limit == DEFAULT_GCP_WALLTIME_LIMIT + + +def test_gcp_job_template(): + params = parse_gcp_job_params({"project_id": "moo"}) + job = gcp_job_template(params) + assert job is not None + assert len(job.task_groups) == 1 + task_group = job.task_groups[0] + task = task_group.task_spec + assert len(task.volumes) == 1 + + +def test_gcp_custom_walltime(): + custom_walltime = "7200" # 2 hours in seconds + params = parse_gcp_job_params(dict(project_id="moo", credentials_file="path/to/credentials.json", walltime_limit=custom_walltime)) + assert params.credentials_file == "path/to/credentials.json" + assert params.walltime_limit == int(custom_walltime) diff --git a/test/tes_test.py b/test/tes_test.py index 9b0849a3..ee9da63f 100644 --- a/test/tes_test.py +++ b/test/tes_test.py @@ -1,35 +1,40 @@ -from pulsar.managers.util.tes import tes_resources +from pulsar.client.container_job_config import ( + parse_tes_job_params, + tes_resources, +) def test_tes_resources_from_xml(): - resources = tes_resources({ + resources = tes_resources(parse_tes_job_params({ + "tes_url": "http://moo", "tes_cpu_cores": "2", "tes_preemptible": "true", "tes_ram_gb": "128.0", "tes_disk_gb": "512.0", "tes_zones": "us-west-1,us-east-1", - }) + })) assert resources.cpu_cores == 2 assert resources.preemptible is True assert resources.ram_gb == 128.0 assert resources.disk_gb == 512.0 assert resources.zones == ["us-west-1", "us-east-1"] assert resources.backend_parameters is None - assert resources.backend_parameters_strict is None + assert resources.backend_parameters_strict is False def test_tes_resources_from_yaml(): - resources = tes_resources({ + resources = tes_resources(parse_tes_job_params({ + "tes_url": "http://moo", "tes_cpu_cores": 4, "tes_ram_gb": 127.0, "tes_disk_gb": 513.0, "tes_zones": ["us-west-1", "us-east-1"], "tes_backend_parameters": {"moo": "cow"}, - }) + })) assert resources.cpu_cores == 4 assert resources.preemptible is None assert resources.ram_gb == 127.0 assert resources.disk_gb == 513.0 assert resources.zones == ["us-west-1", "us-east-1"] assert resources.backend_parameters == {"moo": "cow"} - assert resources.backend_parameters_strict is None + assert resources.backend_parameters_strict is False