8989 retry_for = [MemcacheUnexpectedCloseError ],
9090)
9191
92+ # Message handler stats interval.
93+ # The recurring number of messages received before stats are emitted.
94+ _MESSAGE_STATS_INTERVAL : int = int (os .getenv ("ESS_MESSAGE_STATS_INTERVAL" , "800" ))
95+ _MESSAGE_STATS_KEY_RECEIVED : str = "received"
96+ _MESSAGE_STATS_KEY_SENT : str = "sent"
97+
9298# SQLite database path
9399_DATABASE_PATH = "/data/event-streams.db"
94100
@@ -361,7 +367,12 @@ async def event_stream(
361367
362368
363369async def generate_on_message_for_websocket (
364- websocket : WebSocket , * , es_id : str , es_routing_key : str , es_websocket_uuid : str
370+ websocket : WebSocket ,
371+ * ,
372+ es_id : str ,
373+ es_routing_key : str ,
374+ es_websocket_uuid : str ,
375+ message_stats : dict [str , Any ],
365376):
366377 """Here we use "currying" to append pre-set parameters
367378 to a function that will be used as the stream consumer message callback handler.
@@ -385,7 +396,7 @@ async def on_message_for_websocket(
385396 # - timestamp: int (milliseconds since Python time epoch)
386397 # It's essentially time.time() x 1000
387398 r_stream = message_context .consumer .get_stream (message_context .subscriber_name )
388- _LOGGER .info (
399+ _LOGGER .debug (
389400 "Got msg='%s' stream=%s es_id=%s" ,
390401 msg ,
391402 r_stream ,
@@ -396,6 +407,9 @@ async def on_message_for_websocket(
396407 message_context .offset ,
397408 message_context .timestamp ,
398409 )
410+ # Update message received count
411+ num_messages_received : int = message_stats [_MESSAGE_STATS_KEY_RECEIVED ] + 1
412+ message_stats [_MESSAGE_STATS_KEY_RECEIVED ] = num_messages_received
399413
400414 shutdown : bool = False
401415 # We shutdown if...
@@ -434,13 +448,23 @@ async def on_message_for_websocket(
434448 msg_dict ["ess_timestamp" ] = message_context .timestamp
435449 message_string = json .dumps (msg_dict )
436450 try :
451+ # Pass on and count
437452 await websocket .send_text (message_string )
453+ message_stats [_MESSAGE_STATS_KEY_SENT ] = (
454+ message_stats [_MESSAGE_STATS_KEY_SENT ] + 1
455+ )
438456 except WebSocketDisconnect :
439457 _LOGGER .info ("Got WebSocketDisconnect for %s (stopping)..." , es_id )
440458 shutdown = True
441459
442460 _LOGGER .debug ("Handled msg for %s..." , es_id )
443461
462+ # Consider regular INFO summary.
463+ # Stats will ultimately be produced if the socket closes,
464+ # so we just have to consider regular updates here.
465+ if num_messages_received % _MESSAGE_STATS_INTERVAL == 0 :
466+ _LOGGER .info ("Message stats for %s: %s" , es_id , message_stats )
467+
444468 if shutdown :
445469 _LOGGER .info ("Stopping consumer for %s (shutdown)..." , es_id )
446470 message_context .consumer .stop ()
@@ -460,11 +484,20 @@ async def _consume(
460484 """An asynchronous generator yielding message bodies from the queue
461485 based on the provided routing key.
462486 """
463- on_message = await generate_on_message_for_websocket (
487+ # A dictionary we pass to the message handler.
488+ # It can add material to this
489+ # (most importantly a count of received and sent messages).
490+ # We'll print this when we leave this consumer.
491+ message_stats : dict [str , Any ] = {
492+ _MESSAGE_STATS_KEY_RECEIVED : 0 ,
493+ _MESSAGE_STATS_KEY_SENT : 0 ,
494+ }
495+ message_handler = await generate_on_message_for_websocket (
464496 websocket ,
465497 es_id = es_id ,
466498 es_routing_key = es_routing_key ,
467499 es_websocket_uuid = es_websocket_uuid ,
500+ message_stats = message_stats ,
468501 )
469502
470503 _LOGGER .info (
@@ -479,7 +512,7 @@ async def _consume(
479512 try :
480513 await consumer .subscribe (
481514 stream = es_routing_key ,
482- callback = on_message ,
515+ callback = message_handler ,
483516 decoder = amqp_decoder ,
484517 offset_specification = offset_specification ,
485518 )
@@ -492,9 +525,9 @@ async def _consume(
492525 await consumer .run ()
493526 _LOGGER .info ("Stopped %s (closing)..." , es_id )
494527 await consumer .close ()
495- _LOGGER .info ("Closed %s" , es_id )
528+ _LOGGER .info ("Closed %s (message_stats=%s) " , es_id , message_stats )
496529
497- _LOGGER .info ("Stopped consuming %s" , es_id )
530+ _LOGGER .info ("Stopped consuming %s (message_stats=%s) " , es_id , message_stats )
498531
499532
500533# Endpoints for the 'internal' event-stream management API -----------------------------
0 commit comments