diff --git a/.gitignore b/.gitignore index f1560b9..817cfae 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target /contracts/out +.env \ No newline at end of file diff --git a/crates/uni-v4-common/src/shared_pools.rs b/crates/uni-v4-common/src/shared_pools.rs index 60a9c6a..36c6a67 100644 --- a/crates/uni-v4-common/src/shared_pools.rs +++ b/crates/uni-v4-common/src/shared_pools.rs @@ -1,6 +1,9 @@ use std::{ ops::Deref, - sync::{Arc, atomic::AtomicU64} + sync::{ + Arc, + atomic::{AtomicU64, Ordering} + } }; use alloy_primitives::{B256, FixedBytes}; @@ -71,7 +74,7 @@ impl UniswapPools { pub fn update_pools(&self, mut updates: Vec) { if updates.is_empty() { - return + return; } let mut new_block_number = None; @@ -144,8 +147,8 @@ impl UniswapPools { self.pools.insert(pool_id, state); } PoolUpdate::Slot0Update(update) => { - if update.current_block != update.current_block { - continue + if update.current_block != self.block_number.load(Ordering::Acquire) { + continue; } let Some(mut pool) = self.pools.get_mut(&update.angstrom_pool_id) else { diff --git a/crates/uni-v4-upkeeper/src/pool_manager_service.rs b/crates/uni-v4-upkeeper/src/pool_manager_service.rs index 124e4ab..5442617 100644 --- a/crates/uni-v4-upkeeper/src/pool_manager_service.rs +++ b/crates/uni-v4-upkeeper/src/pool_manager_service.rs @@ -275,6 +275,13 @@ where pool_id ); } + + if let Some(slot0_stream) = &mut self.slot0_stream + && let Some(angstrom_pool_id) = + self.factory.registry().public_key_from_private(pool_id) + { + slot0_stream.subscribe_pools(HashSet::from([angstrom_pool_id])); + } } PoolUpdate::PoolRemoved { pool_id, .. } => { tracing::info!("Pool removed: {:?}", pool_id); @@ -427,8 +434,11 @@ where if let Some(slot0_stream) = this.slot0_stream.as_mut() { let mut slot0_updates = Vec::new(); while let Poll::Ready(Some(update)) = slot0_stream.poll_next_unpin(cx) { + tracing::error!("got slot0 update"); slot0_updates.push(update); } + + tracing::error!("LENGTH WHEN DISPATCHING: {}", slot0_updates.len()); for update in slot0_updates { let pool_update = PoolUpdate::Slot0Update(update); this.dispatch_update(pool_update); diff --git a/examples/pool_manager_initialization_only.rs b/examples/pool_manager_initialization_only.rs index 756c1c9..d077577 100644 --- a/examples/pool_manager_initialization_only.rs +++ b/examples/pool_manager_initialization_only.rs @@ -123,23 +123,23 @@ async fn main() -> Result<()> { // Log the message type match msg { PoolUpdate::NewBlock(block) => { - println!("šŸ“¦ Block #{}: Received NewBlock", block); + println!("šŸ“¦ Block #{block}: Received NewBlock"); } PoolUpdate::FeeUpdate { pool_id, bundle_fee, swap_fee, protocol_fee, .. } => { println!( - "šŸ’° Received FeeUpdate for pool {:?} - bundle: {}, swap: {}, protocol: {}", - pool_id, bundle_fee, swap_fee, protocol_fee + "šŸ’° Received FeeUpdate for pool {pool_id:?} - bundle: {bundle_fee}, swap: \ + {swap_fee}, protocol: {protocol_fee}" ); } PoolUpdate::PoolRemoved { pool_id, .. } => { - println!("šŸ—‘ļø Received PoolRemoved for pool {:?}", pool_id); + println!("šŸ—‘ļø Received PoolRemoved for pool {pool_id:?}"); } PoolUpdate::UpdatedSlot0 { pool_id, .. } => { - println!("šŸ“Š Received UpdatedSlot0 for pool {:?}", pool_id); + println!("šŸ“Š Received UpdatedSlot0 for pool {pool_id:?}"); } PoolUpdate::NewPoolState { pool_id, state } => { local_pools.insert(pool_id, state); - println!("šŸŠ Received NewPoolState for pool {:?}", pool_id); + println!("šŸŠ Received NewPoolState for pool {pool_id:?}"); } PoolUpdate::SwapEvent { .. } => { // This shouldn't happen in InitializationOnly mode @@ -166,12 +166,12 @@ async fn main() -> Result<()> { local_pools.len() ); if filtered_count > 0 { - println!(" āš ļø {} unexpected events received", filtered_count); + println!(" āš ļø {filtered_count} unexpected events received"); } } } - println!("Channel closed after {} messages", message_count); + println!("Channel closed after {message_count} messages"); }); // Main loop - just wait and print status diff --git a/examples/pool_manager_with_channel.rs b/examples/pool_manager_with_channel.rs index ef315bd..4d71c48 100644 --- a/examples/pool_manager_with_channel.rs +++ b/examples/pool_manager_with_channel.rs @@ -51,7 +51,7 @@ async fn main() -> Result<()> { .await .with_stream_mode(stream_mode); - println!(" Using stream mode: {:?}", stream_mode); + println!(" Using stream mode: {stream_mode:?}"); // Create block stream let latest_block = provider @@ -103,8 +103,8 @@ async fn main() -> Result<()> { tokio::spawn(service); // Spawn a task to receive and process updates - let update_processor = tokio::spawn(async move { - let mut local_pools = initial_pools; + let _update_processor = tokio::spawn(async move { + let local_pools = initial_pools; let mut message_count = 0; println!("šŸ“Ø Starting message receiver..."); @@ -115,22 +115,22 @@ async fn main() -> Result<()> { // Log the message type match &msg { PoolUpdate::NewBlock(block) => { - println!("šŸ“¦ Block #{}: Received NewBlock", block); + println!("šŸ“¦ Block #{block}: Received NewBlock"); } PoolUpdate::NewPool { pool_id, .. } => { - println!("šŸŠ Received NewPool config for pool {:?}", pool_id); + println!("šŸŠ Received NewPool config for pool {pool_id:?}"); } PoolUpdate::SwapEvent { pool_id, .. } => { - println!("šŸ’± Received SwapEvent for pool {:?}", pool_id); + println!("šŸ’± Received SwapEvent for pool {pool_id:?}"); } PoolUpdate::LiquidityEvent { pool_id, .. } => { - println!("šŸ’§ Received LiquidityEvent for pool {:?}", pool_id); + println!("šŸ’§ Received LiquidityEvent for pool {pool_id:?}"); } PoolUpdate::NewTicks { pool_id, ticks, .. } => { println!("šŸ“Š Received NewTicks for pool {:?} ({} ticks)", pool_id, ticks.len()); } PoolUpdate::NewPoolState { pool_id, .. } => { - println!("šŸ†• Received NewPoolState with state for pool {:?}", pool_id); + println!("šŸ†• Received NewPoolState with state for pool {pool_id:?}"); } PoolUpdate::Slot0Update(update) => { println!("šŸ”„ Received Slot0Update for pool {:?}", update.angstrom_pool_id); @@ -153,7 +153,7 @@ async fn main() -> Result<()> { } } - println!("Channel closed after {} messages", message_count); + println!("Channel closed after {message_count} messages"); }); // Main loop - just wait and print status diff --git a/examples/test_channel_mode.rs b/examples/test_channel_mode.rs index 944aaef..e146f52 100644 --- a/examples/test_channel_mode.rs +++ b/examples/test_channel_mode.rs @@ -55,16 +55,16 @@ async fn main() -> Result<()> { count += 1; match msg { PoolUpdate::NewBlock(block) => { - println!(" āœ… Received NewBlock #{}", block); + println!(" āœ… Received NewBlock #{block}"); } PoolUpdate::NewPool { pool_id, .. } => { - println!(" āœ… Received NewPool for {:?}", pool_id); + println!(" āœ… Received NewPool for {pool_id:?}"); } PoolUpdate::NewTicks { pool_id, ticks, .. } => { println!(" āœ… Received NewTicks for {:?} ({} ticks)", pool_id, ticks.len()); } PoolUpdate::NewPoolState { pool_id, .. } => { - println!(" āœ… Received NewPoolState for {:?}", pool_id); + println!(" āœ… Received NewPoolState for {pool_id:?}"); } _ => { println!(" āœ… Received other update type"); @@ -73,7 +73,7 @@ async fn main() -> Result<()> { // Exit after receiving a few messages for this test if count >= 3 { - println!("\nšŸŽ‰ Test passed! Received {} messages via channel", count); + println!("\nšŸŽ‰ Test passed! Received {count} messages via channel"); break; } } diff --git a/tests/mainnet_pool_swap_test.rs b/tests/mainnet_pool_swap_test.rs index 04bd7f7..5a7d8bf 100644 --- a/tests/mainnet_pool_swap_test.rs +++ b/tests/mainnet_pool_swap_test.rs @@ -1,14 +1,11 @@ -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; use alloy::{ primitives::{I256, address}, - providers::{Provider, ProviderBuilder} + providers::ProviderBuilder }; use alloy_primitives::U256; -use uni_v4_structure::sqrt_pricex96::SqrtPriceX96; -use uni_v4_upkeeper::{ - pool_manager_service_builder::PoolManagerServiceBuilder, slot0::NoOpSlot0Stream -}; +use uni_v4_upkeeper::pool_manager_service_builder::PoolManagerServiceBuilder; // Test configuration - Uses ETH_URL environment variable fn get_eth_url() -> Option { @@ -44,7 +41,7 @@ async fn test_specific_pool_at_block() { .unwrap() ); - println!("Loading pools at block {} to find available pools", deploy_block); + println!("Loading pools at block {deploy_block} to find available pools"); // Load pools to see what's available let service = PoolManagerServiceBuilder::new_with_noop_stream( @@ -67,7 +64,7 @@ async fn test_specific_pool_at_block() { // List all pools with their details for (idx, entry) in pools.get_pools().iter().enumerate() { let (pool_id, pool_state) = entry.pair(); - println!("\n[Pool {}] ID: {:?}", idx, pool_id); + println!("\n[Pool {idx}] ID: {pool_id:?}"); println!("block {}", pool_state.block_number()); println!(" Token0: {:?} (decimals: {})", pool_state.token0, pool_state.token0_decimals); println!(" Token1: {:?} (decimals: {})", pool_state.token1, pool_state.token1_decimals); @@ -85,7 +82,7 @@ async fn test_specific_pool_at_block() { println!(" āœ“ Swap successful - t0 out: {}", result.total_d_t0); } Err(e) => { - println!(" āœ— Swap failed: {}", e); + println!(" āœ— Swap failed: {e}"); } } }