Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added config-suite.toml
Empty file.
12 changes: 4 additions & 8 deletions git_hg_sync/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ def signal_handler(_sig: int, _frame: FrameType | None) -> None:
self._worker.run()

def _handle_push_event(self, push_event: Push) -> None:
logger.debug(
f"Handling push event: {push_event.push_id} for {push_event.repo_url}"
)
logger.debug(f"Handling event {push_event}")
synchronizer = self._repo_synchronizers[push_event.repo_url]
operations_by_destination: dict[str, list[SyncOperation]] = {}

Expand All @@ -59,7 +57,7 @@ def _handle_push_event(self, push_event: Push) -> None:
).append(match.operation)

if not operations_by_destination:
logger.warning(f"No operation for {push_event}")
logger.warning(f"No operation for event {push_event}")
return

for destination, operations in operations_by_destination.items():
Expand All @@ -76,13 +74,11 @@ def _handle_push_event(self, push_event: Push) -> None:
}
)
logger.warning(
f"An error prevented completion of the following sync operations. {error_data}",
f"An error prevented completion of the following sync operations from event {push_event}. {error_data}",
exc_info=True,
)
raise exc
logger.info(
f"Successfully handled event: {push_event.push_id} for {push_event.repo_url}"
)
logger.info(f"Successfully handled event {push_event}")

def _handle_event(self, event: Event) -> None:
if event.repo_url not in self._repo_synchronizers:
Expand Down
3 changes: 3 additions & 0 deletions git_hg_sync/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ class Push(BaseModel):
user: str
push_json_url: str

def __str__(self) -> str:
return f"Push {self.push_id} for {self.repo_url}"

@model_validator(mode="after")
def check_branch_tags(self) -> Self:
"""Check that at least one of branches or tags is not empty."""
Expand Down
8 changes: 7 additions & 1 deletion git_hg_sync/pulse_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,13 @@ def get_consumers(
_channel: Any,
) -> list[kombu.Consumer]:
consumer = consumer_class(
self.task_queue, auto_declare=False, callbacks=[self.on_task]
self.task_queue,
auto_declare=False,
callbacks=[self.on_task],
# We only fetch one message at a time in case processing it fails.
# This allows us to ensure strict ordering, by re-receiving the same message
# on the next loop after having requeued it after failure.
prefetch_count=1,
)
logger.debug(f"Using consumer {consumer=}")
return [consumer]
Expand Down
12 changes: 6 additions & 6 deletions rabbitmq/definitions.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
"bindings": [
{
"arguments": {},
"destination": "queue/git-hg-sync/sync",
"destination": "queue/guest/test",
"destination_type": "queue",
"routing_key": "git-hg-sync",
"source": "exchange/git-hg-sync/test",
"routing_key": "#",
"source": "exchange/guest/test",
"vhost": "/"
}
],
Expand All @@ -15,8 +15,8 @@
"auto_delete": false,
"durable": true,
"internal": false,
"name": "exchange/git-hg-sync/test",
"type": "direct",
"name": "exchange/guest/test",
"type": "topic",
"vhost": "/"
}
],
Expand Down Expand Up @@ -44,7 +44,7 @@
"arguments": {},
"auto_delete": false,
"durable": true,
"name": "queue/git-hg-sync/sync",
"name": "queue/guest/test",
"vhost": "/"
}
],
Expand Down
4 changes: 2 additions & 2 deletions tests/pulse_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def send_pulse_message(
connection.connect()

with connection:
ex = kombu.Exchange(exchange, type="direct")
ex = kombu.Exchange(exchange, type="topic")
queue = kombu.Queue(
name=queue_name,
exchange=exchange,
Expand All @@ -48,7 +48,7 @@ def send_pulse_message(
exclusive=False,
auto_delete=False,
)
queue(connection).declare()
queue(connection).queue_declare()
if purge:
channel = connection.channel()
channel.queue_purge(queue.name)
Expand Down
45 changes: 45 additions & 0 deletions tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from git_hg_sync.__main__ import get_connection, get_queue, start_app
from git_hg_sync.config import Config, PulseConfig
from git_hg_sync.events import Event
from git_hg_sync.pulse_worker import PulseWorker

NO_RABBITMQ = os.getenv("RABBITMQ") != "true"
Expand Down Expand Up @@ -139,3 +140,47 @@ def test_no_duplicated_ack_messages(
worker.run()

callback.assert_called_once()


@pytest.mark.skipif(NO_RABBITMQ, reason="This test doesn't work without rabbitMq")
def test_messages_in_order(
test_config: Config,
get_payload: Callable,
) -> None:
"""This test checks that long-running messages are not processed more than once.

It may also timeout, which is likely indicative of the same issue.
"""
connection = get_connection(test_config.pulse)
queue = get_queue(test_config.pulse)
queue(connection).queue_declare()
queue(connection).queue_bind()

worker = PulseWorker(connection, queue, one_shot=False)

events_log = []

def event_handler(event: Event) -> None:
push_id = event.push_id
already_seen = push_id in events_log

events_log.append(push_id)

# Terminate the worker after processing the expected number of messages.
if len(events_log) == 4:
worker.should_stop = True

if not already_seen:
raise Exception("Not seen yet")

worker.event_handler = event_handler

pulse_utils.send_pulse_message(
test_config.pulse, get_payload(push_id=0), purge=True
)
pulse_utils.send_pulse_message(
test_config.pulse, get_payload(push_id=1), purge=False
)
worker.run()

assert events_log == [0, 0, 1, 1]