Skip to content

Commit 2f57a74

Browse files
authored
Merge pull request #419 from mvdbeek/pulsar_proxy
Add pulsar relay mode
2 parents 837d97a + 39c38b6 commit 2f57a74

File tree

15 files changed

+1061
-22
lines changed

15 files changed

+1061
-22
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ docs: ready-docs
112112

113113
lint-docs: ready-docs
114114
if [ -f .venv/bin/activate ]; then . .venv/bin/activate; fi; $(MAKE) -C docs clean
115-
if [ -f .venv/bin/activate ]; then . .venv/bin/activate; fi; ! (make -C docs html 2>&1 | grep -v 'nonlocal image URI found\|included in any toctree' | grep WARNING)
115+
if [ -f .venv/bin/activate ]; then . .venv/bin/activate; fi; ! (make -C docs html 2>&1 | grep -v 'more than one target found\|nonlocal image URI found\|included in any toctree' | grep WARNING)
116116

117117
_open-docs:
118118
open docs/_build/html/index.html || xdg-open docs/_build/html/index.html

docs/configure.rst

Lines changed: 185 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ You can consult the `Kombu documentation
113113
even more information.
114114

115115
User Authentication/Authorization
116-
`````````````
116+
`````````````````````````````````
117117

118118
You can configure Pulsar to authenticate user during request processing and check
119119
if this user is allowed to run a job.
@@ -222,6 +222,190 @@ In the event that the connection to the AMQP server is lost during message
222222
publish, the Pulsar server can retry the connection, governed by the
223223
``amqp_publish*`` options documented in `app.yml.sample`_.
224224

225+
Message Queue (pulsar-relay)
226+
-----------------------------
227+
228+
Pulsar can also communicate with Galaxy via an experimental **pulsar-relay** server,
229+
an HTTP-based message proxy. This mode is similar to the AMQP message queue mode but uses
230+
HTTP long-polling instead of a message broker like RabbitMQ. This can help when:
231+
232+
* Galaxy cannot directly reach Pulsar (e.g., due to firewall restrictions)
233+
* You want to avoid deploying and managing a RabbitMQ server
234+
* You prefer HTTP-based communication for simplicity and observability
235+
236+
Architecture
237+
````````````
238+
239+
In this mode:
240+
241+
1. **Galaxy → Pulsar**: Galaxy posts control messages (job setup, status requests,
242+
kill commands) to the proxy via HTTP POST
243+
2. **Pulsar → Galaxy**: Pulsar polls the proxy via HTTP long-polling to receive
244+
these messages
245+
3. **Pulsar → Galaxy**: Pulsar posts status updates to the proxy
246+
4. **Galaxy → Pulsar**: Galaxy polls the proxy to receive status updates
247+
5. **File Transfers**: Pulsar transfers files directly to/from Galaxy via HTTP
248+
(not through the proxy)
249+
250+
::
251+
252+
Galaxy ──POST messages──> pulsar-relay ──poll──> Pulsar Server
253+
254+
255+
Galaxy <────────direct HTTP for file transfers─────────┘
256+
257+
Pulsar Configuration
258+
````````````````````
259+
260+
To configure Pulsar to use pulsar-relay, set the ``message_queue_url`` in
261+
``app.yml`` with a ``http://`` or ``https://`` prefix::
262+
263+
message_queue_url: http://proxy-server.example.org:9000
264+
message_queue_username: admin
265+
message_queue_password: your_secure_password
266+
267+
The ``http://`` / ``https://`` prefix tells Pulsar to use the proxy communication mode instead
268+
of AMQP.
269+
270+
.. note::
271+
272+
Unlike AMQP mode, the pulsar-relay mode does **not** require the ``kombu``
273+
Python dependency. It only requires the ``requests`` library, which is a
274+
standard dependency of Pulsar.
275+
276+
Galaxy Configuration
277+
````````````````````
278+
279+
In Galaxy's job configuration (``job_conf.yml``), configure a Pulsar destination
280+
with proxy parameters::
281+
282+
runners:
283+
pulsar:
284+
load: galaxy.jobs.runners.pulsar:PulsarMQJobRunner
285+
# Proxy connection
286+
proxy_url: http://proxy-server.example.org:9000
287+
proxy_username: your_username
288+
proxy_password: your_secure_password
289+
290+
291+
execution:
292+
default: pulsar_relay
293+
environments:
294+
pulsar_relay:
295+
runner: pulsar
296+
# Galaxy's URL (for Pulsar to reach back for file transfers)
297+
url: http://galaxy-server.example.org:8080
298+
# Remote job staging directory
299+
jobs_directory: /data/pulsar/staging
300+
301+
302+
Authentication
303+
``````````````
304+
305+
The pulsar-relay uses JWT (JSON Web Token) authentication. Galaxy and Pulsar
306+
authenticate with the proxy using the username and password provided in the
307+
configuration. Tokens are automatically managed and refreshed as needed.
308+
309+
.. tip::
310+
311+
In production, always use HTTPS for the proxy URL to encrypt credentials
312+
and message content during transit::
313+
314+
message_queue_url: https://proxy-server.example.org:443
315+
316+
Security Considerations
317+
```````````````````````
318+
319+
* **Use HTTPS**: Always use HTTPS for the proxy URL in production
320+
* **Strong Passwords**: Use strong, unique passwords for proxy authentication
321+
* **Network Isolation**: Deploy the proxy in a DMZ accessible to both Galaxy
322+
and Pulsar
323+
* **Firewall Rules**:
324+
* Galaxy → Proxy: Allow outbound HTTPS
325+
* Pulsar → Proxy: Allow outbound HTTPS
326+
* Pulsar → Galaxy: Allow outbound HTTP/HTTPS for file transfers
327+
328+
Multiple Pulsar Instances
329+
``````````````````````````
330+
331+
You can deploy multiple Pulsar instances with different managers, all using the
332+
same proxy. Messages are routed by topic names that include the manager name.
333+
334+
For example, configure two Pulsar servers:
335+
336+
**Pulsar Server 1** (``app.yml``)::
337+
338+
message_queue_url: http://proxy-server:9000
339+
message_queue_username: admin
340+
message_queue_password: password
341+
managers:
342+
cluster_a:
343+
type: queued_slurm
344+
345+
**Pulsar Server 2** (``app.yml``)::
346+
347+
message_queue_url: http://proxy-server:9000
348+
message_queue_username: admin
349+
message_queue_password: password
350+
managers:
351+
cluster_b:
352+
type: queued_condor
353+
354+
In Galaxy's job configuration, route jobs to specific clusters using the
355+
``manager`` parameter::
356+
357+
execution:
358+
environments:
359+
cluster_a_jobs:
360+
runner: pulsar
361+
proxy_url: http://proxy-server:9000
362+
manager: cluster_a
363+
# ... other settings
364+
365+
cluster_b_jobs:
366+
runner: pulsar
367+
proxy_url: http://proxy-server:9000
368+
manager: cluster_b
369+
# ... other settings
370+
371+
Topic Naming
372+
````````````
373+
374+
Messages are organized by topic with automatic naming based on the manager name:
375+
376+
* Job setup: ``job_setup_{manager_name}`` or ``job_setup`` (for default manager)
377+
* Status requests: ``job_status_request_{manager_name}``
378+
* Kill commands: ``job_kill_{manager_name}``
379+
* Status updates: ``job_status_update_{manager_name}``
380+
381+
This allows multiple Pulsar instances to share the same proxy without message
382+
conflicts.
383+
384+
Comparison with AMQP Mode
385+
``````````````````````````
386+
387+
+------------------------+---------------------------+-------------------------+
388+
| Feature | AMQP (RabbitMQ) | pulsar-relay |
389+
+========================+===========================+=========================+
390+
| Protocol | AMQP over TCP | HTTP/HTTPS |
391+
+------------------------+---------------------------+-------------------------+
392+
| Dependencies | kombu, RabbitMQ server | requests (built-in) |
393+
+------------------------+---------------------------+-------------------------+
394+
| Deployment Complexity | Moderate (broker setup) | Simple (HTTP service) |
395+
+------------------------+---------------------------+-------------------------+
396+
| Message Delivery | Push-based | Long-polling |
397+
+------------------------+---------------------------+-------------------------+
398+
| Observability | Queue monitoring tools | HTTP access logs |
399+
+------------------------+---------------------------+-------------------------+
400+
| SSL/TLS | Via AMQPS | Via HTTPS |
401+
+------------------------+---------------------------+-------------------------+
402+
| Firewall Friendly | Moderate | High (standard HTTP) |
403+
+------------------------+---------------------------+-------------------------+
404+
405+
For more information on deploying pulsar-relay, see the `pulsar-relay documentation`_.
406+
407+
.. _pulsar-relay documentation: https://github.com/galaxyproject/pulsar-relay
408+
225409
Caching (Experimental)
226410
----------------------
227411

