Skip to content

Commit ac1d691

Browse files
Add probabilistic account and storage access tracker (#5)
* Add probabilistic account and storage access tracker * Add verbose flag for tracking accesses on client * make access entry fields public * track dup qc and proposals, make event fields public * Fix wrong variable capitalization --------- Co-authored-by: Vukašin Manojlović <vmanojlovic@monad.foundation>
1 parent c9e1154 commit ac1d691

File tree

5 files changed

+250
-60
lines changed

5 files changed

+250
-60
lines changed

backend/src/bin/client.rs

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use clap::Parser;
2-
use execution_events_example::event_filter::ClientMessage;
32
use execution_events_example::event_listener::EventName;
4-
use execution_events_example::serializable_event::SerializableEventData;
3+
use execution_events_example::{event_filter::ClientMessage, server::ServerMessage};
54
use futures_util::{SinkExt, StreamExt};
5+
use std::collections::HashSet;
66
use tokio_tungstenite::{connect_async, tungstenite::Message};
77

88
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
@@ -21,8 +21,11 @@ struct Cli {
2121
#[arg(short, long, value_delimiter = ',')]
2222
events: Option<Vec<String>>,
2323

24-
#[arg(short, long, default_value = "false")]
25-
dump_events: bool,
24+
#[arg(long, default_value = "false")]
25+
verbose_events: bool,
26+
27+
#[arg(long, default_value = "false")]
28+
verbose_accesses: bool,
2629
}
2730

2831
#[tokio::main]
@@ -75,6 +78,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
7578
// Read messages from the server
7679
let mut events_per_sec_interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
7780
let mut events_witnessed = 0;
81+
let mut seen_block_starts: HashSet<u64> = HashSet::new();
82+
let mut seen_block_qcs: HashSet<u64> = HashSet::new();
7883
loop {
7984
tokio::select! {
8085
msg = read.next() => {
@@ -85,14 +90,43 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
8590
let msg = msg.unwrap();
8691
match msg {
8792
Ok(Message::Text(text)) => {
88-
match serde_json::from_str::<Vec<SerializableEventData>>(&text) {
89-
Ok(events) => {
93+
match serde_json::from_str::<ServerMessage>(&text) {
94+
Ok(ServerMessage::Events(events)) => {
95+
// Check for duplicate BlockStart events
96+
for event in &events {
97+
if event.event_name == EventName::BlockStart {
98+
if let Some(block_number) = event.block_number {
99+
if !seen_block_starts.insert(block_number) {
100+
warn!("Duplicate BlockStart event for block {}", block_number);
101+
}
102+
}
103+
}
104+
if event.event_name == EventName::BlockQC {
105+
if let Some(block_number) = event.block_number {
106+
if !seen_block_qcs.insert(block_number) {
107+
warn!("Duplicate BlockQC event for block {}", block_number);
108+
}
109+
}
110+
}
111+
}
112+
90113
info!("Received {} events", events.len());
91-
if cli.dump_events {
114+
if cli.verbose_events {
92115
info!("Events: {:?}", events);
93116
}
94117
events_witnessed += events.len();
95118
}
119+
Ok(ServerMessage::TopAccesses(top_accesses)) => {
120+
info!("Received top accesses");
121+
if cli.verbose_accesses {
122+
for entry in &top_accesses.storage {
123+
info!("Storage access: address={}, key={}, count={}", entry.key.0, entry.key.1, entry.count);
124+
}
125+
for entry in &top_accesses.account {
126+
info!("Account access: address={}, count={}", entry.key, entry.count);
127+
}
128+
}
129+
}
96130
Err(_) => {
97131
error!("Failed to parse events: {}", text);
98132
}

backend/src/lib.rs

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,18 @@
1-
// Re-export modules from lib/ subdirectory
21
pub mod timestamp {
3-
pub use super::lib::timestamp::*;
2+
include!("lib/timestamp.rs");
43
}
54
pub mod event_filter {
6-
pub use super::lib::event_filter::*;
5+
include!("lib/event_filter.rs");
76
}
87
pub mod event_listener {
9-
pub use super::lib::event_listener::*;
8+
include!("lib/event_listener.rs");
109
}
1110
pub mod serializable_event {
12-
pub use super::lib::serializable_event::*;
11+
include!("lib/serializable_event.rs");
1312
}
1413
pub mod server {
15-
pub use super::lib::server::*;
14+
include!("lib/server.rs");
1615
}
17-
18-
// Internal module containing implementations
19-
mod lib {
20-
pub mod event_filter;
21-
pub mod event_listener;
22-
pub mod serializable_event;
23-
pub mod server;
24-
pub mod timestamp;
16+
pub mod top_k_tracker {
17+
include!("lib/top_k_tracker.rs");
2518
}

backend/src/lib/serializable_event.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -279,14 +279,14 @@ impl From<&ExecEvent> for SerializableExecEvent {
279279
/// Serializable version of EventData with converted payload
280280
#[derive(Serialize, Deserialize, Debug, Clone)]
281281
pub struct SerializableEventData {
282-
event_name: EventName,
282+
pub event_name: EventName,
283283
#[serde(skip_serializing_if = "Option::is_none")]
284-
block_number: Option<u64>,
284+
pub block_number: Option<u64>,
285285
#[serde(skip_serializing_if = "Option::is_none")]
286-
txn_idx: Option<usize>,
287-
payload: SerializableExecEvent,
288-
seqno: u64,
289-
timestamp_ns: NanoTimestamp,
286+
pub txn_idx: Option<usize>,
287+
pub payload: SerializableExecEvent,
288+
pub seqno: u64,
289+
pub timestamp_ns: NanoTimestamp,
290290
}
291291

292292
impl From<&EventData> for SerializableEventData {

backend/src/lib/server.rs

Lines changed: 120 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,40 @@
11
use std::net::SocketAddr;
22

3+
use alloy_primitives::{Address, B256};
34
use futures_util::stream::SplitSink;
45
use futures_util::{stream::SplitStream, SinkExt, StreamExt};
6+
use monad_exec_events::ExecEvent;
57
use tokio::net::{TcpListener, TcpStream};
68
use tokio::sync::broadcast;
79
use tokio_tungstenite::{accept_async, tungstenite::Message, WebSocketStream};
810
use tracing::{error, info, warn};
11+
use serde::{Deserialize, Serialize};
12+
13+
use crate::event_listener::EventName;
14+
use crate::top_k_tracker::{AccessEntry, TopKTracker};
915

1016
use super::event_filter::{ClientMessage, EventFilter};
1117
use super::event_listener::EventData;
1218
use super::serializable_event::SerializableEventData;
1319

20+
#[derive(Debug, Clone, Serialize, Deserialize)]
21+
pub struct TopAccessesData {
22+
pub account: Vec<AccessEntry<Address>>,
23+
pub storage: Vec<AccessEntry<(Address, B256)>>,
24+
}
25+
26+
#[derive(Debug, Clone)]
27+
pub enum EventDataOrAccesses {
28+
Event(EventData),
29+
TopAccesses(TopAccessesData),
30+
}
31+
32+
#[derive(Debug, Clone, Serialize, Deserialize)]
33+
pub enum ServerMessage {
34+
Events(Vec<SerializableEventData>),
35+
TopAccesses(TopAccessesData),
36+
}
37+
1438
/// Wait for the client to send a subscription message, with a timeout.
1539
/// Returns the filter, or None if the client disconnects or times out.
1640
async fn wait_for_subscription(
@@ -58,49 +82,65 @@ async fn wait_for_subscription(
5882
}
5983

6084
async fn client_write_task(
61-
mut event_broadcast_receiver: broadcast::Receiver<EventData>,
85+
mut event_broadcast_receiver: broadcast::Receiver<EventDataOrAccesses>,
6286
filter: EventFilter,
6387
addr: SocketAddr,
6488
mut ws_sender: SplitSink<WebSocketStream<TcpStream>, Message>,
6589
) {
66-
let mut buf: Vec<SerializableEventData> = Vec::new();
90+
let mut events_buf: Vec<SerializableEventData> = Vec::new();
91+
let mut accesses_buf: Vec<TopAccessesData> = Vec::new();
6792

6893
loop {
6994
let result = event_broadcast_receiver.recv().await;
70-
match result {
71-
Ok(event_data) => {
95+
if result.is_err() {
96+
error!("Broadcast channel error for {}: {}", addr, result.err().unwrap());
97+
break;
98+
}
99+
let event = result.unwrap();
100+
match event {
101+
EventDataOrAccesses::Event(event_data) => {
72102
if filter.matches(&event_data.event_name) {
73-
buf.push(SerializableEventData::from(&event_data));
103+
events_buf.push(SerializableEventData::from(&event_data));
74104
}
75105
}
76-
Err(e) => {
77-
error!("Broadcast channel error for {}: {}", addr, e);
78-
break;
106+
EventDataOrAccesses::TopAccesses(top_accesses_data) => {
107+
accesses_buf.push(top_accesses_data);
79108
}
80109
}
81-
while let Ok(event_data) = event_broadcast_receiver.try_recv() {
82-
if filter.matches(&event_data.event_name) {
83-
buf.push(SerializableEventData::from(&event_data));
110+
while let Ok(event) = event_broadcast_receiver.try_recv() {
111+
match event {
112+
EventDataOrAccesses::Event(event_data) => {
113+
if filter.matches(&event_data.event_name) {
114+
events_buf.push(SerializableEventData::from(&event_data));
115+
}
116+
}
117+
EventDataOrAccesses::TopAccesses(top_accesses_data) => {
118+
accesses_buf.push(top_accesses_data);
119+
}
84120
}
85121
}
86122

87-
if !buf.is_empty() {
123+
if !events_buf.is_empty() {
124+
let server_msg = ServerMessage::Events(std::mem::take(&mut events_buf));
88125
// Serialize batch to JSON
89-
let json_message = match serde_json::to_string(&std::mem::take(&mut buf)) {
90-
Ok(json) => json,
91-
Err(e) => {
92-
error!("Failed to serialize batch for {}: {}", addr, e);
93-
buf.clear();
94-
continue;
95-
}
96-
};
126+
let json_message = serde_json::to_string(&server_msg).unwrap();
97127

98128
// Send batch
99129
if let Err(e) = ws_sender.send(Message::Text(json_message)).await {
100130
error!("Failed to send batch to {}: {}", addr, e);
101131
break;
102132
}
103133
}
134+
if !accesses_buf.is_empty() {
135+
for accesses in std::mem::take(&mut accesses_buf) {
136+
let server_msg = ServerMessage::TopAccesses(accesses);
137+
let json_message = serde_json::to_string(&server_msg).unwrap();
138+
if let Err(e) = ws_sender.send(Message::Text(json_message)).await {
139+
error!("Failed to send batch to {}: {}", addr, e);
140+
break;
141+
}
142+
}
143+
}
104144
}
105145
}
106146

@@ -131,10 +171,63 @@ async fn client_read_task(
131171
}
132172
}
133173

174+
async fn run_event_forwarder_task(
175+
mut event_receiver: tokio::sync::mpsc::Receiver<EventData>,
176+
event_broadcast_sender: broadcast::Sender<EventDataOrAccesses>,
177+
) {
178+
let mut account_accesses = TopKTracker::new(10_000);
179+
let mut storage_accesses = TopKTracker::new(10_000);
180+
let mut stats_interval = tokio::time::interval(std::time::Duration::from_secs(5));
181+
182+
loop {
183+
tokio::select! {
184+
event_data = event_receiver.recv() => {
185+
if event_data.is_none() {
186+
warn!("Event receiver closed");
187+
return;
188+
}
189+
let event_data = event_data.unwrap();
190+
191+
if let EventName::AccountAccess = event_data.event_name {
192+
if let ExecEvent::AccountAccess {
193+
account_access,
194+
..
195+
} = event_data.payload {
196+
let address = Address::from_slice(&account_access.address.bytes);
197+
account_accesses.record(address);
198+
} else {
199+
unreachable!();
200+
}
201+
} else if let EventName::StorageAccess = event_data.event_name {
202+
if let ExecEvent::StorageAccess {
203+
storage_access,
204+
..
205+
} = event_data.payload {
206+
let address = Address::from_slice(&storage_access.address.bytes);
207+
let key = B256::from_slice(&storage_access.key.bytes);
208+
storage_accesses.record((address, key));
209+
} else {
210+
unreachable!();
211+
}
212+
}
213+
214+
let _ = event_broadcast_sender.send(EventDataOrAccesses::Event(event_data));
215+
},
216+
_ = stats_interval.tick() => {
217+
let top_accesses_data = TopAccessesData {
218+
account: account_accesses.top_k(10),
219+
storage: storage_accesses.top_k(10),
220+
};
221+
let _ = event_broadcast_sender.send(EventDataOrAccesses::TopAccesses(top_accesses_data));
222+
}
223+
}
224+
}
225+
}
226+
134227
async fn handle_connection(
135228
stream: TcpStream,
136229
addr: SocketAddr,
137-
event_broadcast_receiver: broadcast::Receiver<EventData>,
230+
event_broadcast_receiver: broadcast::Receiver<EventDataOrAccesses>,
138231
) {
139232
info!("New WebSocket connection from: {}", addr);
140233

@@ -186,23 +279,17 @@ async fn handle_connection(
186279

187280
pub async fn run_websocket_server(
188281
addr: SocketAddr,
189-
mut event_receiver: tokio::sync::mpsc::Receiver<EventData>,
282+
event_receiver: tokio::sync::mpsc::Receiver<EventData>,
190283
) -> Result<(), Box<dyn std::error::Error>> {
191284
// Create a broadcast channel for distributing events to all clients
192-
let (event_broadcast_sender, _) = broadcast::channel::<EventData>(1_000_000);
285+
let (event_broadcast_sender, _) = broadcast::channel::<EventDataOrAccesses>(1_000_000);
193286

194287
// Spawn a task to forward events from the mpsc channel to the broadcast channel
195288
let event_broadcast_sender_clone = event_broadcast_sender.clone();
196-
let broadcast_task = tokio::spawn(async move {
197-
loop {
198-
let event_data = event_receiver.recv().await;
199-
if event_data.is_none() {
200-
warn!("Event receiver closed");
201-
return;
202-
}
203-
let _ = event_broadcast_sender_clone.send(event_data.unwrap());
204-
}
205-
});
289+
let broadcast_task = tokio::spawn(run_event_forwarder_task(
290+
event_receiver,
291+
event_broadcast_sender_clone,
292+
));
206293

207294
// Bind the TCP listener
208295
let listener = TcpListener::bind(&addr).await?;

0 commit comments

Comments
 (0)