|
| 1 | +use std::time::Duration; |
| 2 | + |
| 3 | +use alloy::{network::Ethereum, providers::ProviderBuilder, sol, sol_types::SolEvent}; |
| 4 | +use alloy_node_bindings::Anvil; |
| 5 | +use event_scanner::{EventFilter, EventScanner, EventScannerMessage}; |
| 6 | +use tokio::time::sleep; |
| 7 | +use tokio_stream::StreamExt; |
| 8 | +use tracing::{error, info}; |
| 9 | +use tracing_subscriber::EnvFilter; |
| 10 | + |
| 11 | +sol! { |
| 12 | + #[allow(missing_docs)] |
| 13 | + #[sol(rpc, |
| 14 | +bytecode="608080604052346015576101b0908161001a8239f35b5f80fdfe6080806040526004361015610012575f80fd5b5f3560e01c90816306661abd1461016157508063a87d942c14610145578063d732d955146100ad5763e8927fbc14610048575f80fd5b346100a9575f3660031901126100a9575f5460018101809111610095576020817f7ca2ca9527391044455246730762df008a6b47bbdb5d37a890ef78394535c040925f55604051908152a1005b634e487b7160e01b5f52601160045260245ffd5b5f80fd5b346100a9575f3660031901126100a9575f548015610100575f198101908111610095576020817f53a71f16f53e57416424d0d18ccbd98504d42a6f98fe47b09772d8f357c620ce925f55604051908152a1005b60405162461bcd60e51b815260206004820152601860248201527f436f756e742063616e6e6f74206265206e6567617469766500000000000000006044820152606490fd5b346100a9575f3660031901126100a95760205f54604051908152f35b346100a9575f3660031901126100a9576020905f548152f3fea2646970667358221220b846b706f79f5ae1fc4a4238319e723a092f47ce4051404186424739164ab02264736f6c634300081e0033" |
| 15 | +)] contract Counter { |
| 16 | + uint256 public count; |
| 17 | + |
| 18 | + event CountIncreased(uint256 newCount); |
| 19 | + event CountDecreased(uint256 newCount); |
| 20 | + |
| 21 | + function increase() public { |
| 22 | + count += 1; |
| 23 | + emit CountIncreased(count); |
| 24 | + } |
| 25 | + |
| 26 | + function decrease() public { |
| 27 | + require(count > 0, "Count cannot be negative"); |
| 28 | + count -= 1; |
| 29 | + emit CountDecreased(count); |
| 30 | + } |
| 31 | + |
| 32 | + function getCount() public view returns (uint256) { |
| 33 | + return count; |
| 34 | + } |
| 35 | + } |
| 36 | +} |
| 37 | + |
| 38 | +#[tokio::main] |
| 39 | +async fn main() -> anyhow::Result<()> { |
| 40 | + let _ = tracing_subscriber::fmt().with_env_filter(EnvFilter::from_default_env()).try_init(); |
| 41 | + |
| 42 | + let anvil = Anvil::new().block_time(1).try_spawn()?; |
| 43 | + let wallet = anvil.wallet(); |
| 44 | + let provider = |
| 45 | + ProviderBuilder::new().wallet(wallet.unwrap()).connect(anvil.endpoint().as_str()).await?; |
| 46 | + let counter_contract = Counter::deploy(provider).await?; |
| 47 | + |
| 48 | + let contract_address = counter_contract.address(); |
| 49 | + |
| 50 | + let increase_filter = EventFilter::new() |
| 51 | + .with_contract_address(*contract_address) |
| 52 | + .with_event(Counter::CountIncreased::SIGNATURE); |
| 53 | + |
| 54 | + info!("Creating historical events..."); |
| 55 | + for i in 0..3 { |
| 56 | + let _ = counter_contract.increase().send().await?.get_receipt().await?; |
| 57 | + info!("Historical event {} created", i + 1); |
| 58 | + } |
| 59 | + |
| 60 | + let mut scanner = EventScanner::sync().connect_ws::<Ethereum>(anvil.ws_endpoint_url()).await?; |
| 61 | + |
| 62 | + let mut stream = scanner.create_event_stream(increase_filter); |
| 63 | + |
| 64 | + info!("Starting sync scanner..."); |
| 65 | + let scanner_task = tokio::spawn(async move { |
| 66 | + scanner.start().await.expect("failed to start scanner"); |
| 67 | + }); |
| 68 | + |
| 69 | + // Wait a bit for historical events to be processed |
| 70 | + sleep(Duration::from_secs(2)).await; |
| 71 | + |
| 72 | + info!("Creating live events..."); |
| 73 | + for i in 0..2 { |
| 74 | + let _ = counter_contract.increase().send().await?.get_receipt().await?; |
| 75 | + info!("Live event {} created", i + 1); |
| 76 | + sleep(Duration::from_secs(1)).await; |
| 77 | + } |
| 78 | + |
| 79 | + let mut historical_processed = false; |
| 80 | + let mut live_processed = false; |
| 81 | + |
| 82 | + while let Some(message) = stream.next().await { |
| 83 | + match message { |
| 84 | + EventScannerMessage::Data(logs) => { |
| 85 | + for log in logs { |
| 86 | + let Counter::CountIncreased { newCount } = log.log_decode().unwrap().inner.data; |
| 87 | + if newCount <= 3 { |
| 88 | + info!("Processed historical event: count = {}", newCount); |
| 89 | + historical_processed = true; |
| 90 | + } else { |
| 91 | + info!("Processed live event: count = {}", newCount); |
| 92 | + live_processed = true; |
| 93 | + } |
| 94 | + } |
| 95 | + } |
| 96 | + EventScannerMessage::Error(e) => { |
| 97 | + error!("Received error: {}", e); |
| 98 | + } |
| 99 | + EventScannerMessage::Status(info) => { |
| 100 | + info!("Received status: {:?}", info); |
| 101 | + } |
| 102 | + } |
| 103 | + |
| 104 | + if historical_processed && live_processed { |
| 105 | + info!("Both historical and live events processed successfully!"); |
| 106 | + break; |
| 107 | + } |
| 108 | + } |
| 109 | + |
| 110 | + scanner_task.abort(); |
| 111 | + Ok(()) |
| 112 | +} |
| 113 | + |
0 commit comments