@@ -332,7 +332,7 @@ async def event_stream(
332332 _LOGGER .warning (
333333 "Assigning unique WebSocket ID (%s) for this EventStream (routing_key=%s uuid=%s)" ,
334334 new_socket_uuid ,
335- routing_key ,
335+ es_routing_key ,
336336 uuid ,
337337 )
338338 existing_routing_key_uuid : str = _MEMCACHED_CLIENT .get (routing_key )
@@ -349,8 +349,7 @@ async def event_stream(
349349 # We don't return from here until there's an error.
350350 _LOGGER .debug ("Consuming %s..." , es_id )
351351 await _consume (
352- consumer = consumer ,
353- stream_name = es_routing_key ,
352+ consumer ,
354353 es_id = es_id ,
355354 es_routing_key = es_routing_key ,
356355 es_websocket_uuid = new_socket_uuid ,
@@ -366,7 +365,7 @@ async def event_stream(
366365
367366
368367async def generate_on_message_for_websocket (
369- websocket : WebSocket , es_id : str , es_routing_key : str , es_websocket_uuid : str
368+ websocket : WebSocket , * , es_id : str , es_routing_key : str , es_websocket_uuid : str
370369):
371370 """Here we use "currying" to append pre-set parameters
372371 to a function that will be used as the stream consumer message callback handler.
@@ -451,9 +450,8 @@ async def on_message_for_websocket(
451450
452451
453452async def _consume (
454- * ,
455453 consumer : Consumer ,
456- stream_name : str ,
454+ * ,
457455 es_id : str ,
458456 es_routing_key : str ,
459457 es_websocket_uuid : str ,
@@ -464,7 +462,7 @@ async def _consume(
464462 based on the provided routing key.
465463 """
466464 on_message = await generate_on_message_for_websocket (
467- websocket = websocket ,
465+ websocket ,
468466 es_id = es_id ,
469467 es_routing_key = es_routing_key ,
470468 es_websocket_uuid = es_websocket_uuid ,
@@ -481,13 +479,13 @@ async def _consume(
481479 subscribed : bool = True
482480 try :
483481 await consumer .subscribe (
484- stream = stream_name ,
482+ stream = es_routing_key ,
485483 callback = on_message ,
486484 decoder = amqp_decoder ,
487485 offset_specification = offset_specification ,
488486 )
489487 except StreamDoesNotExist :
490- _LOGGER .warning ("Stream '%s' for %s does not exist" , stream_name , es_id )
488+ _LOGGER .warning ("Stream '%s' for %s does not exist" , es_routing_key , es_id )
491489 subscribed = False
492490
493491 if subscribed :
0 commit comments