diff --git a/config-suite.toml b/config-suite.toml new file mode 100644 index 0000000..e69de29 diff --git a/git_hg_sync/application.py b/git_hg_sync/application.py index 89e89c6..89603bb 100644 --- a/git_hg_sync/application.py +++ b/git_hg_sync/application.py @@ -45,9 +45,7 @@ def signal_handler(_sig: int, _frame: FrameType | None) -> None: self._worker.run() def _handle_push_event(self, push_event: Push) -> None: - logger.debug( - f"Handling push event: {push_event.push_id} for {push_event.repo_url}" - ) + logger.debug(f"Handling event {push_event}") synchronizer = self._repo_synchronizers[push_event.repo_url] operations_by_destination: dict[str, list[SyncOperation]] = {} @@ -59,7 +57,7 @@ def _handle_push_event(self, push_event: Push) -> None: ).append(match.operation) if not operations_by_destination: - logger.warning(f"No operation for {push_event}") + logger.warning(f"No operation for event {push_event}") return for destination, operations in operations_by_destination.items(): @@ -76,13 +74,11 @@ def _handle_push_event(self, push_event: Push) -> None: } ) logger.warning( - f"An error prevented completion of the following sync operations. {error_data}", + f"An error prevented completion of the following sync operations from event {push_event}. {error_data}", exc_info=True, ) raise exc - logger.info( - f"Successfully handled event: {push_event.push_id} for {push_event.repo_url}" - ) + logger.info(f"Successfully handled event {push_event}") def _handle_event(self, event: Event) -> None: if event.repo_url not in self._repo_synchronizers: diff --git a/git_hg_sync/events.py b/git_hg_sync/events.py index 65c7a6e..bb43ded 100644 --- a/git_hg_sync/events.py +++ b/git_hg_sync/events.py @@ -20,6 +20,9 @@ class Push(BaseModel): user: str push_json_url: str + def __str__(self) -> str: + return f"Push {self.push_id} for {self.repo_url}" + @model_validator(mode="after") def check_branch_tags(self) -> Self: """Check that at least one of branches or tags is not empty.""" diff --git a/git_hg_sync/pulse_worker.py b/git_hg_sync/pulse_worker.py index b241a75..5d77cdb 100644 --- a/git_hg_sync/pulse_worker.py +++ b/git_hg_sync/pulse_worker.py @@ -52,7 +52,13 @@ def get_consumers( _channel: Any, ) -> list[kombu.Consumer]: consumer = consumer_class( - self.task_queue, auto_declare=False, callbacks=[self.on_task] + self.task_queue, + auto_declare=False, + callbacks=[self.on_task], + # We only fetch one message at a time in case processing it fails. + # This allows us to ensure strict ordering, by re-receiving the same message + # on the next loop after having requeued it after failure. + prefetch_count=1, ) logger.debug(f"Using consumer {consumer=}") return [consumer] diff --git a/rabbitmq/definitions.json b/rabbitmq/definitions.json index c502c14..f92494b 100644 --- a/rabbitmq/definitions.json +++ b/rabbitmq/definitions.json @@ -2,10 +2,10 @@ "bindings": [ { "arguments": {}, - "destination": "queue/git-hg-sync/sync", + "destination": "queue/guest/test", "destination_type": "queue", - "routing_key": "git-hg-sync", - "source": "exchange/git-hg-sync/test", + "routing_key": "#", + "source": "exchange/guest/test", "vhost": "/" } ], @@ -15,8 +15,8 @@ "auto_delete": false, "durable": true, "internal": false, - "name": "exchange/git-hg-sync/test", - "type": "direct", + "name": "exchange/guest/test", + "type": "topic", "vhost": "/" } ], @@ -44,7 +44,7 @@ "arguments": {}, "auto_delete": false, "durable": true, - "name": "queue/git-hg-sync/sync", + "name": "queue/guest/test", "vhost": "/" } ], diff --git a/tests/pulse_utils.py b/tests/pulse_utils.py index 53cfee8..250e868 100644 --- a/tests/pulse_utils.py +++ b/tests/pulse_utils.py @@ -39,7 +39,7 @@ def send_pulse_message( connection.connect() with connection: - ex = kombu.Exchange(exchange, type="direct") + ex = kombu.Exchange(exchange, type="topic") queue = kombu.Queue( name=queue_name, exchange=exchange, @@ -48,7 +48,7 @@ def send_pulse_message( exclusive=False, auto_delete=False, ) - queue(connection).declare() + queue(connection).queue_declare() if purge: channel = connection.channel() channel.queue_purge(queue.name) diff --git a/tests/test_integration.py b/tests/test_integration.py index e3373df..446ed00 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -15,6 +15,7 @@ from git_hg_sync.__main__ import get_connection, get_queue, start_app from git_hg_sync.config import Config, PulseConfig +from git_hg_sync.events import Event from git_hg_sync.pulse_worker import PulseWorker NO_RABBITMQ = os.getenv("RABBITMQ") != "true" @@ -139,3 +140,47 @@ def test_no_duplicated_ack_messages( worker.run() callback.assert_called_once() + + +@pytest.mark.skipif(NO_RABBITMQ, reason="This test doesn't work without rabbitMq") +def test_messages_in_order( + test_config: Config, + get_payload: Callable, +) -> None: + """This test checks that long-running messages are not processed more than once. + + It may also timeout, which is likely indicative of the same issue. + """ + connection = get_connection(test_config.pulse) + queue = get_queue(test_config.pulse) + queue(connection).queue_declare() + queue(connection).queue_bind() + + worker = PulseWorker(connection, queue, one_shot=False) + + events_log = [] + + def event_handler(event: Event) -> None: + push_id = event.push_id + already_seen = push_id in events_log + + events_log.append(push_id) + + # Terminate the worker after processing the expected number of messages. + if len(events_log) == 4: + worker.should_stop = True + + if not already_seen: + raise Exception("Not seen yet") + + worker.event_handler = event_handler + + pulse_utils.send_pulse_message( + test_config.pulse, get_payload(push_id=0), purge=True + ) + pulse_utils.send_pulse_message( + test_config.pulse, get_payload(push_id=1), purge=False + ) + worker.run() + + assert events_log == [0, 0, 1, 1]