Skip to content

Commit c9e1154

Browse files
Add server-side event filtering by event name (#4)
* Add server-side event filtering by event name * Fix u128 parsing for some events * add serializable timestamp, strongly typed event names * clean up channel names, ws client management --------- Co-authored-by: 0xflashboy <jake@monad.foundation>
1 parent 73b84ad commit c9e1154

File tree

9 files changed

+2935
-155
lines changed

9 files changed

+2935
-155
lines changed

Cargo.lock

Lines changed: 2529 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/src/bin/backend.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
88
use execution_events_example::event_listener;
99
use execution_events_example::event_listener::EventData;
1010
use execution_events_example::server;
11+
use tracing::warn;
1112

1213
#[derive(Debug, Parser)]
1314
#[command(name = "eventwatch", about, long_about = None)]
@@ -41,16 +42,23 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4142

4243
// Create a channel for communication between event listener thread and server
4344
// Use large buffer to handle bursts
44-
let (tx, rx) = mpsc::channel::<EventData>(100_000);
45+
let (event_sender, event_receiver) = mpsc::channel::<EventData>(100_000);
4546

4647
// Spawn the event listener thread
47-
let _listener_handle = event_listener::start_event_listener(event_ring_path, tx);
48+
let listener_handle = event_listener::run_event_listener(event_ring_path, event_sender);
4849

4950
// Parse server address
5051
let addr: SocketAddr = server_addr.parse()?;
5152

52-
// Run the WebSocket server
53-
server::run_websocket_server(addr, rx).await?;
53+
// Run both tasks and exit when either completes
54+
tokio::select! {
55+
result = server::run_websocket_server(addr, event_receiver) => {
56+
warn!("WebSocket server stopped: {:?}", result);
57+
}
58+
_ = tokio::task::spawn_blocking(move || listener_handle.join()) => {
59+
warn!("Event listener thread stopped");
60+
}
61+
}
5462

5563
Ok(())
5664
}

backend/src/bin/client.rs

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use clap::Parser;
2+
use execution_events_example::event_filter::ClientMessage;
3+
use execution_events_example::event_listener::EventName;
24
use execution_events_example::serializable_event::SerializableEventData;
3-
use futures_util::StreamExt;
5+
use futures_util::{SinkExt, StreamExt};
46
use tokio_tungstenite::{connect_async, tungstenite::Message};
57

68
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
@@ -14,6 +16,11 @@ struct Cli {
1416
#[arg(short, long, default_value = "ws://127.0.0.1:3000")]
1517
url: String,
1618

19+
/// Filter events by type (comma-separated).
20+
/// If not specified, all events are received.
21+
#[arg(short, long, value_delimiter = ',')]
22+
events: Option<Vec<String>>,
23+
1724
#[arg(short, long, default_value = "false")]
1825
dump_events: bool,
1926
}
@@ -36,7 +43,34 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
3643
let (ws_stream, _) = connect_async(&cli.url).await?;
3744
info!("Connected!");
3845

39-
let (_, mut read) = ws_stream.split();
46+
let (mut write, mut read) = ws_stream.split();
47+
48+
// Parse event names from strings to EventName enum
49+
let event_strings = cli.events.clone().unwrap_or_default();
50+
let events: Vec<EventName> = if event_strings.is_empty() {
51+
Vec::new()
52+
} else {
53+
event_strings
54+
.iter()
55+
.map(|s| {
56+
serde_json::from_value(serde_json::Value::String(s.clone()))
57+
.map_err(|_| format!("Invalid event name: {}", s))
58+
})
59+
.collect::<Result<Vec<_>, _>>()?
60+
};
61+
62+
// Send subscription message
63+
let subscribe_msg = ClientMessage::Subscribe {
64+
events: events.clone(),
65+
};
66+
let subscribe_json = serde_json::to_string(&subscribe_msg)?;
67+
write.send(Message::Text(subscribe_json)).await?;
68+
69+
if events.is_empty() {
70+
info!("Subscribed to all events");
71+
} else {
72+
info!("Subscribed to events: {:?}", events);
73+
}
4074

4175
// Read messages from the server
4276
let mut events_per_sec_interval = tokio::time::interval(tokio::time::Duration::from_secs(1));

