1+ import json
12from typing import Any , Protocol
23
34import kombu
@@ -34,7 +35,7 @@ def __init__(
3435 self .one_shot = one_shot
3536
3637 @staticmethod
37- def parse_entity (raw_entity : Any ) -> Push | Tag :
38+ def parse_entity (raw_entity : dict ) -> Push | Tag :
3839 logger .debug (f"parse_entity: { raw_entity } " )
3940 message_type = raw_entity .pop ("type" )
4041 match message_type :
@@ -53,12 +54,46 @@ def get_consumers(
5354 consumer = consumer_class (
5455 self .task_queue , auto_declare = False , callbacks = [self .on_task ]
5556 )
57+ logger .info (f"{ consumer = } " )
5658 return [consumer ]
5759
5860 def on_task (self , body : Any , message : kombu .Message ) -> None :
5961 logger .info (f"Received message: { body } " )
60- raw_entity = body ["payload" ]
61- event = PulseWorker .parse_entity (raw_entity )
62+
63+ if isinstance (body , str ):
64+ logger .debug ("Message is a string. Trying to parse as JSON ..." )
65+ try :
66+ body = json .loads (body )
67+ except json .JSONDecodeError :
68+ pass # We'll deal with the incorrect type next.
69+ if not isinstance (body , dict ):
70+ logger .warning (f"Invalid message, rejecting ... `{ body } `" )
71+ message .reject ()
72+ return
73+
74+ if not (raw_entity := body .get ("payload" )):
75+ logger .warning (f"Missing or empty payload, rejecting ... `{ body } `" )
76+ message .reject ()
77+ return
78+
79+ if not isinstance (raw_entity , dict ):
80+ logger .warning (f"Invalid payload, rejecting ... `{ raw_entity } `" )
81+ message .reject ()
82+ return
83+
84+ try :
85+ event = PulseWorker .parse_entity (raw_entity )
86+ except KeyError as e :
87+ logger .warning (
88+ f"Invalid payload: missing { e } , rejecting ... `{ raw_entity } `"
89+ )
90+ message .reject ()
91+ return
92+ except (EntityTypeError , TypeError ) as e :
93+ logger .warning (f"Invalid payload: { e } , rejecting ... `{ raw_entity } `" )
94+ message .reject ()
95+ return
96+
6297 if self .event_handler :
6398 self .event_handler (event )
6499 message .ack ()
0 commit comments