2020 status ,
2121)
2222from pydantic import BaseModel
23- from pymemcache .client .base import Client
23+ from pymemcache .client .base import Client , KeepaliveOpts
2424from pymemcache .client .retrying import RetryingClient
2525from pymemcache .exceptions import MemcacheUnexpectedCloseError
2626from rstream import (
7373# The location is either a host ("localhost")
7474# or host and port if the port is not the expected default of 11211 ("localhost:1234")
7575_MEMCACHED_LOCATION : str = os .getenv ("ESS_MEMCACHED_LOCATION" , "localhost" )
76+ _MEMCACHED_KEEPALIVE : KeepaliveOpts = KeepaliveOpts (idle = 35 , intvl = 8 , cnt = 5 )
7677_MEMCACHED_BASE_CLIENT : Client = Client (
77- _MEMCACHED_LOCATION , connect_timeout = 4 , timeout = 0.5
78+ _MEMCACHED_LOCATION ,
79+ connect_timeout = 4 ,
80+ encoding = "utf-8" ,
81+ timeout = 0.5 ,
82+ socket_keepalive = _MEMCACHED_KEEPALIVE ,
7883)
7984_MEMCACHED_CLIENT : RetryingClient = RetryingClient (
8085 _MEMCACHED_BASE_CLIENT ,
8186 attempts = 3 ,
8287 retry_delay = 0.01 ,
8388 retry_for = [MemcacheUnexpectedCloseError ],
8489)
90+ _LOGGER .info ("memcached version=%s" , _MEMCACHED_CLIENT .version ())
8591
8692# SQLite database path
8793_DATABASE_PATH = "/data/event-streams.db"
@@ -120,18 +126,22 @@ def _get_location(uuid: str) -> str:
120126 _LOGGER .info ("Created (or exists)" )
121127
122128 # List existing event streams
129+ # and ensure the memcached cache reflects this...
123130 _DB_CONNECTION = sqlite3 .connect (_DATABASE_PATH )
124131 _CUR = _DB_CONNECTION .cursor ()
125132 _RES = _CUR .execute ("SELECT * FROM es" )
126133 _EVENT_STREAMS = _RES .fetchall ()
127134 _DB_CONNECTION .close ()
128135 for _ES in _EVENT_STREAMS :
136+ routing_key : str = _ES [2 ]
137+ ess_uuid : str = _ES [1 ]
129138 _LOGGER .info (
130139 "Existing EventStream: %s (id=%s routing_key='%s')" ,
131140 _get_location (_ES [1 ]),
132- _ES [ 0 ] ,
133- _ES [ 2 ] ,
141+ ess_uuid ,
142+ routing_key ,
134143 )
144+ _MEMCACHED_CLIENT .set (routing_key , ess_uuid )
135145
136146
137147# We use pydantic to declare the model (request payloads) for the internal REST API.
@@ -282,13 +292,13 @@ async def event_stream(
282292 # Get the ID (for diagnostics)
283293 # and the routing key for the queue...
284294 es_id = es [0 ]
285- routing_key : str = es [2 ]
295+ es_routing_key : str = es [2 ]
286296
287297 _LOGGER .info (
288298 "Creating Consumer for %s (uuid=%s) [%s]..." ,
289299 es_id ,
290300 uuid ,
291- routing_key ,
301+ es_routing_key ,
292302 )
293303 consumer : Consumer = Consumer (
294304 _AMPQ_HOSTNAME ,
@@ -300,7 +310,7 @@ async def event_stream(
300310 # Before continuing ... does the stream exist?
301311 # If we don't check it now we'll fail later anyway.
302312 # The AS is expected to create and delete the streams.
303- if not await consumer .stream_exists (routing_key ):
313+ if not await consumer .stream_exists (es_routing_key ):
304314 msg : str = f"EventStream { uuid } cannot be found"
305315 _LOGGER .warning (msg )
306316 await websocket .close (code = status .WS_1013_TRY_AGAIN_LATER , reason = msg )
@@ -311,7 +321,7 @@ async def event_stream(
311321 _LOGGER .debug ("Consuming %s..." , es_id )
312322 await _consume (
313323 consumer = consumer ,
314- stream_name = routing_key ,
324+ stream_name = es_routing_key ,
315325 es_id = es_id ,
316326 websocket = websocket ,
317327 offset_specification = offset_specification ,
@@ -464,21 +474,22 @@ def post_es(request_body: EventStreamPostRequestBody) -> EventStreamPostResponse
464474 # Create a new ES record.
465475 # An ID is assigned automatically -
466476 # we just need to provide a UUID and the routing key.
467- routing_key : str = request_body .routing_key
477+ es_routing_key : str = request_body .routing_key
468478 _LOGGER .info (
469- "Creating new event stream %s (routing_key='%s')..." , uuid_str , routing_key
479+ "Creating new event stream %s (routing_key='%s')..." , uuid_str , es_routing_key
470480 )
471481
472482 db = sqlite3 .connect (_DATABASE_PATH )
473483 cursor = db .cursor ()
474- cursor .execute (f"INSERT INTO es VALUES (NULL, '{ uuid_str } ', '{ routing_key } ')" )
484+ cursor .execute (f"INSERT INTO es VALUES (NULL, '{ uuid_str } ', '{ es_routing_key } ')" )
475485 db .commit ()
476486 # Now pull the record back to get the assigned record ID...
477487 es = cursor .execute (f"SELECT * FROM es WHERE uuid='{ uuid_str } '" ).fetchone ()
478488 db .close ()
479489 if not es :
480490 msg : str = (
481- f"Failed to get new EventStream record ID for { uuid_str } (routing_key='{ routing_key } ')"
491+ f"Failed to get new EventStream record ID for { uuid_str } "
492+ f" (routing_key='{ es_routing_key } ')"
482493 )
483494 _LOGGER .error (msg )
484495 raise HTTPException (
@@ -498,8 +509,8 @@ def post_es(request_body: EventStreamPostRequestBody) -> EventStreamPostResponse
498509 # This might 'knock-out' a previous stream registered against that key.
499510 # That's fine - the asyncio process will notice this on the next message
500511 # and will kill itself.
501- existing_routing_key_uuid : str = _MEMCACHED_CLIENT .set (routing_key )
502- if existing_routing_key_uuid != uuid_str :
512+ existing_routing_key_uuid : str = _MEMCACHED_CLIENT .get (routing_key )
513+ if existing_routing_key_uuid and existing_routing_key_uuid != uuid_str :
503514 _LOGGER .warning (
504515 "Replaced routing key %s UUID (%s) with ours (%s)" ,
505516 routing_key ,
@@ -556,15 +567,18 @@ def delete_es(es_id: int):
556567 detail = msg ,
557568 )
558569
559- routing_key : str = es [2 ]
570+ es_routing_key : str = es [2 ]
560571 _LOGGER .info (
561- "Deleting event stream %s (uuid=%s routing_key='%s')" , es_id , es [1 ], routing_key
572+ "Deleting event stream %s (uuid=%s routing_key='%s')" ,
573+ es_id ,
574+ es [1 ],
575+ es_routing_key ,
562576 )
563577
564578 # Clear the memcached value of this routing key.
565579 # Any asyncio process using this routing key
566580 # will notice this and kill itself (at some point)
567- _MEMCACHED_CLIENT .delete (routing_key )
581+ _MEMCACHED_CLIENT .delete (es_routing_key )
568582
569583 # Delete the ES record...
570584 # This will prevent any further connections.
0 commit comments