Skip to content

Commit af4e678

Browse files
authored
add client stateful tracking, use ring timestamps (#7)
* add client stateful tracking, use ring timestamps * add pct savings * compact savings log line * clean up event ordering
1 parent 6cd8e23 commit af4e678

File tree

5 files changed

+124
-71
lines changed

5 files changed

+124
-71
lines changed

backend/src/bin/client.rs

Lines changed: 86 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1+
use alloy_primitives::B256;
12
use clap::Parser;
23
use execution_events_example::event_listener::EventName;
4+
use execution_events_example::serializable_event::SerializableExecEvent;
35
use execution_events_example::{event_filter::ClientMessage, server::ServerMessage};
46
use futures_util::{SinkExt, StreamExt};
5-
use std::collections::HashSet;
7+
use std::collections::HashMap;
68
use tokio_tungstenite::{connect_async, tungstenite::Message};
79

810
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
@@ -28,6 +30,30 @@ struct Cli {
2830
verbose_accesses: bool,
2931
}
3032

33+
macro_rules! log_event {
34+
// Entry: just message
35+
($msg:expr) => {
36+
tracing::info!("------> {}", $msg)
37+
};
38+
// Entry: message with arbitrary number of key=value pairs
39+
($msg:expr, $($key:ident = $value:expr),+ $(,)?) => {{
40+
let mut s = format!("------> {}", $msg);
41+
$(
42+
s.push_str(&format!(" {}={:?}", stringify!($key), $value));
43+
)*
44+
tracing::info!("{}", s);
45+
}};
46+
}
47+
48+
#[derive(Default)]
49+
struct ClientState {
50+
events_witnessed: usize,
51+
block_start_ns: u64,
52+
txs_start_ns: HashMap<usize, (B256, u64)>,
53+
block_txns_total_duration: std::time::Duration,
54+
current_block_number: u64,
55+
}
56+
3157
#[tokio::main]
3258
async fn main() -> Result<(), Box<dyn std::error::Error>> {
3359
// Initialize tracing subscriber
@@ -77,9 +103,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
77103

78104
// Read messages from the server
79105
let mut events_per_sec_interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
80-
let mut events_witnessed = 0;
81-
let mut seen_block_starts: HashSet<u64> = HashSet::new();
82-
let mut seen_block_qcs: HashSet<u64> = HashSet::new();
106+
let mut client_state = ClientState::default();
107+
83108
loop {
84109
tokio::select! {
85110
msg = read.next() => {
@@ -92,29 +117,72 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
92117
Ok(Message::Text(text)) => {
93118
match serde_json::from_str::<ServerMessage>(&text) {
94119
Ok(ServerMessage::Events(events)) => {
95-
// Check for duplicate BlockStart events
96120
for event in &events {
97-
if event.event_name == EventName::BlockStart {
98-
if let Some(block_number) = event.block_number {
99-
if !seen_block_starts.insert(block_number) {
100-
warn!("Duplicate BlockStart event for block {}", block_number);
101-
}
121+
match event.event_name {
122+
EventName::TxnPerfEvmEnter => {
123+
println!("tx enter");
124+
}
125+
EventName::TxnPerfEvmExit => {
126+
println!("tx exit");
102127
}
128+
_ => ()
103129
}
104-
if event.event_name == EventName::BlockQC {
105-
if let Some(block_number) = event.block_number {
106-
if !seen_block_qcs.insert(block_number) {
107-
warn!("Duplicate BlockQC event for block {}", block_number);
108-
}
130+
match event.payload {
131+
SerializableExecEvent::BlockStart { block_number, base_fee_per_gas, .. } => {
132+
log_event!("BlockStart", block_number = block_number, base_fee = base_fee_per_gas);
133+
client_state.current_block_number = u64::max(client_state.current_block_number, block_number);
109134
}
135+
SerializableExecEvent::BlockPerfEvmEnter => {
136+
log_event!("BlockPerfEvmEnter");
137+
client_state.block_start_ns = event.timestamp_ns;
138+
}
139+
SerializableExecEvent::TxnHeaderStart { txn_hash, txn_index, .. } => {
140+
log_event!("TxnHeaderStart", txn_index = txn_index, txn_hash = txn_hash);
141+
client_state.txs_start_ns.insert(txn_index, (txn_hash, event.timestamp_ns));
142+
},
143+
SerializableExecEvent::TxnEvmOutput { txn_index, .. } => {
144+
if let Some((txn_hash, txn_start_ns)) = client_state.txs_start_ns.remove(&txn_index) {
145+
let txn_duration = std::time::Duration::from_nanos((event.timestamp_ns - txn_start_ns) as u64);
146+
client_state.block_txns_total_duration += txn_duration;
147+
148+
log_event!("TxnEvmOutput", txn_index = txn_index, txn_hash = txn_hash, duration = txn_duration);
149+
} else {
150+
warn!("TxnPerfEvmExit event received without TxnPerfEvmEnter event: {:?}", txn_index);
151+
}
152+
},
153+
SerializableExecEvent::BlockPerfEvmExit => {
154+
log_event!("BlockPerfEvmExit");
155+
let block_duration = std::time::Duration::from_nanos((event.timestamp_ns - client_state.block_start_ns) as u64);
156+
let parallel_execution_savings = client_state.block_txns_total_duration.checked_sub(block_duration);
157+
let savings_pct = if parallel_execution_savings.is_none() { // This only happens with really small/empty blocks
158+
error!("Parallel execution savings is negative: txs={:?} block={:?} height={}", client_state.block_txns_total_duration, block_duration, client_state.current_block_number);
159+
None
160+
} else {
161+
Some(100.0 * (1.0 - (block_duration.as_nanos() as f64 / client_state.block_txns_total_duration.as_nanos() as f64)))
162+
};
163+
164+
log_event!("BlockPerfEvmExit", height = client_state.current_block_number, block_duration = block_duration, tx_total_duration = client_state.block_txns_total_duration, savings_pct = savings_pct);
165+
166+
client_state.block_txns_total_duration = std::time::Duration::from_nanos(0);
167+
},
168+
SerializableExecEvent::BlockEnd { gas_used, .. } => {
169+
log_event!("BlockEnd", gas_used = gas_used, block_number = client_state.current_block_number);
170+
},
171+
SerializableExecEvent::BlockQC { block_number, .. } => {
172+
log_event!("BlockQC", block_number = block_number);
173+
},
174+
SerializableExecEvent::BlockFinalized { block_number, .. } => {
175+
log_event!("BlockFinalized", block_number = block_number);
176+
},
177+
_ => ()
110178
}
111179
}
112180

113181
info!("Received {} events", events.len());
114182
if cli.verbose_events {
115183
info!("Events: {:?}", events);
116184
}
117-
events_witnessed += events.len();
185+
client_state.events_witnessed += events.len();
118186
}
119187
Ok(ServerMessage::TopAccesses(top_accesses)) => {
120188
info!("Received top accesses");
@@ -147,8 +215,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
147215
}
148216
}
149217
_ = events_per_sec_interval.tick() => {
150-
info!("Events per second: {}", events_witnessed);
151-
events_witnessed = 0;
218+
info!("Events per second: {}", client_state.events_witnessed);
219+
client_state.events_witnessed = 0;
152220
}
153221
}
154222
}

backend/src/lib.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,3 @@
1-
pub mod timestamp {
2-
include!("lib/timestamp.rs");
3-
}
41
pub mod event_filter {
52
include!("lib/event_filter.rs");
63
}

backend/src/lib/event_listener.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,6 @@ use serde::{Deserialize, Serialize};
1313
use std::{ffi::CStr, time::Duration};
1414
use tracing::{debug, error, info, warn};
1515

16-
use super::timestamp::get_unix_time_ns;
17-
use super::timestamp::NanoTimestamp;
18-
1916
lazy_static! {
2017
static ref EXEC_EVENT_NAMES: [&'static str; MONAD_EXEC_EVENT_COUNT] =
2118
std::array::from_fn(|event_type| unsafe {
@@ -122,7 +119,7 @@ impl EventName {
122119

123120
#[derive(Debug, Clone)]
124121
pub struct EventData {
125-
pub timestamp_ns: NanoTimestamp,
122+
pub timestamp_ns: u64,
126123
pub event_name: EventName,
127124
pub seqno: u64,
128125
pub block_number: Option<u64>,
@@ -152,12 +149,10 @@ fn event_to_data(event: &EventDescriptor<ExecEventDecoder>) -> Option<EventData>
152149
let EventDescriptorInfo {
153150
seqno,
154151
event_type,
155-
record_epoch_nanos: _,
152+
record_epoch_nanos,
156153
flow_info,
157154
} = event.info();
158-
159-
let timestamp_ns = get_unix_time_ns();
160-
155+
161156
// Convert event_type to EventName enum for type safety
162157
let event_name = EventName::from_str(EXEC_EVENT_NAMES[event_type as usize])?;
163158

@@ -181,7 +176,7 @@ fn event_to_data(event: &EventDescriptor<ExecEventDecoder>) -> Option<EventData>
181176
};
182177

183178
Some(EventData {
184-
timestamp_ns,
179+
timestamp_ns: record_epoch_nanos,
185180
event_name,
186181
seqno,
187182
block_number,

backend/src/lib/serializable_event.rs

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@ use alloy_primitives::{Address, Bytes, B256, U256};
33
use monad_exec_events::{ffi::*, ExecEvent};
44
use serde::{Deserialize, Serialize};
55

6-
use super::timestamp::NanoTimestamp;
7-
86
/// Serializable version of ExecEvent using alloy-primitives for type safety
97
#[derive(Debug, Clone, Serialize, Deserialize)]
108
#[serde(tag = "type")]
119
pub enum SerializableExecEvent {
1210
RecordError {
1311
error_type: u16,
12+
dropped_event_type: u16,
13+
truncated_payload_size: u32,
14+
requested_payload_size: u64,
1415
},
1516
BlockStart {
1617
block_number: u64,
@@ -41,6 +42,7 @@ pub enum SerializableExecEvent {
4142
round: u64,
4243
},
4344
BlockFinalized {
45+
block_id: B256,
4446
block_number: u64,
4547
},
4648
BlockVerified {
@@ -50,13 +52,21 @@ pub enum SerializableExecEvent {
5052
txn_index: usize,
5153
txn_hash: B256,
5254
sender: Address,
53-
to: Address,
55+
txn_type: u8,
56+
chain_id: U256,
57+
nonce: u64,
5458
gas_limit: u64,
5559
max_fee_per_gas: U256,
5660
max_priority_fee_per_gas: U256,
5761
value: U256,
5862
data: Bytes,
59-
blob_data: Bytes,
63+
to: Address,
64+
is_contract_creation: bool,
65+
r: U256,
66+
s: U256,
67+
y_parity: bool,
68+
access_list_count: u32,
69+
auth_list_count: u32,
6070
},
6171
TxnAccessListEntry {
6272
txn_index: usize,
@@ -76,11 +86,13 @@ pub enum SerializableExecEvent {
7686
TxnPerfEvmExit,
7787
TxnEvmOutput {
7888
txn_index: usize,
89+
log_count: u32,
7990
status: bool,
8091
gas_used: u64,
8192
},
8293
TxnLog {
8394
txn_index: usize,
95+
log_index: u32,
8496
address: Address,
8597
topics: Bytes,
8698
data: Bytes,
@@ -134,6 +146,9 @@ impl From<&ExecEvent> for SerializableExecEvent {
134146
match event {
135147
ExecEvent::RecordError(err) => Self::RecordError {
136148
error_type: err.error_type,
149+
dropped_event_type: err.dropped_event_type,
150+
truncated_payload_size: err.truncated_payload_size,
151+
requested_payload_size: err.requested_payload_size,
137152
},
138153
ExecEvent::BlockStart(block) => Self::BlockStart {
139154
block_number: block.block_tag.block_number,
@@ -162,6 +177,7 @@ impl From<&ExecEvent> for SerializableExecEvent {
162177
round: qc.round,
163178
},
164179
ExecEvent::BlockFinalized(finalized) => Self::BlockFinalized {
180+
block_id: B256::from_slice(&finalized.id.bytes),
165181
block_number: finalized.block_number,
166182
},
167183
ExecEvent::BlockVerified(verified) => Self::BlockVerified {
@@ -171,20 +187,28 @@ impl From<&ExecEvent> for SerializableExecEvent {
171187
txn_index,
172188
txn_header_start,
173189
data_bytes,
174-
blob_bytes,
190+
..
175191
} => Self::TxnHeaderStart {
176192
txn_index: *txn_index,
177193
txn_hash: B256::from_slice(&txn_header_start.txn_hash.bytes),
178194
sender: Address::from_slice(&txn_header_start.sender.bytes),
179-
to: Address::from_slice(&txn_header_start.txn_header.to.bytes),
195+
txn_type: txn_header_start.txn_header.txn_type,
196+
chain_id: uint256_from_c(&txn_header_start.txn_header.chain_id),
197+
nonce: txn_header_start.txn_header.nonce,
180198
gas_limit: txn_header_start.txn_header.gas_limit,
181199
max_fee_per_gas: uint256_from_c(&txn_header_start.txn_header.max_fee_per_gas),
182200
max_priority_fee_per_gas: uint256_from_c(
183201
&txn_header_start.txn_header.max_priority_fee_per_gas,
184202
),
185203
value: uint256_from_c(&txn_header_start.txn_header.value),
204+
to: Address::from_slice(&txn_header_start.txn_header.to.bytes),
205+
is_contract_creation: txn_header_start.txn_header.is_contract_creation,
206+
r: uint256_from_c(&txn_header_start.txn_header.r),
207+
s: uint256_from_c(&txn_header_start.txn_header.s),
208+
y_parity: txn_header_start.txn_header.y_parity,
209+
access_list_count: txn_header_start.txn_header.access_list_count,
210+
auth_list_count: txn_header_start.txn_header.auth_list_count,
186211
data: Bytes::copy_from_slice(data_bytes),
187-
blob_data: Bytes::copy_from_slice(blob_bytes),
188212
},
189213
ExecEvent::TxnAccessListEntry {
190214
txn_index,
@@ -212,6 +236,7 @@ impl From<&ExecEvent> for SerializableExecEvent {
212236
ExecEvent::TxnEvmOutput { txn_index, output } => Self::TxnEvmOutput {
213237
txn_index: *txn_index,
214238
status: output.receipt.status,
239+
log_count: output.receipt.log_count,
215240
gas_used: output.receipt.gas_used,
216241
},
217242
ExecEvent::TxnLog {
@@ -221,6 +246,7 @@ impl From<&ExecEvent> for SerializableExecEvent {
221246
data_bytes,
222247
} => Self::TxnLog {
223248
txn_index: *txn_index,
249+
log_index: txn_log.index,
224250
address: Address::from_slice(&txn_log.address.bytes),
225251
topics: Bytes::copy_from_slice(topic_bytes),
226252
data: Bytes::copy_from_slice(data_bytes),
@@ -286,7 +312,7 @@ pub struct SerializableEventData {
286312
pub txn_idx: Option<usize>,
287313
pub payload: SerializableExecEvent,
288314
pub seqno: u64,
289-
pub timestamp_ns: NanoTimestamp,
315+
pub timestamp_ns: u64,
290316
}
291317

292318
impl From<&EventData> for SerializableEventData {

backend/src/lib/timestamp.rs

Lines changed: 0 additions & 33 deletions
This file was deleted.

0 commit comments

Comments
 (0)