docs/pulsar.client.rst

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,14 @@ pulsar.client.config\_util module
5454
:undoc-members:
5555
:show-inheritance:
5656

57+
pulsar.client.container\_job\_config module
58+
-------------------------------------------
59+
60+
.. automodule:: pulsar.client.container_job_config
61+
:members:
62+
:undoc-members:
63+
:show-inheritance:
64+
5765
pulsar.client.decorators module
5866
-------------------------------
5967

@@ -110,6 +118,14 @@ pulsar.client.path\_mapper module
110118
:undoc-members:
111119
:show-inheritance:
112120

121+
pulsar.client.relay\_auth module
122+
--------------------------------
123+
124+
.. automodule:: pulsar.client.relay_auth
125+
:members:
126+
:undoc-members:
127+
:show-inheritance:
128+
113129
pulsar.client.server\_interface module
114130
--------------------------------------
115131

docs/pulsar.client.transport.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,14 @@ pulsar.client.transport.poster module
2020
:undoc-members:
2121
:show-inheritance:
2222

23+
pulsar.client.transport.relay module
24+
------------------------------------
25+
26+
.. automodule:: pulsar.client.transport.relay
27+
:members:
28+
:undoc-members:
29+
:show-inheritance:
30+
2331
pulsar.client.transport.requests module
2432
---------------------------------------
2533

