Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 86 additions & 18 deletions backend/src/bin/client.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use alloy_primitives::B256;
use clap::Parser;
use execution_events_example::event_listener::EventName;
use execution_events_example::serializable_event::SerializableExecEvent;
use execution_events_example::{event_filter::ClientMessage, server::ServerMessage};
use futures_util::{SinkExt, StreamExt};
use std::collections::HashSet;
use std::collections::HashMap;
use tokio_tungstenite::{connect_async, tungstenite::Message};

use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
Expand All @@ -28,6 +30,30 @@ struct Cli {
verbose_accesses: bool,
}

macro_rules! log_event {
// Entry: just message
($msg:expr) => {
tracing::info!("------> {}", $msg)
};
// Entry: message with arbitrary number of key=value pairs
($msg:expr, $($key:ident = $value:expr),+ $(,)?) => {{
let mut s = format!("------> {}", $msg);
$(
s.push_str(&format!(" {}={:?}", stringify!($key), $value));
)*
tracing::info!("{}", s);
}};
}

#[derive(Default)]
struct ClientState {
events_witnessed: usize,
block_start_ns: u64,
txs_start_ns: HashMap<usize, (B256, u64)>,
block_txns_total_duration: std::time::Duration,
current_block_number: u64,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize tracing subscriber
Expand Down Expand Up @@ -77,9 +103,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

// Read messages from the server
let mut events_per_sec_interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
let mut events_witnessed = 0;
let mut seen_block_starts: HashSet<u64> = HashSet::new();
let mut seen_block_qcs: HashSet<u64> = HashSet::new();
let mut client_state = ClientState::default();

loop {
tokio::select! {
msg = read.next() => {
Expand All @@ -92,29 +117,72 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(Message::Text(text)) => {
match serde_json::from_str::<ServerMessage>(&text) {
Ok(ServerMessage::Events(events)) => {
// Check for duplicate BlockStart events
for event in &events {
if event.event_name == EventName::BlockStart {
if let Some(block_number) = event.block_number {
if !seen_block_starts.insert(block_number) {
warn!("Duplicate BlockStart event for block {}", block_number);
}
match event.event_name {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need it? Was it just used for debugging?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly for debugging + wanted to confirm my assumption that receiving duplicate proposals and voted blocks is uncommon, which it is

this client bin is more of an ideating space for the frontend part, not essential to the core functionality

EventName::TxnPerfEvmEnter => {
println!("tx enter");
}
EventName::TxnPerfEvmExit => {
println!("tx exit");
}
_ => ()
}
if event.event_name == EventName::BlockQC {
if let Some(block_number) = event.block_number {
if !seen_block_qcs.insert(block_number) {
warn!("Duplicate BlockQC event for block {}", block_number);
}
match event.payload {
SerializableExecEvent::BlockStart { block_number, base_fee_per_gas, .. } => {
log_event!("BlockStart", block_number = block_number, base_fee = base_fee_per_gas);
client_state.current_block_number = u64::max(client_state.current_block_number, block_number);
}
SerializableExecEvent::BlockPerfEvmEnter => {
log_event!("BlockPerfEvmEnter");
client_state.block_start_ns = event.timestamp_ns;
}
SerializableExecEvent::TxnHeaderStart { txn_hash, txn_index, .. } => {
log_event!("TxnHeaderStart", txn_index = txn_index, txn_hash = txn_hash);
client_state.txs_start_ns.insert(txn_index, (txn_hash, event.timestamp_ns));
},
SerializableExecEvent::TxnEvmOutput { txn_index, .. } => {
if let Some((txn_hash, txn_start_ns)) = client_state.txs_start_ns.remove(&txn_index) {
let txn_duration = std::time::Duration::from_nanos((event.timestamp_ns - txn_start_ns) as u64);
client_state.block_txns_total_duration += txn_duration;

log_event!("TxnEvmOutput", txn_index = txn_index, txn_hash = txn_hash, duration = txn_duration);
} else {
warn!("TxnPerfEvmExit event received without TxnPerfEvmEnter event: {:?}", txn_index);
}
},
SerializableExecEvent::BlockPerfEvmExit => {
log_event!("BlockPerfEvmExit");
let block_duration = std::time::Duration::from_nanos((event.timestamp_ns - client_state.block_start_ns) as u64);
let parallel_execution_savings = client_state.block_txns_total_duration.checked_sub(block_duration);
let savings_pct = if parallel_execution_savings.is_none() { // This only happens with really small/empty blocks
error!("Parallel execution savings is negative: txs={:?} block={:?} height={}", client_state.block_txns_total_duration, block_duration, client_state.current_block_number);
None
} else {
Some(100.0 * (1.0 - (block_duration.as_nanos() as f64 / client_state.block_txns_total_duration.as_nanos() as f64)))
};

log_event!("BlockPerfEvmExit", height = client_state.current_block_number, block_duration = block_duration, tx_total_duration = client_state.block_txns_total_duration, savings_pct = savings_pct);

client_state.block_txns_total_duration = std::time::Duration::from_nanos(0);
},
SerializableExecEvent::BlockEnd { gas_used, .. } => {
log_event!("BlockEnd", gas_used = gas_used, block_number = client_state.current_block_number);
},
SerializableExecEvent::BlockQC { block_number, .. } => {
log_event!("BlockQC", block_number = block_number);
},
SerializableExecEvent::BlockFinalized { block_number, .. } => {
log_event!("BlockFinalized", block_number = block_number);
},
_ => ()
}
}

info!("Received {} events", events.len());
if cli.verbose_events {
info!("Events: {:?}", events);
}
events_witnessed += events.len();
client_state.events_witnessed += events.len();
}
Ok(ServerMessage::TopAccesses(top_accesses)) => {
info!("Received top accesses");
Expand Down Expand Up @@ -147,8 +215,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}
_ = events_per_sec_interval.tick() => {
info!("Events per second: {}", events_witnessed);
events_witnessed = 0;
info!("Events per second: {}", client_state.events_witnessed);
client_state.events_witnessed = 0;
}
}
}
Expand Down
3 changes: 0 additions & 3 deletions backend/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
pub mod timestamp {
include!("lib/timestamp.rs");
}
pub mod event_filter {
include!("lib/event_filter.rs");
}
Expand Down
13 changes: 4 additions & 9 deletions backend/src/lib/event_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ use serde::{Deserialize, Serialize};
use std::{ffi::CStr, time::Duration};
use tracing::{debug, error, info, warn};

use super::timestamp::get_unix_time_ns;
use super::timestamp::NanoTimestamp;

lazy_static! {
static ref EXEC_EVENT_NAMES: [&'static str; MONAD_EXEC_EVENT_COUNT] =
std::array::from_fn(|event_type| unsafe {
Expand Down Expand Up @@ -122,7 +119,7 @@ impl EventName {

#[derive(Debug, Clone)]
pub struct EventData {
pub timestamp_ns: NanoTimestamp,
pub timestamp_ns: u64,
pub event_name: EventName,
pub seqno: u64,
pub block_number: Option<u64>,
Expand Down Expand Up @@ -152,12 +149,10 @@ fn event_to_data(event: &EventDescriptor<ExecEventDecoder>) -> Option<EventData>
let EventDescriptorInfo {
seqno,
event_type,
record_epoch_nanos: _,
record_epoch_nanos,
flow_info,
} = event.info();

let timestamp_ns = get_unix_time_ns();


// Convert event_type to EventName enum for type safety
let event_name = EventName::from_str(EXEC_EVENT_NAMES[event_type as usize])?;

Expand All @@ -181,7 +176,7 @@ fn event_to_data(event: &EventDescriptor<ExecEventDecoder>) -> Option<EventData>
};

Some(EventData {
timestamp_ns,
timestamp_ns: record_epoch_nanos,
event_name,
seqno,
block_number,
Expand Down
42 changes: 34 additions & 8 deletions backend/src/lib/serializable_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ use alloy_primitives::{Address, Bytes, B256, U256};
use monad_exec_events::{ffi::*, ExecEvent};
use serde::{Deserialize, Serialize};

use super::timestamp::NanoTimestamp;

/// Serializable version of ExecEvent using alloy-primitives for type safety
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum SerializableExecEvent {
RecordError {
error_type: u16,
dropped_event_type: u16,
truncated_payload_size: u32,
requested_payload_size: u64,
},
BlockStart {
block_number: u64,
Expand Down Expand Up @@ -41,6 +42,7 @@ pub enum SerializableExecEvent {
round: u64,
},
BlockFinalized {
block_id: B256,
block_number: u64,
},
BlockVerified {
Expand All @@ -50,13 +52,21 @@ pub enum SerializableExecEvent {
txn_index: usize,
txn_hash: B256,
sender: Address,
to: Address,
txn_type: u8,
chain_id: U256,
nonce: u64,
gas_limit: u64,
max_fee_per_gas: U256,
max_priority_fee_per_gas: U256,
value: U256,
data: Bytes,
blob_data: Bytes,
to: Address,
is_contract_creation: bool,
r: U256,
s: U256,
y_parity: bool,
access_list_count: u32,
auth_list_count: u32,
},
TxnAccessListEntry {
txn_index: usize,
Expand All @@ -76,11 +86,13 @@ pub enum SerializableExecEvent {
TxnPerfEvmExit,
TxnEvmOutput {
txn_index: usize,
log_count: u32,
status: bool,
gas_used: u64,
},
TxnLog {
txn_index: usize,
log_index: u32,
address: Address,
topics: Bytes,
data: Bytes,
Expand Down Expand Up @@ -134,6 +146,9 @@ impl From<&ExecEvent> for SerializableExecEvent {
match event {
ExecEvent::RecordError(err) => Self::RecordError {
error_type: err.error_type,
dropped_event_type: err.dropped_event_type,
truncated_payload_size: err.truncated_payload_size,
requested_payload_size: err.requested_payload_size,
},
ExecEvent::BlockStart(block) => Self::BlockStart {
block_number: block.block_tag.block_number,
Expand Down Expand Up @@ -162,6 +177,7 @@ impl From<&ExecEvent> for SerializableExecEvent {
round: qc.round,
},
ExecEvent::BlockFinalized(finalized) => Self::BlockFinalized {
block_id: B256::from_slice(&finalized.id.bytes),
block_number: finalized.block_number,
},
ExecEvent::BlockVerified(verified) => Self::BlockVerified {
Expand All @@ -171,20 +187,28 @@ impl From<&ExecEvent> for SerializableExecEvent {
txn_index,
txn_header_start,
data_bytes,
blob_bytes,
..
} => Self::TxnHeaderStart {
txn_index: *txn_index,
txn_hash: B256::from_slice(&txn_header_start.txn_hash.bytes),
sender: Address::from_slice(&txn_header_start.sender.bytes),
to: Address::from_slice(&txn_header_start.txn_header.to.bytes),
txn_type: txn_header_start.txn_header.txn_type,
chain_id: uint256_from_c(&txn_header_start.txn_header.chain_id),
nonce: txn_header_start.txn_header.nonce,
gas_limit: txn_header_start.txn_header.gas_limit,
max_fee_per_gas: uint256_from_c(&txn_header_start.txn_header.max_fee_per_gas),
max_priority_fee_per_gas: uint256_from_c(
&txn_header_start.txn_header.max_priority_fee_per_gas,
),
value: uint256_from_c(&txn_header_start.txn_header.value),
to: Address::from_slice(&txn_header_start.txn_header.to.bytes),
is_contract_creation: txn_header_start.txn_header.is_contract_creation,
r: uint256_from_c(&txn_header_start.txn_header.r),
s: uint256_from_c(&txn_header_start.txn_header.s),
y_parity: txn_header_start.txn_header.y_parity,
access_list_count: txn_header_start.txn_header.access_list_count,
auth_list_count: txn_header_start.txn_header.auth_list_count,
data: Bytes::copy_from_slice(data_bytes),
blob_data: Bytes::copy_from_slice(blob_bytes),
},
ExecEvent::TxnAccessListEntry {
txn_index,
Expand Down Expand Up @@ -212,6 +236,7 @@ impl From<&ExecEvent> for SerializableExecEvent {
ExecEvent::TxnEvmOutput { txn_index, output } => Self::TxnEvmOutput {
txn_index: *txn_index,
status: output.receipt.status,
log_count: output.receipt.log_count,
gas_used: output.receipt.gas_used,
},
ExecEvent::TxnLog {
Expand All @@ -221,6 +246,7 @@ impl From<&ExecEvent> for SerializableExecEvent {
data_bytes,
} => Self::TxnLog {
txn_index: *txn_index,
log_index: txn_log.index,
address: Address::from_slice(&txn_log.address.bytes),
topics: Bytes::copy_from_slice(topic_bytes),
data: Bytes::copy_from_slice(data_bytes),
Expand Down Expand Up @@ -286,7 +312,7 @@ pub struct SerializableEventData {
pub txn_idx: Option<usize>,
pub payload: SerializableExecEvent,
pub seqno: u64,
pub timestamp_ns: NanoTimestamp,
pub timestamp_ns: u64,
}

impl From<&EventData> for SerializableEventData {
Expand Down
33 changes: 0 additions & 33 deletions backend/src/lib/timestamp.rs

This file was deleted.