44import logging
55import os
66import sqlite3
7- import threading
87from logging .config import dictConfig
98from typing import Any
109
@@ -87,14 +86,6 @@ def _get_location(uuid: str) -> str:
8786 _ES [2 ],
8887 )
8988
90- # The active websocket connections (and a thread lock).
91- # A set of WebSocket objects appended to when a connection is made
92- # and where objects are removed when each the connection is closed.
93- # This is used by the "shutdown" event to gracefully close all
94- # active connections.
95- _ACTIVE_CONNECTIONS : set [WebSocket ] = set ()
96- _ACTIVE_CONNECTIONS_LOCK = threading .Lock ()
97-
9889
9990# We use pydantic to declare the model (request payloads) for the internal REST API.
10091# The public API is a WebSocket API and does not require a model.
@@ -125,26 +116,6 @@ class EventStreamGetResponse(BaseModel):
125116 event_streams : list [EventStreamItem ]
126117
127118
128- def _add_connection (websocket : WebSocket ):
129- """Safely add a connection to the active connections set."""
130- with _ACTIVE_CONNECTIONS_LOCK :
131- _ACTIVE_CONNECTIONS .add (websocket )
132-
133-
134- def _remove_connection (websocket : WebSocket ):
135- """Safely remove a connection from the active connections set."""
136- with _ACTIVE_CONNECTIONS_LOCK :
137- _ACTIVE_CONNECTIONS .remove (websocket )
138-
139-
140- @app_public .on_event ("shutdown" )
141- async def shutdown ():
142- """The application is shutting down.
143- Gracefully close all active connections."""
144- for websocket in _ACTIVE_CONNECTIONS :
145- await websocket .close (code = status .WS_1001_GOING_AWAY , reason = "Server shutdown" )
146-
147-
148119# Endpoints for the 'public-facing' event-stream web-socket API ------------------------
149120
150121
@@ -187,13 +158,7 @@ async def event_stream(websocket: WebSocket, uuid: str):
187158 routing_key ,
188159 )
189160 await websocket .accept ()
190- # Add us to the set of active connections
191- _add_connection (websocket )
192- _LOGGER .info (
193- "Accepted connection for %s (%s active connections)" ,
194- es_id ,
195- len (_ACTIVE_CONNECTIONS ),
196- )
161+ _LOGGER .info ("Accepted connection for %s" , es_id )
197162
198163 _LOGGER .debug ("Creating reader for %s..." , es_id )
199164 message_reader = _get_from_queue (routing_key )
@@ -213,9 +178,6 @@ async def event_stream(websocket: WebSocket, uuid: str):
213178 else :
214179 await websocket .send_text (str (message_body ))
215180
216- # Remove us from the set of active connections
217- _remove_connection (websocket )
218-
219181 _LOGGER .info ("Closing %s (uuid=%s)..." , es_id , uuid )
220182 await websocket .close (
221183 code = status .WS_1000_NORMAL_CLOSURE , reason = "The stream has been deleted"
0 commit comments