Skip to content

Commit 3dcf382

Browse files
committed
pulse_worker: error handling for payload
1 parent be7f9d0 commit 3dcf382

File tree

2 files changed

+39
-4
lines changed

2 files changed

+39
-4
lines changed

git_hg_sync/application.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def _handle_push_event(self, push_event: Push) -> None:
5353

5454
def _handle_event(self, event: Push | Tag) -> None:
5555
if event.repo_url not in self._repo_synchronizers:
56-
logger.info("Ignoring event for untracked repository: %()s", event.repo_url)
56+
logger.info(f"Ignoring event for untracked repository: {event.repo_url}")
5757
return
5858
match event:
5959
case Push():

git_hg_sync/pulse_worker.py

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
from typing import Any, Protocol
23

34
import kombu
@@ -34,7 +35,7 @@ def __init__(
3435
self.one_shot = one_shot
3536

3637
@staticmethod
37-
def parse_entity(raw_entity: Any) -> Push | Tag:
38+
def parse_entity(raw_entity: dict) -> Push | Tag:
3839
logger.debug(f"parse_entity: {raw_entity}")
3940
message_type = raw_entity.pop("type")
4041
match message_type:
@@ -53,12 +54,46 @@ def get_consumers(
5354
consumer = consumer_class(
5455
self.task_queue, auto_declare=False, callbacks=[self.on_task]
5556
)
57+
logger.info(f"{consumer=}")
5658
return [consumer]
5759

5860
def on_task(self, body: Any, message: kombu.Message) -> None:
5961
logger.info(f"Received message: {body}")
60-
raw_entity = body["payload"]
61-
event = PulseWorker.parse_entity(raw_entity)
62+
63+
if isinstance(body, str):
64+
logger.debug("Message is a string. Trying to parse as JSON ...")
65+
try:
66+
body = json.loads(body)
67+
except json.JSONDecodeError:
68+
pass # We'll deal with the incorrect type next.
69+
if not isinstance(body, dict):
70+
logger.warning(f"Invalid message, rejecting ... `{body}`")
71+
message.reject()
72+
return
73+
74+
if not (raw_entity := body.get("payload")):
75+
logger.warning(f"Missing or empty payload, rejecting ... `{body}`")
76+
message.reject()
77+
return
78+
79+
if not isinstance(raw_entity, dict):
80+
logger.warning(f"Invalid payload, rejecting ... `{raw_entity}`")
81+
message.reject()
82+
return
83+
84+
try:
85+
event = PulseWorker.parse_entity(raw_entity)
86+
except KeyError as e:
87+
logger.warning(
88+
f"Invalid payload: missing {e}, rejecting ... `{raw_entity}`"
89+
)
90+
message.reject()
91+
return
92+
except (EntityTypeError, TypeError) as e:
93+
logger.warning(f"Invalid payload: {e}, rejecting ... `{raw_entity}`")
94+
message.reject()
95+
return
96+
6297
if self.event_handler:
6398
self.event_handler(event)
6499
message.ack()

0 commit comments

Comments
 (0)