@@ -395,6 +395,11 @@ async def generate_on_message_for_websocket(
395395 async def on_message_for_websocket (
396396 msg : AMQPMessage , message_context : MessageContext
397397 ):
398+ # CAUTION: EVERY possible exception needs to be handled here
399+ # otherwise the problem will not be seen,
400+ # as exceptions are not exposed to the log and
401+ # messages just get dropped.
402+
398403 # The message is expected to be formed from an
399404 # AMQPMessage generated in the AS using 'body=bytes(message_string, "utf-8")'
400405 #
@@ -451,7 +456,10 @@ async def on_message_for_websocket(
451456 # The msg (an AMQPMessage) cannot be decoded directly, but we can
452457 # invoke its built-in __bytes__() representation to get the message as bytes.
453458 # We can then decode it to get a string we can manipulate.
454- message_string = bytes (msg ).decode ("utf-8" )
459+ try :
460+ message_string = bytes (msg ).decode ("utf-8" )
461+ except Exception as ex : # pylint: disable=broad-exception-caught
462+ _LOGGER .error ("EXCEPTION %s" , ex )
455463 _LOGGER .info ("TRIMMED %s" , message_context .offset )
456464 if message_string [0 ] == "{" :
457465 _LOGGER .info ("IS JSON %s" , message_context .offset )
@@ -463,7 +471,12 @@ async def on_message_for_websocket(
463471 json .decoder .JSONDecodeError
464472 ) as jde : # pylint: disable=broad-exception-caught
465473 _LOGGER .error (
466- "Got JSONDecodeError %s (%s)" , message_context .offset , jde
474+ "JSONDecodeError for offset %s on %s /%s/ '%s' (skipping) message=%s" ,
475+ message_context .offset ,
476+ es_id ,
477+ es_websocket_uuid ,
478+ jde ,
479+ message_string ,
467480 )
468481 return
469482 msg_dict ["ess_ordinal" ] = message_context .offset
0 commit comments