From aad4a00ef1df70f6948c6751c334c5672379c1f5 Mon Sep 17 00:00:00 2001 From: Philip Glazman <8378656+philipglazman@users.noreply.github.com> Date: Mon, 22 Sep 2025 17:31:54 -0400 Subject: [PATCH] Add websocket and rpc request gen to txgen --- Cargo.lock | 259 ++++++++- Cargo.toml | 2 + monad-eth-testutil/Cargo.toml | 2 + monad-eth-testutil/examples/txgen/cli.rs | 11 + monad-eth-testutil/examples/txgen/config.rs | 18 + monad-eth-testutil/examples/txgen/run.rs | 47 +- .../examples/txgen/workers/mod.rs | 1 + .../examples/txgen/workers/rpc_request_gen.rs | 490 ++++++++++++++++++ 8 files changed, 816 insertions(+), 14 deletions(-) create mode 100644 monad-eth-testutil/examples/txgen/workers/rpc_request_gen.rs diff --git a/Cargo.lock b/Cargo.lock index 4fc9e1d1c3..d5068b139f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -399,6 +399,17 @@ version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" +[[package]] +name = "alloy-chains" +version = "0.1.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28e2652684758b0d9b389d248b209ed9fd9989ef489a550265fe4bb8454fe7eb" +dependencies = [ + "alloy-primitives", + "num_enum", + "strum 0.27.2", +] + [[package]] name = "alloy-consensus" version = "0.8.3" @@ -575,6 +586,64 @@ dependencies = [ "tiny-keccak", ] +[[package]] +name = "alloy-provider" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc2dfaddd9a30aa870a78a4e1316e3e115ec1e12e552cbc881310456b85c1f24" +dependencies = [ + "alloy-chains", + "alloy-consensus", + "alloy-eips", + "alloy-json-rpc", + "alloy-network", + "alloy-network-primitives", + "alloy-primitives", + "alloy-pubsub", + "alloy-rpc-client", + "alloy-rpc-types-eth", + "alloy-transport", + "alloy-transport-http", + "alloy-transport-ws", + "async-stream", + "async-trait", + "auto_impl", + "dashmap", + "futures", + "futures-utils-wasm", + "lru", + "parking_lot", + "pin-project", + "reqwest 0.12.23", + "schnellru", + "serde", + "serde_json", + "thiserror 2.0.16", + "tokio", + "tracing", + "url", + "wasmtimer", +] + +[[package]] +name = "alloy-pubsub" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "695809e743628d54510c294ad17a4645bd9f465aeb0d20ee9ce9877c9712dc9c" +dependencies = [ + "alloy-json-rpc", + "alloy-primitives", + "alloy-transport", + "bimap", + "futures", + "serde", + "serde_json", + "tokio", + "tokio-stream", + "tower 0.5.2", + "tracing", +] + [[package]] name = "alloy-rlp" version = "0.3.12" @@ -605,8 +674,10 @@ checksum = "531137b283547d5b9a5cafc96b006c64ef76810c681d606f28be9781955293b6" dependencies = [ "alloy-json-rpc", "alloy-primitives", + "alloy-pubsub", "alloy-transport", "alloy-transport-http", + "alloy-transport-ws", "futures", "pin-project", "reqwest 0.12.23", @@ -659,7 +730,7 @@ dependencies = [ "jsonwebtoken", "rand 0.8.5", "serde", - "strum", + "strum 0.26.3", ] [[package]] @@ -845,6 +916,24 @@ dependencies = [ "url", ] +[[package]] +name = "alloy-transport-ws" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fba0e39d181d13c266dbb8ca54ed584a2c66d6e9279afca89c7a6b1825e98abb" +dependencies = [ + "alloy-pubsub", + "alloy-transport", + "futures", + "http 1.3.1", + "rustls 0.23.31", + "serde_json", + "tokio", + "tokio-tungstenite", + "tracing", + "ws_stream_wasm", +] + [[package]] name = "alloy-trie" version = "0.7.9" @@ -1161,7 +1250,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "strum", + "strum 0.26.3", "syn 2.0.106", "thiserror 1.0.69", ] @@ -1223,6 +1312,17 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "async_io_stream" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6d7b9decdf35d8908a7e3ef02f64c5e9b1695e230154c0e8de3969142d9b94c" +dependencies = [ + "futures", + "pharos", + "rustc_version 0.4.1", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -1820,6 +1920,12 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55248b47b0caf0546f7988906588779981c43bb1bc9d0c44087278f80cdb44ba" +[[package]] +name = "bimap" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "230c5f1ca6a325a32553f8640d31ac9b49f2411e901e427570154868b46da4f7" + [[package]] name = "bincode" version = "1.3.3" @@ -3610,6 +3716,12 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hashbrown" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" + [[package]] name = "hashbrown" version = "0.14.5" @@ -5162,6 +5274,7 @@ dependencies = [ "alloy-json-rpc", "alloy-network", "alloy-primitives", + "alloy-provider", "alloy-rlp", "alloy-rpc-client", "alloy-rpc-types", @@ -5198,6 +5311,7 @@ dependencies = [ "serde_json", "thiserror 1.0.69", "tokio", + "tokio-tungstenite", "toml", "tracing", "tracing-subscriber", @@ -5366,7 +5480,7 @@ dependencies = [ "ratatui", "serde", "serde_json", - "strum", + "strum 0.26.3", "tracing", ] @@ -5578,7 +5692,7 @@ dependencies = [ "monad-secp", "monad-types", "serde", - "strum", + "strum 0.26.3", "toml", ] @@ -6583,6 +6697,27 @@ dependencies = [ "libc", ] +[[package]] +name = "num_enum" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a973b4e44ce6cad84ce69d797acf9a044532e4184c4f267913d1b546a0727b7a" +dependencies = [ + "num_enum_derive", + "rustversion", +] + +[[package]] +name = "num_enum_derive" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77e878c846a8abae00dd069496dbe8751b16ac1c3d6bd2a7283a938e8228f90d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "num_threads" version = "0.1.7" @@ -6944,6 +7079,16 @@ dependencies = [ "sha2", ] +[[package]] +name = "pharos" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9567389417feee6ce15dd6527a8a1ecac205ef62c2932bcf3d9f6fc5b78b414" +dependencies = [ + "futures", + "rustc_version 0.4.1", +] + [[package]] name = "pin-project" version = "1.1.10" @@ -7437,7 +7582,7 @@ dependencies = [ "itertools 0.13.0", "lru", "paste", - "strum", + "strum 0.26.3", "unicode-segmentation", "unicode-truncate", "unicode-width 0.2.0", @@ -8053,6 +8198,17 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "schnellru" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "356285bbf17bea63d9e52e96bd18f039672ac92b55b8cb997d6162a2a37d1649" +dependencies = [ + "ahash", + "cfg-if", + "hashbrown 0.13.2", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -8186,6 +8342,12 @@ dependencies = [ "pest", ] +[[package]] +name = "send_wrapper" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73" + [[package]] name = "serde" version = "1.0.219" @@ -8581,7 +8743,16 @@ version = "0.26.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" dependencies = [ - "strum_macros", + "strum_macros 0.26.4", +] + +[[package]] +name = "strum" +version = "0.27.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af23d6f6c1a224baef9d3f61e287d2761385a5b88fdab4eb4c6f11aeb54c4bcf" +dependencies = [ + "strum_macros 0.27.2", ] [[package]] @@ -8597,6 +8768,18 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "strum_macros" +version = "0.27.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7695ce3845ea4b33927c055a39dc438a45b059f7c1b3d91d38d10355fb8cbca7" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "subtle" version = "2.6.1" @@ -9006,6 +9189,24 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9" +dependencies = [ + "futures-util", + "log", + "native-tls", + "rustls 0.23.31", + "rustls-pki-types", + "tokio", + "tokio-native-tls", + "tokio-rustls 0.26.2", + "tungstenite", + "webpki-roots 0.26.11", +] + [[package]] name = "tokio-util" version = "0.7.16" @@ -9315,6 +9516,27 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.3.1", + "httparse", + "log", + "native-tls", + "rand 0.8.5", + "rustls 0.23.31", + "rustls-pki-types", + "sha1", + "thiserror 1.0.69", + "utf-8", +] + [[package]] name = "typed-builder" version = "0.20.1" @@ -9451,6 +9673,12 @@ version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8_iter" version = "1.0.4" @@ -10073,6 +10301,25 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ea2f10b9bb0928dfb1b42b65e1f9e36f7f54dbdf08457afefb38afcdec4fa2bb" +[[package]] +name = "ws_stream_wasm" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c173014acad22e83f16403ee360115b38846fe754e735c5d9d3803fe70c6abc" +dependencies = [ + "async_io_stream", + "futures", + "js-sys", + "log", + "pharos", + "rustc_version 0.4.1", + "send_wrapper", + "thiserror 2.0.16", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wyz" version = "0.5.1" diff --git a/Cargo.toml b/Cargo.toml index d0f2bb30cf..6996d786e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -99,6 +99,7 @@ alloy-eips = "0.8" alloy-json-rpc = "0.8" alloy-network = "0.8" alloy-primitives = "=0.8.18" +alloy-provider = "0.8" alloy-rlp = "0.3" alloy-rpc-client = "0.8" alloy-rpc-types = "0.8" @@ -205,6 +206,7 @@ tiny-keccak = "2" tokio = { version = "1.39", features = ["sync"] } tokio-retry = "0.3.0" tokio-stream = "0.1.17" +tokio-tungstenite = {version="0.24.0", features = ["native-tls"]} tokio-util = "0.7" toml = "0.7" tracing = "0.1" diff --git a/monad-eth-testutil/Cargo.toml b/monad-eth-testutil/Cargo.toml index 3bcef55c5b..dc96aeb306 100644 --- a/monad-eth-testutil/Cargo.toml +++ b/monad-eth-testutil/Cargo.toml @@ -23,9 +23,11 @@ monad-validator = { workspace = true } alloy-consensus = { workspace = true, features = ["k256"] } alloy-eips = { workspace = true } alloy-primitives = { workspace = true } +alloy-provider = { workspace = true, features = ["ws"] } alloy-signer = { workspace = true } alloy-signer-local = { workspace = true } rand = { workspace = true } +tokio-tungstenite = { workspace = true, features = ["native-tls"]} [dev-dependencies] alloy-json-rpc = { workspace = true } diff --git a/monad-eth-testutil/examples/txgen/cli.rs b/monad-eth-testutil/examples/txgen/cli.rs index ff06bc481c..bb564ca958 100644 --- a/monad-eth-testutil/examples/txgen/cli.rs +++ b/monad-eth-testutil/examples/txgen/cli.rs @@ -28,6 +28,9 @@ pub struct CliConfig { #[arg(long, global = true)] pub rpc_url: Option, + #[arg(long, global = true)] + pub ws_url: Option, + /// Target tps of the generator #[arg(long, global = true)] pub tps: Option, @@ -125,6 +128,14 @@ pub struct CliConfig { #[arg(long, global = true)] pub use_static_tps_interval: Option, + /// Spams rpc with common wallet workflow requests + #[arg(long, global = true)] + pub spam_rpc: Option, + + /// Compares rpc and websocket responses + #[arg(long, global = true)] + pub compare_rpc_ws: Option, + /// Otel endpoint #[arg(long, global = true)] pub otel_endpoint: Option, diff --git a/monad-eth-testutil/examples/txgen/config.rs b/monad-eth-testutil/examples/txgen/config.rs index 3b4069cf06..4fe438375a 100644 --- a/monad-eth-testutil/examples/txgen/config.rs +++ b/monad-eth-testutil/examples/txgen/config.rs @@ -30,6 +30,9 @@ pub struct Config { #[serde(default)] pub rpc_urls: Vec, + #[serde(default)] + pub ws_url: String, + /// Funded private keys used to seed native tokens to sender accounts pub root_private_keys: Vec, @@ -80,6 +83,7 @@ impl Default for Config { fn default() -> Self { Self { rpc_urls: vec!["http://localhost:8545".to_string()], + ws_url: "ws://localhost:8546".to_string(), root_private_keys: vec![ "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80".to_string(), "0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d".to_string(), @@ -253,6 +257,12 @@ impl Config { }) .collect() } + + pub fn ws_url(&self) -> Result { + self.ws_url + .parse() + .wrap_err_with(|| format!("Failed to parse WS URL: {}", self.ws_url)) + } } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] @@ -305,6 +315,12 @@ pub struct TrafficGen { /// How many txs should be generated per sender per cycle. /// Or put another way, how many txs should be generated before refreshing the nonce from chain state pub tx_per_sender: Option, + + // Should the txgen spam rpc and websocket with wallet workflow requests + pub spam_rpc: bool, + + /// Should the txgen compare rpc and websocket responses + pub compare_rpc_ws: bool, } impl Default for TrafficGen { @@ -321,6 +337,8 @@ impl Default for TrafficGen { }), sender_group_size: None, tx_per_sender: None, + spam_rpc: false, + compare_rpc_ws: false, } } } diff --git a/monad-eth-testutil/examples/txgen/run.rs b/monad-eth-testutil/examples/txgen/run.rs index 234c4ac584..e7440a29ab 100644 --- a/monad-eth-testutil/examples/txgen/run.rs +++ b/monad-eth-testutil/examples/txgen/run.rs @@ -30,7 +30,10 @@ use serde::{Deserialize, Serialize}; use crate::{ config::{Config, DeployedContract, TrafficGen}, generators::make_generator, - prelude::*, + prelude::{ + rpc_request_gen::{RpcWalletSpam, RpcWsCompare}, + *, + }, shared::{ ecmul::ECMul, eip7702::EIP7702, erc20::ERC20, eth_json_rpc::EthJsonRpc, uniswap::Uniswap, }, @@ -232,7 +235,7 @@ fn run_traffic_gen( let refresher = Refresher::new( rpc_rx, gen_sender, - read_client, + read_client.clone(), Arc::clone(&metrics), base_fee, Duration::from_secs_f64(config.refresh_delay_secs), @@ -242,12 +245,40 @@ fn run_traffic_gen( Arc::clone(shutdown), )?; - Ok([ - critical_task("Refresher", tokio::spawn(refresher.run())).boxed(), - critical_task("Rpc Sender", tokio::spawn(rpc_sender.run())).boxed(), - critical_task("Generator Harness", tokio::spawn(gen.run())).boxed(), - ] - .into_iter()) + let mut tasks = Vec::new(); + + if traffic_gen.spam_rpc { + let spammer = RpcWalletSpam::new( + read_client.clone(), + config.ws_url().expect("WS URL is not valid"), + ); + tasks.push( + critical_task("Spammer", tokio::spawn(async move { spammer.run().await })).boxed(), + ); + } + + if traffic_gen.compare_rpc_ws { + let compare_rpc_ws = RpcWsCompare::new( + read_client.clone(), + config.ws_url().expect("WS URL is not valid"), + ); + tasks.push( + critical_task( + "Compare RPC WS", + tokio::spawn(async move { compare_rpc_ws.run().await }), + ) + .boxed(), + ); + } + + Ok(tasks + .into_iter() + .chain([ + critical_task("Refresher", tokio::spawn(refresher.run())).boxed(), + critical_task("Rpc Sender", tokio::spawn(rpc_sender.run())).boxed(), + critical_task("Generator Harness", tokio::spawn(gen.run())).boxed(), + ]) + .into_iter()) } async fn helper_task( diff --git a/monad-eth-testutil/examples/txgen/workers/mod.rs b/monad-eth-testutil/examples/txgen/workers/mod.rs index 11b3d204d5..7a084b6efc 100644 --- a/monad-eth-testutil/examples/txgen/workers/mod.rs +++ b/monad-eth-testutil/examples/txgen/workers/mod.rs @@ -27,6 +27,7 @@ pub mod committed_tx_watcher; pub mod gen_harness; pub mod metrics; pub mod refresher; +pub mod rpc_request_gen; pub mod rpc_sender; pub use committed_tx_watcher::*; diff --git a/monad-eth-testutil/examples/txgen/workers/rpc_request_gen.rs b/monad-eth-testutil/examples/txgen/workers/rpc_request_gen.rs new file mode 100644 index 0000000000..3b1f43327b --- /dev/null +++ b/monad-eth-testutil/examples/txgen/workers/rpc_request_gen.rs @@ -0,0 +1,490 @@ +// Copyright (C) 2025 Category Labs, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use alloy_primitives::{Address, BlockNumber, U256, U64}; +use alloy_rpc_client::ReqwestClient; +use alloy_rpc_types::TransactionRequest; +use futures::{ + stream::{FuturesUnordered, SplitStream}, + SinkExt, StreamExt, +}; +use tokio::time::Duration; +use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; +use tracing::{debug, error, warn}; +use url::Url; + +// RpcWalletSpam will send common wallet workflow requests to an rpc and websocket endpoints +pub struct RpcWalletSpam { + rpc_client: ReqwestClient, + ws_url: Url, +} + +impl RpcWalletSpam { + pub fn new(rpc_client: ReqwestClient, ws_url: Url) -> Self { + Self { rpc_client, ws_url } + } + + pub async fn run(&self) { + // number of concurrent websocket connections + let num_connections: usize = 4; + + let mut tasks = FuturesUnordered::new(); + for _ in 0..num_connections { + let ws_url = self.ws_url.clone(); + let http_client = self.rpc_client.clone(); + tasks.push(tokio::spawn(async move { + // Each connection has its own request id counter + let mut next_id: u64 = 1; + loop { + // Open a websocket connection + let (ws_stream, _) = match connect_async(&ws_url.to_string()).await { + Ok(ok) => ok, + Err(err) => { + warn!(?err, "Failed to connect websocket; retrying"); + tokio::time::sleep(Duration::from_millis(200)).await; + continue; + } + }; + let (mut write, mut read) = ws_stream.split(); + + async fn ws_call( + write: &mut futures::stream::SplitSink< + tokio_tungstenite::WebSocketStream< + tokio_tungstenite::MaybeTlsStream, + >, + Message, + >, + read: &mut futures::stream::SplitStream< + tokio_tungstenite::WebSocketStream< + tokio_tungstenite::MaybeTlsStream, + >, + >, + id: u64, + method: &str, + params: serde_json::Value, + ) -> Result { + let req = serde_json::json!({ + "id": id, + "jsonrpc": "2.0", + "method": method, + "params": params, + }); + write + .send(Message::Text(req.to_string())) + .await + .map_err(|e| format!("ws send error: {:?}", e))?; + loop { + let msg = read + .next() + .await + .ok_or_else(|| "ws closed".to_string()) + .and_then(|r| r.map_err(|e| format!("ws recv error: {:?}", e)))?; + match msg { + Message::Text(txt) => { + if let Ok(json) = + serde_json::from_str::(&txt) + { + if json.get("id") == Some(&serde_json::Value::from(id)) { + if let Some(err) = json.get("error") { + return Err(format!("ws error response: {}", err)); + } + if let Some(result) = json.get("result") { + return Ok(result.clone()); + } + } + } + } + Message::Ping(data) => { + // respond to ping + let _ = write.send(Message::Pong(data)).await; + } + Message::Binary(_) + | Message::Pong(_) + | Message::Close(_) + | Message::Frame(_) => {} + } + } + } + + // 1) eth_chainId + let id = { + let v = next_id; + next_id += 1; + v + }; + let (ws_chain, http_chain) = tokio::join!( + ws_call( + &mut write, + &mut read, + id, + "eth_chainId", + serde_json::json!([]) + ), + async { + http_client + .request_noparams::("eth_chainId") + .map_resp(|res| res) + .await + } + ); + match (ws_chain, http_chain) { + (Ok(ws_val), Ok(http_u64)) => { + let ws_u64: U64 = + serde_json::from_value(ws_val).expect("invalid ws chainId"); + assert_eq!(ws_u64, http_u64, "eth_chainId mismatch"); + } + _ => { + warn!("eth_chainId failed; restarting connection"); + continue; + } + } + + // 2) eth_blockNumber + let id = { + let v = next_id; + next_id += 1; + v + }; + let (ws_bn, http_bn) = tokio::join!( + ws_call( + &mut write, + &mut read, + id, + "eth_blockNumber", + serde_json::json!([]) + ), + async { + http_client + .request_noparams::("eth_blockNumber") + .map_resp(|res| res) + .await + } + ); + let block_number: U64 = match (ws_bn, http_bn) { + (Ok(ws_val), Ok(http_u64)) => { + let _ws_u64: U64 = + serde_json::from_value(ws_val).expect("invalid ws blockNumber"); + // Do not assert since blockchain might have progressed since the last request + http_u64 + } + _ => { + warn!("eth_blockNumber failed; restarting connection"); + continue; + } + }; + + // 3) eth_getBlockByNumber for the exact same block + let id = { + let v = next_id; + next_id += 1; + v + }; + let params = serde_json::json!([block_number, true]); + let (ws_block, http_block) = tokio::join!( + ws_call(&mut write, &mut read, id, "eth_getBlockByNumber", params), + async { + http_client + .request::<_, alloy_rpc_types_eth::Block>( + "eth_getBlockByNumber", + (block_number, true), + ) + .await + } + ); + match (ws_block, http_block) { + (Ok(ws_val), Ok(http_blk)) => { + let mut ws_blk: alloy_rpc_types_eth::Block = + serde_json::from_value(ws_val).expect("invalid ws block"); + ws_blk.header.parent_beacon_block_root = None; + let mut http_blk = http_blk; + http_blk.header.parent_beacon_block_root = None; + assert_eq!(ws_blk, http_blk, "eth_getBlockByNumber mismatch"); + } + _ => { + warn!("eth_getBlockByNumber failed; restarting connection"); + continue; + } + } + + // 4) eth_getBalance at the same block + let random_addr = Address::random(); + let id = { + let v = next_id; + next_id += 1; + v + }; + let (ws_bal, http_bal) = tokio::join!( + ws_call( + &mut write, + &mut read, + id, + "eth_getBalance", + serde_json::json!([random_addr, block_number]) + ), + async { + http_client + .request::<_, U256>("eth_getBalance", (&random_addr, block_number)) + .await + } + ); + match (ws_bal, http_bal) { + (Ok(ws_val), Ok(http_u256)) => { + let ws_u256: U256 = + serde_json::from_value(ws_val).expect("invalid ws balance"); + assert_eq!(ws_u256, http_u256, "eth_getBalance mismatch"); + } + _ => warn!("eth_getBalance failed; continuing"), + } + + // 5) eth_getTransactionCount at the same block + let id = { + let v = next_id; + next_id += 1; + v + }; + let (ws_nonce, http_nonce) = tokio::join!( + ws_call( + &mut write, + &mut read, + id, + "eth_getTransactionCount", + serde_json::json!([random_addr, block_number]) + ), + async { + http_client + .request::<_, U256>( + "eth_getTransactionCount", + (&random_addr, block_number), + ) + .await + } + ); + match (ws_nonce, http_nonce) { + (Ok(ws_val), Ok(http_u256)) => { + let ws_u256: U256 = + serde_json::from_value(ws_val).expect("invalid ws nonce"); + assert_eq!(ws_u256, http_u256, "eth_getTransactionCount mismatch"); + } + _ => warn!("eth_getTransactionCount failed; continuing"), + } + + // 6) eth_estimateGas + let estimate_req = TransactionRequest { + from: Some(random_addr), + to: Some(random_addr.into()), + value: Some(U256::from(0)).into(), + ..Default::default() + }; + let id = { + let v = next_id; + next_id += 1; + v + }; + let (ws_gas, http_gas) = tokio::join!( + ws_call( + &mut write, + &mut read, + id, + "eth_estimateGas", + serde_json::json!([estimate_req, block_number]) + ), + async { + http_client + .request::<_, U256>("eth_estimateGas", (estimate_req, block_number)) + .await + } + ); + match (ws_gas, http_gas) { + (Ok(ws_val), Ok(http_u256)) => { + let ws_u256: U256 = + serde_json::from_value(ws_val).expect("invalid ws gas"); + assert_eq!(ws_u256, http_u256, "eth_estimateGas mismatch"); + } + (Err(_), Err(_)) => { + // both failed; acceptable as parity + } + _ => warn!("eth_estimateGas parity failed; continuing"), + } + + // Close and immediately loop to create a fresh connection and continue spamming + } + })); + } + + while let Some(res) = tasks.next().await { + if let Err(err) = res { + warn!(?err, "connection task failed"); + } + } + } +} + +// RpcWsCompare compares results between an rpc and websocket endpoint +pub struct RpcWsCompare { + rpc_client: ReqwestClient, + ws_url: Url, +} + +impl RpcWsCompare { + pub fn new(rpc_client: ReqwestClient, ws_url: Url) -> Self { + Self { rpc_client, ws_url } + } + + pub async fn run(&self) { + let client = self.rpc_client.clone(); + + // Get the current tip from the rpc + let mut tip = client + .request_noparams::("eth_blockNumber") + .map_resp(|res| res.to()) + .await + .unwrap(); + + let (block_sender, mut block_receiver) = tokio::sync::mpsc::channel(100); + let rpc_client_clone = self.rpc_client.clone(); + tokio::spawn(async move { + let client = rpc_client_clone; + loop { + let block = Self::get_block_by_number(&client, tip).await.unwrap(); + block_sender.send(block).await.unwrap(); + tip += 1; + tokio::time::sleep(Duration::from_millis(500)).await; // Add a small delay + } + }); + + // Create a websocket stream to listen to new blocks + let (ws_stream, _) = connect_async(&self.ws_url.to_string()) + .await + .expect("Failed to connect"); + let (mut write, mut read) = ws_stream.split(); + write.send(Message::Text("{ \"id\": 1, \"jsonrpc\": \"2.0\", \"method\": \"eth_subscribe\", \"params\": [\"newHeads\"] }".into())).await.expect("failed to send message"); + Self::wait_for_subscription_id(&mut read, 1) + .await + .expect("failed to get newHeads subscription ID"); + + let (ws_tx, mut ws_rx) = tokio::sync::mpsc::channel(100); + tokio::spawn(async move { + loop { + tokio::select! { + Some(message) = read.next() => { + let message = message.expect("failed to parse message"); + match message { + Message::Text(text) => { + if let Ok(json) = serde_json::from_str::(&text) { + if let Some(result) = json.get("params").unwrap().get("result") { + // Convert result to alloy_rpc_types_eth::Header + let header = serde_json::from_value::(result.clone()).expect("failed to convert result to alloy_rpc_types_eth::Header"); + ws_tx.send(header).await.unwrap(); + } + } + } + Message::Binary(_) => {} + Message::Ping(data) => { + write.send(Message::Pong(data)).await.expect("failed to send message"); + } + Message::Pong(_) => {} + Message::Close(frame) => { + panic!("Received close message: {:?}", frame); + } + Message::Frame(_) => {} + } + } + } + } + }); + + loop { + let ws_header = ws_rx.recv().await.expect("ws block not found"); + let rpc_header = block_receiver + .recv() + .await + .take() + .expect("rpc block not found") + .header; + + if ws_header.number != rpc_header.number { + panic!( + "block number mismatch websocket {} rpc {}", + ws_header.number, rpc_header.number + ); + } + debug!( + "comparing websocket header with rpc header at height {}", + &ws_header.number + ); + assert_eq!(ws_header, rpc_header); + } + } + + async fn get_block_by_number( + client: &ReqwestClient, + block_number: BlockNumber, + ) -> Option { + loop { + match client + .request::<_, alloy_rpc_types_eth::Block>( + "eth_getBlockByNumber", + (U64::from(block_number), true), + ) + .await + { + Ok(block) => return Some(block), + Err(err) => { + warn!(?err, "failed to get block by number"); + tokio::time::sleep(Duration::from_millis(10)).await; + continue; + } + } + } + } + + async fn wait_for_subscription_id( + read: &mut SplitStream< + tokio_tungstenite::WebSocketStream< + tokio_tungstenite::MaybeTlsStream, + >, + >, + json_rpc_id: u32, + ) -> Option { + while let Some(message) = read.next().await { + let message = message.expect("failed to parse message"); + match message { + Message::Text(text) => { + if let Ok(json) = serde_json::from_str::(&text) { + if let Some(id) = json.get("id") { + if id == json_rpc_id { + if let Some(result) = json.get("result") { + if let Some(sub_id) = result.as_str() { + return Some(sub_id.to_string()); + } + } + if let Some(error) = json.get("error") { + error!("Error in subscription response: {}", error); + return None; + } + } + } + } + } + Message::Ping(_) => { + // We don't have access to the write half here, so we can't respond to pings + warn!("Received ping while waiting for subscription ID"); + } + _ => {} + } + } + None + } +}