@@ -324,6 +324,8 @@ async def event_stream(
324324 consumer = consumer ,
325325 stream_name = es_routing_key ,
326326 es_id = es_id ,
327+ es_routing_key = es_routing_key ,
328+ es_uuid = uuid ,
327329 websocket = websocket ,
328330 offset_specification = offset_specification ,
329331 )
@@ -335,12 +337,17 @@ async def event_stream(
335337 _LOGGER .info ("Closed WebSocket for %s (uuid=%s)" , es_id , uuid )
336338
337339
338- async def generate_on_message_for_websocket (websocket : WebSocket , es_id : str ):
340+ async def generate_on_message_for_websocket (
341+ websocket : WebSocket , es_id : str , es_routing_key : str , es_uuid : str
342+ ):
339343 """Here we use "currying" to append pre-set parameters
340- to a function that wil be used as the stream consumer message callback handler.
344+ to a function that will be used as the stream consumer message callback handler.
341345 We need the callback to 'know' the WebSocket and (for diagnostics) the ES ID
342346 """
343347 assert websocket
348+ assert es_id
349+ assert es_routing_key
350+ assert es_uuid
344351
345352 async def on_message_for_websocket (
346353 msg : AMQPMessage , message_context : MessageContext
@@ -368,7 +375,21 @@ async def on_message_for_websocket(
368375 )
369376
370377 shutdown : bool = False
371- if msg == b"POISON" :
378+ # We shutdown if...
379+ # 1. we are no longer the source of events fo the stream
380+ # (e.g. our UUID is not the value of the cached routing key im memcached).
381+ # This typically means we've been replaced by a new stream.
382+ # 2. We get a POISON message
383+ stream_uuid : str = _MEMCACHED_CLIENT .get (es_routing_key )
384+ if stream_uuid != es_uuid :
385+ _LOGGER .info (
386+ "There is a new owner of %s (uuid=%s). It is not me (uuid=%s) (stopping)..." ,
387+ es_id ,
388+ stream_uuid ,
389+ es_uuid ,
390+ )
391+ shutdown = True
392+ elif msg == b"POISON" :
372393 _LOGGER .info ("Taking POISON for %s (stopping)..." , es_id )
373394 shutdown = True
374395 elif msg :
@@ -406,13 +427,17 @@ async def _consume(
406427 consumer : Consumer ,
407428 stream_name : str ,
408429 es_id : str ,
430+ es_routing_key : str ,
431+ es_uuid : str ,
409432 websocket : WebSocket ,
410433 offset_specification : ConsumerOffsetSpecification ,
411434):
412435 """An asynchronous generator yielding message bodies from the queue
413436 based on the provided routing key.
414437 """
415- on_message = await generate_on_message_for_websocket (websocket , es_id )
438+ on_message = await generate_on_message_for_websocket (
439+ websocket , es_id , es_routing_key , es_uuid
440+ )
416441
417442 _LOGGER .info (
418443 "Starting consumer %s (offset type=%s offset=%s)..." ,
0 commit comments