11"""The entrypoint for the Squonk2 FastAPI WebSocket service."""
22
3+ import json
34import logging
45import os
5- from typing import Dict
6+ import sqlite3
7+ from logging .config import dictConfig
8+ from typing import Any , Dict
69
710import aio_pika
811import shortuuid
912from fastapi import FastAPI , HTTPException , WebSocket , status
1013from pydantic import BaseModel
1114
12- logging .basicConfig (level = logging .DEBUG )
15+ # Configure logging
16+ print ("Configuring logging..." )
17+ _LOGGING_CONFIG : Dict [str , Any ] = {}
18+ with open ("logging.config" , "r" , encoding = "utf8" ) as stream :
19+ try :
20+ _LOGGING_CONFIG = json .loads (stream .read ())
21+ except json .decoder .JSONDecodeError as exc :
22+ print (exc )
23+ dictConfig (_LOGGING_CONFIG )
24+ print ("Configured logging." )
25+
1326_LOGGER = logging .getLogger (__name__ )
1427
1528# Public (event-stream) and internal (REST) services
1629app_public = FastAPI ()
1730app_internal = FastAPI ()
1831
19- # A map of event streams (a short UUID) to their routing keys.
20- _ES_ROUTING_MAP : Dict [str , str ] = {}
21- # A map of event streams IDs to their UUIDs.
22- _ES_UUID_MAP : Dict [int , str ] = {}
23-
2432_INGRESS_LOCATION : str = os .getenv ("INGRESS_LOCATION" )
2533assert _INGRESS_LOCATION , "INGRESS_LOCATION environment variable must be set"
2634
2937assert _AMPQ_URL , "AMPQ_URL environment variable must be set"
3038_LOGGER .info ("AMPQ_URL: %s" , _AMPQ_URL )
3139
40+ # Create our local database.
41+ # A table to record allocated Event Streams.
42+ # The table 'id' is an INTEGER PRIMARY KEY and so becomes an auto-incrementing
43+ # value when NONE is passed in as it's value.
44+ _DATABASE_PATH = "/data/event-streams.db"
45+ _LOGGER .info ("Creating SQLite database (%s)..." , _DATABASE_PATH )
46+
47+ _DB_CONNECTION = sqlite3 .connect (_DATABASE_PATH )
48+ _CUR = _DB_CONNECTION .cursor ()
49+ _CUR .execute (
50+ "CREATE TABLE IF NOT EXISTS es (id INTEGER PRIMARY KEY, uuid TEXT, routing_key TEXT)"
51+ )
52+ _DB_CONNECTION .commit ()
53+ _DB_CONNECTION .close ()
54+
55+ _LOGGER .info ("Created." )
56+
3257
3358# We use pydantic to declare the model (request payloads) for the internal REST API.
3459# The public API is a WebSocket API and does not require a model.
@@ -41,33 +66,44 @@ class EventStreamPostRequestBody(BaseModel):
4166# Endpoints for the 'public-facing' event-stream web-socket API ------------------------
4267
4368
44- @app_public .websocket ("/event-stream/{es_id }" )
45- async def event_stream (websocket : WebSocket , es_id : str ):
69+ @app_public .websocket ("/event-stream/{uuid }" )
70+ async def event_stream (websocket : WebSocket , uuid : str ):
4671 """The websocket handler for the event-stream."""
47- _LOGGER .info ("_ES_UUID_MAP: %s" , _ES_UUID_MAP )
4872
49- # if not es_id in _ES_UUID_MAP:
50- # raise HTTPException(
51- # status_code=status.HTTP_404_NOT_FOUND,
52- # detail=f"EventStream {es_id} is not known",
53- # )
73+ # Get the DB record for this UUID...
74+ db = sqlite3 .connect (_DATABASE_PATH )
75+ cursor = db .cursor ()
76+ res = cursor .execute (f"SELECT * FROM es WHERE uuid='{ uuid } '" )
77+ es = res .fetchone ()
78+ db .close ()
79+ if not es :
80+ raise HTTPException (
81+ status_code = status .HTTP_404_NOT_FOUND ,
82+ detail = f"EventStream { uuid } is not known" ,
83+ )
84+
85+ # Get the ID (for diagnostics)
86+ # and the routing key for the queue...
87+ es_id = es [0 ]
88+ routing_key : str = es [2 ]
89+
5490 await websocket .accept ()
55- _LOGGER .info ("Accepted connection for event-stream %s" , es_id )
91+ _LOGGER .info (
92+ "Accepted connection for %s (uuid=%s routing_key=%s)" , es_id , uuid , routing_key
93+ )
5694
57- # routing_key: str = _ES_UUID_MAP[es_id]
58- routing_key : str = "abcdefgh"
59- _LOGGER .info ("Creating reader for routing key %s (%s)" , routing_key , es_id )
60- message_reader = get_from_queue (routing_key )
95+ _LOGGER .info ("Creating reader for %s..." , es_id )
96+ message_reader = _get_from_queue (routing_key )
6197
62- _LOGGER .info ("Reading from %s (%s) ..." , routing_key , es_id )
98+ _LOGGER .info ("Reading from %s..." , es_id )
6399 while True :
64100 reader = anext (message_reader )
65101 message_body = await reader
66- _LOGGER .info ("Got %s from %s (%s)" , message_body , routing_key , es_id )
102+ _LOGGER .info ("Got %s for %s (routing_key= %s)" , message_body , es_id , routing_key )
67103 await websocket .send_text (str (message_body ))
68104
69105
70- async def get_from_queue (routing_key : str ):
106+ async def _get_from_queue (routing_key : str ):
71107 """An asynchronous generator yielding message bodies from the queue
72108 based on the provided routing key.
73109 """
@@ -103,31 +139,59 @@ def post_es(request_body: EventStreamPostRequestBody):
103139 and an ID the event stream is known by (that can be used to delete the stream).
104140 In our case, it's a WebSocket URL like 'ws://localhost:8000/event-stream/0000'.
105141 """
106- uuid_str = shortuuid .uuid ()
107- next_id = len (_ES_UUID_MAP ) + 1
108- _ES_ROUTING_MAP [uuid_str ] = request_body .routing_key
109- _ES_UUID_MAP [next_id ] = uuid_str
110- _LOGGER .info ("_ES_ROUTING_MAP: %s" , _ES_ROUTING_MAP )
111- _LOGGER .info ("_ES_UUID_MAP: %s" , _ES_UUID_MAP )
142+ uuid_str : str = shortuuid .uuid ()
143+ location : str = f"ws://{ _INGRESS_LOCATION } /event-stream/{ uuid_str } "
144+
145+ # Create a new ES record...
146+ # An ID is assigned automatically -
147+ # we just need to supply a short UUID and corresponding location.
148+ _LOGGER .info ("Creating new event stream: %s" , uuid_str )
149+
150+ db = sqlite3 .connect (_DATABASE_PATH )
151+ cursor = db .cursor ()
152+ cursor .execute (
153+ f"INSERT INTO es VALUES (NULL, '{ uuid_str } ', '{ request_body .routing_key } ')"
154+ )
155+ db .commit ()
156+ # Now pull the record back to get the assigned numeric ID...
157+ res = cursor .execute (f"SELECT * FROM es WHERE uuid='{ uuid_str } '" )
158+ es = res .fetchone ()
159+ assert es , "Failed to insert new event stream"
160+ db .close ()
161+
162+ _LOGGER .info ("Created %s" , es )
112163
113164 return {
114- "id" : next_id ,
115- "location" : f"ws:// { _INGRESS_LOCATION } /event-stream/ { uuid_str } " ,
165+ "id" : es [ 0 ] ,
166+ "location" : location ,
116167 }
117168
118169
119170@app_internal .delete ("/event-stream/{es_id}" )
120171def delete_es (es_id : int ):
121172 """Destroys an existing event-stream."""
122- _LOGGER .info ("_ES_UUID_MAP: %s" , _ES_UUID_MAP )
123- if es_id not in _ES_UUID_MAP :
173+
174+ _LOGGER .info ("Deleting event stream: %s" , es_id )
175+
176+ # Get the ES record (by primary key)
177+ db = sqlite3 .connect (_DATABASE_PATH )
178+ cursor = db .cursor ()
179+ res = cursor .execute (f"SELECT * FROM es WHERE id={ es_id } " )
180+ es = res .fetchone ()
181+ db .close ()
182+ if not es :
124183 raise HTTPException (
125184 status_code = status .HTTP_404_NOT_FOUND ,
126185 detail = f"EventStream { es_id } is not known" ,
127186 )
128187
129- es_uuid = _ES_UUID_MAP [es_id ]
130- _ = _ES_UUID_MAP .pop (es_id )
131- _ = _ES_ROUTING_MAP .pop (es_uuid )
188+ # Delete the ES record...
189+ db = sqlite3 .connect (_DATABASE_PATH )
190+ cursor = db .cursor ()
191+ cursor .execute (f"DELETE FROM es WHERE id={ es_id } " )
192+ db .commit ()
193+ db .close ()
194+
195+ _LOGGER .info ("Deleted %s" , es_id )
132196
133197 return {}
0 commit comments