docs/pulsar.managers.util.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,14 @@ pulsar.managers.util.external module
3939
:undoc-members:
4040
:show-inheritance:
4141

42+
pulsar.managers.util.gcp\_util module
43+
-------------------------------------
44+
45+
.. automodule:: pulsar.managers.util.gcp_util
46+
:members:
47+
:undoc-members:
48+
:show-inheritance:
49+
4250
pulsar.managers.util.kill module
4351
--------------------------------
4452

docs/pulsar.messaging.rst

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,22 @@ pulsar.messaging.bind\_amqp module
1212
:undoc-members:
1313
:show-inheritance:
1414

15+
pulsar.messaging.bind\_relay module
16+
-----------------------------------
17+
18+
.. automodule:: pulsar.messaging.bind_relay
19+
:members:
20+
:undoc-members:
21+
:show-inheritance:
22+
23+
pulsar.messaging.relay\_state module
24+
------------------------------------
25+
26+
.. automodule:: pulsar.messaging.relay_state
27+
:members:
28+
:undoc-members:
29+
:show-inheritance:
30+
1531
Module contents
1632
---------------
1733

pulsar/client/client.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,78 @@ def kill(self):
568568
pass
569569

570570

571+
class RelayJobClient(BaseMessageJobClient):
572+
"""Client that communicates with Pulsar via pulsar-relay.
573+
574+
This client posts control messages (setup, status, kill) to the relay,
575+
which are then consumed by the Pulsar server. File transfers happen
576+
directly between Pulsar and Galaxy via HTTP.
577+
"""
578+
579+
def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None,
580+
dynamic_file_sources=None, token_endpoint=None):
581+
"""Submit a job by posting a setup message to the relay.
582+
583+
Args:
584+
command_line: Command to execute on Pulsar
585+
dependencies_description: Tool dependencies
586+
env: Environment variables
587+
remote_staging: Remote staging configuration
588+
job_config: Job configuration
589+
dynamic_file_sources: Dynamic file sources
590+
token_endpoint: Token endpoint for file access
591+
592+
Returns:
593+
None (async operation)
594+
"""
595+
launch_params = self._build_setup_message(
596+
command_line,
597+
dependencies_description=dependencies_description,
598+
env=env,
599+
remote_staging=remote_staging,
600+
job_config=job_config,
601+
dynamic_file_sources=dynamic_file_sources,
602+
token_endpoint=token_endpoint,
603+
)
604+
605+
# Determine topic name based on manager
606+
manager_name = self.client_manager.manager_name
607+
topic = f"job_setup_{manager_name}" if manager_name != "_default_" else "job_setup"
608+
609+
# Post message to relay
610+
self.client_manager.relay_transport.post_message(topic, launch_params)
611+
log.info("Job %s published to relay topic '%s'", self.job_id, topic)
612+
return None
613+
614+
def get_status(self):
615+
"""Request job status by posting a status request message to the relay.
616+
617+
Returns:
618+
Cached status if available, None otherwise
619+
"""
620+
manager_name = self.client_manager.manager_name
621+
topic = f"job_status_request_{manager_name}" if manager_name != "_default_" else "job_status_request"
622+
623+
status_params = {
624+
'job_id': self.job_id,
625+
}
626+
627+
self.client_manager.relay_transport.post_message(topic, status_params)
628+
log.debug("Job status request for %s published to relay topic '%s'", self.job_id, topic)
629+
630+
# Return cached status if available
631+
return self.client_manager.status_cache.get(self.job_id, {}).get('status', None)
632+
633+
def kill(self):
634+
"""Kill a job by posting a kill message to the relay."""
635+
manager_name = self.client_manager.manager_name
636+
topic = f"job_kill_{manager_name}" if manager_name != "_default_" else "job_kill"
637+
638+
kill_params = {'job_id': self.job_id}
639+
self.client_manager.relay_transport.post_message(topic, kill_params)
640+
log.info("Job kill request for %s published to relay topic '%s'", self.job_id, topic)
641+
642+
571643
class ExecutionType(str, Enum):
572644
# containers run one after each other with similar configuration
573645
# like in TES or AWS Batch

0 commit comments

Comments
 (0)