@@ -332,15 +332,22 @@ async def event_stream(
332332 # That's fine - the on_message_for_websocket() function will notice this
333333 # on the next message and should shut itself down.
334334 new_socket_uuid : str = str (python_uuid .uuid4 ())
335- _LOGGER .info ("Assigning connection ID %s to %s..." , new_socket_uuid , es_id )
335+ _LOGGER .info (
336+ "Assigned connection ID /%s/ to %s (uuid=%s)" , new_socket_uuid , es_id , uuid
337+ )
336338 existing_socket_uuid : bytes = _MEMCACHED_CLIENT .get (es_routing_key )
337339 if existing_socket_uuid and existing_socket_uuid .decode ("utf-8" ) != new_socket_uuid :
338- _LOGGER .warning ("Replacing existing connection ID with ours for %s" , es_id )
340+ _LOGGER .warning (
341+ "This replaces connection ID %s for %s (uuid=%s)" ,
342+ existing_socket_uuid .decode ("utf-8" ),
343+ es_id ,
344+ uuid ,
345+ )
339346 _MEMCACHED_CLIENT .set (es_routing_key , new_socket_uuid )
340347
341348 # Start consuming the stream.
342349 # We don't return from here until there's an error.
343- _LOGGER .debug ("Consuming %s..." , es_id )
350+ _LOGGER .debug ("Consuming %s /%s/ ..." , es_id , new_socket_uuid )
344351 await _consume (
345352 consumer ,
346353 es_id = es_id ,
@@ -361,9 +368,11 @@ async def event_stream(
361368 # We don't care - we have to just get out of here
362369 # so errors in tear-down have to be ignored,
363370 # and there's no apparent way to know whether calling 'close()' is safe.
364- _LOGGER .debug ("Ignoring RuntimeError from close() for %s" , es_id )
371+ _LOGGER .debug (
372+ "Ignoring RuntimeError from close() for %s /%s/" , es_id , new_socket_uuid
373+ )
365374
366- _LOGGER .info ("Closed WebSocket for %s (uuid=%s)" , es_id , uuid )
375+ _LOGGER .info ("Closed WebSocket for %s /%s/ (uuid=%s)" , es_id , new_socket_uuid , uuid )
367376
368377
369378async def generate_on_message_for_websocket (
@@ -423,14 +432,16 @@ async def on_message_for_websocket(
423432 shutdown = True
424433 elif stream_socket_uuid .decode ("utf-8" ) != es_websocket_uuid :
425434 _LOGGER .info (
426- "There is a new consumer of %s (%s) , and it is not us (%s) (stopping)..." ,
435+ "There is a new consumer of %s /%s/ , and it is not us /%s/ (stopping)..." ,
427436 es_id ,
428437 stream_socket_uuid .decode ("utf-8" ),
429438 es_websocket_uuid ,
430439 )
431440 shutdown = True
432441 elif msg == b"POISON" :
433- _LOGGER .info ("Taking POISON for %s (stopping)..." , es_id )
442+ _LOGGER .info (
443+ "Taking POISON for %s /%s/ (stopping)..." , es_id , es_websocket_uuid
444+ )
434445 shutdown = True
435446 elif msg :
436447 # We know the AMQPMessage (as a string will start "b'" and end "'"
@@ -454,19 +465,27 @@ async def on_message_for_websocket(
454465 message_stats [_MESSAGE_STATS_KEY_SENT ] + 1
455466 )
456467 except WebSocketDisconnect :
457- _LOGGER .info ("Got WebSocketDisconnect for %s (stopping)..." , es_id )
468+ _LOGGER .info (
469+ "Got WebSocketDisconnect for %s /%s/ (stopping)..." ,
470+ es_id ,
471+ es_websocket_uuid ,
472+ )
458473 shutdown = True
459474
460- _LOGGER .debug ("Handled msg for %s..." , es_id )
475+ _LOGGER .debug ("Handled msg for %s /%s/ ..." , es_id , es_websocket_uuid )
461476
462477 # Consider regular INFO summary.
463478 # Stats will ultimately be produced if the socket closes,
464479 # so we just have to consider regular updates here.
465480 if num_messages_received % _MESSAGE_STATS_INTERVAL == 0 :
466- _LOGGER .info ("Message stats for %s: %s" , es_id , message_stats )
481+ _LOGGER .info (
482+ "Message stats for %s /%s/: %s" , es_id , es_websocket_uuid , message_stats
483+ )
467484
468485 if shutdown :
469- _LOGGER .info ("Stopping consumer for %s (shutdown)..." , es_id )
486+ _LOGGER .info (
487+ "Stopping consumer for %s /%s/ (shutdown)..." , es_id , es_websocket_uuid
488+ )
470489 message_context .consumer .stop ()
471490
472491 return on_message_for_websocket
@@ -501,13 +520,14 @@ async def _consume(
501520 )
502521
503522 _LOGGER .info (
504- "Starting consumer %s (offset type=%s offset=%s)..." ,
523+ "Starting consumer %s /%s/ (offset type=%s offset=%s)..." ,
505524 es_id ,
525+ es_websocket_uuid ,
506526 offset_specification .offset_type .name ,
507527 offset_specification .offset ,
508528 )
509529 await consumer .start ()
510- _LOGGER .info ("Subscribing %s..." , es_id )
530+ _LOGGER .info ("Subscribing %s /%s/ ..." , es_id , es_websocket_uuid )
511531 subscribed : bool = True
512532 try :
513533 await consumer .subscribe (
@@ -521,13 +541,18 @@ async def _consume(
521541 subscribed = False
522542
523543 if subscribed :
524- _LOGGER .info ("Running %s..." , es_id )
544+ _LOGGER .info ("Running %s /%s/ ..." , es_id , es_websocket_uuid )
525545 await consumer .run ()
526- _LOGGER .info ("Stopped %s (closing)..." , es_id )
546+ _LOGGER .info ("Stopped %s /%s/ (closing)..." , es_id , es_websocket_uuid )
527547 await consumer .close ()
528- _LOGGER .info ("Closed %s (message_stats=%s) " , es_id , message_stats )
548+ _LOGGER .info ("Closed %s /%s/ " , es_id , es_websocket_uuid )
529549
530- _LOGGER .info ("Stopped consuming %s (message_stats=%s)" , es_id , message_stats )
550+ _LOGGER .info (
551+ "Stopped consuming %s /%s/ (message_stats=%s)" ,
552+ es_id ,
553+ es_websocket_uuid ,
554+ message_stats ,
555+ )
531556
532557
533558# Endpoints for the 'internal' event-stream management API -----------------------------
0 commit comments