11use std:: net:: SocketAddr ;
2+ use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
3+ use std:: sync:: Arc ;
4+ use std:: time:: { SystemTime , UNIX_EPOCH } ;
25
36use alloy_primitives:: { Address , B256 } ;
7+ use http_body_util:: Full ;
8+ use hyper:: body:: Bytes ;
9+ use hyper:: server:: conn:: http1;
10+ use hyper:: service:: service_fn;
11+ use hyper:: { Request , Response , StatusCode } ;
412use futures_util:: stream:: SplitSink ;
513use futures_util:: { stream:: SplitStream , SinkExt , StreamExt } ;
614use monad_exec_events:: ExecEvent ;
@@ -18,6 +26,9 @@ use super::event_filter::EventFilter;
1826use super :: event_listener:: EventData ;
1927use super :: serializable_event:: SerializableEventData ;
2028
29+ /// Stores the Unix timestamp (in seconds) of the last event received from the ring
30+ type LastEventTime = Arc < AtomicU64 > ;
31+
2132#[ derive( Debug , Clone , Serialize , Deserialize ) ]
2233pub struct TopAccessesData {
2334 pub account : Vec < AccessEntry < Address > > ,
@@ -188,6 +199,7 @@ async fn client_read_task(
188199async fn run_event_forwarder_task (
189200 mut event_receiver : tokio:: sync:: mpsc:: Receiver < EventData > ,
190201 event_broadcast_sender : broadcast:: Sender < EventDataOrMetrics > ,
202+ last_event_time : LastEventTime ,
191203) {
192204 let mut account_accesses = TopKTracker :: new ( 1_000 ) ;
193205 let mut storage_accesses = TopKTracker :: new ( 1_000 ) ;
@@ -207,6 +219,13 @@ async fn run_event_forwarder_task(
207219 }
208220 let mut event_data = event_data. unwrap( ) ;
209221
222+ // Update last event timestamp for health check
223+ let now_secs = SystemTime :: now( )
224+ . duration_since( UNIX_EPOCH )
225+ . unwrap_or_default( )
226+ . as_secs( ) ;
227+ last_event_time. store( now_secs, Ordering :: Relaxed ) ;
228+
210229 // Track txn_hash from TxnHeaderStart events
211230 if let EventName :: TxnHeaderStart = event_data. event_name {
212231 if let ExecEvent :: TxnHeaderStart { txn_index, txn_header_start, .. } = & event_data. payload {
@@ -341,20 +360,60 @@ async fn handle_connection(
341360 info ! ( "WebSocket connection closed: {}" , addr) ;
342361}
343362
344- pub async fn run_websocket_server (
345- server_addr : SocketAddr ,
346- event_receiver : tokio:: sync:: mpsc:: Receiver < EventData > ,
347- ) -> Result < ( ) , Box < dyn std:: error:: Error > > {
348- // Create a broadcast channel for distributing events to all clients
349- let ( event_broadcast_sender, _) = broadcast:: channel :: < EventDataOrMetrics > ( 1_000_000 ) ;
363+ async fn health_handler (
364+ last_event_time : LastEventTime ,
365+ ) -> Result < Response < Full < Bytes > > , hyper:: Error > {
366+ let now_secs = SystemTime :: now ( )
367+ . duration_since ( UNIX_EPOCH )
368+ . unwrap_or_default ( )
369+ . as_secs ( ) ;
370+ let last_event = last_event_time. load ( Ordering :: Relaxed ) ;
371+ let is_healthy = now_secs. saturating_sub ( last_event) <= 10 ;
372+
373+ let body = if is_healthy {
374+ info ! ( "Health check passed" ) ;
375+ r#"{"success": true}"#
376+ } else {
377+ warn ! ( "Health check failed - last event time: {} seconds ago" , now_secs. saturating_sub( last_event) ) ;
378+ r#"{"success": false}"#
379+ } ;
350380
351- // Spawn a task to forward events from the mpsc channel to the broadcast channel
352- let event_broadcast_sender_clone = event_broadcast_sender . clone ( ) ;
353- let _ = tokio :: spawn ( run_event_forwarder_task (
354- event_receiver ,
355- event_broadcast_sender_clone ,
356- ) ) ;
381+ Ok ( Response :: builder ( )
382+ . status ( StatusCode :: OK )
383+ . header ( "Content-Type" , "application/json" )
384+ . body ( Full :: new ( Bytes :: from ( body ) ) )
385+ . unwrap ( ) )
386+ }
357387
388+ async fn run_health_server (
389+ health_addr : SocketAddr ,
390+ last_event_time : LastEventTime ,
391+ ) -> Result < ( ) , Box < dyn std:: error:: Error + Send + Sync > > {
392+ let listener = tokio:: net:: TcpListener :: bind ( health_addr) . await ?;
393+ info ! ( "Health server listening on: {}" , health_addr) ;
394+
395+ loop {
396+ let ( stream, _) = listener. accept ( ) . await ?;
397+ let io = hyper_util:: rt:: TokioIo :: new ( stream) ;
398+ let last_event_time = last_event_time. clone ( ) ;
399+
400+ tokio:: spawn ( async move {
401+ let service = service_fn ( move |_req : Request < hyper:: body:: Incoming > | {
402+ let last_event_time = last_event_time. clone ( ) ;
403+ async move { health_handler ( last_event_time) . await }
404+ } ) ;
405+
406+ if let Err ( e) = http1:: Builder :: new ( ) . serve_connection ( io, service) . await {
407+ error ! ( "Health server connection error: {}" , e) ;
408+ }
409+ } ) ;
410+ }
411+ }
412+
413+ async fn run_websocket_server (
414+ server_addr : SocketAddr ,
415+ event_broadcast_sender : broadcast:: Sender < EventDataOrMetrics > ,
416+ ) -> Result < ( ) , Box < dyn std:: error:: Error + Send + Sync > > {
358417 // Bind the TCP listener
359418 let listener = TcpListener :: bind ( & server_addr) . await ?;
360419 info ! ( "WebSocket server listening on: {}" , server_addr) ;
@@ -380,3 +439,39 @@ pub async fn run_websocket_server(
380439 }
381440 }
382441}
442+
443+ pub async fn run_servers (
444+ server_addr : SocketAddr ,
445+ health_server_addr : SocketAddr ,
446+ event_receiver : tokio:: sync:: mpsc:: Receiver < EventData > ,
447+ ) -> Result < ( ) , Box < dyn std:: error:: Error > > {
448+ // Create shared state for tracking last event time (for health checks)
449+ let last_event_time: LastEventTime = Arc :: new ( AtomicU64 :: new ( 0 ) ) ;
450+
451+ // Create a broadcast channel for distributing events to all clients
452+ let ( event_broadcast_sender, _) = broadcast:: channel :: < EventDataOrMetrics > ( 1_000_000 ) ;
453+
454+ // Spawn a task to forward events from the mpsc channel to the broadcast channel
455+ let event_broadcast_sender_clone = event_broadcast_sender. clone ( ) ;
456+ let last_event_time_clone = last_event_time. clone ( ) ;
457+ tokio:: spawn ( run_event_forwarder_task (
458+ event_receiver,
459+ event_broadcast_sender_clone,
460+ last_event_time_clone,
461+ ) ) ;
462+
463+ // Spawn both servers and wait for either to complete
464+ let websocket_task = tokio:: spawn ( run_websocket_server ( server_addr, event_broadcast_sender) ) ;
465+ let health_task = tokio:: spawn ( run_health_server ( health_server_addr, last_event_time) ) ;
466+
467+ tokio:: select! {
468+ result = websocket_task => {
469+ error!( "WebSocket server task stopped: {:?}" , result) ;
470+ }
471+ result = health_task => {
472+ error!( "Health server task stopped: {:?}" , result) ;
473+ }
474+ }
475+
476+ Ok ( ( ) )
477+ }
0 commit comments