2121)
2222from pydantic import BaseModel
2323from rstream import (
24- AMQPMessage ,
2524 Consumer ,
2625 ConsumerOffsetSpecification ,
2726 MessageContext ,
@@ -321,9 +320,10 @@ async def generate_on_message_for_websocket(websocket: WebSocket, es_id: str):
321320 """
322321 assert websocket
323322
324- async def on_message_for_websocket (
325- msg : AMQPMessage , message_context : MessageContext
326- ):
323+ async def on_message_for_websocket (msg : bytes , message_context : MessageContext ):
324+ # The message is expected to be formed from an
325+ # AMQPMessage generated in the AS using 'body=bytes(message_string, "utf-8")'
326+ #
327327 # The MessageContext contains: -
328328 # - consumer: The Consumer object
329329 # - subscriber_name: str
@@ -332,8 +332,9 @@ async def on_message_for_websocket(
332332 # It's essentially time.time() x 1000
333333 r_stream = message_context .consumer .get_stream (message_context .subscriber_name )
334334 _LOGGER .info (
335- "Got msg='%s' stream=%s es_id=%s" ,
335+ "Got msg='%s' (type=%s) stream=%s es_id=%s" ,
336336 msg ,
337+ type (msg ),
337338 r_stream ,
338339 es_id ,
339340 )
@@ -350,7 +351,7 @@ async def on_message_for_websocket(
350351 shutdown = True
351352 elif msg :
352353 try :
353- await websocket .send_text (str ( msg ))
354+ await websocket .send_text (msg . decode ( "utf-8" ))
354355 except WebSocketDisconnect :
355356 _LOGGER .info ("Got WebSocketDisconnect for %s (stopping)..." , es_id )
356357 shutdown = True
0 commit comments