Skip to content

Commit c11a673

Browse files
author
Alan Christie
committed
feat: Attempt to append offset and timestamp message
1 parent a22c807 commit c11a673

File tree

1 file changed

+7
-1
lines changed

1 file changed

+7
-1
lines changed

app/app.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,9 @@ async def on_message_for_websocket(
331331
# - timestamp: int (milliseconds since Python time epoch)
332332
# It's essentially time.time() x 1000
333333
r_stream = message_context.consumer.get_stream(message_context.subscriber_name)
334-
_LOGGER.info("Got msg='%s' stream=%s es_id=%s", msg, r_stream, es_id)
334+
_LOGGER.info(
335+
"Got msg='%s' (type=%s) stream=%s es_id=%s", msg, type(msg), r_stream, es_id
336+
)
335337
_LOGGER.info(
336338
"With offset=%s timestamp=%s",
337339
message_context.offset,
@@ -344,6 +346,10 @@ async def on_message_for_websocket(
344346
_LOGGER.info("Taking POISON for %s (stopping)...", es_id)
345347
shutdown = True
346348
elif msg:
349+
# Add offset and timestamp to the end of the protobuf string
350+
msg_str: str = msg.decode("utf-8")
351+
msg_str += f"|{message_context.offset}|{message_context.timestamp}"
352+
msg = msg_str.encode("utf-8")
347353
try:
348354
await websocket.send_text(str(msg))
349355
except WebSocketDisconnect:

0 commit comments

Comments
 (0)