From 9f17f5748f93247594c595d9ac41f41a408fdb5f Mon Sep 17 00:00:00 2001 From: Olivier Mehani Date: Wed, 14 Jan 2026 14:03:12 +1100 Subject: [PATCH 01/15] application: log push_id consistently --- git_hg_sync/application.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/git_hg_sync/application.py b/git_hg_sync/application.py index 89e89c6..7506205 100644 --- a/git_hg_sync/application.py +++ b/git_hg_sync/application.py @@ -45,9 +45,8 @@ def signal_handler(_sig: int, _frame: FrameType | None) -> None: self._worker.run() def _handle_push_event(self, push_event: Push) -> None: - logger.debug( - f"Handling push event: {push_event.push_id} for {push_event.repo_url}" - ) + push_event_str = f"{push_event.push_id} for {push_event.repo_url}" + logger.debug(f"Handling push event: {push_event_str}") synchronizer = self._repo_synchronizers[push_event.repo_url] operations_by_destination: dict[str, list[SyncOperation]] = {} @@ -59,7 +58,7 @@ def _handle_push_event(self, push_event: Push) -> None: ).append(match.operation) if not operations_by_destination: - logger.warning(f"No operation for {push_event}") + logger.warning(f"No operation for push event: {push_event_str}") return for destination, operations in operations_by_destination.items(): @@ -76,13 +75,11 @@ def _handle_push_event(self, push_event: Push) -> None: } ) logger.warning( - f"An error prevented completion of the following sync operations. {error_data}", + f"An error prevented completion of the following sync operations from push event {push_event_str}. {error_data}", exc_info=True, ) raise exc - logger.info( - f"Successfully handled event: {push_event.push_id} for {push_event.repo_url}" - ) + logger.info(f"Successfully handled event: {push_event_str}") def _handle_event(self, event: Event) -> None: if event.repo_url not in self._repo_synchronizers: From 7157351c91fcdac0bc4830d6bcfa56367dcd75ca Mon Sep 17 00:00:00 2001 From: Olivier Mehani Date: Wed, 14 Jan 2026 14:51:37 +1100 Subject: [PATCH 02/15] tests: Pulse exchanges MUST be topic exchanges So we match this in the test code. https://wiki.mozilla.org/Auto-tools/Projects/Pulse#:~:text=Exchanges%20MUST%20be%20topic%20exchanges --- rabbitmq/definitions.json | 12 ++++++------ tests/pulse_utils.py | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/rabbitmq/definitions.json b/rabbitmq/definitions.json index c502c14..f92494b 100644 --- a/rabbitmq/definitions.json +++ b/rabbitmq/definitions.json @@ -2,10 +2,10 @@ "bindings": [ { "arguments": {}, - "destination": "queue/git-hg-sync/sync", + "destination": "queue/guest/test", "destination_type": "queue", - "routing_key": "git-hg-sync", - "source": "exchange/git-hg-sync/test", + "routing_key": "#", + "source": "exchange/guest/test", "vhost": "/" } ], @@ -15,8 +15,8 @@ "auto_delete": false, "durable": true, "internal": false, - "name": "exchange/git-hg-sync/test", - "type": "direct", + "name": "exchange/guest/test", + "type": "topic", "vhost": "/" } ], @@ -44,7 +44,7 @@ "arguments": {}, "auto_delete": false, "durable": true, - "name": "queue/git-hg-sync/sync", + "name": "queue/guest/test", "vhost": "/" } ], diff --git a/tests/pulse_utils.py b/tests/pulse_utils.py index 53cfee8..250e868 100644 --- a/tests/pulse_utils.py +++ b/tests/pulse_utils.py @@ -39,7 +39,7 @@ def send_pulse_message( connection.connect() with connection: - ex = kombu.Exchange(exchange, type="direct") + ex = kombu.Exchange(exchange, type="topic") queue = kombu.Queue( name=queue_name, exchange=exchange, @@ -48,7 +48,7 @@ def send_pulse_message( exclusive=False, auto_delete=False, ) - queue(connection).declare() + queue(connection).queue_declare() if purge: channel = connection.channel() channel.queue_purge(queue.name) From 7accc403e1a2774ae21dae4d35257e79cbdcadb9 Mon Sep 17 00:00:00 2001 From: Olivier Mehani Date: Wed, 14 Jan 2026 15:50:58 +1100 Subject: [PATCH 03/15] test_integration: test reordering on message requeing (bug 1981900) --- tests/test_integration.py | 45 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/tests/test_integration.py b/tests/test_integration.py index e3373df..446ed00 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -15,6 +15,7 @@ from git_hg_sync.__main__ import get_connection, get_queue, start_app from git_hg_sync.config import Config, PulseConfig +from git_hg_sync.events import Event from git_hg_sync.pulse_worker import PulseWorker NO_RABBITMQ = os.getenv("RABBITMQ") != "true" @@ -139,3 +140,47 @@ def test_no_duplicated_ack_messages( worker.run() callback.assert_called_once() + + +@pytest.mark.skipif(NO_RABBITMQ, reason="This test doesn't work without rabbitMq") +def test_messages_in_order( + test_config: Config, + get_payload: Callable, +) -> None: + """This test checks that long-running messages are not processed more than once. + + It may also timeout, which is likely indicative of the same issue. + """ + connection = get_connection(test_config.pulse) + queue = get_queue(test_config.pulse) + queue(connection).queue_declare() + queue(connection).queue_bind() + + worker = PulseWorker(connection, queue, one_shot=False) + + events_log = [] + + def event_handler(event: Event) -> None: + push_id = event.push_id + already_seen = push_id in events_log + + events_log.append(push_id) + + # Terminate the worker after processing the expected number of messages. + if len(events_log) == 4: + worker.should_stop = True + + if not already_seen: + raise Exception("Not seen yet") + + worker.event_handler = event_handler + + pulse_utils.send_pulse_message( + test_config.pulse, get_payload(push_id=0), purge=True + ) + pulse_utils.send_pulse_message( + test_config.pulse, get_payload(push_id=1), purge=False + ) + worker.run() + + assert events_log == [0, 0, 1, 1] From 43213f9392bf5768df50e5200293680c196bf99b Mon Sep 17 00:00:00 2001 From: Olivier Mehani Date: Wed, 14 Jan 2026 16:10:46 +1100 Subject: [PATCH 04/15] pulse_worker: don't prefetch more than one message (bug 1981900) If the first message fails and needs requeuing, the Consumer will immediately be given the next message in the batch. This leads to those message being processed out-of-order. By only fetching one message at a time, the Consumer will retrieve the failed message again on the next fetch. It will continue looping on this message forever until it succeeds, or someone intervenes. This is the desired behaviour. This ensures that no message goes silently unprocessed. --- git_hg_sync/pulse_worker.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/git_hg_sync/pulse_worker.py b/git_hg_sync/pulse_worker.py index b241a75..8afdb04 100644 --- a/git_hg_sync/pulse_worker.py +++ b/git_hg_sync/pulse_worker.py @@ -52,7 +52,10 @@ def get_consumers( _channel: Any, ) -> list[kombu.Consumer]: consumer = consumer_class( - self.task_queue, auto_declare=False, callbacks=[self.on_task] + self.task_queue, + auto_declare=False, + callbacks=[self.on_task], + prefetch_count=1, ) logger.debug(f"Using consumer {consumer=}") return [consumer] From 7d90b152a867d41f25800212d04bf7737df485e2 Mon Sep 17 00:00:00 2001 From: Olivier Mehani Date: Thu, 15 Jan 2026 12:50:19 +1100 Subject: [PATCH 05/15] fixup! application: log push_id consistently --- git_hg_sync/application.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/git_hg_sync/application.py b/git_hg_sync/application.py index 7506205..7f46230 100644 --- a/git_hg_sync/application.py +++ b/git_hg_sync/application.py @@ -45,8 +45,7 @@ def signal_handler(_sig: int, _frame: FrameType | None) -> None: self._worker.run() def _handle_push_event(self, push_event: Push) -> None: - push_event_str = f"{push_event.push_id} for {push_event.repo_url}" - logger.debug(f"Handling push event: {push_event_str}") + logger.debug(f"Handling push event: {push_event}") synchronizer = self._repo_synchronizers[push_event.repo_url] operations_by_destination: dict[str, list[SyncOperation]] = {} From 4715c8db31c3263750ad5a0fac031d6ed6792253 Mon Sep 17 00:00:00 2001 From: Olivier Mehani Date: Thu, 15 Jan 2026 12:50:19 +1100 Subject: [PATCH 06/15] fixup! application: log push_id consistently --- git_hg_sync/application.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/git_hg_sync/application.py b/git_hg_sync/application.py index 7f46230..b3f2914 100644 --- a/git_hg_sync/application.py +++ b/git_hg_sync/application.py @@ -57,7 +57,7 @@ def _handle_push_event(self, push_event: Push) -> None: ).append(match.operation) if not operations_by_destination: - logger.warning(f"No operation for push event: {push_event_str}") + logger.warning(f"No operation for push event: {push_event}") return for destination, operations in operations_by_destination.items(): From b9faaa2871b8fb3a3bb1220126da079869a4f5bb Mon Sep 17 00:00:00 2001 From: Olivier Mehani Date: Thu, 15 Jan 2026 12:50:19 +1100 Subject: [PATCH 07/15] fixup! application: log push_id consistently --- git_hg_sync/application.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/git_hg_sync/application.py b/git_hg_sync/application.py index b3f2914..b12f4be 100644 --- a/git_hg_sync/application.py +++ b/git_hg_sync/application.py @@ -74,7 +74,7 @@ def _handle_push_event(self, push_event: Push) -> None: } ) logger.warning( - f"An error prevented completion of the following sync operations from push event {push_event_str}. {error_data}", + f"An error prevented completion of the following sync operations from push event {push_event}. {error_data}", exc_info=True, ) raise exc From e94ad4342ae85cd83dfd87ad63eba51d7f554263 Mon Sep 17 00:00:00 2001 From: Olivier Mehani Date: Thu, 15 Jan 2026 12:50:19 +1100 Subject: [PATCH 08/15] fixup! application: log push_id consistently --- git_hg_sync/application.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/git_hg_sync/application.py b/git_hg_sync/application.py index b12f4be..1d02652 100644 --- a/git_hg_sync/application.py +++ b/git_hg_sync/application.py @@ -78,7 +78,7 @@ def _handle_push_event(self, push_event: Push) -> None: exc_info=True, ) raise exc - logger.info(f"Successfully handled event: {push_event_str}") + logger.info(f"Successfully handled event: {push_event}") def _handle_event(self, event: Event) -> None: if event.repo_url not in self._repo_synchronizers: From 3fb90dbca5680be5a64aa6cda6d4124b2fbe5201 Mon Sep 17 00:00:00 2001 From: Olivier Mehani Date: Thu, 15 Jan 2026 12:50:19 +1100 Subject: [PATCH 09/15] fixup! pulse_worker: don't prefetch more than one message (bug 1981900) --- git_hg_sync/pulse_worker.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/git_hg_sync/pulse_worker.py b/git_hg_sync/pulse_worker.py index 8afdb04..5d77cdb 100644 --- a/git_hg_sync/pulse_worker.py +++ b/git_hg_sync/pulse_worker.py @@ -55,6 +55,9 @@ def get_consumers( self.task_queue, auto_declare=False, callbacks=[self.on_task], + # We only fetch one message at a time in case processing it fails. + # This allows us to ensure strict ordering, by re-receiving the same message + # on the next loop after having requeued it after failure. prefetch_count=1, ) logger.debug(f"Using consumer {consumer=}") From 07fd80c4a29852a02b5ec80c2982e7003a01a43c Mon Sep 17 00:00:00 2001 From: Olivier Mehani Date: Thu, 15 Jan 2026 15:08:17 +1100 Subject: [PATCH 10/15] events: add __str__ showing push_id and repo_url --- config-suite.toml | 0 git_hg_sync/events.py | 3 +++ 2 files changed, 3 insertions(+) create mode 100644 config-suite.toml diff --git a/config-suite.toml b/config-suite.toml new file mode 100644 index 0000000..e69de29 diff --git a/git_hg_sync/events.py b/git_hg_sync/events.py index 65c7a6e..a97142e 100644 --- a/git_hg_sync/events.py +++ b/git_hg_sync/events.py @@ -20,6 +20,9 @@ class Push(BaseModel): user: str push_json_url: str + def __str__(self) -> str: + return f"{self.push_id} for {self.repo_url}" + @model_validator(mode="after") def check_branch_tags(self) -> Self: """Check that at least one of branches or tags is not empty.""" From 0616856ee29af84369158ea72b52e5c21cea26a4 Mon Sep 17 00:00:00 2001 From: Olivier Mehani Date: Thu, 15 Jan 2026 15:17:17 +1100 Subject: [PATCH 11/15] fixup! 7d90b152a867d41f25800212d04bf7737df485e2 --- git_hg_sync/application.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/git_hg_sync/application.py b/git_hg_sync/application.py index 1d02652..04c966b 100644 --- a/git_hg_sync/application.py +++ b/git_hg_sync/application.py @@ -45,7 +45,7 @@ def signal_handler(_sig: int, _frame: FrameType | None) -> None: self._worker.run() def _handle_push_event(self, push_event: Push) -> None: - logger.debug(f"Handling push event: {push_event}") + logger.debug(f"Handling event {push_event}") synchronizer = self._repo_synchronizers[push_event.repo_url] operations_by_destination: dict[str, list[SyncOperation]] = {} From 09519075c6bd2a4fe44cb8bf6ed38007a49bc7b8 Mon Sep 17 00:00:00 2001 From: Olivier Mehani Date: Thu, 15 Jan 2026 15:17:17 +1100 Subject: [PATCH 12/15] fixup! 4715c8db31c3263750ad5a0fac031d6ed6792253 --- git_hg_sync/application.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/git_hg_sync/application.py b/git_hg_sync/application.py index 04c966b..076af0c 100644 --- a/git_hg_sync/application.py +++ b/git_hg_sync/application.py @@ -57,7 +57,7 @@ def _handle_push_event(self, push_event: Push) -> None: ).append(match.operation) if not operations_by_destination: - logger.warning(f"No operation for push event: {push_event}") + logger.warning(f"No operation for event {push_event}") return for destination, operations in operations_by_destination.items(): From 9e600d689a7a9e4251cd77cb8c5e99bee41af8a6 Mon Sep 17 00:00:00 2001 From: Olivier Mehani Date: Thu, 15 Jan 2026 15:17:17 +1100 Subject: [PATCH 13/15] fixup! b9faaa2871b8fb3a3bb1220126da079869a4f5bb --- git_hg_sync/application.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/git_hg_sync/application.py b/git_hg_sync/application.py index 076af0c..bb2d8e2 100644 --- a/git_hg_sync/application.py +++ b/git_hg_sync/application.py @@ -74,7 +74,7 @@ def _handle_push_event(self, push_event: Push) -> None: } ) logger.warning( - f"An error prevented completion of the following sync operations from push event {push_event}. {error_data}", + f"An error prevented completion of the following sync operations from event {push_event}. {error_data}", exc_info=True, ) raise exc From 8a5c0cf088b0ec980814b87fd648d6a5c318dac0 Mon Sep 17 00:00:00 2001 From: Olivier Mehani Date: Thu, 15 Jan 2026 15:17:17 +1100 Subject: [PATCH 14/15] fixup! e94ad4342ae85cd83dfd87ad63eba51d7f554263 --- git_hg_sync/application.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/git_hg_sync/application.py b/git_hg_sync/application.py index bb2d8e2..89603bb 100644 --- a/git_hg_sync/application.py +++ b/git_hg_sync/application.py @@ -78,7 +78,7 @@ def _handle_push_event(self, push_event: Push) -> None: exc_info=True, ) raise exc - logger.info(f"Successfully handled event: {push_event}") + logger.info(f"Successfully handled event {push_event}") def _handle_event(self, event: Event) -> None: if event.repo_url not in self._repo_synchronizers: From e927f3ca9a6467021bdc9f102227feb69d9b9e21 Mon Sep 17 00:00:00 2001 From: Olivier Mehani Date: Thu, 15 Jan 2026 15:17:17 +1100 Subject: [PATCH 15/15] fixup! events: add __str__ showing push_id and repo_url --- git_hg_sync/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/git_hg_sync/events.py b/git_hg_sync/events.py index a97142e..bb43ded 100644 --- a/git_hg_sync/events.py +++ b/git_hg_sync/events.py @@ -21,7 +21,7 @@ class Push(BaseModel): push_json_url: str def __str__(self) -> str: - return f"{self.push_id} for {self.repo_url}" + return f"Push {self.push_id} for {self.repo_url}" @model_validator(mode="after") def check_branch_tags(self) -> Self: