11"""The entrypoint for the Squonk2 FastAPI WebSocket service."""
22
3- import uuid
3+ import logging
4+ import os
45from typing import Dict
56
7+ import aio_pika
8+ import shortuuid
69from fastapi import FastAPI , HTTPException , WebSocket , status
710from pydantic import BaseModel
811
12+ logging .basicConfig (level = logging .DEBUG )
13+ _LOGGER = logging .getLogger (__name__ )
14+
915# Public (event-stream) and internal (REST) services
1016app_public = FastAPI ()
1117app_internal = FastAPI ()
1218
13- # A map of event stream IDs (a UUID with a "es-" prefix) to their routing keys.
14- _ES_UUID_MAP : Dict = {}
19+ # A map of event streams (a short UUID) to their routing keys.
20+ _ES_ROUTING_MAP : Dict [str , str ] = {}
21+ # A map of event streams IDs to their UUIDs.
22+ _ES_UUID_MAP : Dict [int , str ] = {}
23+
24+ _INGRESS_LOCATION : str = os .getenv ("INGRESS_LOCATION" )
25+ assert _INGRESS_LOCATION , "INGRESS_LOCATION environment variable must be set"
26+
27+ _AMPQ_EXCHANGE : str = "event-streams"
28+ _AMPQ_URL : str = os .getenv ("AMPQ_URL" )
29+ assert _AMPQ_URL , "AMPQ_URL environment variable must be set"
30+ _LOGGER .info ("AMPQ_URL: %s" , _AMPQ_URL )
1531
1632
1733# We use pydantic to declare the model (request payloads) for the internal REST API.
@@ -28,39 +44,90 @@ class EventStreamPostRequestBody(BaseModel):
2844@app_public .websocket ("/event-stream/{es_id}" )
2945async def event_stream (websocket : WebSocket , es_id : str ):
3046 """The websocket handler for the event-stream."""
31- if not es_id in _ES_UUID_MAP :
32- raise HTTPException (
33- status_code = status .HTTP_404_NOT_FOUND ,
34- detail = f"EventStream { es_id } is not known" ,
35- )
47+ _LOGGER .info ("_ES_UUID_MAP: %s" , _ES_UUID_MAP )
48+
49+ # if not es_id in _ES_UUID_MAP:
50+ # raise HTTPException(
51+ # status_code=status.HTTP_404_NOT_FOUND,
52+ # detail=f"EventStream {es_id} is not known",
53+ # )
3654 await websocket .accept ()
55+ _LOGGER .info ("Accepted connection for event-stream %s" , es_id )
56+
57+ # routing_key: str = _ES_UUID_MAP[es_id]
58+ routing_key : str = "abcdefgh"
59+ _LOGGER .info ("Creating reader for routing key %s (%s)" , routing_key , es_id )
60+ message_reader = get_from_queue (routing_key )
61+
62+ _LOGGER .info ("Reading from %s (%s)..." , routing_key , es_id )
3763 while True :
38- data = await websocket .receive_text ()
39- await websocket .send_text (f"Message text was: { data } " )
64+ reader = anext (message_reader )
65+ message_body = await reader
66+ _LOGGER .info ("Got %s from %s (%s)" , message_body , routing_key , es_id )
67+ await websocket .send_text (str (message_body ))
68+
69+
70+ async def get_from_queue (routing_key : str ):
71+ """An asynchronous generator yielding message bodies from the queue
72+ based on the provided routing key.
73+ """
74+ connection = await aio_pika .connect_robust (_AMPQ_URL )
75+
76+ async with connection :
77+ channel = await connection .channel ()
78+ es_exchange = await channel .declare_exchange (
79+ _AMPQ_EXCHANGE ,
80+ aio_pika .ExchangeType .DIRECT ,
81+ )
82+ queue = await channel .declare_queue (exclusive = True )
83+ await queue .bind (es_exchange , routing_key = routing_key )
84+
85+ async with queue .iterator () as queue_iter :
86+ async for message in queue_iter :
87+ async with message .process ():
88+ yield message .body
4089
4190
4291# Endpoints for the 'internal' event-stream management API -----------------------------
4392
4493
4594@app_internal .post ("/event-stream/" )
4695def post_es (request_body : EventStreamPostRequestBody ):
47- """Create a new event-stream."""
48- uuid_str = f"es-{ uuid .uuid4 ()} "
49- _ES_UUID_MAP [uuid_str ] = request_body .routing_key
50- print (_ES_UUID_MAP )
96+ """Create a new event-stream returning the endpoint location.
5197
52- return {"id" : uuid_str }
98+ The AS provides a routing key to this endpoint and expects a event stream location
99+ in return.
100+
101+ This is one of the required endpoints for the Squonk2 event-stream service.
102+ If successful it must return the location the client can use to read data
103+ and an ID the event stream is known by (that can be used to delete the stream).
104+ In our case, it's a WebSocket URL like 'ws://localhost:8000/event-stream/0000'.
105+ """
106+ uuid_str = shortuuid .uuid ()
107+ next_id = len (_ES_UUID_MAP ) + 1
108+ _ES_ROUTING_MAP [uuid_str ] = request_body .routing_key
109+ _ES_UUID_MAP [next_id ] = uuid_str
110+ _LOGGER .info ("_ES_ROUTING_MAP: %s" , _ES_ROUTING_MAP )
111+ _LOGGER .info ("_ES_UUID_MAP: %s" , _ES_UUID_MAP )
112+
113+ return {
114+ "id" : next_id ,
115+ "location" : f"ws://{ _INGRESS_LOCATION } /event-stream/{ uuid_str } " ,
116+ }
53117
54118
55119@app_internal .delete ("/event-stream/{es_id}" )
56- def delete_es (es_id : str ):
120+ def delete_es (es_id : int ):
57121 """Destroys an existing event-stream."""
58- print ( _ES_UUID_MAP )
122+ _LOGGER . info ( "_ES_UUID_MAP: %s" , _ES_UUID_MAP )
59123 if es_id not in _ES_UUID_MAP :
60124 raise HTTPException (
61125 status_code = status .HTTP_404_NOT_FOUND ,
62126 detail = f"EventStream { es_id } is not known" ,
63127 )
128+
129+ es_uuid = _ES_UUID_MAP [es_id ]
64130 _ = _ES_UUID_MAP .pop (es_id )
131+ _ = _ES_ROUTING_MAP .pop (es_uuid )
65132
66133 return {}
0 commit comments