Skip to content

Commit 360c8fe

Browse files
committed
Add an optional relay topic prefix
1 parent 2772269 commit 360c8fe

File tree

5 files changed

+203
-18
lines changed

5 files changed

+203
-18
lines changed

app.yml.sample

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,18 @@
8383
## Python library is available and start the Pulsar normally.
8484
#message_queue_url: amqp://guest:guest@localhost:5672//
8585

86+
## Alternatively, use pulsar-relay (experimental) with HTTP/HTTPS instead of AMQP.
87+
## This does not require kombu or RabbitMQ.
88+
#message_queue_url: https://relay-server.example.org:9000
89+
#message_queue_username: admin
90+
#message_queue_password: changeme
91+
92+
## Optional topic prefix for relay messages. This allows multiple independent
93+
## Galaxy/Pulsar instance pairs to share the same relay server by using different
94+
## prefixes (e.g., "production", "staging"). Must match the relay_topic_prefix
95+
## configured in Galaxy's job_conf.yml if set.
96+
#relay_topic_prefix: production
97+
8698
## Pulsar loops over waiting for queue messages for a short time before checking
8799
## to see if it has been instructed to shut down. By default this is 0.2
88100
## seconds. This value is used as the value of the 'timeout' parameter to

docs/configure.rst

Lines changed: 111 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,16 @@ To configure Pulsar to use pulsar-relay, set the ``message_queue_url`` in
267267
The ``http://`` / ``https://`` prefix tells Pulsar to use the proxy communication mode instead
268268
of AMQP.
269269

270+
**Optional Topic Prefix**
271+
272+
You can optionally set a ``relay_topic_prefix`` to namespace your topics. This is useful
273+
when multiple independent Galaxy/Pulsar instance pairs share the same relay::
274+
275+
message_queue_url: http://proxy-server.example.org:9000
276+
message_queue_username: admin
277+
message_queue_password: your_secure_password
278+
relay_topic_prefix: production
279+
270280
.. note::
271281

272282
Unlike AMQP mode, the pulsar-relay mode does **not** require the ``kombu``
@@ -286,6 +296,8 @@ with proxy parameters::
286296
proxy_url: http://proxy-server.example.org:9000
287297
proxy_username: your_username
288298
proxy_password: your_secure_password
299+
# Optional topic prefix (must match Pulsar configuration)
300+
# relay_topic_prefix: production
289301

290302

291303
execution:
@@ -298,6 +310,11 @@ with proxy parameters::
298310
# Remote job staging directory
299311
jobs_directory: /data/pulsar/staging
300312

313+
.. note::
314+
315+
The ``relay_topic_prefix`` must match on both Galaxy and Pulsar sides.
316+
If set on one side but not the other, messages will not be routed correctly.
317+
301318

302319
Authentication
303320
``````````````
@@ -368,18 +385,105 @@ In Galaxy's job configuration, route jobs to specific clusters using the
368385
manager: cluster_b
369386
# ... other settings
370387

