@@ -332,8 +332,14 @@ async def on_message_for_websocket(
332332 # It's essentially time.time() x 1000
333333 r_stream = message_context .consumer .get_stream (message_context .subscriber_name )
334334 _LOGGER .info (
335- "Got msg='%s' (type=%s) stream=%s es_id=%s" , msg , type (msg ), r_stream , es_id
335+ "Got msg='%s' (type=%sn encoding=%s) stream=%s es_id=%s" ,
336+ msg ,
337+ type (msg ),
338+ json .detect_encoding (msg ),
339+ r_stream ,
340+ es_id ,
336341 )
342+ assert isinstance (msg , bytes )
337343 _LOGGER .info (
338344 "With offset=%s timestamp=%s" ,
339345 message_context .offset ,
@@ -346,10 +352,13 @@ async def on_message_for_websocket(
346352 _LOGGER .info ("Taking POISON for %s (stopping)..." , es_id )
347353 shutdown = True
348354 elif msg :
349- # Add offset and timestamp to the end of the protobuf string
350- msg_str : str = msg .decode ("utf-8" )
351- msg_str += f"|{ message_context .offset } |{ message_context .timestamp } "
352- msg = msg_str .encode ("utf-8" )
355+ if not msg .startswith (b"{" ):
356+ # Add offset and timestamp to the end of the protobuf string.
357+ # The EventStream Service is permitted to extend the string
358+ # as long as it uses '|' as a delimiter and adds material at the end.
359+ msg_str : str = msg .decode ("utf-8" )
360+ msg_str += f"|{ message_context .offset } |{ message_context .timestamp } "
361+ msg = msg_str .encode ("utf-8" )
353362 try :
354363 await websocket .send_text (str (msg ))
355364 except WebSocketDisconnect :
0 commit comments