Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 41 additions & 7 deletions backend/src/bin/client.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use clap::Parser;
use execution_events_example::event_filter::ClientMessage;
use execution_events_example::event_listener::EventName;
use execution_events_example::serializable_event::SerializableEventData;
use execution_events_example::{event_filter::ClientMessage, server::ServerMessage};
use futures_util::{SinkExt, StreamExt};
use std::collections::HashSet;
use tokio_tungstenite::{connect_async, tungstenite::Message};

use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
Expand All @@ -21,8 +21,11 @@ struct Cli {
#[arg(short, long, value_delimiter = ',')]
events: Option<Vec<String>>,

#[arg(short, long, default_value = "false")]
dump_events: bool,
#[arg(long, default_value = "false")]
verbose_events: bool,

#[arg(long, default_value = "false")]
verbose_accesses: bool,
}

#[tokio::main]
Expand Down Expand Up @@ -75,6 +78,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Read messages from the server
let mut events_per_sec_interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
let mut events_witnessed = 0;
let mut seen_block_starts: HashSet<u64> = HashSet::new();
let mut seen_block_qcs: HashSet<u64> = HashSet::new();
loop {
tokio::select! {
msg = read.next() => {
Expand All @@ -85,14 +90,43 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let msg = msg.unwrap();
match msg {
Ok(Message::Text(text)) => {
match serde_json::from_str::<Vec<SerializableEventData>>(&text) {
Ok(events) => {
match serde_json::from_str::<ServerMessage>(&text) {
Ok(ServerMessage::Events(events)) => {
// Check for duplicate BlockStart events
for event in &events {
if event.event_name == EventName::BlockStart {
if let Some(block_number) = event.block_number {
if !seen_block_starts.insert(block_number) {
warn!("Duplicate BlockStart event for block {}", block_number);
}
}
}
if event.event_name == EventName::BlockQC {
if let Some(block_number) = event.block_number {
if !seen_block_qcs.insert(block_number) {
warn!("Duplicate BlockQC event for block {}", block_number);
}
}
}
}

info!("Received {} events", events.len());
if cli.dump_events {
if cli.verbose_events {
info!("Events: {:?}", events);
}
events_witnessed += events.len();
}
Ok(ServerMessage::TopAccesses(top_accesses)) => {
info!("Received top accesses");
if cli.verbose_accesses {
for entry in &top_accesses.storage {
info!("Storage access: address={}, key={}, count={}", entry.key.0, entry.key.1, entry.count);
}
for entry in &top_accesses.account {
info!("Account access: address={}, count={}", entry.key, entry.count);
}
}
}
Err(_) => {
error!("Failed to parse events: {}", text);
}
Expand Down
21 changes: 7 additions & 14 deletions backend/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,18 @@
// Re-export modules from lib/ subdirectory
pub mod timestamp {
pub use super::lib::timestamp::*;
include!("lib/timestamp.rs");
}
pub mod event_filter {
pub use super::lib::event_filter::*;
include!("lib/event_filter.rs");
}
pub mod event_listener {
pub use super::lib::event_listener::*;
include!("lib/event_listener.rs");
}
pub mod serializable_event {
pub use super::lib::serializable_event::*;
include!("lib/serializable_event.rs");
}
pub mod server {
pub use super::lib::server::*;
include!("lib/server.rs");
}

// Internal module containing implementations
mod lib {
pub mod event_filter;
pub mod event_listener;
pub mod serializable_event;
pub mod server;
pub mod timestamp;
pub mod top_k_tracker {
include!("lib/top_k_tracker.rs");
}
12 changes: 6 additions & 6 deletions backend/src/lib/serializable_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,14 +279,14 @@ impl From<&ExecEvent> for SerializableExecEvent {
/// Serializable version of EventData with converted payload
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct SerializableEventData {
event_name: EventName,
pub event_name: EventName,
#[serde(skip_serializing_if = "Option::is_none")]
block_number: Option<u64>,
pub block_number: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
txn_idx: Option<usize>,
payload: SerializableExecEvent,
seqno: u64,
timestamp_ns: NanoTimestamp,
pub txn_idx: Option<usize>,
pub payload: SerializableExecEvent,
pub seqno: u64,
pub timestamp_ns: NanoTimestamp,
}

impl From<&EventData> for SerializableEventData {
Expand Down
153 changes: 120 additions & 33 deletions backend/src/lib/server.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,40 @@
use std::net::SocketAddr;

use alloy_primitives::{Address, B256};
use futures_util::stream::SplitSink;
use futures_util::{stream::SplitStream, SinkExt, StreamExt};
use monad_exec_events::ExecEvent;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::broadcast;
use tokio_tungstenite::{accept_async, tungstenite::Message, WebSocketStream};
use tracing::{error, info, warn};
use serde::{Deserialize, Serialize};

use crate::event_listener::EventName;
use crate::top_k_tracker::{AccessEntry, TopKTracker};

use super::event_filter::{ClientMessage, EventFilter};
use super::event_listener::EventData;
use super::serializable_event::SerializableEventData;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TopAccessesData {
pub account: Vec<AccessEntry<Address>>,
pub storage: Vec<AccessEntry<(Address, B256)>>,
}

#[derive(Debug, Clone)]
pub enum EventDataOrAccesses {
Event(EventData),
TopAccesses(TopAccessesData),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ServerMessage {
Events(Vec<SerializableEventData>),
TopAccesses(TopAccessesData),
}

/// Wait for the client to send a subscription message, with a timeout.
/// Returns the filter, or None if the client disconnects or times out.
async fn wait_for_subscription(
Expand Down Expand Up @@ -58,49 +82,65 @@ async fn wait_for_subscription(
}

async fn client_write_task(
mut event_broadcast_receiver: broadcast::Receiver<EventData>,
mut event_broadcast_receiver: broadcast::Receiver<EventDataOrAccesses>,
filter: EventFilter,
addr: SocketAddr,
mut ws_sender: SplitSink<WebSocketStream<TcpStream>, Message>,
) {
let mut buf: Vec<SerializableEventData> = Vec::new();
let mut events_buf: Vec<SerializableEventData> = Vec::new();
let mut accesses_buf: Vec<TopAccessesData> = Vec::new();

loop {
let result = event_broadcast_receiver.recv().await;
match result {
Ok(event_data) => {
if result.is_err() {
error!("Broadcast channel error for {}: {}", addr, result.err().unwrap());
break;
}
let event = result.unwrap();
match event {
EventDataOrAccesses::Event(event_data) => {
if filter.matches(&event_data.event_name) {
buf.push(SerializableEventData::from(&event_data));
events_buf.push(SerializableEventData::from(&event_data));
}
}
Err(e) => {
error!("Broadcast channel error for {}: {}", addr, e);
break;
EventDataOrAccesses::TopAccesses(top_accesses_data) => {
accesses_buf.push(top_accesses_data);
}
}
while let Ok(event_data) = event_broadcast_receiver.try_recv() {
if filter.matches(&event_data.event_name) {
buf.push(SerializableEventData::from(&event_data));
while let Ok(event) = event_broadcast_receiver.try_recv() {
match event {
EventDataOrAccesses::Event(event_data) => {
if filter.matches(&event_data.event_name) {
events_buf.push(SerializableEventData::from(&event_data));
}
}
EventDataOrAccesses::TopAccesses(top_accesses_data) => {
accesses_buf.push(top_accesses_data);
}
}
}

if !buf.is_empty() {
if !events_buf.is_empty() {
let server_msg = ServerMessage::Events(std::mem::take(&mut events_buf));
// Serialize batch to JSON
let json_message = match serde_json::to_string(&std::mem::take(&mut buf)) {
Ok(json) => json,
Err(e) => {
error!("Failed to serialize batch for {}: {}", addr, e);
buf.clear();
continue;
}
};
let json_message = serde_json::to_string(&server_msg).unwrap();

// Send batch
if let Err(e) = ws_sender.send(Message::Text(json_message)).await {
error!("Failed to send batch to {}: {}", addr, e);
break;
}
}
if !accesses_buf.is_empty() {
for accesses in std::mem::take(&mut accesses_buf) {
let server_msg = ServerMessage::TopAccesses(accesses);
let json_message = serde_json::to_string(&server_msg).unwrap();
if let Err(e) = ws_sender.send(Message::Text(json_message)).await {
error!("Failed to send batch to {}: {}", addr, e);
break;
}
}
}
}
}

Expand Down Expand Up @@ -131,10 +171,63 @@ async fn client_read_task(
}
}

async fn run_event_forwarder_task(
mut event_receiver: tokio::sync::mpsc::Receiver<EventData>,
event_broadcast_sender: broadcast::Sender<EventDataOrAccesses>,
) {
let mut account_accesses = TopKTracker::new(10_000);
let mut storage_accesses = TopKTracker::new(10_000);
let mut stats_interval = tokio::time::interval(std::time::Duration::from_secs(5));

loop {
tokio::select! {
event_data = event_receiver.recv() => {
if event_data.is_none() {
warn!("Event receiver closed");
return;
}
let event_data = event_data.unwrap();

if let EventName::AccountAccess = event_data.event_name {
if let ExecEvent::AccountAccess {
account_access,
..
} = event_data.payload {
let address = Address::from_slice(&account_access.address.bytes);
account_accesses.record(address);
} else {
unreachable!();
}
} else if let EventName::StorageAccess = event_data.event_name {
if let ExecEvent::StorageAccess {
storage_access,
..
} = event_data.payload {
let address = Address::from_slice(&storage_access.address.bytes);
let key = B256::from_slice(&storage_access.key.bytes);
storage_accesses.record((address, key));
} else {
unreachable!();
}
}

let _ = event_broadcast_sender.send(EventDataOrAccesses::Event(event_data));
},
_ = stats_interval.tick() => {
let top_accesses_data = TopAccessesData {
account: account_accesses.top_k(10),
storage: storage_accesses.top_k(10),
};
let _ = event_broadcast_sender.send(EventDataOrAccesses::TopAccesses(top_accesses_data));
}
}
}
}

async fn handle_connection(
stream: TcpStream,
addr: SocketAddr,
event_broadcast_receiver: broadcast::Receiver<EventData>,
event_broadcast_receiver: broadcast::Receiver<EventDataOrAccesses>,
) {
info!("New WebSocket connection from: {}", addr);

Expand Down Expand Up @@ -186,23 +279,17 @@ async fn handle_connection(

pub async fn run_websocket_server(
addr: SocketAddr,
mut event_receiver: tokio::sync::mpsc::Receiver<EventData>,
event_receiver: tokio::sync::mpsc::Receiver<EventData>,
) -> Result<(), Box<dyn std::error::Error>> {
// Create a broadcast channel for distributing events to all clients
let (event_broadcast_sender, _) = broadcast::channel::<EventData>(1_000_000);
let (event_broadcast_sender, _) = broadcast::channel::<EventDataOrAccesses>(1_000_000);

// Spawn a task to forward events from the mpsc channel to the broadcast channel
let event_broadcast_sender_clone = event_broadcast_sender.clone();
let broadcast_task = tokio::spawn(async move {
loop {
let event_data = event_receiver.recv().await;
if event_data.is_none() {
warn!("Event receiver closed");
return;
}
let _ = event_broadcast_sender_clone.send(event_data.unwrap());
}
});
let broadcast_task = tokio::spawn(run_event_forwarder_task(
event_receiver,
event_broadcast_sender_clone,
));

// Bind the TCP listener
let listener = TcpListener::bind(&addr).await?;
Expand Down
Loading