388+
Multiple Galaxy/Pulsar Instance Pairs
389+
``````````````````````````````````````
390+
391+
You can have multiple independent Galaxy and Pulsar instance pairs all sharing
392+
the same relay by using different topic prefixes. This is useful for:
393+
394+
* Running separate production and staging environments
395+
* Supporting multiple research groups with isolated instances
396+
* Multi-tenant deployments
397+
398+
**Example: Production and Staging Environments**
399+
400+
**Production Pulsar** (``app.yml``)::
401+
402+
message_queue_url: https://shared-relay:9000
403+
message_queue_username: admin
404+
message_queue_password: password
405+
relay_topic_prefix: production
406+
managers:
407+
cluster_a:
408+
type: queued_slurm
409+
410+
**Staging Pulsar** (``app.yml``)::
411+
412+
message_queue_url: https://shared-relay:9000
413+
message_queue_username: admin
414+
message_queue_password: password
415+
relay_topic_prefix: staging
416+
managers:
417+
cluster_a:
418+
type: queued_slurm
419+
420+
**Production Galaxy** (``job_conf.yml``)::
421+
422+
runners:
423+
pulsar:
424+
load: galaxy.jobs.runners.pulsar:PulsarMQJobRunner
425+
proxy_url: https://shared-relay:9000
426+
proxy_username: admin
427+
proxy_password: password
428+
relay_topic_prefix: production
429+
430+
execution:
431+
environments:
432+
pulsar_jobs:
433+
runner: pulsar
434+
manager: cluster_a
435+
# ... other settings
436+
437+
**Staging Galaxy** (``job_conf.yml``)::
438+
439+
runners:
440+
pulsar:
441+
load: galaxy.jobs.runners.pulsar:PulsarMQJobRunner
442+
proxy_url: https://shared-relay:9000
443+
proxy_username: admin
444+
proxy_password: password
445+
relay_topic_prefix: staging
446+
447+
execution:
448+
environments:
449+
pulsar_jobs:
450+
runner: pulsar
451+
manager: cluster_a
452+
# ... other settings
453+
454+
In this setup, the topics will be completely isolated:
455+
456+
* **Production**: ``production_job_setup_cluster_a``, ``production_job_status_update_cluster_a``
457+
* **Staging**: ``staging_job_setup_cluster_a``, ``staging_job_status_update_cluster_a``
458+
371459
Topic Naming
372460
````````````
373461

374-
Messages are organized by topic with automatic naming based on the manager name:
462+
Messages are organized by topic with automatic naming based on the optional prefix
463+
and manager name:
464+
465+
* Job setup: ``job_setup`` (default manager, no prefix)
466+
* Job setup: ``job_setup_{manager_name}`` (named manager, no prefix)
467+
* Job setup: ``{prefix}_job_setup`` (default manager, with prefix)
468+
* Job setup: ``{prefix}_job_setup_{manager_name}`` (named manager, with prefix)
469+
470+
The same pattern applies to other message types:
471+
472+
* Status requests: ``job_status_request``, ``job_status_request_{manager_name}``
473+
* Kill commands: ``job_kill``, ``job_kill_{manager_name}``
474+
* Status updates: ``job_status_update``, ``job_status_update_{manager_name}``
475+
476+
When a ``relay_topic_prefix`` is configured, it is prepended to all topic names:
477+
478+
* ``production_job_setup``
479+
* ``production_job_setup_cluster_a``
480+
* ``production_job_status_update_cluster_a``
375481

376-
* Job setup: ``job_setup_{manager_name}`` or ``job_setup`` (for default manager)
377-
* Status requests: ``job_status_request_{manager_name}``
378-
* Kill commands: ``job_kill_{manager_name}``
379-
* Status updates: ``job_status_update_{manager_name}``
482+
This allows:
380483

381-
This allows multiple Pulsar instances to share the same proxy without message
382-
conflicts.
484+
* Multiple Pulsar instances to share the same relay (using different manager names)
485+
* Multiple independent Galaxy/Pulsar instance pairs to share the same relay
486+
(using different topic prefixes)
383487

384488
Comparison with AMQP Mode
385489
``````````````````````````

