@@ -356,30 +356,13 @@ async def on_message_for_websocket(
356356 )
357357
358358 shutdown : bool = False
359- decoded_msg : str = msg .decode (encoding = "utf-8" )
360- if decoded_msg == "POISON" :
359+ # decoded_msg: str = msg.decode(encoding="utf-8")
360+ if msg == b "POISON" :
361361 _LOGGER .info ("Taking POISON for %s (stopping)..." , es_id )
362362 shutdown = True
363- elif decoded_msg :
364- # Insert the Stream offset and timestamp.
365- # The message is either a protobuf String or a JSON string.
366- if decoded_msg [0 ] == "{" :
367- # JSON string.
368- # An EventStreaming service is permitted to add any properties
369- # as long as it uses the prefix "ess_"
370- msg_dict = json .loads (decoded_msg )
371- msg_dict ["ess_offset" ] = message_context .offset
372- msg_dict ["ess_timestamp" ] = message_context .timestamp
373- decoded_msg = json .dumps (msg_dict )
374- else :
375- # An EventStreaming service is permitted to add any properties
376- # as long as it uses the separator "|str()" and places properties at the end
377- # of the received string.
378- decoded_msg = f"{ decoded_msg } |{ message_context .offset } |{ message_context .timestamp } |"
379- text = decoded_msg .encode ("utf-8" )
380- _LOGGER .info ("Sending msg for %s (%s)..." , es_id , text )
363+ elif msg :
381364 try :
382- await websocket .send_text (text )
365+ await websocket .send_text (str ( msg ) )
383366 except WebSocketDisconnect :
384367 _LOGGER .info ("Got WebSocketDisconnect for %s (stopping)..." , es_id )
385368 shutdown = True
0 commit comments