Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions app.yml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,18 @@
## Python library is available and start the Pulsar normally.
#message_queue_url: amqp://guest:guest@localhost:5672//

## Alternatively, use pulsar-relay (experimental) with HTTP/HTTPS instead of AMQP.
## This does not require kombu or RabbitMQ.
#message_queue_url: https://relay-server.example.org:9000
#message_queue_username: admin
#message_queue_password: changeme

## Optional topic prefix for relay messages. This allows multiple independent
## Galaxy/Pulsar instance pairs to share the same relay server by using different
## prefixes (e.g., "production", "staging"). Must match the relay_topic_prefix
## configured in Galaxy's job_conf.yml if set.
#relay_topic_prefix: production

## Pulsar loops over waiting for queue messages for a short time before checking
## to see if it has been instructed to shut down. By default this is 0.2
## seconds. This value is used as the value of the 'timeout' parameter to
Expand Down
118 changes: 111 additions & 7 deletions docs/configure.rst
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,16 @@ To configure Pulsar to use pulsar-relay, set the ``message_queue_url`` in
The ``http://`` / ``https://`` prefix tells Pulsar to use the proxy communication mode instead
of AMQP.

**Optional Topic Prefix**

You can optionally set a ``relay_topic_prefix`` to namespace your topics. This is useful
when multiple independent Galaxy/Pulsar instance pairs share the same relay::

message_queue_url: http://proxy-server.example.org:9000
message_queue_username: admin
message_queue_password: your_secure_password
relay_topic_prefix: production

.. note::

Unlike AMQP mode, the pulsar-relay mode does **not** require the ``kombu``
Expand All @@ -286,6 +296,8 @@ with proxy parameters::
proxy_url: http://proxy-server.example.org:9000
proxy_username: your_username
proxy_password: your_secure_password
# Optional topic prefix (must match Pulsar configuration)
# relay_topic_prefix: production


execution:
Expand All @@ -298,6 +310,11 @@ with proxy parameters::
# Remote job staging directory
jobs_directory: /data/pulsar/staging

.. note::

The ``relay_topic_prefix`` must match on both Galaxy and Pulsar sides.
If set on one side but not the other, messages will not be routed correctly.


Authentication
``````````````
Expand Down Expand Up @@ -368,18 +385,105 @@ In Galaxy's job configuration, route jobs to specific clusters using the
manager: cluster_b
# ... other settings

Multiple Galaxy/Pulsar Instance Pairs
``````````````````````````````````````

You can have multiple independent Galaxy and Pulsar instance pairs all sharing
the same relay by using different topic prefixes. This is useful for:

* Running separate production and staging environments
* Supporting multiple research groups with isolated instances
* Multi-tenant deployments

**Example: Production and Staging Environments**

**Production Pulsar** (``app.yml``)::

message_queue_url: https://shared-relay:9000
message_queue_username: admin
message_queue_password: password
relay_topic_prefix: production
managers:
cluster_a:
type: queued_slurm

**Staging Pulsar** (``app.yml``)::

message_queue_url: https://shared-relay:9000
message_queue_username: admin
message_queue_password: password
relay_topic_prefix: staging
managers:
cluster_a:
type: queued_slurm

**Production Galaxy** (``job_conf.yml``)::

runners:
pulsar:
load: galaxy.jobs.runners.pulsar:PulsarMQJobRunner
proxy_url: https://shared-relay:9000
proxy_username: admin
proxy_password: password
relay_topic_prefix: production

execution:
environments:
pulsar_jobs:
runner: pulsar
manager: cluster_a
# ... other settings

**Staging Galaxy** (``job_conf.yml``)::

runners:
pulsar:
load: galaxy.jobs.runners.pulsar:PulsarMQJobRunner
proxy_url: https://shared-relay:9000
proxy_username: admin
proxy_password: password
relay_topic_prefix: staging

execution:
environments:
pulsar_jobs:
runner: pulsar
manager: cluster_a
# ... other settings

In this setup, the topics will be completely isolated:

* **Production**: ``production_job_setup_cluster_a``, ``production_job_status_update_cluster_a``
* **Staging**: ``staging_job_setup_cluster_a``, ``staging_job_status_update_cluster_a``

Topic Naming
````````````

Messages are organized by topic with automatic naming based on the manager name:
Messages are organized by topic with automatic naming based on the optional prefix
and manager name:

* Job setup: ``job_setup`` (default manager, no prefix)
* Job setup: ``job_setup_{manager_name}`` (named manager, no prefix)
* Job setup: ``{prefix}_job_setup`` (default manager, with prefix)
* Job setup: ``{prefix}_job_setup_{manager_name}`` (named manager, with prefix)

The same pattern applies to other message types:

* Status requests: ``job_status_request``, ``job_status_request_{manager_name}``
* Kill commands: ``job_kill``, ``job_kill_{manager_name}``
* Status updates: ``job_status_update``, ``job_status_update_{manager_name}``

When a ``relay_topic_prefix`` is configured, it is prepended to all topic names:

* ``production_job_setup``
* ``production_job_setup_cluster_a``
* ``production_job_status_update_cluster_a``

* Job setup: ``job_setup_{manager_name}`` or ``job_setup`` (for default manager)
* Status requests: ``job_status_request_{manager_name}``
* Kill commands: ``job_kill_{manager_name}``
* Status updates: ``job_status_update_{manager_name}``
This allows:

This allows multiple Pulsar instances to share the same proxy without message
conflicts.
* Multiple Pulsar instances to share the same relay (using different manager names)
* Multiple independent Galaxy/Pulsar instance pairs to share the same relay
(using different topic prefixes)

Comparison with AMQP Mode
``````````````````````````
Expand Down
6 changes: 3 additions & 3 deletions pulsar/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ def launch(self, command_line, dependencies_description=None, env=None, remote_s

# Determine topic name based on manager
manager_name = self.client_manager.manager_name
topic = f"job_setup_{manager_name}" if manager_name != "_default_" else "job_setup"
topic = self.client_manager._make_topic_name("job_setup", manager_name)

# Post message to relay
self.client_manager.relay_transport.post_message(topic, launch_params)
Expand All @@ -618,7 +618,7 @@ def get_status(self):
Cached status if available, None otherwise
"""
manager_name = self.client_manager.manager_name
topic = f"job_status_request_{manager_name}" if manager_name != "_default_" else "job_status_request"
topic = self.client_manager._make_topic_name("job_status_request", manager_name)

status_params = {
'job_id': self.job_id,
Expand All @@ -633,7 +633,7 @@ def get_status(self):
def kill(self):
"""Kill a job by posting a kill message to the relay."""
manager_name = self.client_manager.manager_name
topic = f"job_kill_{manager_name}" if manager_name != "_default_" else "job_kill"
topic = self.client_manager._make_topic_name("job_kill", manager_name)

kill_params = {'job_id': self.job_id}
self.client_manager.relay_transport.post_message(topic, kill_params)
Expand Down
54 changes: 47 additions & 7 deletions pulsar/client/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import functools
import threading
import time
from logging import getLogger
from os import getenv
from queue import Queue
Expand Down Expand Up @@ -264,18 +263,20 @@ class RelayClientManager(BaseRemoteConfiguredJobClientManager):
"""
status_cache: Dict[str, Any]

def __init__(self, relay_url: str, relay_username: str, relay_password: str, **kwds: Dict[str, Any]):
def __init__(self, relay_url: str, relay_username: str, relay_password: str, relay_topic_prefix: str = '', **kwds: Dict[str, Any]):
super().__init__(**kwds)

if not relay_url:
raise Exception("relay_url is required for RelayClientManager")

# Initialize relay transport
self.relay_transport = RelayTransport(relay_url, relay_username, relay_password)
self.relay_topic_prefix = relay_topic_prefix
self.status_cache = {}
self.callback_lock = threading.Lock()
self.callback_thread = None
self.active = True
self.shutdown_event = threading.Event()

def callback_wrapper(self, callback, message_data):
"""Process status update messages from the relay."""
Expand All @@ -298,7 +299,7 @@ def callback_wrapper(self, callback, message_data):
def status_consumer(self, callback_wrapper):
"""Long-poll the relay for status update messages."""
manager_name = self.manager_name
topic = f"job_status_update_{manager_name}" if manager_name != "_default_" else "job_status_update"
topic = self._make_topic_name("job_status_update", manager_name)

log.info("Starting relay status consumer for topic '%s'", topic)

Expand All @@ -314,12 +315,14 @@ def status_consumer(self, callback_wrapper):
if self.active:
log.exception("Exception while polling for status updates from relay, will retry.")
# Brief sleep before retrying to avoid tight loop on persistent errors
time.sleep(5)
# Use wait() instead of sleep() to allow immediate interruption on shutdown
if self.shutdown_event.wait(timeout=5):
break
else:
log.debug("Exception during shutdown, ignoring.")
break

log.debug("Leaving Pulsar client relay status consumer, no additional updates will be processed.")
log.info("Done consuming relay status updates for topic %s", topic)

def ensure_has_status_update_callback(self, callback):
"""Start a thread to poll for status updates if not already running."""
Expand All @@ -333,17 +336,47 @@ def ensure_has_status_update_callback(self, callback):
name="pulsar_client_%s_relay_status_consumer" % self.manager_name,
target=run
)
thread.daemon = False # Don't interrupt processing
# Make daemon so Python can exit even if thread is blocked in HTTP request.
# Unlike MessageQueueClientManager which uses AMQP connections that can be
# interrupted cleanly, HTTP long-poll requests block until timeout.
thread.daemon = True
thread.start()
self.callback_thread = thread