pulsar/client/client.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -604,7 +604,7 @@ def launch(self, command_line, dependencies_description=None, env=None, remote_s
604604

605605
# Determine topic name based on manager
606606
manager_name = self.client_manager.manager_name
607-
topic = f"job_setup_{manager_name}" if manager_name != "_default_" else "job_setup"
607+
topic = self.client_manager._make_topic_name("job_setup", manager_name)
608608

609609
# Post message to relay
610610
self.client_manager.relay_transport.post_message(topic, launch_params)
@@ -618,7 +618,7 @@ def get_status(self):
618618
Cached status if available, None otherwise
619619
"""
620620
manager_name = self.client_manager.manager_name
621-
topic = f"job_status_request_{manager_name}" if manager_name != "_default_" else "job_status_request"
621+
topic = self.client_manager._make_topic_name("job_status_request", manager_name)
622622

623623
status_params = {
624624
'job_id': self.job_id,
@@ -633,7 +633,7 @@ def get_status(self):
633633
def kill(self):
634634
"""Kill a job by posting a kill message to the relay."""
635635
manager_name = self.client_manager.manager_name
636-
topic = f"job_kill_{manager_name}" if manager_name != "_default_" else "job_kill"
636+
topic = self.client_manager._make_topic_name("job_kill", manager_name)
637637

638638
kill_params = {'job_id': self.job_id}
639639
self.client_manager.relay_transport.post_message(topic, kill_params)

pulsar/client/manager.py

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -264,14 +264,15 @@ class RelayClientManager(BaseRemoteConfiguredJobClientManager):
264264
"""
265265
status_cache: Dict[str, Any]
266266

267-
def __init__(self, relay_url: str, relay_username: str, relay_password: str, **kwds: Dict[str, Any]):
267+
def __init__(self, relay_url: str, relay_username: str, relay_password: str, relay_topic_prefix: str = '', **kwds: Dict[str, Any]):
268268
super().__init__(**kwds)
269269

270270
if not relay_url:
271271
raise Exception("relay_url is required for RelayClientManager")
272272

273273
# Initialize relay transport
274274
self.relay_transport = RelayTransport(relay_url, relay_username, relay_password)
275+
self.relay_topic_prefix = relay_topic_prefix
275276
self.status_cache = {}
276277
self.callback_lock = threading.Lock()
277278
self.callback_thread = None
@@ -298,7 +299,7 @@ def callback_wrapper(self, callback, message_data):
298299
def status_consumer(self, callback_wrapper):
299300
"""Long-poll the relay for status update messages."""
300301
manager_name = self.manager_name
301-
topic = f"job_status_update_{manager_name}" if manager_name != "_default_" else "job_status_update"
302+
topic = self._make_topic_name("job_status_update", manager_name)
302303

303304
log.info("Starting relay status consumer for topic '%s'", topic)
304305

@@ -341,6 +342,31 @@ def ensure_has_ack_consumers(self):
341342
"""No-op for relay client manager, as acknowledgements are handled via HTTP."""
342343
pass
343344

345+
def _make_topic_name(self, base_topic: str, manager_name: str) -> str:
346+
"""Create a topic name with optional prefix and manager suffix.
347+
348+
Args:
349+
base_topic: Base topic name (e.g., 'job_setup', 'job_status_update')
350+
manager_name: Manager name (e.g., '_default_', 'cluster_a')
351+
352+
Returns:
353+
Fully qualified topic name
354+
"""
355+
parts = []
356+
357+
# Add prefix if provided
358+
if self.relay_topic_prefix:
359+
parts.append(self.relay_topic_prefix)
360+
361+
# Add base topic
362+
parts.append(base_topic)
363+
364+
# Add manager name if not default
365+
if manager_name != "_default_":
366+
parts.append(manager_name)
367+
368+
return "_".join(parts)
369+
344370
def shutdown(self, ensure_cleanup: bool = False):
345371
"""Shutdown the client manager and cleanup resources."""
346372
self.active = False
@@ -391,6 +417,7 @@ def build_client_manager(
391417
relay_url: Optional[str] = None,
392418
relay_username: Optional[str] = None,
393419
relay_password: Optional[str] = None,
420+
relay_topic_prefix: Optional[str] = None,
394421
amqp_url: Optional[str] = None,
395422
k8s_enabled: Optional[bool] = None,
396423
tes_enabled: Optional[bool] = None,
@@ -401,7 +428,13 @@ def build_client_manager(
401428
return ClientManager(job_manager=job_manager, **kwargs) # TODO: Consider more separation here.
402429
elif relay_url:
403430
assert relay_password and relay_username, "relay_url set, but relay_username and relay_password must also be set"
404-
return RelayClientManager(relay_url=relay_url, relay_username=relay_username, relay_password=relay_password, **kwargs)
431+
return RelayClientManager(
432+
relay_url=relay_url,
433+
relay_username=relay_username,
434+
relay_password=relay_password,
435+
relay_topic_prefix=relay_topic_prefix or '',
436+
**kwargs
437+
)
405438
elif amqp_url:
406439
return MessageQueueClientManager(amqp_url=amqp_url, **kwargs)
407440
elif k8s_enabled or tes_enabled or gcp_batch_enabled:

pulsar/messaging/bind_relay.py

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ def bind_manager_to_relay(manager, relay_state: RelayState, relay_url, conf):
3434
if not password:
3535
raise Exception("message_queue_password is required for relay communication")
3636

37+
# Extract optional relay topic prefix
38+
relay_topic_prefix = conf.get('relay_topic_prefix', '')
39+
3740
# Create relay transport
3841
relay_transport = RelayTransport(relay_url, username, password)
3942

@@ -42,11 +45,11 @@ def bind_manager_to_relay(manager, relay_state: RelayState, relay_url, conf):
4245
process_kill_messages = functools.partial(__process_kill_message, manager)
4346
process_status_messages = functools.partial(__process_status_message, manager)
4447

45-
# Determine topics based on manager name
46-
setup_topic = f"job_setup_{manager_name}" if manager_name != "_default_" else "job_setup"
47-
status_request_topic = f"job_status_request_{manager_name}" if manager_name != "_default_" else "job_status_request"
48-
kill_topic = f"job_kill_{manager_name}" if manager_name != "_default_" else "job_kill"
49-
status_update_topic = f"job_status_update_{manager_name}" if manager_name != "_default_" else "job_status_update"
48+
# Determine topics based on manager name and optional prefix
49+
setup_topic = __make_topic_name(relay_topic_prefix, "job_setup", manager_name)
50+
status_request_topic = __make_topic_name(relay_topic_prefix, "job_status_request", manager_name)
51+
kill_topic = __make_topic_name(relay_topic_prefix, "job_kill", manager_name)
52+
status_update_topic = __make_topic_name(relay_topic_prefix, "job_status_update", manager_name)
5053

5154
# Start consumer threads if message_queue_consume is enabled
5255
if conf.get("message_queue_consume", True):
@@ -206,3 +209,36 @@ def __client_job_id_from_body(body):
206209
"""
207210
job_id = body.get("job_id", None)
208211
return job_id
212+
213+
214+
def __make_topic_name(prefix, base_topic, manager_name):
215+
"""Create a topic name with optional prefix and manager suffix.
216+
217+
Args:
218+
prefix: Optional prefix string (e.g., 'galaxy1', 'prod')
219+
base_topic: Base topic name (e.g., 'job_setup', 'job_status_update')
220+
manager_name: Manager name (e.g., '_default_', 'cluster_a')
221+
222+
Returns:
223+
Fully qualified topic name
224+
225+
Examples:
226+
__make_topic_name('', 'job_setup', '_default_') -> 'job_setup'
227+
__make_topic_name('', 'job_setup', 'cluster_a') -> 'job_setup_cluster_a'
228+
__make_topic_name('prod', 'job_setup', '_default_') -> 'prod_job_setup'
229+
__make_topic_name('prod', 'job_setup', 'cluster_a') -> 'prod_job_setup_cluster_a'
230+
"""
231+
parts = []
232+
233+
# Add prefix if provided
234+
if prefix:
235+
parts.append(prefix)
236+
237+
# Add base topic
238+
parts.append(base_topic)
239+
240+
# Add manager name if not default
241+
if manager_name != "_default_":
242+
parts.append(manager_name)
243+
244+
return "_".join(parts)

0 commit comments

Comments
 (0)