2121)
2222from pydantic import BaseModel
2323from rstream import (
24+ AMQPMessage ,
2425 Consumer ,
2526 ConsumerOffsetSpecification ,
2627 MessageContext ,
@@ -320,7 +321,9 @@ async def generate_on_message_for_websocket(websocket: WebSocket, es_id: str):
320321 """
321322 assert websocket
322323
323- async def on_message_for_websocket (msg : bytes , message_context : MessageContext ):
324+ async def on_message_for_websocket (
325+ msg : AMQPMessage , message_context : MessageContext
326+ ):
324327 # The message is expected to be formed from an
325328 # AMQPMessage generated in the AS using 'body=bytes(message_string, "utf-8")'
326329 #
@@ -332,9 +335,8 @@ async def on_message_for_websocket(msg: bytes, message_context: MessageContext):
332335 # It's essentially time.time() x 1000
333336 r_stream = message_context .consumer .get_stream (message_context .subscriber_name )
334337 _LOGGER .info (
335- "Got msg='%s' (type=%s) stream=%s es_id=%s" ,
338+ "Got msg='%s' stream=%s es_id=%s" ,
336339 msg ,
337- type (msg ),
338340 r_stream ,
339341 es_id ,
340342 )
@@ -350,8 +352,23 @@ async def on_message_for_websocket(msg: bytes, message_context: MessageContext):
350352 _LOGGER .info ("Taking POISON for %s (stopping)..." , es_id )
351353 shutdown = True
352354 elif msg :
355+ # The EventStream Service is permitted to append to the protobuf string
356+ # as long as it uses the '|' delimiter. Here qwe add offset and timestamp.
357+ # We know the AMQPMessage (as a string will start "b'" and end "'"
358+ message_string = str (msg )[2 :- 1 ]
359+ if message_string [0 ] != "{" :
360+ message_string += (
361+ f"|{ message_context .offset } |{ message_context .timestamp } "
362+ )
363+ patched_msg : AMQPMessage = AMQPMessage (
364+ body = bytes (message_string , "utf-8" )
365+ )
366+ else :
367+ patched_msg : AMQPMessage = AMQPMessage (
368+ body = bytes (message_string , "utf-8" )
369+ )
353370 try :
354- await websocket .send_text (msg . decode ( "utf-8" ))
371+ await websocket .send_text (str ( patched_msg ))
355372 except WebSocketDisconnect :
356373 _LOGGER .info ("Got WebSocketDisconnect for %s (stopping)..." , es_id )
357374 shutdown = True
0 commit comments