def ensure_has_ack_consumers(self):
"""No-op for relay client manager, as acknowledgements are handled via HTTP."""
pass

def _make_topic_name(self, base_topic: str, manager_name: str) -> str:
"""Create a topic name with optional prefix and manager suffix.

Args:
base_topic: Base topic name (e.g., 'job_setup', 'job_status_update')
manager_name: Manager name (e.g., '_default_', 'cluster_a')

Returns:
Fully qualified topic name
"""
parts = []

# Add prefix if provided
if self.relay_topic_prefix:
parts.append(self.relay_topic_prefix)

# Add base topic
parts.append(base_topic)

# Add manager name if not default
if manager_name != "_default_":
parts.append(manager_name)

return "_".join(parts)

def shutdown(self, ensure_cleanup: bool = False):
"""Shutdown the client manager and cleanup resources."""
self.active = False
# Signal the shutdown event to interrupt any waiting threads
self.shutdown_event.set()
if ensure_cleanup:
if self.callback_thread is not None:
self.callback_thread.join()
Expand Down Expand Up @@ -391,6 +424,7 @@ def build_client_manager(
relay_url: Optional[str] = None,
relay_username: Optional[str] = None,
relay_password: Optional[str] = None,
relay_topic_prefix: Optional[str] = None,
amqp_url: Optional[str] = None,
k8s_enabled: Optional[bool] = None,
tes_enabled: Optional[bool] = None,
Expand All @@ -401,7 +435,13 @@ def build_client_manager(
return ClientManager(job_manager=job_manager, **kwargs) # TODO: Consider more separation here.
elif relay_url:
assert relay_password and relay_username, "relay_url set, but relay_username and relay_password must also be set"
return RelayClientManager(relay_url=relay_url, relay_username=relay_username, relay_password=relay_password, **kwargs)
return RelayClientManager(
relay_url=relay_url,
relay_username=relay_username,
relay_password=relay_password,
relay_topic_prefix=relay_topic_prefix or '',
**kwargs
)
elif amqp_url:
return MessageQueueClientManager(amqp_url=amqp_url, **kwargs)
elif k8s_enabled or tes_enabled or gcp_batch_enabled:
Expand Down
Loading
Loading