Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
/target
/contracts/out
.env
11 changes: 7 additions & 4 deletions crates/uni-v4-common/src/shared_pools.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::{
ops::Deref,
sync::{Arc, atomic::AtomicU64}
sync::{
Arc,
atomic::{AtomicU64, Ordering}
}
};

use alloy_primitives::{B256, FixedBytes};
Expand Down Expand Up @@ -71,7 +74,7 @@ impl UniswapPools {

pub fn update_pools(&self, mut updates: Vec<PoolUpdate>) {
if updates.is_empty() {
return
return;
}

let mut new_block_number = None;
Expand Down Expand Up @@ -144,8 +147,8 @@ impl UniswapPools {
self.pools.insert(pool_id, state);
}
PoolUpdate::Slot0Update(update) => {
if update.current_block != update.current_block {
continue
if update.current_block != self.block_number.load(Ordering::Acquire) {
continue;
}

let Some(mut pool) = self.pools.get_mut(&update.angstrom_pool_id) else {
Expand Down
10 changes: 10 additions & 0 deletions crates/uni-v4-upkeeper/src/pool_manager_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,13 @@ where
pool_id
);
}

if let Some(slot0_stream) = &mut self.slot0_stream
&& let Some(angstrom_pool_id) =
self.factory.registry().public_key_from_private(pool_id)
{
slot0_stream.subscribe_pools(HashSet::from([angstrom_pool_id]));
}
}
PoolUpdate::PoolRemoved { pool_id, .. } => {
tracing::info!("Pool removed: {:?}", pool_id);
Expand Down Expand Up @@ -427,8 +434,11 @@ where
if let Some(slot0_stream) = this.slot0_stream.as_mut() {
let mut slot0_updates = Vec::new();
while let Poll::Ready(Some(update)) = slot0_stream.poll_next_unpin(cx) {
tracing::error!("got slot0 update");
slot0_updates.push(update);
}

tracing::error!("LENGTH WHEN DISPATCHING: {}", slot0_updates.len());
for update in slot0_updates {
let pool_update = PoolUpdate::Slot0Update(update);
this.dispatch_update(pool_update);
Expand Down
16 changes: 8 additions & 8 deletions examples/pool_manager_initialization_only.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,23 +123,23 @@ async fn main() -> Result<()> {
// Log the message type
match msg {
PoolUpdate::NewBlock(block) => {
println!("📦 Block #{}: Received NewBlock", block);
println!("📦 Block #{block}: Received NewBlock");
}
PoolUpdate::FeeUpdate { pool_id, bundle_fee, swap_fee, protocol_fee, .. } => {
println!(
"💰 Received FeeUpdate for pool {:?} - bundle: {}, swap: {}, protocol: {}",
pool_id, bundle_fee, swap_fee, protocol_fee
"💰 Received FeeUpdate for pool {pool_id:?} - bundle: {bundle_fee}, swap: \
{swap_fee}, protocol: {protocol_fee}"
);
}
PoolUpdate::PoolRemoved { pool_id, .. } => {
println!("🗑️ Received PoolRemoved for pool {:?}", pool_id);
println!("🗑️ Received PoolRemoved for pool {pool_id:?}");
}
PoolUpdate::UpdatedSlot0 { pool_id, .. } => {
println!("📊 Received UpdatedSlot0 for pool {:?}", pool_id);
println!("📊 Received UpdatedSlot0 for pool {pool_id:?}");
}
PoolUpdate::NewPoolState { pool_id, state } => {
local_pools.insert(pool_id, state);
println!("🏊 Received NewPoolState for pool {:?}", pool_id);
println!("🏊 Received NewPoolState for pool {pool_id:?}");
}
PoolUpdate::SwapEvent { .. } => {
// This shouldn't happen in InitializationOnly mode
Expand All @@ -166,12 +166,12 @@ async fn main() -> Result<()> {
local_pools.len()
);
if filtered_count > 0 {
println!(" ⚠️ {} unexpected events received", filtered_count);
println!(" ⚠️ {filtered_count} unexpected events received");
}
}
}

println!("Channel closed after {} messages", message_count);
println!("Channel closed after {message_count} messages");
});

// Main loop - just wait and print status
Expand Down
18 changes: 9 additions & 9 deletions examples/pool_manager_with_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async fn main() -> Result<()> {
.await
.with_stream_mode(stream_mode);

println!(" Using stream mode: {:?}", stream_mode);
println!(" Using stream mode: {stream_mode:?}");

// Create block stream
let latest_block = provider
Expand Down Expand Up @@ -103,8 +103,8 @@ async fn main() -> Result<()> {
tokio::spawn(service);

// Spawn a task to receive and process updates
let update_processor = tokio::spawn(async move {
let mut local_pools = initial_pools;
let _update_processor = tokio::spawn(async move {
let local_pools = initial_pools;
let mut message_count = 0;

println!("📨 Starting message receiver...");
Expand All @@ -115,22 +115,22 @@ async fn main() -> Result<()> {
// Log the message type
match &msg {
PoolUpdate::NewBlock(block) => {
println!("📦 Block #{}: Received NewBlock", block);
println!("📦 Block #{block}: Received NewBlock");
}
PoolUpdate::NewPool { pool_id, .. } => {
println!("🏊 Received NewPool config for pool {:?}", pool_id);
println!("🏊 Received NewPool config for pool {pool_id:?}");
}
PoolUpdate::SwapEvent { pool_id, .. } => {
println!("💱 Received SwapEvent for pool {:?}", pool_id);
println!("💱 Received SwapEvent for pool {pool_id:?}");
}
PoolUpdate::LiquidityEvent { pool_id, .. } => {
println!("💧 Received LiquidityEvent for pool {:?}", pool_id);
println!("💧 Received LiquidityEvent for pool {pool_id:?}");
}
PoolUpdate::NewTicks { pool_id, ticks, .. } => {
println!("📊 Received NewTicks for pool {:?} ({} ticks)", pool_id, ticks.len());
}
PoolUpdate::NewPoolState { pool_id, .. } => {
println!("🆕 Received NewPoolState with state for pool {:?}", pool_id);
println!("🆕 Received NewPoolState with state for pool {pool_id:?}");
}
PoolUpdate::Slot0Update(update) => {
println!("🔄 Received Slot0Update for pool {:?}", update.angstrom_pool_id);
Expand All @@ -153,7 +153,7 @@ async fn main() -> Result<()> {
}
}

println!("Channel closed after {} messages", message_count);
println!("Channel closed after {message_count} messages");
});

// Main loop - just wait and print status
Expand Down
8 changes: 4 additions & 4 deletions examples/test_channel_mode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,16 @@ async fn main() -> Result<()> {
count += 1;
match msg {
PoolUpdate::NewBlock(block) => {
println!(" ✅ Received NewBlock #{}", block);
println!(" ✅ Received NewBlock #{block}");
}
PoolUpdate::NewPool { pool_id, .. } => {
println!(" ✅ Received NewPool for {:?}", pool_id);
println!(" ✅ Received NewPool for {pool_id:?}");
}
PoolUpdate::NewTicks { pool_id, ticks, .. } => {
println!(" ✅ Received NewTicks for {:?} ({} ticks)", pool_id, ticks.len());
}
PoolUpdate::NewPoolState { pool_id, .. } => {
println!(" ✅ Received NewPoolState for {:?}", pool_id);
println!(" ✅ Received NewPoolState for {pool_id:?}");
}
_ => {
println!(" ✅ Received other update type");
Expand All @@ -73,7 +73,7 @@ async fn main() -> Result<()> {

// Exit after receiving a few messages for this test
if count >= 3 {
println!("\n🎉 Test passed! Received {} messages via channel", count);
println!("\n🎉 Test passed! Received {count} messages via channel");
break;
}
}
Expand Down
15 changes: 6 additions & 9 deletions tests/mainnet_pool_swap_test.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
use std::{collections::HashMap, sync::Arc};
use std::sync::Arc;

use alloy::{
primitives::{I256, address},
providers::{Provider, ProviderBuilder}
providers::ProviderBuilder
};
use alloy_primitives::U256;
use uni_v4_structure::sqrt_pricex96::SqrtPriceX96;
use uni_v4_upkeeper::{
pool_manager_service_builder::PoolManagerServiceBuilder, slot0::NoOpSlot0Stream
};
use uni_v4_upkeeper::pool_manager_service_builder::PoolManagerServiceBuilder;

// Test configuration - Uses ETH_URL environment variable
fn get_eth_url() -> Option<String> {
Expand Down Expand Up @@ -44,7 +41,7 @@ async fn test_specific_pool_at_block() {
.unwrap()
);

println!("Loading pools at block {} to find available pools", deploy_block);
println!("Loading pools at block {deploy_block} to find available pools");

// Load pools to see what's available
let service = PoolManagerServiceBuilder::new_with_noop_stream(
Expand All @@ -67,7 +64,7 @@ async fn test_specific_pool_at_block() {
// List all pools with their details
for (idx, entry) in pools.get_pools().iter().enumerate() {
let (pool_id, pool_state) = entry.pair();
println!("\n[Pool {}] ID: {:?}", idx, pool_id);
println!("\n[Pool {idx}] ID: {pool_id:?}");
println!("block {}", pool_state.block_number());
println!(" Token0: {:?} (decimals: {})", pool_state.token0, pool_state.token0_decimals);
println!(" Token1: {:?} (decimals: {})", pool_state.token1, pool_state.token1_decimals);
Expand All @@ -85,7 +82,7 @@ async fn test_specific_pool_at_block() {
println!(" ✓ Swap successful - t0 out: {}", result.total_d_t0);
}
Err(e) => {
println!(" ✗ Swap failed: {}", e);
println!(" ✗ Swap failed: {e}");
}
}
}
Expand Down