@@ -411,11 +411,11 @@ async def on_message_for_websocket(
411411 # It's essentially time.time() x 1000
412412 r_stream = message_context .consumer .get_stream (message_context .subscriber_name )
413413 _LOGGER .debug (
414- "Got msg=%s stream=%s es_id=%s /%s/" ,
415- msg ,
414+ "Handling message stream=%s es_id=%s /%s/ msg=%s" ,
416415 r_stream ,
417416 es_id ,
418417 es_websocket_uuid ,
418+ msg ,
419419 )
420420 _LOGGER .debug (
421421 "With offset=%s timestamp=%s /%s/" ,
@@ -426,7 +426,23 @@ async def on_message_for_websocket(
426426 # Update message received count
427427 num_messages_received : int = message_stats [_MESSAGE_STATS_KEY_RECEIVED ] + 1
428428 message_stats [_MESSAGE_STATS_KEY_RECEIVED ] = num_messages_received
429- _LOGGER .info ("COUNTED %s" , message_context .offset )
429+
430+ # At the time of writing we could not be decoded directly, but we can invoke
431+ # its built-in __str__() representation to get the message as a string.
432+ # As a result we get a series of bytes represented as a string (e.g. "b'xyz'").
433+ # With this we can eval() it to get _real_ bytes and then decode it
434+ # (we know it's utf-8) in order to get the message as a string! :-O
435+ try :
436+ msg_str = eval (str (msg )).decode ("utf-8" ) # pylint: disable=eval-used
437+ except Exception as ex : # pylint: disable=broad-exception-caught
438+ _LOGGER .error (
439+ "Exception trying to decode message for %s /%s/ msg=%s (%s)" ,
440+ es_id ,
441+ es_websocket_uuid ,
442+ msg ,
443+ ex ,
444+ )
445+ return
430446
431447 shutdown : bool = False
432448 # We shutdown if...
@@ -446,26 +462,13 @@ async def on_message_for_websocket(
446462 es_websocket_uuid ,
447463 )
448464 shutdown = True
449- elif msg == b "POISON" :
465+ elif msg_str == "POISON" :
450466 _LOGGER .info (
451467 "Taking POISON for %s /%s/ (stopping)..." , es_id , es_websocket_uuid
452468 )
453469 shutdown = True
454- elif msg :
455- _LOGGER .info ("PREPARING %s" , message_context .offset )
456- # At the time of writing it seemed as though the msg (an AMQPMessage)
457- # could not be decoded directly, but we can invoke its built-in __str__()
458- # representation to get the message as a string. We get a series of bytes
459- # represented as a string (e.g. "b'xyz'"). With this we can eval() it
460- # to get _real_ bytes and then decode it (assuming it to be utf-8)
461- # in order to get the message as a string! :-O
462- try :
463- msg_str = eval (str (msg )).decode ("utf-8" ) # pylint: disable=eval-used
464- except Exception as ex : # pylint: disable=broad-exception-caught
465- _LOGGER .error ("Exception trying to decode message %s" , ex )
466- _LOGGER .info ("TRIMMED %s" , message_context .offset )
470+ elif msg_str :
467471 if msg_str [0 ] == "{" :
468- _LOGGER .info ("IS JSON %s" , message_context .offset )
469472 # The EventStream Service is permitted to append to the JSON string
470473 # as long as it uses keys with the prefix "ess_"
471474 try :
@@ -474,7 +477,7 @@ async def on_message_for_websocket(
474477 json .decoder .JSONDecodeError
475478 ) as jde : # pylint: disable=broad-exception-caught
476479 _LOGGER .error (
477- "JSONDecodeError for offset %s on %s /%s/ '%s' (skipping) message =%s" ,
480+ "JSONDecodeError for offset %s on %s /%s/ '%s' (skipping) msg_str =%s" ,
478481 message_context .offset ,
479482 es_id ,
480483 es_websocket_uuid ,
@@ -494,9 +497,7 @@ async def on_message_for_websocket(
494497
495498 try :
496499 # Pass on and count
497- _LOGGER .info ("SENDING %s" , message_context .offset )
498500 await websocket .send_text (msg_str )
499- _LOGGER .info ("SENT %s" , message_context .offset )
500501 message_stats [_MESSAGE_STATS_KEY_SENT ] = (
501502 message_stats [_MESSAGE_STATS_KEY_SENT ] + 1
502503 )
@@ -508,7 +509,7 @@ async def on_message_for_websocket(
508509 )
509510 shutdown = True
510511
511- _LOGGER .debug ("Handled msg for %s /%s/..." , es_id , es_websocket_uuid )
512+ _LOGGER .debug ("Handled message for %s /%s/..." , es_id , es_websocket_uuid )
512513
513514 # Consider regular INFO summary.
514515 # Stats will ultimately be produced if the socket closes,
0 commit comments