backend/src/lib.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
11
// Re-export modules from lib/ subdirectory
2+
pub mod timestamp {
3+
pub use super::lib::timestamp::*;
4+
}
5+
pub mod event_filter {
6+
pub use super::lib::event_filter::*;
7+
}
28
pub mod event_listener {
39
pub use super::lib::event_listener::*;
410
}
@@ -11,7 +17,9 @@ pub mod server {
1117

1218
// Internal module containing implementations
1319
mod lib {
20+
pub mod event_filter;
1421
pub mod event_listener;
1522
pub mod serializable_event;
1623
pub mod server;
24+
pub mod timestamp;
1725
}

backend/src/lib/event_filter.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
use serde::{Deserialize, Serialize};
2+
use std::collections::HashSet;
3+
4+
use super::event_listener::EventName;
5+
6+
/// Message sent by client to subscribe to specific event types
7+
#[derive(Debug, Clone, Serialize, Deserialize)]
8+
#[serde(tag = "type")]
9+
pub enum ClientMessage {
10+
/// Subscribe to specific event types. Empty list means subscribe to all events.
11+
/// If any event name fails to deserialize, the connection will be rejected.
12+
#[serde(rename = "subscribe")]
13+
Subscribe { events: Vec<EventName> },
14+
}
15+
16+
/// Filter for event types
17+
#[derive(Clone, Debug, Default)]
18+
pub struct EventFilter {
19+
/// Set of event types to include. If empty, all events pass through.
20+
event_types: HashSet<EventName>,
21+
}
22+
23+
impl EventFilter {
24+
/// Create a filter from a list of event names
25+
pub fn from_event_names(events: Vec<EventName>) -> Self {
26+
Self {
27+
event_types: events.into_iter().collect(),
28+
}
29+
}
30+
31+
/// Check if an event name matches the filter using direct enum comparison
32+
/// Returns true if the filter is empty (accept all) or if the event name is in the filter
33+
pub fn matches(&self, event_name: &EventName) -> bool {
34+
self.event_types.is_empty() || self.event_types.contains(event_name)
35+
}
36+
37+
/// Check if filter accepts all events
38+
pub fn accepts_all(&self) -> bool {
39+
self.event_types.is_empty()
40+
}
41+
}

backend/src/lib/event_listener.rs

Lines changed: 111 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
use std::{ffi::CStr, time::Duration};
2-
3-
use chrono::{DateTime, Local, TimeZone};
1+
use chrono::{DateTime, Local};
42
use lazy_static::lazy_static;
53
use monad_event_ring::{
64
DecodedEventRing, EventDescriptor, EventDescriptorInfo, EventNextResult, EventPayloadResult,
@@ -11,8 +9,13 @@ use monad_exec_events::{
119
ffi::{g_monad_exec_event_metadata, MONAD_EXEC_EVENT_COUNT},
1210
ExecEventDecoder, ExecEventDescriptorExt, ExecEventRing, ExecSnapshotEventRing,
1311
};
12+
use serde::{Deserialize, Serialize};
13+
use std::{ffi::CStr, time::Duration};
1414
use tracing::{debug, error, info, warn};
1515

16+
use super::timestamp::get_unix_time_ns;
17+
use super::timestamp::NanoTimestamp;
18+
1619
lazy_static! {
1720
static ref EXEC_EVENT_NAMES: [&'static str; MONAD_EXEC_EVENT_COUNT] =
1821
std::array::from_fn(|event_type| unsafe {
@@ -22,11 +25,105 @@ lazy_static! {
2225
});
2326
}
2427

28+
/// Type-safe enum for event names based on ExecEvent variants
29+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
30+
#[serde(rename_all = "PascalCase")]
31+
pub enum EventName {
32+
RecordError,
33+
BlockStart,
34+
BlockReject,
35+
BlockPerfEvmEnter,
36+
BlockPerfEvmExit,
37+
BlockEnd,
38+
BlockQC,
39+
BlockFinalized,
40+
BlockVerified,
41+
TxnHeaderStart,
42+
TxnAccessListEntry,
43+
TxnAuthListEntry,
44+
TxnHeaderEnd,
45+
TxnReject,
46+
TxnPerfEvmEnter,
47+
TxnPerfEvmExit,
48+
TxnEvmOutput,
49+
TxnLog,
50+
TxnCallFrame,
51+
TxnEnd,
52+
AccountAccessListHeader,
53+
AccountAccess,
54+
StorageAccess,
55+
EvmError,
56+
}
57+
58+
impl EventName {
59+
/// Convert EventName to string representation
60+
pub fn as_str(&self) -> &'static str {
61+
match self {
62+
EventName::RecordError => "RecordError",
63+
EventName::BlockStart => "BlockStart",
64+
EventName::BlockReject => "BlockReject",
65+
EventName::BlockPerfEvmEnter => "BlockPerfEvmEnter",
66+
EventName::BlockPerfEvmExit => "BlockPerfEvmExit",
67+
EventName::BlockEnd => "BlockEnd",
68+
EventName::BlockQC => "BlockQC",
69+
EventName::BlockFinalized => "BlockFinalized",
70+
EventName::BlockVerified => "BlockVerified",
71+
EventName::TxnHeaderStart => "TxnHeaderStart",
72+
EventName::TxnAccessListEntry => "TxnAccessListEntry",
73+
EventName::TxnAuthListEntry => "TxnAuthListEntry",
74+
EventName::TxnHeaderEnd => "TxnHeaderEnd",
75+
EventName::TxnReject => "TxnReject",
76+
EventName::TxnPerfEvmEnter => "TxnPerfEvmEnter",
77+
EventName::TxnPerfEvmExit => "TxnPerfEvmExit",
78+
EventName::TxnEvmOutput => "TxnEvmOutput",
79+
EventName::TxnLog => "TxnLog",
80+
EventName::TxnCallFrame => "TxnCallFrame",
81+
EventName::TxnEnd => "TxnEnd",
82+
EventName::AccountAccessListHeader => "AccountAccessListHeader",
83+
EventName::AccountAccess => "AccountAccess",
84+
EventName::StorageAccess => "StorageAccess",
85+
EventName::EvmError => "EvmError",
86+
}
87+
}
88+
89+
pub fn from_str(s: &str) -> Option<Self> {
90+
match s {
91+
"RECORD_ERROR" => Some(EventName::RecordError),
92+
"BLOCK_START" => Some(EventName::BlockStart),
93+
"BLOCK_REJECT" => Some(EventName::BlockReject),
94+
"BLOCK_PERF_EVM_ENTER" => Some(EventName::BlockPerfEvmEnter),
95+
"BLOCK_PERF_EVM_EXIT" => Some(EventName::BlockPerfEvmExit),
96+
"BLOCK_END" => Some(EventName::BlockEnd),
97+
"BLOCK_QC" => Some(EventName::BlockQC),
98+
"BLOCK_FINALIZED" => Some(EventName::BlockFinalized),
99+
"BLOCK_VERIFIED" => Some(EventName::BlockVerified),
100+
"TXN_HEADER_START" => Some(EventName::TxnHeaderStart),
101+
"TXN_ACCESS_LIST_ENTRY" => Some(EventName::TxnAccessListEntry),
102+
"TXN_AUTH_LIST_ENTRY" => Some(EventName::TxnAuthListEntry),
103+
"TXN_HEADER_END" => Some(EventName::TxnHeaderEnd),
104+
"TXN_REJECT" => Some(EventName::TxnReject),
105+
"TXN_PERF_EVM_ENTER" => Some(EventName::TxnPerfEvmEnter),
106+
"TXN_PERF_EVM_EXIT" => Some(EventName::TxnPerfEvmExit),
107+
"TXN_EVM_OUTPUT" => Some(EventName::TxnEvmOutput),
108+
"TXN_LOG" => Some(EventName::TxnLog),
109+
"TXN_CALL_FRAME" => Some(EventName::TxnCallFrame),
110+
"TXN_END" => Some(EventName::TxnEnd),
111+
"ACCOUNT_ACCESS_LIST_HEADER" => Some(EventName::AccountAccessListHeader),
112+
"ACCOUNT_ACCESS" => Some(EventName::AccountAccess),
113+
"STORAGE_ACCESS" => Some(EventName::StorageAccess),
114+
"EVM_ERROR" => Some(EventName::EvmError),
115+
_ => {
116+
warn!("Unknown event name: {}", s);
117+
None
118+
}
119+
}
120+
}
121+
}
122+
25123
#[derive(Debug, Clone)]
26124
pub struct EventData {
27-
pub timestamp: String,
28-
pub event_name: String,
29-
pub event_type: u16,
125+
pub timestamp_ns: NanoTimestamp,
126+
pub event_name: EventName,
30127
pub seqno: u64,
31128
pub block_number: Option<u64>,
32129
pub txn_idx: Option<usize>,
@@ -55,16 +152,14 @@ fn event_to_data(event: &EventDescriptor<ExecEventDecoder>) -> Option<EventData>
55152
let EventDescriptorInfo {
56153
seqno,
57154
event_type,
58-
record_epoch_nanos,
155+
record_epoch_nanos: _,
59156
flow_info,
60157
} = event.info();
61158

62-
let timestamp = Local
63-
.timestamp_nanos(record_epoch_nanos as i64)
64-
.format("%H:%M:%S.%9f")
65-
.to_string();
159+
let timestamp_ns = get_unix_time_ns();
66160

67-
let event_name = EXEC_EVENT_NAMES[event_type as usize].to_string();
161+
// Convert event_type to EventName enum for type safety
162+
let event_name = EventName::from_str(EXEC_EVENT_NAMES[event_type as usize])?;
68163

69164
// Get block number if present
70165
let block_number = if flow_info.block_seqno != 0 {
@@ -86,19 +181,18 @@ fn event_to_data(event: &EventDescriptor<ExecEventDecoder>) -> Option<EventData>
86181
};
87182

88183
Some(EventData {
89-
timestamp,
184+
timestamp_ns,
90185
event_name,
91-
event_type,
92186
seqno,
93187
block_number,
94188
txn_idx,
95189
payload,
96190
})
97191
}
98192

99-
pub fn start_event_listener(
193+
pub fn run_event_listener(
100194
event_ring_path: EventRingPath,
101-
tx: tokio::sync::mpsc::Sender<EventData>,
195+
event_sender: tokio::sync::mpsc::Sender<EventData>,
102196
) -> std::thread::JoinHandle<()> {
103197
std::thread::spawn(move || {
104198
info!("Starting event listener thread");
@@ -196,7 +290,7 @@ pub fn start_event_listener(
196290
if let Some(event_data) = event_to_data(&event) {
197291
// Send to channel; if receiver is dropped, exit thread
198292
// Use blocking_send since we're in a blocking thread
199-
if tx.blocking_send(event_data).is_err() {
293+
if event_sender.blocking_send(event_data).is_err() {
200294
warn!("Channel receiver dropped, exiting listener thread");
201295
return;
202296
}

0 commit comments

Comments
 (0)