diff --git a/app.yml.sample b/app.yml.sample index a4bcb3b3..5e9a0005 100644 --- a/app.yml.sample +++ b/app.yml.sample @@ -83,6 +83,18 @@ ## Python library is available and start the Pulsar normally. #message_queue_url: amqp://guest:guest@localhost:5672// +## Alternatively, use pulsar-relay (experimental) with HTTP/HTTPS instead of AMQP. +## This does not require kombu or RabbitMQ. +#message_queue_url: https://relay-server.example.org:9000 +#message_queue_username: admin +#message_queue_password: changeme + +## Optional topic prefix for relay messages. This allows multiple independent +## Galaxy/Pulsar instance pairs to share the same relay server by using different +## prefixes (e.g., "production", "staging"). Must match the relay_topic_prefix +## configured in Galaxy's job_conf.yml if set. +#relay_topic_prefix: production + ## Pulsar loops over waiting for queue messages for a short time before checking ## to see if it has been instructed to shut down. By default this is 0.2 ## seconds. This value is used as the value of the 'timeout' parameter to diff --git a/docs/configure.rst b/docs/configure.rst index 171f0fdf..c8c35a49 100644 --- a/docs/configure.rst +++ b/docs/configure.rst @@ -267,6 +267,16 @@ To configure Pulsar to use pulsar-relay, set the ``message_queue_url`` in The ``http://`` / ``https://`` prefix tells Pulsar to use the proxy communication mode instead of AMQP. +**Optional Topic Prefix** + +You can optionally set a ``relay_topic_prefix`` to namespace your topics. This is useful +when multiple independent Galaxy/Pulsar instance pairs share the same relay:: + + message_queue_url: http://proxy-server.example.org:9000 + message_queue_username: admin + message_queue_password: your_secure_password + relay_topic_prefix: production + .. note:: Unlike AMQP mode, the pulsar-relay mode does **not** require the ``kombu`` @@ -286,6 +296,8 @@ with proxy parameters:: proxy_url: http://proxy-server.example.org:9000 proxy_username: your_username proxy_password: your_secure_password + # Optional topic prefix (must match Pulsar configuration) + # relay_topic_prefix: production execution: @@ -298,6 +310,11 @@ with proxy parameters:: # Remote job staging directory jobs_directory: /data/pulsar/staging +.. note:: + + The ``relay_topic_prefix`` must match on both Galaxy and Pulsar sides. + If set on one side but not the other, messages will not be routed correctly. + Authentication `````````````` @@ -368,18 +385,105 @@ In Galaxy's job configuration, route jobs to specific clusters using the manager: cluster_b # ... other settings +Multiple Galaxy/Pulsar Instance Pairs +`````````````````````````````````````` + +You can have multiple independent Galaxy and Pulsar instance pairs all sharing +the same relay by using different topic prefixes. This is useful for: + +* Running separate production and staging environments +* Supporting multiple research groups with isolated instances +* Multi-tenant deployments + +**Example: Production and Staging Environments** + +**Production Pulsar** (``app.yml``):: + + message_queue_url: https://shared-relay:9000 + message_queue_username: admin + message_queue_password: password + relay_topic_prefix: production + managers: + cluster_a: + type: queued_slurm + +**Staging Pulsar** (``app.yml``):: + + message_queue_url: https://shared-relay:9000 + message_queue_username: admin + message_queue_password: password + relay_topic_prefix: staging + managers: + cluster_a: + type: queued_slurm + +**Production Galaxy** (``job_conf.yml``):: + + runners: + pulsar: + load: galaxy.jobs.runners.pulsar:PulsarMQJobRunner + proxy_url: https://shared-relay:9000 + proxy_username: admin + proxy_password: password + relay_topic_prefix: production + + execution: + environments: + pulsar_jobs: + runner: pulsar + manager: cluster_a + # ... other settings + +**Staging Galaxy** (``job_conf.yml``):: + + runners: + pulsar: + load: galaxy.jobs.runners.pulsar:PulsarMQJobRunner + proxy_url: https://shared-relay:9000 + proxy_username: admin + proxy_password: password + relay_topic_prefix: staging + + execution: + environments: + pulsar_jobs: + runner: pulsar + manager: cluster_a + # ... other settings + +In this setup, the topics will be completely isolated: + +* **Production**: ``production_job_setup_cluster_a``, ``production_job_status_update_cluster_a`` +* **Staging**: ``staging_job_setup_cluster_a``, ``staging_job_status_update_cluster_a`` + Topic Naming ```````````` -Messages are organized by topic with automatic naming based on the manager name: +Messages are organized by topic with automatic naming based on the optional prefix +and manager name: + +* Job setup: ``job_setup`` (default manager, no prefix) +* Job setup: ``job_setup_{manager_name}`` (named manager, no prefix) +* Job setup: ``{prefix}_job_setup`` (default manager, with prefix) +* Job setup: ``{prefix}_job_setup_{manager_name}`` (named manager, with prefix) + +The same pattern applies to other message types: + +* Status requests: ``job_status_request``, ``job_status_request_{manager_name}`` +* Kill commands: ``job_kill``, ``job_kill_{manager_name}`` +* Status updates: ``job_status_update``, ``job_status_update_{manager_name}`` + +When a ``relay_topic_prefix`` is configured, it is prepended to all topic names: + +* ``production_job_setup`` +* ``production_job_setup_cluster_a`` +* ``production_job_status_update_cluster_a`` -* Job setup: ``job_setup_{manager_name}`` or ``job_setup`` (for default manager) -* Status requests: ``job_status_request_{manager_name}`` -* Kill commands: ``job_kill_{manager_name}`` -* Status updates: ``job_status_update_{manager_name}`` +This allows: -This allows multiple Pulsar instances to share the same proxy without message -conflicts. +* Multiple Pulsar instances to share the same relay (using different manager names) +* Multiple independent Galaxy/Pulsar instance pairs to share the same relay + (using different topic prefixes) Comparison with AMQP Mode `````````````````````````` diff --git a/pulsar/client/client.py b/pulsar/client/client.py index 94fb0f28..5349b58c 100644 --- a/pulsar/client/client.py +++ b/pulsar/client/client.py @@ -604,7 +604,7 @@ def launch(self, command_line, dependencies_description=None, env=None, remote_s # Determine topic name based on manager manager_name = self.client_manager.manager_name - topic = f"job_setup_{manager_name}" if manager_name != "_default_" else "job_setup" + topic = self.client_manager._make_topic_name("job_setup", manager_name) # Post message to relay self.client_manager.relay_transport.post_message(topic, launch_params) @@ -618,7 +618,7 @@ def get_status(self): Cached status if available, None otherwise """ manager_name = self.client_manager.manager_name - topic = f"job_status_request_{manager_name}" if manager_name != "_default_" else "job_status_request" + topic = self.client_manager._make_topic_name("job_status_request", manager_name) status_params = { 'job_id': self.job_id, @@ -633,7 +633,7 @@ def get_status(self): def kill(self): """Kill a job by posting a kill message to the relay.""" manager_name = self.client_manager.manager_name - topic = f"job_kill_{manager_name}" if manager_name != "_default_" else "job_kill" + topic = self.client_manager._make_topic_name("job_kill", manager_name) kill_params = {'job_id': self.job_id} self.client_manager.relay_transport.post_message(topic, kill_params) diff --git a/pulsar/client/manager.py b/pulsar/client/manager.py index bb999adf..995e08d8 100644 --- a/pulsar/client/manager.py +++ b/pulsar/client/manager.py @@ -7,7 +7,6 @@ import functools import threading -import time from logging import getLogger from os import getenv from queue import Queue @@ -264,7 +263,7 @@ class RelayClientManager(BaseRemoteConfiguredJobClientManager): """ status_cache: Dict[str, Any] - def __init__(self, relay_url: str, relay_username: str, relay_password: str, **kwds: Dict[str, Any]): + def __init__(self, relay_url: str, relay_username: str, relay_password: str, relay_topic_prefix: str = '', **kwds: Dict[str, Any]): super().__init__(**kwds) if not relay_url: @@ -272,10 +271,12 @@ def __init__(self, relay_url: str, relay_username: str, relay_password: str, **k # Initialize relay transport self.relay_transport = RelayTransport(relay_url, relay_username, relay_password) + self.relay_topic_prefix = relay_topic_prefix self.status_cache = {} self.callback_lock = threading.Lock() self.callback_thread = None self.active = True + self.shutdown_event = threading.Event() def callback_wrapper(self, callback, message_data): """Process status update messages from the relay.""" @@ -298,7 +299,7 @@ def callback_wrapper(self, callback, message_data): def status_consumer(self, callback_wrapper): """Long-poll the relay for status update messages.""" manager_name = self.manager_name - topic = f"job_status_update_{manager_name}" if manager_name != "_default_" else "job_status_update" + topic = self._make_topic_name("job_status_update", manager_name) log.info("Starting relay status consumer for topic '%s'", topic) @@ -314,12 +315,14 @@ def status_consumer(self, callback_wrapper): if self.active: log.exception("Exception while polling for status updates from relay, will retry.") # Brief sleep before retrying to avoid tight loop on persistent errors - time.sleep(5) + # Use wait() instead of sleep() to allow immediate interruption on shutdown + if self.shutdown_event.wait(timeout=5): + break else: log.debug("Exception during shutdown, ignoring.") break - log.debug("Leaving Pulsar client relay status consumer, no additional updates will be processed.") + log.info("Done consuming relay status updates for topic %s", topic) def ensure_has_status_update_callback(self, callback): """Start a thread to poll for status updates if not already running.""" @@ -333,7 +336,10 @@ def ensure_has_status_update_callback(self, callback): name="pulsar_client_%s_relay_status_consumer" % self.manager_name, target=run ) - thread.daemon = False # Don't interrupt processing + # Make daemon so Python can exit even if thread is blocked in HTTP request. + # Unlike MessageQueueClientManager which uses AMQP connections that can be + # interrupted cleanly, HTTP long-poll requests block until timeout. + thread.daemon = True thread.start() self.callback_thread = thread @@ -341,9 +347,36 @@ def ensure_has_ack_consumers(self): """No-op for relay client manager, as acknowledgements are handled via HTTP.""" pass + def _make_topic_name(self, base_topic: str, manager_name: str) -> str: + """Create a topic name with optional prefix and manager suffix. + + Args: + base_topic: Base topic name (e.g., 'job_setup', 'job_status_update') + manager_name: Manager name (e.g., '_default_', 'cluster_a') + + Returns: + Fully qualified topic name + """ + parts = [] + + # Add prefix if provided + if self.relay_topic_prefix: + parts.append(self.relay_topic_prefix) + + # Add base topic + parts.append(base_topic) + + # Add manager name if not default + if manager_name != "_default_": + parts.append(manager_name) + + return "_".join(parts) + def shutdown(self, ensure_cleanup: bool = False): """Shutdown the client manager and cleanup resources.""" self.active = False + # Signal the shutdown event to interrupt any waiting threads + self.shutdown_event.set() if ensure_cleanup: if self.callback_thread is not None: self.callback_thread.join() @@ -391,6 +424,7 @@ def build_client_manager( relay_url: Optional[str] = None, relay_username: Optional[str] = None, relay_password: Optional[str] = None, + relay_topic_prefix: Optional[str] = None, amqp_url: Optional[str] = None, k8s_enabled: Optional[bool] = None, tes_enabled: Optional[bool] = None, @@ -401,7 +435,13 @@ def build_client_manager( return ClientManager(job_manager=job_manager, **kwargs) # TODO: Consider more separation here. elif relay_url: assert relay_password and relay_username, "relay_url set, but relay_username and relay_password must also be set" - return RelayClientManager(relay_url=relay_url, relay_username=relay_username, relay_password=relay_password, **kwargs) + return RelayClientManager( + relay_url=relay_url, + relay_username=relay_username, + relay_password=relay_password, + relay_topic_prefix=relay_topic_prefix or '', + **kwargs + ) elif amqp_url: return MessageQueueClientManager(amqp_url=amqp_url, **kwargs) elif k8s_enabled or tes_enabled or gcp_batch_enabled: diff --git a/pulsar/client/test/test_relay_transport.py b/pulsar/client/test/test_relay_transport.py new file mode 100644 index 00000000..dc863727 --- /dev/null +++ b/pulsar/client/test/test_relay_transport.py @@ -0,0 +1,251 @@ +""" +Tests for the relay transport implementation. + +Tests retry logic and message ID tracking functionality. +""" +from unittest.mock import Mock, patch +import pytest +import requests + +from pulsar.client.transport.relay import RelayTransport, RelayTransportError + + +class TestRetryLogic: + """Test retry logic with exponential backoff.""" + + @patch('pulsar.client.transport.relay.time.sleep') + def test_post_message_retries_on_connection_error(self, mock_sleep): + """Test that post_message retries indefinitely on connection errors.""" + transport = RelayTransport('http://localhost:8000', 'user', 'pass') + + # Mock the auth manager to return a token + transport.auth_manager.get_token = Mock(return_value='test-token') + + # Mock session.post to fail twice with ConnectionError, then succeed + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + 'message_id': 'msg_123', + 'topic': 'test-topic', + 'timestamp': '2025-10-27T10:00:00Z' + } + + transport.session.post = Mock( + side_effect=[ + requests.ConnectionError("Connection refused"), + requests.ConnectionError("Connection refused"), + mock_response + ] + ) + + result = transport.post_message('test-topic', {'data': 'test'}) + + # Verify it succeeded after retries + assert result['message_id'] == 'msg_123' + assert transport.session.post.call_count == 3 + # Verify exponential backoff was used + assert mock_sleep.call_count == 2 + # First delay should be 1.0, second should be 2.0 + assert mock_sleep.call_args_list[0][0][0] == 1.0 + assert mock_sleep.call_args_list[1][0][0] == 2.0 + + @patch('pulsar.client.transport.relay.time.sleep') + def test_post_message_retries_on_500_error(self, mock_sleep): + """Test that post_message retries on 5xx server errors.""" + transport = RelayTransport('http://localhost:8000', 'user', 'pass') + transport.auth_manager.get_token = Mock(return_value='test-token') + + # Mock responses: 500, 503, then 200 + mock_500 = Mock() + mock_500.status_code = 500 + + mock_503 = Mock() + mock_503.status_code = 503 + + mock_200 = Mock() + mock_200.status_code = 200 + mock_200.json.return_value = { + 'message_id': 'msg_456', + 'topic': 'test-topic', + 'timestamp': '2025-10-27T10:00:00Z' + } + + transport.session.post = Mock(side_effect=[mock_500, mock_503, mock_200]) + + result = transport.post_message('test-topic', {'data': 'test'}) + + assert result['message_id'] == 'msg_456' + assert transport.session.post.call_count == 3 + assert mock_sleep.call_count == 2 + + def test_post_message_does_not_retry_on_400_error(self): + """Test that post_message does NOT retry on 4xx client errors.""" + transport = RelayTransport('http://localhost:8000', 'user', 'pass') + transport.auth_manager.get_token = Mock(return_value='test-token') + + # Mock response with 400 error + mock_400 = Mock() + mock_400.status_code = 400 + + # Create HTTPError with response attached + error = requests.HTTPError("400 Bad Request") + error.response = mock_400 + mock_400.raise_for_status.side_effect = error + + transport.session.post = Mock(return_value=mock_400) + + with pytest.raises(RelayTransportError): + transport.post_message('test-topic', {'data': 'test'}) + + # Should only be called once (no retries for 4xx) + assert transport.session.post.call_count == 1 + + @patch('pulsar.client.transport.relay.time.sleep') + def test_post_message_retries_on_timeout(self, mock_sleep): + """Test that post_message retries on timeout.""" + transport = RelayTransport('http://localhost:8000', 'user', 'pass') + transport.auth_manager.get_token = Mock(return_value='test-token') + + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + 'message_id': 'msg_789', + 'topic': 'test-topic', + 'timestamp': '2025-10-27T10:00:00Z' + } + + transport.session.post = Mock( + side_effect=[ + requests.Timeout("Request timed out"), + mock_response + ] + ) + + result = transport.post_message('test-topic', {'data': 'test'}) + + assert result['message_id'] == 'msg_789' + assert transport.session.post.call_count == 2 + assert mock_sleep.call_count == 1 + + @patch('pulsar.client.transport.relay.time.sleep') + def test_retry_backoff_caps_at_max_delay(self, mock_sleep): + """Test that exponential backoff caps at max_delay.""" + transport = RelayTransport('http://localhost:8000', 'user', 'pass') + transport.auth_manager.get_token = Mock(return_value='test-token') + + # Create many connection errors to test max delay + errors = [requests.ConnectionError("Connection refused")] * 10 + + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + 'message_id': 'msg_999', + 'topic': 'test-topic', + 'timestamp': '2025-10-27T10:00:00Z' + } + + transport.session.post = Mock(side_effect=errors + [mock_response]) + + result = transport.post_message('test-topic', {'data': 'test'}) + + assert result['message_id'] == 'msg_999' + assert mock_sleep.call_count == 10 + + # Check that delay caps at 60 seconds + delays = [call[0][0] for call in mock_sleep.call_args_list] + # Expected: 1, 2, 4, 8, 16, 32, 60, 60, 60, 60 + assert delays[0] == 1.0 + assert delays[1] == 2.0 + assert delays[2] == 4.0 + assert delays[3] == 8.0 + assert delays[4] == 16.0 + assert delays[5] == 32.0 + # After this, should cap at 60 + assert all(d == 60.0 for d in delays[6:]) + + +class TestMessageIDTracking: + """Test message ID tracking functionality.""" + + def test_long_poll_tracks_message_ids(self): + """Test that long_poll tracks message IDs per topic.""" + transport = RelayTransport('http://localhost:8000', 'user', 'pass') + transport.auth_manager.get_token = Mock(return_value='test-token') + + # Mock response with messages from different topics + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + 'messages': [ + {'topic': 'topic1', 'message_id': 'msg_001', 'payload': {'data': 'a'}}, + {'topic': 'topic2', 'message_id': 'msg_002', 'payload': {'data': 'b'}}, + {'topic': 'topic1', 'message_id': 'msg_003', 'payload': {'data': 'c'}}, + ], + 'has_more': False + } + + transport.session.post = Mock(return_value=mock_response) + + messages = transport.long_poll(['topic1', 'topic2']) + + # Verify message IDs are tracked (last message ID per topic) + assert transport.get_last_message_id('topic1') == 'msg_003' + assert transport.get_last_message_id('topic2') == 'msg_002' + assert len(messages) == 3 + + def test_long_poll_uses_tracked_message_ids_in_since(self): + """Test that long_poll includes tracked message IDs in the since parameter.""" + transport = RelayTransport('http://localhost:8000', 'user', 'pass') + transport.auth_manager.get_token = Mock(return_value='test-token') + + # Set some tracked message IDs + transport.set_last_message_id('topic1', 'msg_100') + transport.set_last_message_id('topic2', 'msg_200') + + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + 'messages': [], + 'has_more': False + } + + transport.session.post = Mock(return_value=mock_response) + + # Call long_poll + transport.long_poll(['topic1', 'topic2']) + + # Verify the 'since' parameter was included in the request + call_args = transport.session.post.call_args + request_json = call_args[1]['json'] + + assert 'since' in request_json + assert request_json['since']['topic1'] == 'msg_100' + assert request_json['since']['topic2'] == 'msg_200' + + def test_long_poll_only_includes_since_for_requested_topics(self): + """Test that since only includes tracked IDs for topics in the request.""" + transport = RelayTransport('http://localhost:8000', 'user', 'pass') + transport.auth_manager.get_token = Mock(return_value='test-token') + + # Set tracked message IDs for multiple topics + transport.set_last_message_id('topic1', 'msg_100') + transport.set_last_message_id('topic2', 'msg_200') + transport.set_last_message_id('topic3', 'msg_300') + + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {'messages': [], 'has_more': False} + + transport.session.post = Mock(return_value=mock_response) + + # Only poll for topic1 and topic2 + transport.long_poll(['topic1', 'topic2']) + + call_args = transport.session.post.call_args + request_json = call_args[1]['json'] + + # Should only include topic1 and topic2 in since + assert 'since' in request_json + assert 'topic1' in request_json['since'] + assert 'topic2' in request_json['since'] + assert 'topic3' not in request_json['since'] diff --git a/pulsar/client/transport/relay.py b/pulsar/client/transport/relay.py index 9f13c9f5..576aebb7 100644 --- a/pulsar/client/transport/relay.py +++ b/pulsar/client/transport/relay.py @@ -5,7 +5,8 @@ authentication with the relay server. """ import logging -from typing import Any, Dict, List, Optional +import time +from typing import Any, Callable, Dict, List, Optional import requests @@ -41,6 +42,7 @@ def __init__(self, relay_url: str, username: str, password: str, timeout: int = self.auth_manager = RelayAuthManager(relay_url, username, password) self.timeout = timeout self.session = requests.Session() + self._last_message_ids: Dict[str, str] = {} # Track last message ID per topic def _get_headers(self) -> Dict[str, str]: """Get HTTP headers including authentication token. @@ -54,6 +56,92 @@ def _get_headers(self) -> Dict[str, str]: 'Content-Type': 'application/json' } + def _retry_with_backoff( + self, + operation: Callable[[], requests.Response], + operation_name: str, + initial_delay: float = 1.0, + max_delay: float = 60.0, + backoff_factor: float = 2.0 + ) -> requests.Response: + """Retry an operation with exponential backoff. + + Retries indefinitely for communication errors and 5xx errors, + but not for 4xx errors (client errors). + + Args: + operation: Function that performs the HTTP request + operation_name: Name of the operation for logging + initial_delay: Initial delay in seconds before first retry + max_delay: Maximum delay in seconds between retries + backoff_factor: Multiplier for exponential backoff + + Returns: + requests.Response object from successful request + + Raises: + RelayTransportError: For 4xx client errors (non-retryable) + """ + attempt = 0 + delay = initial_delay + + while True: + attempt += 1 + try: + response = operation() + + # Check for client errors (4xx) - don't retry these + if 400 <= response.status_code < 500: + # Let caller handle 401 separately + if response.status_code == 401: + return response + # Other 4xx errors are client errors, don't retry + response.raise_for_status() + return response + + # Check for server errors (5xx) - retry these + if response.status_code >= 500: + log.warning( + "%s failed with status %d (attempt %d), retrying in %.1f seconds", + operation_name, response.status_code, attempt, delay + ) + time.sleep(delay) + delay = min(delay * backoff_factor, max_delay) + continue + + # Success + return response + + except requests.ConnectionError as e: + log.warning( + "%s connection error (attempt %d): %s, retrying in %.1f seconds", + operation_name, attempt, e, delay + ) + time.sleep(delay) + delay = min(delay * backoff_factor, max_delay) + + except requests.Timeout as e: + log.warning( + "%s timeout (attempt %d): %s, retrying in %.1f seconds", + operation_name, attempt, e, delay + ) + time.sleep(delay) + delay = min(delay * backoff_factor, max_delay) + + except requests.RequestException as e: + # For other request exceptions, check if it's a client error + if hasattr(e, 'response') and e.response is not None: + if 400 <= e.response.status_code < 500: + # Client error, don't retry + raise RelayTransportError(f"{operation_name} failed: {e}") + # Otherwise retry + log.warning( + "%s error (attempt %d): %s, retrying in %.1f seconds", + operation_name, attempt, e, delay + ) + time.sleep(delay) + delay = min(delay * backoff_factor, max_delay) + def post_message( self, topic: str, @@ -63,6 +151,9 @@ def post_message( ) -> Dict[str, Any]: """Post a single message to the relay. + Automatically retries on communication errors and server errors (5xx) + with exponential backoff. Does not retry on client errors (4xx). + Args: topic: Topic name to publish to payload: Message payload (must be JSON-serializable) @@ -73,7 +164,7 @@ def post_message( Response dictionary with message_id, topic, and timestamp Raises: - RelayTransportError: If the request fails + RelayTransportError: If the request fails with a client error (4xx) """ url = f"{self.relay_url}/api/v1/messages" @@ -88,38 +179,35 @@ def post_message( if metadata is not None: message_data['metadata'] = metadata - try: - response = self.session.post( + def _post() -> requests.Response: + return self.session.post( url, json=message_data, headers=self._get_headers(), timeout=self.timeout ) - if response.status_code == 401: - # Token might have expired, invalidate and retry once - log.debug("Received 401, invalidating token and retrying") - self.auth_manager.invalidate() - response = self.session.post( - url, - json=message_data, - headers=self._get_headers(), - timeout=self.timeout - ) + # Use retry logic with exponential backoff + response = self._retry_with_backoff(_post, "post_message") - response.raise_for_status() - result = response.json() + if response.status_code == 401: + # Token might have expired, invalidate and retry + log.debug("Received 401, invalidating token and retrying") + self.auth_manager.invalidate() + response = self._retry_with_backoff(_post, "post_message") - log.debug("Posted message to topic '%s': message_id=%s", topic, result.get('message_id')) - return result + response.raise_for_status() + result = response.json() - except requests.RequestException as e: - log.error("Failed to post message to topic '%s': %s", topic, e) - raise RelayTransportError(f"Failed to post message: {e}") + log.debug("Posted message to topic '%s': message_id=%s", topic, result.get('message_id')) + return result def post_bulk_messages(self, messages: List[Dict[str, Any]]) -> Dict[str, Any]: """Post multiple messages in a single request. + Automatically retries on communication errors and server errors (5xx) + with exponential backoff. Does not retry on client errors (4xx). + Args: messages: List of message dictionaries, each containing 'topic' and 'payload' @@ -127,52 +215,46 @@ def post_bulk_messages(self, messages: List[Dict[str, Any]]) -> Dict[str, Any]: Response dictionary with results and summary Raises: - RelayTransportError: If the request fails + RelayTransportError: If the request fails with a client error (4xx) """ url = f"{self.relay_url}/api/v1/messages/bulk" request_data = {'messages': messages} - try: - response = self.session.post( + def _post() -> requests.Response: + return self.session.post( url, json=request_data, headers=self._get_headers(), timeout=self.timeout ) - if response.status_code == 401: - self.auth_manager.invalidate() - response = self.session.post( - url, - json=request_data, - headers=self._get_headers(), - timeout=self.timeout - ) + # Use retry logic with exponential backoff + response = self._retry_with_backoff(_post, "post_bulk_messages") - response.raise_for_status() - result = response.json() + if response.status_code == 401: + self.auth_manager.invalidate() + response = self._retry_with_backoff(_post, "post_bulk_messages") - log.debug("Posted %d messages in bulk", len(messages)) - return result + response.raise_for_status() + result = response.json() - except requests.RequestException as e: - log.error("Failed to post bulk messages: %s", e) - raise RelayTransportError(f"Failed to post bulk messages: {e}") + log.debug("Posted %d messages in bulk", len(messages)) + return result def long_poll( self, topics: List[str], - since: Optional[Dict[str, str]] = None, - timeout: int = 30 + timeout: int = 30, ) -> List[Dict[str, Any]]: """Poll for messages from specified topics. This is a blocking call that waits up to 'timeout' seconds for new messages. + Automatically tracks the last message ID per topic and includes it in + subsequent poll requests to ensure no messages are missed. Args: topics: List of topic names to subscribe to - since: Optional dict mapping topic names to last seen message IDs timeout: Maximum seconds to wait for messages (1-60) Returns: @@ -188,8 +270,15 @@ def long_poll( 'timeout': min(max(timeout, 1), 60) # Clamp to 1-60 range } - if since is not None: - poll_data['since'] = since + # Build since dict from tracked message IDs for requested topics + tracked_since = {} + for topic in topics: + msg_id = self.get_last_message_id(topic) + if msg_id is not None: + tracked_since[topic] = msg_id + if tracked_since: + poll_data['since'] = tracked_since + log.debug("Using tracked message IDs for poll: %s", tracked_since) try: response = self.session.post( @@ -211,9 +300,26 @@ def long_poll( response.raise_for_status() result = response.json() - messages = result.get('messages', []) - if messages: - log.debug("Received %d messages from long poll", len(messages)) + messages = result.get('messages') or [] + + # Track the last message ID per topic + for message in messages: + topic = message.get('topic') + message_id = message.get('message_id') + if topic and message_id: + self.set_last_message_id(topic, message_id) + + # Build dict of tracked IDs for the requested topics + tracked_ids = {} + for topic in topics: + msg_id = self.get_last_message_id(topic) + if msg_id is not None: + tracked_ids[topic] = msg_id + log.debug( + "Received %d messages from long poll, tracked IDs: %s", + len(messages), + tracked_ids + ) return messages @@ -226,6 +332,52 @@ def long_poll( log.error("Failed to long poll: %s", e) raise RelayTransportError(f"Failed to long poll: {e}") + def get_last_message_id(self, topic: str) -> Optional[str]: + """Get the last tracked message ID for a topic. + + Args: + topic: Topic name + + Returns: + Last message ID for the topic, or None if not tracked + """ + return self._last_message_ids.get(topic) + + def get_all_tracked_message_ids(self) -> Dict[str, str]: + """Get all tracked message IDs. + + Returns: + Dictionary mapping topic names to last message IDs + """ + return self._last_message_ids.copy() + + def set_last_message_id(self, topic: str, message_id: str) -> None: + """Manually set the last message ID for a topic. + + This can be useful for resuming from a specific message ID + after a restart. + + Args: + topic: Topic name + message_id: Message ID to set as the last seen message + """ + self._last_message_ids[topic] = message_id + + def clear_tracked_message_ids(self, topic: Optional[str] = None) -> None: + """Clear tracked message IDs. + + Args: + topic: If provided, clears only the specified topic. + If None, clears all tracked message IDs. + """ + if topic is not None: + if topic in self._last_message_ids: + del self._last_message_ids[topic] + log.debug("Cleared tracked message ID for topic '%s'", topic) + else: + self._last_message_ids.clear() + log.debug("Cleared all tracked message IDs") + def close(self): """Close the transport and cleanup resources.""" self.session.close() diff --git a/pulsar/messaging/bind_relay.py b/pulsar/messaging/bind_relay.py index 2d90fdcf..10150beb 100644 --- a/pulsar/messaging/bind_relay.py +++ b/pulsar/messaging/bind_relay.py @@ -34,6 +34,9 @@ def bind_manager_to_relay(manager, relay_state: RelayState, relay_url, conf): if not password: raise Exception("message_queue_password is required for relay communication") + # Extract optional relay topic prefix + relay_topic_prefix = conf.get('relay_topic_prefix', '') + # Create relay transport relay_transport = RelayTransport(relay_url, username, password) @@ -42,11 +45,11 @@ def bind_manager_to_relay(manager, relay_state: RelayState, relay_url, conf): process_kill_messages = functools.partial(__process_kill_message, manager) process_status_messages = functools.partial(__process_status_message, manager) - # Determine topics based on manager name - setup_topic = f"job_setup_{manager_name}" if manager_name != "_default_" else "job_setup" - status_request_topic = f"job_status_request_{manager_name}" if manager_name != "_default_" else "job_status_request" - kill_topic = f"job_kill_{manager_name}" if manager_name != "_default_" else "job_kill" - status_update_topic = f"job_status_update_{manager_name}" if manager_name != "_default_" else "job_status_update" + # Determine topics based on manager name and optional prefix + setup_topic = __make_topic_name(relay_topic_prefix, "job_setup", manager_name) + status_request_topic = __make_topic_name(relay_topic_prefix, "job_status_request", manager_name) + kill_topic = __make_topic_name(relay_topic_prefix, "job_kill", manager_name) + status_update_topic = __make_topic_name(relay_topic_prefix, "job_status_update", manager_name) # Start consumer threads if message_queue_consume is enabled if conf.get("message_queue_consume", True): @@ -206,3 +209,36 @@ def __client_job_id_from_body(body): """ job_id = body.get("job_id", None) return job_id + + +def __make_topic_name(prefix, base_topic, manager_name): + """Create a topic name with optional prefix and manager suffix. + + Args: + prefix: Optional prefix string (e.g., 'galaxy1', 'prod') + base_topic: Base topic name (e.g., 'job_setup', 'job_status_update') + manager_name: Manager name (e.g., '_default_', 'cluster_a') + + Returns: + Fully qualified topic name + + Examples: + __make_topic_name('', 'job_setup', '_default_') -> 'job_setup' + __make_topic_name('', 'job_setup', 'cluster_a') -> 'job_setup_cluster_a' + __make_topic_name('prod', 'job_setup', '_default_') -> 'prod_job_setup' + __make_topic_name('prod', 'job_setup', 'cluster_a') -> 'prod_job_setup_cluster_a' + """ + parts = [] + + # Add prefix if provided + if prefix: + parts.append(prefix) + + # Add base topic + parts.append(base_topic) + + # Add manager name if not default + if manager_name != "_default_": + parts.append(manager_name) + + return "_".join(parts)