File tree Expand file tree Collapse file tree 1 file changed +9
-2
lines changed
Expand file tree Collapse file tree 1 file changed +9
-2
lines changed Original file line number Diff line number Diff line change @@ -352,14 +352,21 @@ async def on_message_for_websocket(
352352 _LOGGER .info ("Taking POISON for %s (stopping)..." , es_id )
353353 shutdown = True
354354 elif msg :
355- # The EventStream Service is permitted to append to the protobuf string
356- # as long as it uses the '|' delimiter. Here qwe add offset and timestamp.
357355 # We know the AMQPMessage (as a string will start "b'" and end "'"
358356 message_string = str (msg )[2 :- 1 ]
359357 if message_string [0 ] != "{" :
358+ # The EventStream Service is permitted to append to the protobuf string
359+ # as long as it uses the '|' delimiter. Here qwe add offset and timestamp.
360360 message_string += (
361361 f"|{ message_context .offset } |{ message_context .timestamp } "
362362 )
363+ else :
364+ # The EventStream Service is permitted to append to the JSON string
365+ # as long as it uses keys with the prefix "ess_"
366+ msg_dict : dict [str , Any ] = json .loads (message_string )
367+ msg_dict ["ess_offset" ] = message_context .offset
368+ msg_dict ["ess_timestamp" ] = message_context .timestamp
369+ message_string = json .dumps (msg_dict )
363370 try :
364371 await websocket .send_text (message_string )
365372 except WebSocketDisconnect :
You can’t perform that action at this time.
0 commit comments