@@ -14,7 +14,7 @@ use crate::event_filter::{is_restricted_mode, load_restricted_filters};
1414use crate :: event_listener:: EventName ;
1515use crate :: top_k_tracker:: { AccessEntry , TopKTracker } ;
1616
17- use super :: event_filter:: { ClientMessage , EventFilter } ;
17+ use super :: event_filter:: EventFilter ;
1818use super :: event_listener:: EventData ;
1919use super :: serializable_event:: SerializableEventData ;
2020
@@ -67,61 +67,6 @@ impl TPSTracker {
6767 }
6868}
6969
70- /// Wait for the client to send a subscription message, with a timeout.
71- /// Returns the filter, or None if the client disconnects or times out.
72- async fn wait_for_subscription (
73- ws_receiver : & mut SplitStream < WebSocketStream < TcpStream > > ,
74- addr : SocketAddr ,
75- restricted_filters : Option < EventFilter > ,
76- ) -> Option < EventFilter > {
77- let timeout = tokio:: time:: Duration :: from_secs ( 10 ) ;
78-
79- match tokio:: time:: timeout ( timeout, ws_receiver. next ( ) ) . await {
80- Ok ( Some ( Ok ( Message :: Text ( text) ) ) ) => match serde_json:: from_str :: < ClientMessage > ( & text) {
81- Ok ( ClientMessage :: Subscribe { event_filters } ) => {
82- let filter = EventFilter :: new ( event_filters. clone ( ) ) ;
83- if let Some ( restricted_filters) = restricted_filters {
84- if filter != restricted_filters {
85- warn ! (
86- "Client {} subscription does not match restricted filters, closing connection" ,
87- addr
88- ) ;
89- return None ;
90- }
91- }
92- if filter. accepts_all ( ) {
93- info ! ( "Client {} subscribed to all events" , addr) ;
94- } else {
95- info ! ( "Client {} subscribed with {} event filters" , addr, event_filters. len( ) ) ;
96- }
97- Some ( filter)
98- }
99- Err ( e) => {
100- warn ! (
101- "Client {} sent invalid subscription: {} - {}" ,
102- addr, e, text
103- ) ;
104- None
105- }
106- } ,
107- Ok ( Some ( Ok ( _) ) ) => {
108- warn ! ( "Client {} sent non-text message before subscribing" , addr) ;
109- None
110- }
111- Ok ( Some ( Err ( e) ) ) => {
112- warn ! ( "WebSocket error from {} before subscription: {}" , addr, e) ;
113- None
114- }
115- Ok ( None ) => {
116- warn ! ( "Client {} disconnected before subscribing" , addr) ;
117- None
118- }
119- Err ( _) => {
120- warn ! ( "Client {} timed out waiting for subscription" , addr) ;
121- None
122- }
123- }
124- }
12570
12671// Helper function to process events into buffers
12772fn process_event (
@@ -352,7 +297,7 @@ async fn handle_connection(
352297 stream : TcpStream ,
353298 addr : SocketAddr ,
354299 event_broadcast_receiver : broadcast:: Receiver < EventDataOrMetrics > ,
355- restricted_filters : Option < EventFilter > ,
300+ filter : EventFilter ,
356301) {
357302 info ! ( "New WebSocket connection from: {}" , addr) ;
358303
@@ -364,13 +309,7 @@ async fn handle_connection(
364309 }
365310 } ;
366311
367- let ( ws_sender, mut ws_receiver) = ws_stream. split ( ) ;
368-
369- // Wait for subscription message before streaming events
370- let filter = match wait_for_subscription ( & mut ws_receiver, addr, restricted_filters) . await {
371- Some ( f) => f,
372- None => return ,
373- } ;
312+ let ( ws_sender, ws_receiver) = ws_stream. split ( ) ;
374313
375314 // Spawn a task to receive events from the broadcast channel and send batches to this client
376315 let mut send_task = tokio:: spawn ( client_write_task (
@@ -381,7 +320,7 @@ async fn handle_connection(
381320 ) ) ;
382321
383322 // Spawn a task to handle incoming messages from the client (mostly for ping/pong)
384- let mut recv_task = tokio:: spawn ( client_read_task ( addr. clone ( ) , ws_receiver) ) ;
323+ let mut recv_task = tokio:: spawn ( client_read_task ( addr, ws_receiver) ) ;
385324
386325 // Wait for either task to finish
387326 tokio:: select! {
@@ -420,20 +359,20 @@ pub async fn run_websocket_server(
420359 let listener = TcpListener :: bind ( & server_addr) . await ?;
421360 info ! ( "WebSocket server listening on: {}" , server_addr) ;
422361
423- let restricted_filters : Option < EventFilter > = if is_restricted_mode ( ) {
362+ let filter : EventFilter = if is_restricted_mode ( ) {
424363 info ! ( "Running in restricted mode" ) ;
425- Some ( load_restricted_filters ( ) )
364+ load_restricted_filters ( )
426365 } else {
427366 info ! ( "Running in unrestricted mode" ) ;
428- None
367+ EventFilter :: default ( )
429368 } ;
430369
431370 // Accept incoming connections
432371 loop {
433372 match listener. accept ( ) . await {
434- Ok ( ( stream, client_addr) ) => {
373+ Ok ( ( stream, client_addr) ) => {
435374 let event_broadcast_receiver = event_broadcast_sender. subscribe ( ) ;
436- tokio:: spawn ( handle_connection ( stream, client_addr, event_broadcast_receiver, restricted_filters . clone ( ) ) ) ;
375+ tokio:: spawn ( handle_connection ( stream, client_addr, event_broadcast_receiver, filter . clone ( ) ) ) ;
437376 }
438377 Err ( e) => {
439378 error ! ( "Error accepting connection: {}" , e) ;
0 commit comments