From 31548f17428521136d0ad5ed06d688f5eb64aad1 Mon Sep 17 00:00:00 2001 From: Naman Garg <0708ng@gmail.com> Date: Wed, 25 Jun 2025 11:50:06 +0530 Subject: [PATCH] chore: clippy happy + fmt --- examples/00_symbols.rs | 2 +- examples/03_exchange_provider.rs | 2 +- examples/04_websocket.rs | 2 +- examples/05_builder_orders.rs | 2 +- src/lib.rs | 5 +- src/providers/agent.rs | 83 +++++---- src/providers/batcher.rs | 92 +++++----- src/providers/exchange.rs | 306 +++++++++++++++++-------------- src/providers/info.rs | 6 +- src/providers/mod.rs | 25 ++- src/providers/nonce.rs | 48 ++--- src/providers/order_tracker.rs | 5 +- src/providers/websocket.rs | 140 +++++++------- src/signers/mod.rs | 4 +- src/signers/privy.rs | 61 +++--- src/signers/signer.rs | 2 +- src/types/actions.rs | 22 ++- src/types/eip712.rs | 6 +- src/types/mod.rs | 2 +- src/types/symbols.rs | 4 +- tests/approve_agent_test.rs | 16 +- tests/managed_exchange_test.rs | 75 ++++---- tests/order_tracking_test.rs | 43 ++--- 23 files changed, 520 insertions(+), 433 deletions(-) diff --git a/examples/00_symbols.rs b/examples/00_symbols.rs index ce50dda..abb9ac8 100644 --- a/examples/00_symbols.rs +++ b/examples/00_symbols.rs @@ -1,6 +1,6 @@ //! Example showing how to use the Symbol type -use ferrofluid::types::{Symbol, symbols}; +use ferrofluid::types::{symbols, Symbol}; fn main() { // Using predefined constants diff --git a/examples/03_exchange_provider.rs b/examples/03_exchange_provider.rs index 6131e5c..0f09502 100644 --- a/examples/03_exchange_provider.rs +++ b/examples/03_exchange_provider.rs @@ -2,7 +2,7 @@ use alloy::primitives::Address; use alloy::signers::local::PrivateKeySigner; use ferrofluid::constants::TIF_GTC; use ferrofluid::types::requests::OrderRequest; -use ferrofluid::{ExchangeProvider, signers::AlloySigner}; +use ferrofluid::{signers::AlloySigner, ExchangeProvider}; use uuid::Uuid; #[tokio::main] diff --git a/examples/04_websocket.rs b/examples/04_websocket.rs index e303c8d..88bbe23 100644 --- a/examples/04_websocket.rs +++ b/examples/04_websocket.rs @@ -1,6 +1,6 @@ //! Example of using the WebSocket provider for real-time data -use ferrofluid::{Network, providers::WsProvider, types::ws::Message}; +use ferrofluid::{providers::WsProvider, types::ws::Message, Network}; #[tokio::main] async fn main() -> Result<(), Box> { diff --git a/examples/05_builder_orders.rs b/examples/05_builder_orders.rs index 9ec6366..5fb2586 100644 --- a/examples/05_builder_orders.rs +++ b/examples/05_builder_orders.rs @@ -1,6 +1,6 @@ //! Example of using builder functionality for orders -use alloy::primitives::{B256, address}; +use alloy::primitives::{address, B256}; use alloy::signers::local::PrivateKeySigner; use ferrofluid::{providers::ExchangeProvider, signers::AlloySigner}; diff --git a/src/lib.rs b/src/lib.rs index bd22dfa..3df23e7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,7 +9,6 @@ pub mod utils; pub use constants::Network; pub use errors::HyperliquidError; pub use providers::{ - ExchangeProvider, InfoProvider, - WsProvider, RawWsProvider, ManagedWsProvider, WsConfig, - ManagedExchangeProvider + ExchangeProvider, InfoProvider, ManagedExchangeProvider, ManagedWsProvider, + RawWsProvider, WsConfig, WsProvider, }; diff --git a/src/providers/agent.rs b/src/providers/agent.rs index 0059bc3..6de7ef4 100644 --- a/src/providers/agent.rs +++ b/src/providers/agent.rs @@ -2,13 +2,13 @@ use std::sync::Arc; use std::time::{Duration, Instant}; + use alloy::primitives::Address; use alloy::signers::local::PrivateKeySigner; use tokio::sync::RwLock; + use crate::{ - signers::HyperliquidSigner, - errors::HyperliquidError, - providers::nonce::NonceManager, + errors::HyperliquidError, providers::nonce::NonceManager, signers::HyperliquidSigner, Network, }; @@ -48,7 +48,7 @@ impl AgentWallet { status: AgentStatus::Active, } } - + /// Check if agent should be rotated based on TTL pub fn should_rotate(&self, ttl: Duration) -> bool { match self.status { @@ -56,7 +56,7 @@ impl AgentWallet { AgentStatus::PendingRotation | AgentStatus::Deregistered => true, } } - + /// Get next nonce for this agent pub fn next_nonce(&self) -> u64 { self.nonce_manager.next_nonce(None) @@ -106,70 +106,85 @@ impl AgentManager { network, } } - + /// Get or create an agent, rotating if necessary - pub async fn get_or_rotate_agent(&self, name: &str) -> Result { + pub async fn get_or_rotate_agent( + &self, + name: &str, + ) -> Result { let mut agents = self.agents.write().await; - + // Check if we have an active agent if let Some(agent) = agents.get(name) { - let effective_ttl = self.config.ttl.saturating_sub(self.config.proactive_rotation_buffer); - + let effective_ttl = self + .config + .ttl + .saturating_sub(self.config.proactive_rotation_buffer); + if !agent.should_rotate(effective_ttl) { return Ok(agent.clone()); } - + // Mark for rotation let mut agent_mut = agent.clone(); agent_mut.status = AgentStatus::PendingRotation; agents.insert(name.to_string(), agent_mut); } - + // Create new agent let new_agent = self.create_new_agent(name).await?; agents.insert(name.to_string(), new_agent.clone()); - + Ok(new_agent) } - + /// Create and approve a new agent - async fn create_new_agent(&self, name: &str) -> Result { + async fn create_new_agent( + &self, + name: &str, + ) -> Result { // Generate new key for agent let agent_signer = PrivateKeySigner::random(); let agent_wallet = AgentWallet::new(agent_signer.clone()); - + // We need to approve this agent using the exchange provider // This is a bit circular, but we'll handle it carefully - self.approve_agent_internal(agent_wallet.address, Some(name.to_string())).await?; - + self.approve_agent_internal(agent_wallet.address, Some(name.to_string())) + .await?; + Ok(agent_wallet) } - + /// Internal method to approve agent (will use exchange provider) - async fn approve_agent_internal(&self, agent_address: Address, name: Option) -> Result<(), HyperliquidError> { + async fn approve_agent_internal( + &self, + agent_address: Address, + name: Option, + ) -> Result<(), HyperliquidError> { use crate::providers::RawExchangeProvider; - + // Create a temporary raw provider just for agent approval let raw_provider = match self.network { Network::Mainnet => RawExchangeProvider::mainnet(self.master_signer.clone()), Network::Testnet => RawExchangeProvider::testnet(self.master_signer.clone()), }; - + // Approve the agent raw_provider.approve_agent(agent_address, name).await?; - + Ok(()) } - + /// Get all active agents pub async fn get_active_agents(&self) -> Vec<(String, AgentWallet)> { let agents = self.agents.read().await; - agents.iter() + agents + .iter() .filter(|(_, agent)| agent.status == AgentStatus::Active) .map(|(name, agent)| (name.clone(), agent.clone())) .collect() } - + /// Mark an agent as deregistered pub async fn mark_deregistered(&self, name: &str) { let mut agents = self.agents.write().await; @@ -177,7 +192,7 @@ impl AgentManager { agent.status = AgentStatus::Deregistered; } } - + /// Clean up deregistered agents pub async fn cleanup_deregistered(&self) { let mut agents = self.agents.write().await; @@ -188,27 +203,27 @@ impl AgentManager { #[cfg(test)] mod tests { use super::*; - + #[test] fn test_agent_rotation_check() { let signer = PrivateKeySigner::random(); let agent = AgentWallet::new(signer); - + // Should not rotate immediately assert!(!agent.should_rotate(Duration::from_secs(24 * 60 * 60))); - + // Test with zero duration (should always rotate) assert!(agent.should_rotate(Duration::ZERO)); } - + #[test] fn test_agent_nonce_generation() { let signer = PrivateKeySigner::random(); let agent = AgentWallet::new(signer); - + let nonce1 = agent.next_nonce(); let nonce2 = agent.next_nonce(); - + assert!(nonce2 > nonce1); } -} \ No newline at end of file +} diff --git a/src/providers/batcher.rs b/src/providers/batcher.rs index c2d8be2..cfabde7 100644 --- a/src/providers/batcher.rs +++ b/src/providers/batcher.rs @@ -1,15 +1,17 @@ //! Order batching for high-frequency trading strategies +use std::future::Future; +use std::pin::Pin; use std::sync::Arc; use std::time::Duration; -use std::pin::Pin; -use std::future::Future; -use tokio::sync::{Mutex, mpsc}; + +use tokio::sync::{mpsc, Mutex}; use tokio::time::interval; use uuid::Uuid; -use crate::types::requests::{OrderRequest, CancelRequest}; -use crate::types::responses::ExchangeResponseStatus; + use crate::errors::HyperliquidError; +use crate::types::requests::{CancelRequest, OrderRequest}; +use crate::types::responses::ExchangeResponseStatus; type BoxFuture = Pin + Send + 'static>>; @@ -19,7 +21,8 @@ pub struct PendingOrder { pub order: OrderRequest, pub nonce: u64, pub id: Uuid, - pub response_tx: mpsc::UnboundedSender>, + pub response_tx: + mpsc::UnboundedSender>, } /// Cancel with metadata for batching @@ -28,7 +31,8 @@ pub struct PendingCancel { pub cancel: CancelRequest, pub nonce: u64, pub id: Uuid, - pub response_tx: mpsc::UnboundedSender>, + pub response_tx: + mpsc::UnboundedSender>, } /// Order type classification for priority batching @@ -91,57 +95,57 @@ impl OrderBatcher { /// Create a new order batcher pub fn new(config: BatchConfig) -> (Self, BatcherHandle) { let (shutdown_tx, shutdown_rx) = mpsc::channel(1); - + let batcher = Self { pending_orders: Arc::new(Mutex::new(Vec::new())), pending_cancels: Arc::new(Mutex::new(Vec::new())), _config: config, shutdown_tx, }; - + let handle = BatcherHandle { pending_orders: batcher.pending_orders.clone(), pending_cancels: batcher.pending_cancels.clone(), shutdown_rx, }; - + (batcher, handle) } - + /// Add an order to the batch queue pub async fn add_order(&self, order: OrderRequest, nonce: u64) -> OrderHandle { let id = Uuid::new_v4(); let (tx, rx) = mpsc::unbounded_channel(); - + let pending = PendingOrder { order, nonce, id, response_tx: tx, }; - + self.pending_orders.lock().await.push(pending); - + OrderHandle::Pending { id, rx } } - + /// Add a cancel to the batch queue pub async fn add_cancel(&self, cancel: CancelRequest, nonce: u64) -> OrderHandle { let id = Uuid::new_v4(); let (tx, rx) = mpsc::unbounded_channel(); - + let pending = PendingCancel { cancel, nonce, id, response_tx: tx, }; - + self.pending_cancels.lock().await.push(pending); - + OrderHandle::Pending { id, rx } } - + /// Shutdown the batcher pub async fn shutdown(self) { let _ = self.shutdown_tx.send(()).await; @@ -157,17 +161,21 @@ pub struct BatcherHandle { impl BatcherHandle { /// Run the batching loop (should be spawned as a task) - pub async fn run( - mut self, - mut order_executor: F, - mut cancel_executor: G, - ) + pub async fn run(mut self, mut order_executor: F, mut cancel_executor: G) where - F: FnMut(Vec) -> BoxFuture>> + Send, - G: FnMut(Vec) -> BoxFuture>> + Send, + F: FnMut( + Vec, + ) + -> BoxFuture>> + + Send, + G: FnMut( + Vec, + ) + -> BoxFuture>> + + Send, { let mut interval = interval(Duration::from_millis(100)); // Fixed interval for now - + loop { tokio::select! { _ = interval.tick() => { @@ -176,14 +184,14 @@ impl BatcherHandle { let mut pending = self.pending_orders.lock().await; std::mem::take(&mut *pending) }; - + if !orders.is_empty() { // Separate ALO from regular orders - let (alo_orders, regular_orders): (Vec<_>, Vec<_>) = + let (alo_orders, regular_orders): (Vec<_>, Vec<_>) = orders.into_iter().partition(|o| { o.order.is_alo() }); - + // Process ALO orders first (priority) if !alo_orders.is_empty() { let results = order_executor(alo_orders.clone()).await; @@ -191,7 +199,7 @@ impl BatcherHandle { let _ = order.response_tx.send(result); } } - + // Process regular orders if !regular_orders.is_empty() { let results = order_executor(regular_orders.clone()).await; @@ -200,13 +208,13 @@ impl BatcherHandle { } } } - + // Process cancels let cancels = { let mut pending = self.pending_cancels.lock().await; std::mem::take(&mut *pending) }; - + if !cancels.is_empty() { let results = cancel_executor(cancels.clone()).await; for (cancel, result) in cancels.into_iter().zip(results) { @@ -214,7 +222,7 @@ impl BatcherHandle { } } } - + _ = self.shutdown_rx.recv() => { // Graceful shutdown break; @@ -239,13 +247,13 @@ impl OrderRequest { #[cfg(test)] mod tests { use super::*; - use crate::types::requests::{OrderType, Limit}; - + use crate::types::requests::{Limit, OrderType}; + #[tokio::test] async fn test_order_batching() { let config = BatchConfig::default(); let (batcher, _handle) = OrderBatcher::new(config); - + // Create a test order let order = OrderRequest { asset: 0, @@ -253,14 +261,16 @@ mod tests { limit_px: "50000".to_string(), sz: "0.1".to_string(), reduce_only: false, - order_type: OrderType::Limit(Limit { tif: "Gtc".to_string() }), + order_type: OrderType::Limit(Limit { + tif: "Gtc".to_string(), + }), cloid: None, }; - + // Add to batch let handle = batcher.add_order(order, 123456789).await; - + // Should return pending handle assert!(matches!(handle, OrderHandle::Pending { .. })); } -} \ No newline at end of file +} diff --git a/src/providers/exchange.rs b/src/providers/exchange.rs index 29d61cd..9f7496f 100644 --- a/src/providers/exchange.rs +++ b/src/providers/exchange.rs @@ -3,24 +3,24 @@ use std::{ time::{SystemTime, UNIX_EPOCH}, }; -use alloy::primitives::{Address, B256, keccak256}; +use alloy::primitives::{keccak256, Address, B256}; use http_body_util::{BodyExt, Full}; -use hyper::{Method, Request, body::Bytes}; +use hyper::{body::Bytes, Method, Request}; use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder}; -use hyper_util::client::legacy::{Client, connect::HttpConnector}; +use hyper_util::client::legacy::{connect::HttpConnector, Client}; use serde::Serialize; -use serde_json::{Value, json}; +use serde_json::{json, Value}; use uuid::Uuid; use crate::{ constants::*, errors::HyperliquidError, + providers::order_tracker::{OrderStatus, OrderTracker, TrackedOrder}, signers::{HyperliquidSignature, HyperliquidSigner}, types::{ actions::*, eip712::HyperliquidAction, requests::*, responses::ExchangeResponseStatus, Symbol, }, - providers::order_tracker::{OrderTracker, OrderStatus, TrackedOrder}, }; type Result = std::result::Result; @@ -28,7 +28,7 @@ type Result = std::result::Result; /// Format a float for use in API requests /// Formats to 8 decimal places and removes trailing zeros fn format_float_string(value: f64) -> String { - let mut x = format!("{:.8}", value); + let mut x = format!("{value:.8}"); while x.ends_with('0') { x.pop(); } @@ -269,23 +269,24 @@ impl RawExchangeProvider { // Auto-generate CLOID if tracking is enabled and order doesn't have one let mut order = order.clone(); let cloid = if let Some(tracker) = &self.order_tracker { - let cloid = order.cloid + let cloid = order + .cloid .as_ref() .and_then(|c| Uuid::parse_str(c).ok()) .unwrap_or_else(Uuid::new_v4); - + // Ensure the order has a cloid if order.cloid.is_none() { order = order.with_cloid(Some(cloid)); } - + // Track the order let timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs(); tracker.track_order(cloid, order.clone(), timestamp); - + Some(cloid) } else { order.cloid.as_ref().and_then(|c| Uuid::parse_str(c).ok()) @@ -301,7 +302,7 @@ impl RawExchangeProvider { }; let result = self.send_l1_action("order", &bulk_order).await; - + // Update tracking status based on result if let Some(tracker) = &self.order_tracker { if let Some(cloid) = cloid { @@ -310,20 +311,20 @@ impl RawExchangeProvider { tracker.update_order_status( &cloid, OrderStatus::Submitted, - Some(response.clone()) + Some(response.clone()), ); } Err(e) => { tracker.update_order_status( &cloid, OrderStatus::Failed(e.to_string()), - None + None, ); } } } } - + result } @@ -337,23 +338,24 @@ impl RawExchangeProvider { // Auto-generate CLOID if tracking is enabled and order doesn't have one let mut order = order.clone(); let cloid = if let Some(tracker) = &self.order_tracker { - let cloid = order.cloid + let cloid = order + .cloid .as_ref() .and_then(|c| Uuid::parse_str(c).ok()) .unwrap_or_else(Uuid::new_v4); - + // Ensure the order has a cloid if order.cloid.is_none() { order = order.with_cloid(Some(cloid)); } - + // Track the order let timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs(); tracker.track_order(cloid, order.clone(), timestamp); - + Some(cloid) } else { order.cloid.as_ref().and_then(|c| Uuid::parse_str(c).ok()) @@ -369,7 +371,7 @@ impl RawExchangeProvider { }; let result = self.send_l1_action("order", &bulk_order).await; - + // Update tracking status based on result if let Some(tracker) = &self.order_tracker { if let Some(cloid) = cloid { @@ -378,20 +380,20 @@ impl RawExchangeProvider { tracker.update_order_status( &cloid, OrderStatus::Submitted, - Some(response.clone()) + Some(response.clone()), ); } Err(e) => { tracker.update_order_status( &cloid, OrderStatus::Failed(e.to_string()), - None + None, ); } } } } - + result } @@ -583,7 +585,7 @@ impl RawExchangeProvider { let action = UsdSend { signature_chain_id: chain_id, hyperliquid_chain: chain.to_string(), - destination: format!("{:#x}", destination), + destination: format!("{destination:#x}"), amount: amount.to_string(), time: Self::current_nonce(), }; @@ -606,7 +608,7 @@ impl RawExchangeProvider { let action = Withdraw { signature_chain_id: chain_id, hyperliquid_chain: chain.to_string(), - destination: format!("{:#x}", destination), + destination: format!("{destination:#x}"), amount: amount.to_string(), time: Self::current_nonce(), }; @@ -631,7 +633,7 @@ impl RawExchangeProvider { let action = SpotSend { signature_chain_id: chain_id, hyperliquid_chain: chain.to_string(), - destination: format!("{:#x}", destination), + destination: format!("{destination:#x}"), token: symbol.as_str().to_string(), amount: amount.to_string(), time: Self::current_nonce(), @@ -662,25 +664,27 @@ impl RawExchangeProvider { self.send_user_action(&action).await } - + /// Approve a new agent, generating a random key like the original SDK /// Returns (private_key_hex, response) pub async fn approve_agent_new(&self) -> Result<(String, ExchangeResponseStatus)> { - use rand::Rng; - use alloy::signers::local::PrivateKeySigner; use alloy::primitives::B256; - + use alloy::signers::local::PrivateKeySigner; + use rand::Rng; + // Generate random key let mut rng = rand::thread_rng(); let mut key_bytes = [0u8; 32]; rng.fill(&mut key_bytes); let key_hex = hex::encode(key_bytes); - + // Create a signer from the key to get the address - let signer = PrivateKeySigner::from_bytes(&B256::from(key_bytes)) - .map_err(|e| HyperliquidError::InvalidRequest(format!("Failed to create signer: {}", e)))?; + let signer = + PrivateKeySigner::from_bytes(&B256::from(key_bytes)).map_err(|e| { + HyperliquidError::InvalidRequest(format!("Failed to create signer: {e}")) + })?; let agent_address = signer.address(); - + // Get chain info let (_, _) = self.infer_network(); let chain = if self.endpoint.contains("testnet") { @@ -688,7 +692,7 @@ impl RawExchangeProvider { } else { "Mainnet" }; - + // Create the action with proper Address type let action = ApproveAgent { signature_chain_id: 421614, // Always use Arbitrum Sepolia chain ID @@ -697,10 +701,10 @@ impl RawExchangeProvider { agent_name: None, nonce: Self::current_nonce(), }; - + // Use send_user_action which handles EIP-712 signing let response = self.send_user_action(&action).await?; - + Ok((key_hex, response)) } @@ -814,13 +818,17 @@ impl RawExchangeProvider { "approveAgent" => ActionWrapper::ApproveAgent(action), "approveBuilderFee" => ActionWrapper::ApproveBuilderFee(action), "withdraw3" => ActionWrapper::Withdraw3(action), - _ => return Err(HyperliquidError::InvalidRequest(format!("Unknown action type: {}", action_type))), + _ => { + return Err(HyperliquidError::InvalidRequest(format!( + "Unknown action type: {action_type}" + ))) + } }; // NOTE: Hyperliquid uses MessagePack (rmp_serde) for action serialization // This is different from typical EVM systems that use RLP let mut bytes = rmp_serde::to_vec_named(&wrapped).map_err(|e| { - HyperliquidError::InvalidRequest(format!("Failed to serialize action: {}", e)) + HyperliquidError::InvalidRequest(format!("Failed to serialize action: {e}")) })?; bytes.extend(timestamp.to_be_bytes()); if let Some(vault) = vault_address { @@ -907,12 +915,12 @@ impl RawExchangeProvider { "ApproveBuilderFee" => "approveBuilderFee", _ => action_type, }; - + // Add type tag if let Value::Object(ref mut map) = action_value { map.insert("type".to_string(), json!(type_tag)); } - + // Send directly without L1 wrapping for user actions self.post(action_value, signature, nonce).await } @@ -923,7 +931,6 @@ impl RawExchangeProvider { signature: HyperliquidSignature, nonce: u64, ) -> Result { - // Hyperliquid expects signature as an object with r, s, v fields // not as a concatenated hex string let payload = json!({ @@ -961,7 +968,7 @@ impl RawExchangeProvider { // Always try to deserialize the response as ExchangeResponseStatus // The API returns this format even for error status codes serde_json::from_slice(&body_bytes).map_err(|e| { - // If deserialization fails and we have an error status, + // If deserialization fails and we have an error status, // return the HTTP error with the body if !status.is_success() { let body_text = String::from_utf8_lossy(&body_bytes); @@ -971,8 +978,7 @@ impl RawExchangeProvider { } } else { HyperliquidError::InvalidResponse(format!( - "Failed to parse exchange response: {}", - e + "Failed to parse exchange response: {e}" )) } }) @@ -1087,13 +1093,15 @@ impl<'a, S: HyperliquidSigner> OrderBuilder<'a, S> { let sz = self.sz.ok_or(HyperliquidError::InvalidRequest( "sz must be specified".to_string(), ))?; - + // Parse and format the prices to match API expectations - let limit_px_f64 = limit_px.parse::() - .map_err(|_| HyperliquidError::InvalidRequest("Invalid limit_px format".to_string()))?; - let sz_f64 = sz.parse::() - .map_err(|_| HyperliquidError::InvalidRequest("Invalid sz format".to_string()))?; - + let limit_px_f64 = limit_px.parse::().map_err(|_| { + HyperliquidError::InvalidRequest("Invalid limit_px format".to_string()) + })?; + let sz_f64 = sz.parse::().map_err(|_| { + HyperliquidError::InvalidRequest("Invalid sz format".to_string()) + })?; + Ok(OrderRequest { asset: self.asset, is_buy: self.is_buy.ok_or(HyperliquidError::InvalidRequest( @@ -1117,7 +1125,7 @@ impl<'a, S: HyperliquidSigner> OrderBuilder<'a, S> { } impl RawExchangeProvider { - pub fn order(&self, asset: u32) -> OrderBuilder { + pub fn order(&self, asset: u32) -> OrderBuilder<'_, S> { OrderBuilder::new(self, asset) } } @@ -1125,10 +1133,11 @@ impl RawExchangeProvider { // ==================== Managed Exchange Provider ==================== use tokio::sync::Mutex as TokioMutex; + use crate::providers::{ - agent::{AgentManager, AgentConfig, AgentWallet}, + agent::{AgentConfig, AgentManager, AgentWallet}, + batcher::{BatchConfig, OrderBatcher, OrderHandle}, nonce::NonceManager, - batcher::{OrderBatcher, BatchConfig, OrderHandle}, }; /// Configuration for managed exchange provider @@ -1138,15 +1147,15 @@ pub struct ManagedExchangeConfig { pub batch_orders: bool, /// Batch configuration pub batch_config: BatchConfig, - + /// Agent lifecycle management pub auto_rotate_agents: bool, /// Agent configuration pub agent_config: AgentConfig, - + /// Nonce isolation per subaccount pub isolate_subaccount_nonces: bool, - + /// Safety features pub prevent_agent_address_queries: bool, pub warn_on_high_nonce_velocity: bool, @@ -1170,17 +1179,17 @@ impl Default for ManagedExchangeConfig { pub struct ManagedExchangeProvider { /// Inner raw provider inner: Arc>, - + /// Agent manager for lifecycle agent_manager: Option>>, - + /// Nonce tracking nonce_manager: Arc, - + /// Order batching batcher: Option>, batcher_handle: Option>>>>, - + /// Configuration config: ManagedExchangeConfig, } @@ -1190,7 +1199,7 @@ impl ManagedExchangeProvider { pub fn builder(signer: S) -> ManagedExchangeProviderBuilder { ManagedExchangeProviderBuilder::new(signer) } - + /// Create with default configuration for mainnet pub async fn mainnet(signer: S) -> Result> { Self::builder(signer) @@ -1198,7 +1207,7 @@ impl ManagedExchangeProvider { .build() .await } - + /// Create with default configuration for testnet pub async fn testnet(signer: S) -> Result> { Self::builder(signer) @@ -1206,7 +1215,7 @@ impl ManagedExchangeProvider { .build() .await } - + /// Place an order with all managed features pub async fn place_order(&self, order: &OrderRequest) -> Result { // Get nonce based on configuration @@ -1229,18 +1238,18 @@ impl ManagedExchangeProvider { self.nonce_manager.next_nonce(None) } }; - + // Check nonce validity if !NonceManager::is_valid_nonce(nonce) { return Err(HyperliquidError::InvalidRequest( - "Generated nonce is outside valid time bounds".to_string() + "Generated nonce is outside valid time bounds".to_string(), )); } - + // For now, we always use the main provider // In a full implementation, we'd need to handle agent signing differently // This is a limitation of the current design where we can't easily swap signers - + // Batch or direct execution if self.config.batch_orders { if let Some(batcher) = &self.batcher { @@ -1256,17 +1265,20 @@ impl ManagedExchangeProvider { Ok(OrderHandle::Immediate(Ok(result))) } } - + /// Place order immediately, bypassing batch - pub async fn place_order_immediate(&self, order: &OrderRequest) -> Result { + pub async fn place_order_immediate( + &self, + order: &OrderRequest, + ) -> Result { self.inner.place_order(order).await } - + /// Access the raw provider for advanced usage pub fn raw(&self) -> &RawExchangeProvider { &self.inner } - + /// Get current agent status pub async fn get_agent_status(&self) -> Option> { if let Some(agent_mgr) = &self.agent_manager { @@ -1275,7 +1287,7 @@ impl ManagedExchangeProvider { None } } - + /// Shutdown the managed provider cleanly pub async fn shutdown(self: Arc) { // Stop batcher if running @@ -1308,52 +1320,52 @@ impl ManagedExchangeProviderBuilder { builder_address: None, } } - + /// Set network pub fn with_network(mut self, network: Network) -> Self { self.network = network; self } - + /// Enable automatic order batching pub fn with_auto_batching(mut self, interval: std::time::Duration) -> Self { self.config.batch_orders = true; self.config.batch_config.interval = interval; self } - + /// Configure agent rotation pub fn with_agent_rotation(mut self, ttl: std::time::Duration) -> Self { self.config.auto_rotate_agents = true; self.config.agent_config.ttl = ttl; self } - + /// Start with an agent pub fn with_agent(mut self, name: Option) -> Self { self.initial_agent = name; self.config.auto_rotate_agents = true; self } - + /// Set vault address pub fn with_vault(mut self, vault: Address) -> Self { self.vault_address = Some(vault); self } - + /// Set builder address pub fn with_builder(mut self, builder: Address) -> Self { self.builder_address = Some(builder); self } - + /// Disable agent rotation pub fn without_agent_rotation(mut self) -> Self { self.config.auto_rotate_agents = false; self } - + /// Build the provider pub async fn build(self) -> Result>> { // Create raw provider @@ -1377,85 +1389,107 @@ impl ManagedExchangeProviderBuilder { } } }; - + let inner = Arc::new(raw); - + // Create agent manager if needed let agent_manager = if self.config.auto_rotate_agents { Some(Arc::new(AgentManager::new( self.signer, self.config.agent_config.clone(), - self.network + self.network, ))) } else { None }; - + // Create nonce manager - let nonce_manager = Arc::new(NonceManager::new( - self.config.isolate_subaccount_nonces - )); - + let nonce_manager = + Arc::new(NonceManager::new(self.config.isolate_subaccount_nonces)); + // Create batcher if needed let (batcher, batcher_handle) = if self.config.batch_orders { let (batcher, handle) = OrderBatcher::new(self.config.batch_config.clone()); let batcher = Arc::new(batcher); - + // Spawn batch processing task let inner_clone = inner.clone(); let inner_clone2 = inner.clone(); let handle_future = tokio::spawn(async move { - handle.run( - move |orders| { - let inner = inner_clone.clone(); - Box::pin(async move { - // Execute batch - let order_requests: Vec = orders.iter() - .map(|o| o.order.clone()) - .collect(); - - match inner.bulk_orders(order_requests).await { - Ok(status) => { - // Return same status for all orders in batch - orders.iter().map(|_| Ok(status.clone())).collect() - } - Err(e) => { - // Return same error for all orders in batch - let err_str = e.to_string(); - orders.iter().map(|_| Err(HyperliquidError::InvalidResponse(err_str.clone()))).collect() + handle + .run( + move |orders| { + let inner = inner_clone.clone(); + Box::pin(async move { + // Execute batch + let order_requests: Vec = + orders.iter().map(|o| o.order.clone()).collect(); + + match inner.bulk_orders(order_requests).await { + Ok(status) => { + // Return same status for all orders in batch + orders + .iter() + .map(|_| Ok(status.clone())) + .collect() + } + Err(e) => { + // Return same error for all orders in batch + let err_str = e.to_string(); + orders + .iter() + .map(|_| { + Err(HyperliquidError::InvalidResponse( + err_str.clone(), + )) + }) + .collect() + } } - } - }) - }, - move |cancels| { - let inner = inner_clone2.clone(); - Box::pin(async move { - // Execute cancel batch - let cancel_requests: Vec = cancels.iter() - .map(|c| c.cancel.clone()) - .collect(); - - match inner.bulk_cancel(cancel_requests).await { - Ok(status) => { - // Return same status for all cancels in batch - cancels.iter().map(|_| Ok(status.clone())).collect() + }) + }, + move |cancels| { + let inner = inner_clone2.clone(); + Box::pin(async move { + // Execute cancel batch + let cancel_requests: Vec = + cancels.iter().map(|c| c.cancel.clone()).collect(); + + match inner.bulk_cancel(cancel_requests).await { + Ok(status) => { + // Return same status for all cancels in batch + cancels + .iter() + .map(|_| Ok(status.clone())) + .collect() + } + Err(e) => { + // Return same error for all cancels in batch + let err_str = e.to_string(); + cancels + .iter() + .map(|_| { + Err(HyperliquidError::InvalidResponse( + err_str.clone(), + )) + }) + .collect() + } } - Err(e) => { - // Return same error for all cancels in batch - let err_str = e.to_string(); - cancels.iter().map(|_| Err(HyperliquidError::InvalidResponse(err_str.clone()))).collect() - } - } - }) - } - ).await; + }) + }, + ) + .await; }); - - (Some(batcher), Some(Arc::new(TokioMutex::new(Some(handle_future))))) + + ( + Some(batcher), + Some(Arc::new(TokioMutex::new(Some(handle_future)))), + ) } else { (None, None) }; - + let provider = Arc::new(ManagedExchangeProvider { inner, agent_manager, @@ -1464,14 +1498,14 @@ impl ManagedExchangeProviderBuilder { batcher_handle, config: self.config, }); - + // Initialize agent if requested if let Some(agent_name) = self.initial_agent { if let Some(agent_mgr) = &provider.agent_manager { agent_mgr.get_or_rotate_agent(&agent_name).await?; } } - + Ok(provider) } } diff --git a/src/providers/info.rs b/src/providers/info.rs index 1de7b91..97d95ac 100644 --- a/src/providers/info.rs +++ b/src/providers/info.rs @@ -7,7 +7,7 @@ use http::{Method, Request}; use http_body_util::{BodyExt, Full}; use hyper::body::Bytes; use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder}; -use hyper_util::client::legacy::{Client, connect::HttpConnector}; +use hyper_util::client::legacy::{connect::HttpConnector, Client}; use hyper_util::rt::TokioExecutor; use serde_json::json; @@ -292,7 +292,7 @@ impl InfoProvider { // ==================== Builder Pattern Methods ==================== - pub fn candles(&self, coin: impl Into) -> CandlesRequestBuilder { + pub fn candles(&self, coin: impl Into) -> CandlesRequestBuilder<'_> { CandlesRequestBuilder { provider: self, coin: coin.into(), @@ -302,7 +302,7 @@ impl InfoProvider { } } - pub fn funding_history(&self, coin: impl Into) -> FundingHistoryBuilder { + pub fn funding_history(&self, coin: impl Into) -> FundingHistoryBuilder<'_> { FundingHistoryBuilder { provider: self, coin: coin.into(), diff --git a/src/providers/mod.rs b/src/providers/mod.rs index dc70879..12ac559 100644 --- a/src/providers/mod.rs +++ b/src/providers/mod.rs @@ -1,26 +1,23 @@ +pub mod agent; +pub mod batcher; pub mod exchange; pub mod info; -pub mod websocket; pub mod nonce; -pub mod agent; -pub mod batcher; pub mod order_tracker; +pub mod websocket; // Raw providers (backwards compatibility) +pub use batcher::OrderHandle; +// Common types +pub use exchange::OrderBuilder; pub use exchange::RawExchangeProvider as ExchangeProvider; -pub use info::InfoProvider; -pub use websocket::RawWsProvider as WsProvider; - // Explicit raw exports pub use exchange::RawExchangeProvider; -pub use websocket::RawWsProvider; - // Managed providers -pub use exchange::{ManagedExchangeProvider, ManagedExchangeConfig}; -pub use websocket::{ManagedWsProvider, WsConfig}; - -// Common types -pub use exchange::OrderBuilder; +pub use exchange::{ManagedExchangeConfig, ManagedExchangeProvider}; +pub use info::InfoProvider; pub use info::RateLimiter; +pub use websocket::RawWsProvider as WsProvider; +pub use websocket::RawWsProvider; pub use websocket::SubscriptionId; -pub use batcher::OrderHandle; +pub use websocket::{ManagedWsProvider, WsConfig}; diff --git a/src/providers/nonce.rs b/src/providers/nonce.rs index ac1b448..2935bca 100644 --- a/src/providers/nonce.rs +++ b/src/providers/nonce.rs @@ -2,11 +2,12 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{SystemTime, UNIX_EPOCH}; + use alloy::primitives::Address; use dashmap::DashMap; /// Manages nonces for Hyperliquid's sliding window system -/// +/// /// Hyperliquid stores the 100 highest nonces per address and requires: /// - New nonce > smallest in the set /// - Never reuse a nonce @@ -30,17 +31,16 @@ impl NonceManager { isolate_per_address, } } - + /// Get the next nonce for an optional address pub fn next_nonce(&self, address: Option
) -> u64 { let now = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_millis() as u64; - + // Get counter increment - let counter = if self.isolate_per_address && address.is_some() { - let addr = address.unwrap(); + let counter = if let (true, Some(addr)) = (self.isolate_per_address, address) { self.counters .entry(addr) .or_insert_with(|| AtomicU64::new(0)) @@ -48,19 +48,19 @@ impl NonceManager { } else { self.global_counter.fetch_add(1, Ordering::Relaxed) }; - + // Add sub-millisecond offset to ensure uniqueness // This handles rapid-fire orders within the same millisecond now.saturating_add(counter % 1000) } - + /// Reset counter for a specific address (useful after agent rotation) pub fn reset_address(&self, address: Address) { if let Some(counter) = self.counters.get_mut(&address) { counter.store(0, Ordering::Relaxed); } } - + /// Get current counter value for monitoring pub fn get_counter(&self, address: Option
) -> u64 { if let Some(addr) = address { @@ -76,18 +76,18 @@ impl NonceManager { self.global_counter.load(Ordering::Relaxed) } } - + /// Check if a nonce is within valid time bounds pub fn is_valid_nonce(nonce: u64) -> bool { let now = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_millis() as u64; - + // Must be within (T - 2 days, T + 1 day) const TWO_DAYS_MS: u64 = 2 * 24 * 60 * 60 * 1000; const ONE_DAY_MS: u64 = 24 * 60 * 60 * 1000; - + nonce > now.saturating_sub(TWO_DAYS_MS) && nonce < now.saturating_add(ONE_DAY_MS) } } @@ -95,56 +95,56 @@ impl NonceManager { #[cfg(test)] mod tests { use super::*; - + #[test] fn test_nonce_uniqueness() { let manager = NonceManager::new(false); - + let nonce1 = manager.next_nonce(None); let nonce2 = manager.next_nonce(None); - + assert_ne!(nonce1, nonce2); assert!(nonce2 > nonce1); } - + #[test] fn test_address_isolation() { let manager = NonceManager::new(true); let addr1 = Address::new([1u8; 20]); let addr2 = Address::new([2u8; 20]); - + // Get nonces for different addresses let n1_1 = manager.next_nonce(Some(addr1)); let n2_1 = manager.next_nonce(Some(addr2)); let n1_2 = manager.next_nonce(Some(addr1)); let n2_2 = manager.next_nonce(Some(addr2)); - + // Each address should have independent counters assert!(n1_2 > n1_1); assert!(n2_2 > n2_1); - + // Counters should be independent assert_eq!(manager.get_counter(Some(addr1)), 2); assert_eq!(manager.get_counter(Some(addr2)), 2); } - + #[test] fn test_nonce_validity() { let now = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_millis() as u64; - + // Valid: current time assert!(NonceManager::is_valid_nonce(now)); - + // Valid: 1 day ago assert!(NonceManager::is_valid_nonce(now - 24 * 60 * 60 * 1000)); - + // Invalid: 3 days ago assert!(!NonceManager::is_valid_nonce(now - 3 * 24 * 60 * 60 * 1000)); - + // Invalid: 2 days in future assert!(!NonceManager::is_valid_nonce(now + 2 * 24 * 60 * 60 * 1000)); } -} \ No newline at end of file +} diff --git a/src/providers/order_tracker.rs b/src/providers/order_tracker.rs index d8f719e..533c213 100644 --- a/src/providers/order_tracker.rs +++ b/src/providers/order_tracker.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::sync::{Arc, RwLock}; + use uuid::Uuid; use crate::types::requests::OrderRequest; @@ -42,7 +43,7 @@ impl OrderTracker { status: OrderStatus::Pending, response: None, }; - + let mut orders = self.orders.write().unwrap(); orders.insert(cloid, tracked); } @@ -126,4 +127,4 @@ impl Default for OrderTracker { fn default() -> Self { Self::new() } -} \ No newline at end of file +} diff --git a/src/providers/websocket.rs b/src/providers/websocket.rs index a18fb99..09af0d5 100644 --- a/src/providers/websocket.rs +++ b/src/providers/websocket.rs @@ -1,22 +1,22 @@ //! WebSocket provider for real-time market data and user events use std::sync::{ - Arc, atomic::{AtomicU32, Ordering}, + Arc, }; use dashmap::DashMap; -use fastwebsockets::{Frame, OpCode, Role, WebSocket, handshake}; +use fastwebsockets::{handshake, Frame, OpCode, Role, WebSocket}; use http_body_util::Empty; -use hyper::{Request, StatusCode, body::Bytes, header, upgrade::Upgraded}; +use hyper::{body::Bytes, header, upgrade::Upgraded, Request, StatusCode}; use hyper_util::rt::TokioIo; use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; use crate::{ - Network, errors::HyperliquidError, types::ws::{Message, Subscription, WsRequest}, types::Symbol, + Network, }; pub type SubscriptionId = u32; @@ -81,13 +81,13 @@ impl RawWsProvider { let uri = url .parse::() - .map_err(|e| HyperliquidError::WebSocket(format!("Invalid URL: {}", e)))?; + .map_err(|e| HyperliquidError::WebSocket(format!("Invalid URL: {e}")))?; // Create HTTPS connector with proper configuration let https = HttpsConnectorBuilder::new() .with_native_roots() .map_err(|e| { - HyperliquidError::WebSocket(format!("Failed to load native roots: {}", e)) + HyperliquidError::WebSocket(format!("Failed to load native roots: {e}")) })? .https_only() .enable_http1() @@ -111,11 +111,11 @@ impl RawWsProvider { .header(header::SEC_WEBSOCKET_KEY, handshake::generate_key()) .body(Empty::new()) .map_err(|e| { - HyperliquidError::WebSocket(format!("Request build failed: {}", e)) + HyperliquidError::WebSocket(format!("Request build failed: {e}")) })?; let res = client.request(req).await.map_err(|e| { - HyperliquidError::WebSocket(format!("HTTP request failed: {}", e)) + HyperliquidError::WebSocket(format!("HTTP request failed: {e}")) })?; if res.status() != StatusCode::SWITCHING_PROTOCOLS { @@ -127,7 +127,7 @@ impl RawWsProvider { let upgraded = hyper::upgrade::on(res) .await - .map_err(|e| HyperliquidError::WebSocket(format!("Upgrade failed: {}", e)))?; + .map_err(|e| HyperliquidError::WebSocket(format!("Upgrade failed: {e}")))?; Ok(WebSocket::after_handshake( TokioIo::new(upgraded), @@ -184,7 +184,7 @@ impl RawWsProvider { ws.write_frame(Frame::text(payload.into_bytes().into())) .await .map_err(|e| { - HyperliquidError::WebSocket(format!("Failed to send subscription: {}", e)) + HyperliquidError::WebSocket(format!("Failed to send subscription: {e}")) })?; // Create channel for this subscription @@ -215,8 +215,7 @@ impl RawWsProvider { .await .map_err(|e| { HyperliquidError::WebSocket(format!( - "Failed to send unsubscribe: {}", - e + "Failed to send unsubscribe: {e}" )) })?; } @@ -238,7 +237,7 @@ impl RawWsProvider { ws.write_frame(Frame::text(payload.into_bytes().into())) .await .map_err(|e| { - HyperliquidError::WebSocket(format!("Failed to send ping: {}", e)) + HyperliquidError::WebSocket(format!("Failed to send ping: {e}")) })?; Ok(()) @@ -314,6 +313,7 @@ impl Drop for RawWsProvider { // ==================== Enhanced WebSocket Provider ==================== use std::time::{Duration, Instant}; + use tokio::sync::Mutex; use tokio::time::sleep; @@ -355,7 +355,7 @@ struct ManagedSubscription { subscription: Subscription, tx: UnboundedSender, #[allow(dead_code)] - created_at: Instant, // For future use: subscription age tracking + created_at: Instant, // For future use: subscription age tracking } /// Managed WebSocket provider with automatic keep-alive and reconnection @@ -375,10 +375,13 @@ pub struct ManagedWsProvider { impl ManagedWsProvider { /// Connect with custom configuration - pub async fn connect(network: Network, config: WsConfig) -> Result, HyperliquidError> { + pub async fn connect( + network: Network, + config: WsConfig, + ) -> Result, HyperliquidError> { // Create initial connection let raw_provider = RawWsProvider::connect(network).await?; - + let provider = Arc::new(Self { network, inner: Arc::new(Mutex::new(Some(raw_provider))), @@ -386,7 +389,7 @@ impl ManagedWsProvider { config, next_id: Arc::new(AtomicU32::new(1)), }); - + // Start keep-alive task if configured if provider.config.ping_interval > Duration::ZERO { let provider_clone = provider.clone(); @@ -394,7 +397,7 @@ impl ManagedWsProvider { provider_clone.keepalive_loop().await; }); } - + // Start reconnection task if configured if provider.config.auto_reconnect { let provider_clone = provider.clone(); @@ -402,26 +405,31 @@ impl ManagedWsProvider { provider_clone.reconnect_loop().await; }); } - + Ok(provider) } - + /// Connect with default configuration - pub async fn connect_with_defaults(network: Network) -> Result, HyperliquidError> { + pub async fn connect_with_defaults( + network: Network, + ) -> Result, HyperliquidError> { Self::connect(network, WsConfig::default()).await } - + /// Check if currently connected pub async fn is_connected(&self) -> bool { let inner = self.inner.lock().await; inner.as_ref().map(|p| p.is_connected()).unwrap_or(false) } - + /// Get mutable access to the raw provider - pub async fn raw(&self) -> Result>, HyperliquidError> { + pub async fn raw( + &self, + ) -> Result>, HyperliquidError> + { Ok(self.inner.lock().await) } - + /// Subscribe to L2 order book updates with automatic replay on reconnect pub async fn subscribe_l2_book( &self, @@ -433,7 +441,7 @@ impl ManagedWsProvider { }; self.subscribe(subscription).await } - + /// Subscribe to trades with automatic replay on reconnect pub async fn subscribe_trades( &self, @@ -445,33 +453,33 @@ impl ManagedWsProvider { }; self.subscribe(subscription).await } - + /// Subscribe to all mid prices with automatic replay on reconnect pub async fn subscribe_all_mids( &self, ) -> Result<(SubscriptionId, UnboundedReceiver), HyperliquidError> { self.subscribe(Subscription::AllMids).await } - + /// Generic subscription with automatic replay on reconnect pub async fn subscribe( &self, subscription: Subscription, ) -> Result<(SubscriptionId, UnboundedReceiver), HyperliquidError> { let mut inner = self.inner.lock().await; - let raw_provider = inner.as_mut().ok_or_else(|| { - HyperliquidError::WebSocket("Not connected".to_string()) - })?; - + let raw_provider = inner + .as_mut() + .ok_or_else(|| HyperliquidError::WebSocket("Not connected".to_string()))?; + // Subscribe using the raw provider let (_raw_id, rx) = raw_provider.subscribe(subscription.clone()).await?; - + // Generate our own ID for tracking let managed_id = self.next_id.fetch_add(1, Ordering::SeqCst); - + // Create channel for managed subscription let (tx, managed_rx) = mpsc::unbounded_channel(); - + // Store subscription for replay self.subscriptions.insert( managed_id, @@ -481,7 +489,7 @@ impl ManagedWsProvider { created_at: Instant::now(), }, ); - + // Forward messages from raw to managed let subscriptions = self.subscriptions.clone(); tokio::spawn(async move { @@ -494,38 +502,38 @@ impl ManagedWsProvider { // Clean up when channel closes subscriptions.remove(&managed_id); }); - + Ok((managed_id, managed_rx)) } - + /// Unsubscribe and stop automatic replay pub async fn unsubscribe(&self, id: SubscriptionId) -> Result<(), HyperliquidError> { // Remove from our tracking self.subscriptions.remove(&id); - + // Note: We can't unsubscribe from the raw provider because we don't // track the mapping between our IDs and raw IDs. This is fine since // the subscription will be cleaned up on reconnect anyway. - + Ok(()) } - + /// Start reading messages (must be called after connecting) pub async fn start_reading(&self) -> Result<(), HyperliquidError> { let mut inner = self.inner.lock().await; - let raw_provider = inner.as_mut().ok_or_else(|| { - HyperliquidError::WebSocket("Not connected".to_string()) - })?; + let raw_provider = inner + .as_mut() + .ok_or_else(|| HyperliquidError::WebSocket("Not connected".to_string()))?; raw_provider.start_reading().await } - + // Keep-alive loop async fn keepalive_loop(self: Arc) { let mut interval = tokio::time::interval(self.config.ping_interval); - + loop { interval.tick().await; - + let mut inner = self.inner.lock().await; if let Some(provider) = inner.as_mut() { if provider.ping().await.is_err() { @@ -536,60 +544,64 @@ impl ManagedWsProvider { } } } - + // Reconnection loop async fn reconnect_loop(self: Arc) { let mut reconnect_attempts = 0u32; let mut current_delay = self.config.reconnect_delay; - + loop { // Wait a bit before checking sleep(Duration::from_secs(1)).await; - + // Check if we need to reconnect if !self.is_connected().await { // Check max attempts if let Some(max) = self.config.max_reconnect_attempts { if reconnect_attempts >= max { - eprintln!("Max reconnection attempts ({}) reached", max); + eprintln!("Max reconnection attempts ({max}) reached"); break; } } - + println!("Attempting reconnection #{}", reconnect_attempts + 1); - + match RawWsProvider::connect(self.network).await { Ok(mut new_provider) => { // Start reading before replaying subscriptions if let Err(e) = new_provider.start_reading().await { - eprintln!("Failed to start reading after reconnect: {}", e); + eprintln!("Failed to start reading after reconnect: {e}"); continue; } - + // Replay all subscriptions let mut replay_errors = 0; for entry in self.subscriptions.iter() { - if let Err(e) = new_provider.subscribe(entry.subscription.clone()).await { - eprintln!("Failed to replay subscription: {}", e); + if let Err(e) = + new_provider.subscribe(entry.subscription.clone()).await + { + eprintln!("Failed to replay subscription: {e}"); replay_errors += 1; } } - + if replay_errors == 0 { // Success! Reset counters *self.inner.lock().await = Some(new_provider); reconnect_attempts = 0; current_delay = self.config.reconnect_delay; - println!("Reconnection successful, {} subscriptions replayed", - self.subscriptions.len()); + println!( + "Reconnection successful, {} subscriptions replayed", + self.subscriptions.len() + ); } } Err(e) => { - eprintln!("Reconnection failed: {}", e); - + eprintln!("Reconnection failed: {e}"); + // Wait before next attempt sleep(current_delay).await; - + // Update delay for next attempt reconnect_attempts += 1; if self.config.exponential_backoff { @@ -603,7 +615,7 @@ impl ManagedWsProvider { } } } - + // Handle disconnection async fn handle_disconnect(&self) { *self.inner.lock().await = None; diff --git a/src/signers/mod.rs b/src/signers/mod.rs index 6d7abfb..660aeed 100644 --- a/src/signers/mod.rs +++ b/src/signers/mod.rs @@ -1,5 +1,5 @@ -pub mod signer; pub mod privy; +pub mod signer; +pub use privy::{PrivyError, PrivySigner}; pub use signer::{AlloySigner, HyperliquidSignature, HyperliquidSigner, SignerError}; -pub use privy::{PrivySigner, PrivyError}; diff --git a/src/signers/privy.rs b/src/signers/privy.rs index deeafdf..4704f77 100644 --- a/src/signers/privy.rs +++ b/src/signers/privy.rs @@ -1,12 +1,11 @@ -use alloy::{ - primitives::{Address, B256}, -}; +use std::{error::Error, fmt, sync::Arc}; + +use alloy::primitives::{Address, B256}; use async_trait::async_trait; use base64::{engine::general_purpose, Engine as _}; use reqwest::{Client, StatusCode}; use serde::Deserialize; use serde_json::{json, Value}; -use std::{error::Error, fmt, sync::Arc}; use crate::signers::{HyperliquidSignature, HyperliquidSigner, SignerError}; @@ -75,9 +74,9 @@ impl PrivySigner { .map_err(|_| PrivyError::MissingEnvVar("PRIVY_APP_ID".to_string()))?; let secret = std::env::var("PRIVY_SECRET") .map_err(|_| PrivyError::MissingEnvVar("PRIVY_SECRET".to_string()))?; - + let creds = general_purpose::STANDARD.encode(format!("{app_id}:{secret}")); - + Ok(Self { client: Arc::new(Client::builder().build()?), wallet_id, @@ -95,7 +94,7 @@ impl PrivySigner { secret: String, ) -> Result { let creds = general_purpose::STANDARD.encode(format!("{app_id}:{secret}")); - + Ok(Self { client: Arc::new(Client::builder().build()?), wallet_id, @@ -126,7 +125,7 @@ impl PrivySigner { let txt = resp.text().await.unwrap_or_default(); return Err(PrivyError::Api(status, txt)); } - + Ok(resp.json::().await?) } } @@ -146,7 +145,7 @@ impl HyperliquidSigner for PrivySigner { async fn sign_hash(&self, hash: B256) -> Result { // Convert hash to hex string with 0x prefix let hash_hex = format!("0x{}", hex::encode(hash)); - + // Use secp256k1_sign for raw hash signing let body = json!({ "method": "secp256k1_sign", @@ -155,38 +154,39 @@ impl HyperliquidSigner for PrivySigner { } }); - let resp: SignResponse = self.rpc(body) + let resp: SignResponse = self + .rpc(body) .await .map_err(|e| SignerError::SigningFailed(e.to_string()))?; - + // Parse the signature string (0x-prefixed hex) - let sig_hex = resp.data.signature + let sig_hex = resp + .data + .signature .strip_prefix("0x") .unwrap_or(&resp.data.signature); - - let sig_bytes = hex::decode(sig_hex) - .map_err(|e| SignerError::SigningFailed(format!("Invalid hex signature: {}", e)))?; - + + let sig_bytes = hex::decode(sig_hex).map_err(|e| { + SignerError::SigningFailed(format!("Invalid hex signature: {e}")) + })?; + if sig_bytes.len() != 65 { - return Err(SignerError::SigningFailed( - format!("Invalid signature length: expected 65, got {}", sig_bytes.len()) - )); + return Err(SignerError::SigningFailed(format!( + "Invalid signature length: expected 65, got {}", + sig_bytes.len() + ))); } - + // Extract r, s, v from the signature bytes let mut r_bytes = [0u8; 32]; let mut s_bytes = [0u8; 32]; r_bytes.copy_from_slice(&sig_bytes[0..32]); s_bytes.copy_from_slice(&sig_bytes[32..64]); let v = sig_bytes[64]; - + // Convert v to EIP-155 format if needed - let v = if v < 27 { - v + 27 - } else { - v - }; - + let v = if v < 27 { v + 27 } else { v }; + Ok(HyperliquidSignature { r: alloy::primitives::U256::from_be_bytes(r_bytes), s: alloy::primitives::U256::from_be_bytes(s_bytes), @@ -201,9 +201,10 @@ impl HyperliquidSigner for PrivySigner { #[cfg(test)] mod tests { - use super::*; use alloy::primitives::address; + use super::*; + #[test] fn test_privy_signer_creation() { // This test would require actual Privy credentials @@ -212,7 +213,7 @@ mod tests { "test-wallet-id".to_string(), address!("0000000000000000000000000000000000000000"), ); - + match result { Err(PrivyError::MissingEnvVar(var)) => { assert!(var == "PRIVY_APP_ID" || var == "PRIVY_SECRET"); @@ -220,4 +221,4 @@ mod tests { _ => panic!("Expected MissingEnvVar error"), } } -} \ No newline at end of file +} diff --git a/src/signers/signer.rs b/src/signers/signer.rs index fbbd144..c1d78ee 100644 --- a/src/signers/signer.rs +++ b/src/signers/signer.rs @@ -1,5 +1,5 @@ use alloy::{ - primitives::{Address, B256, Parity, U256}, + primitives::{Address, Parity, B256, U256}, signers::Signer, }; use async_trait::async_trait; diff --git a/src/types/actions.rs b/src/types/actions.rs index 0dabe0b..de73b5d 100644 --- a/src/types/actions.rs +++ b/src/types/actions.rs @@ -1,10 +1,10 @@ use alloy::primitives::B256; use serde; +use crate::l1_action; use crate::types::requests::{ BuilderInfo, CancelRequest, CancelRequestCloid, ModifyRequest, OrderRequest, }; -use crate::l1_action; // User Actions (with HyperliquidTransaction: prefix) @@ -21,7 +21,8 @@ pub struct UsdSend { } impl crate::types::eip712::HyperliquidAction for UsdSend { - const TYPE_STRING: &'static str = "UsdSend(string hyperliquidChain,string destination,string amount,uint64 time)"; + const TYPE_STRING: &'static str = + "UsdSend(string hyperliquidChain,string destination,string amount,uint64 time)"; const USE_PREFIX: bool = true; fn chain_id(&self) -> Option { @@ -53,7 +54,8 @@ pub struct Withdraw { } impl crate::types::eip712::HyperliquidAction for Withdraw { - const TYPE_STRING: &'static str = "Withdraw(string hyperliquidChain,string destination,string amount,uint64 time)"; + const TYPE_STRING: &'static str = + "Withdraw(string hyperliquidChain,string destination,string amount,uint64 time)"; const USE_PREFIX: bool = true; fn chain_id(&self) -> Option { @@ -119,19 +121,25 @@ pub struct ApproveAgent { pub nonce: u64, } -pub(crate) fn serialize_address(address: &alloy::primitives::Address, serializer: S) -> Result +pub(crate) fn serialize_address( + address: &alloy::primitives::Address, + serializer: S, +) -> Result where S: serde::Serializer, { - serializer.serialize_str(&format!("{:#x}", address)) + serializer.serialize_str(&format!("{address:#x}")) } -pub(crate) fn serialize_chain_id(chain_id: &u64, serializer: S) -> Result +pub(crate) fn serialize_chain_id( + chain_id: &u64, + serializer: S, +) -> Result where S: serde::Serializer, { // Serialize as hex string to match SDK format - serializer.serialize_str(&format!("{:#x}", chain_id)) + serializer.serialize_str(&format!("{chain_id:#x}")) } impl crate::types::eip712::HyperliquidAction for ApproveAgent { diff --git a/src/types/eip712.rs b/src/types/eip712.rs index 8eb2f46..59f8db7 100644 --- a/src/types/eip712.rs +++ b/src/types/eip712.rs @@ -1,4 +1,4 @@ -use alloy::primitives::{Address, B256, U256, keccak256}; +use alloy::primitives::{keccak256, Address, B256, U256}; use alloy::sol_types::Eip712Domain; pub trait HyperliquidAction: Sized + serde::Serialize { @@ -52,13 +52,13 @@ pub trait HyperliquidAction: Sized + serde::Serialize { fn eip712_signing_hash(&self, domain: &Eip712Domain) -> B256 { let domain_separator = domain.separator(); let struct_hash = self.struct_hash(); - + let mut buf = Vec::with_capacity(66); buf.push(0x19); buf.push(0x01); buf.extend_from_slice(&domain_separator[..]); buf.extend_from_slice(&struct_hash[..]); - + keccak256(&buf) } } diff --git a/src/types/mod.rs b/src/types/mod.rs index 15c2d64..9a658e9 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -9,7 +9,7 @@ pub mod ws; // Re-export commonly used types pub use actions::*; -pub use eip712::{EncodeEip712, HyperliquidAction, encode_value}; +pub use eip712::{encode_value, EncodeEip712, HyperliquidAction}; pub use info_types::*; pub use requests::*; pub use responses::*; diff --git a/src/types/symbols.rs b/src/types/symbols.rs index 4ddbfa0..f12da49 100644 --- a/src/types/symbols.rs +++ b/src/types/symbols.rs @@ -1210,6 +1210,8 @@ pub fn symbol(s: impl Into) -> Symbol { /// ``` pub mod prelude { pub use super::{ + // Runtime symbol creation + symbol, // Popular alts APT, ARB, @@ -1246,8 +1248,6 @@ pub mod prelude { USDC, WIF, - // Runtime symbol creation - symbol, }; // Re-export Symbol type for convenience pub use crate::types::symbol::Symbol; diff --git a/tests/approve_agent_test.rs b/tests/approve_agent_test.rs index 7489c03..cf82c44 100644 --- a/tests/approve_agent_test.rs +++ b/tests/approve_agent_test.rs @@ -2,10 +2,10 @@ #[cfg(test)] mod tests { + use alloy::primitives::{address, keccak256}; use ferrofluid::types::actions::ApproveAgent; use ferrofluid::types::eip712::HyperliquidAction; - use alloy::primitives::{address, keccak256}; - + #[test] fn test_approve_agent_type_hash() { let expected = keccak256( @@ -13,7 +13,7 @@ mod tests { ); assert_eq!(ApproveAgent::type_hash(), expected); } - + #[test] fn test_approve_agent_serialization() { let action = ApproveAgent { @@ -23,11 +23,11 @@ mod tests { agent_name: Some("Test Agent".to_string()), nonce: 1234567890, }; - + // Serialize to JSON let json = serde_json::to_string(&action).unwrap(); let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); - + // Check that address is serialized as hex string assert_eq!( parsed["agentAddress"].as_str().unwrap(), @@ -37,7 +37,7 @@ mod tests { assert_eq!(parsed["agentName"].as_str().unwrap(), "Test Agent"); assert_eq!(parsed["nonce"].as_u64().unwrap(), 1234567890); } - + #[test] fn test_approve_agent_struct_hash() { let action = ApproveAgent { @@ -47,10 +47,10 @@ mod tests { agent_name: None, nonce: 1690393044548, }; - + // Test that struct hash is computed let struct_hash = action.struct_hash(); // Just verify it's not zero assert_ne!(struct_hash, alloy::primitives::B256::ZERO); } -} \ No newline at end of file +} diff --git a/tests/managed_exchange_test.rs b/tests/managed_exchange_test.rs index e5956e5..dc8bd17 100644 --- a/tests/managed_exchange_test.rs +++ b/tests/managed_exchange_test.rs @@ -1,29 +1,31 @@ //! Tests for ManagedExchangeProvider +use std::time::Duration; + +use alloy::signers::local::PrivateKeySigner; use ferrofluid::{ - providers::{ManagedExchangeProvider, OrderHandle}, - types::requests::{OrderRequest, OrderType, Limit}, constants::*, + providers::{ManagedExchangeProvider, OrderHandle}, + types::requests::{Limit, OrderRequest, OrderType}, }; -use alloy::signers::local::PrivateKeySigner; -use std::time::Duration; #[tokio::test] async fn test_managed_provider_creation() { // Initialize CryptoProvider for rustls rustls::crypto::CryptoProvider::install_default( - rustls::crypto::ring::default_provider() - ).ok(); - + rustls::crypto::ring::default_provider(), + ) + .ok(); + // Create a test signer let signer = PrivateKeySigner::random(); - + // Create managed provider with default config let exchange = ManagedExchangeProvider::builder(signer) .with_network(ferrofluid::Network::Testnet) .build() .await; - + assert!(exchange.is_ok()); } @@ -31,11 +33,12 @@ async fn test_managed_provider_creation() { async fn test_managed_provider_with_batching() { // Initialize CryptoProvider for rustls rustls::crypto::CryptoProvider::install_default( - rustls::crypto::ring::default_provider() - ).ok(); - + rustls::crypto::ring::default_provider(), + ) + .ok(); + let signer = PrivateKeySigner::random(); - + // Create with batching enabled let exchange = ManagedExchangeProvider::builder(signer) .with_network(ferrofluid::Network::Testnet) @@ -44,7 +47,7 @@ async fn test_managed_provider_with_batching() { .build() .await .unwrap(); - + // Create a test order let order = OrderRequest { asset: 0, @@ -52,13 +55,15 @@ async fn test_managed_provider_with_batching() { limit_px: "50000".to_string(), sz: "0.01".to_string(), reduce_only: false, - order_type: OrderType::Limit(Limit { tif: TIF_GTC.to_string() }), + order_type: OrderType::Limit(Limit { + tif: TIF_GTC.to_string(), + }), cloid: None, }; - + // Place order should return pending handle let handle = exchange.place_order(&order).await.unwrap(); - + match handle { OrderHandle::Pending { .. } => { // Expected for batched orders @@ -69,7 +74,7 @@ async fn test_managed_provider_with_batching() { } } -#[tokio::test] +#[tokio::test] async fn test_alo_order_detection() { let order = OrderRequest { asset: 0, @@ -77,34 +82,38 @@ async fn test_alo_order_detection() { limit_px: "50000".to_string(), sz: "0.01".to_string(), reduce_only: false, - order_type: OrderType::Limit(Limit { tif: "Alo".to_string() }), + order_type: OrderType::Limit(Limit { + tif: "Alo".to_string(), + }), cloid: None, }; - + assert!(order.is_alo()); - + let regular_order = OrderRequest { asset: 0, is_buy: true, limit_px: "50000".to_string(), sz: "0.01".to_string(), reduce_only: false, - order_type: OrderType::Limit(Limit { tif: "Gtc".to_string() }), + order_type: OrderType::Limit(Limit { + tif: "Gtc".to_string(), + }), cloid: None, }; - + assert!(!regular_order.is_alo()); } #[test] fn test_nonce_generation() { use ferrofluid::providers::nonce::NonceManager; - + let manager = NonceManager::new(false); - + let nonce1 = manager.next_nonce(None); let nonce2 = manager.next_nonce(None); - + assert!(nonce2 > nonce1); assert!(NonceManager::is_valid_nonce(nonce1)); assert!(NonceManager::is_valid_nonce(nonce2)); @@ -112,13 +121,13 @@ fn test_nonce_generation() { #[test] fn test_nonce_isolation() { - use ferrofluid::providers::nonce::NonceManager; use alloy::primitives::Address; - + use ferrofluid::providers::nonce::NonceManager; + let manager = NonceManager::new(true); let addr1 = Address::new([1u8; 20]); let addr2 = Address::new([2u8; 20]); - + // Get initial nonces - these should have different millisecond timestamps let n1_1 = manager.next_nonce(Some(addr1)); std::thread::sleep(std::time::Duration::from_millis(1)); @@ -127,16 +136,16 @@ fn test_nonce_isolation() { let n1_2 = manager.next_nonce(Some(addr1)); std::thread::sleep(std::time::Duration::from_millis(1)); let n2_2 = manager.next_nonce(Some(addr2)); - + // Each address should have independent, increasing nonces assert!(n1_2 > n1_1, "addr1 nonces should increase"); assert!(n2_2 > n2_1, "addr2 nonces should increase"); - + // Verify counter independence using the manager's get_counter method assert_eq!(manager.get_counter(Some(addr1)), 2); // addr1 has 2 nonces assert_eq!(manager.get_counter(Some(addr2)), 2); // addr2 has 2 nonces - + // The nonces themselves should be unique assert_ne!(n1_1, n2_1); assert_ne!(n1_2, n2_2); -} \ No newline at end of file +} diff --git a/tests/order_tracking_test.rs b/tests/order_tracking_test.rs index 6acf280..dc43ceb 100644 --- a/tests/order_tracking_test.rs +++ b/tests/order_tracking_test.rs @@ -1,13 +1,13 @@ #[cfg(test)] mod tests { + use std::sync::Once; + use alloy::signers::local::PrivateKeySigner; use ferrofluid::{ - constants::TIF_GTC, - types::requests::OrderRequest, - ExchangeProvider, signers::AlloySigner, + constants::TIF_GTC, signers::AlloySigner, types::requests::OrderRequest, + ExchangeProvider, }; use uuid::Uuid; - use std::sync::Once; static INIT: Once = Once::new(); @@ -22,12 +22,12 @@ mod tests { fn create_test_exchange() -> ExchangeProvider> { init_crypto(); - let private_key = "ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80"; + let private_key = + "ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80"; let signer = private_key.parse::().unwrap(); let alloy_signer = AlloySigner { inner: signer }; - - ExchangeProvider::testnet(alloy_signer) - .with_order_tracking() + + ExchangeProvider::testnet(alloy_signer).with_order_tracking() } #[test] @@ -40,13 +40,13 @@ mod tests { #[test] fn test_order_tracking_methods() { let exchange = create_test_exchange(); - + // Test empty state assert_eq!(exchange.get_all_tracked_orders().len(), 0); assert_eq!(exchange.get_pending_orders().len(), 0); assert_eq!(exchange.get_submitted_orders().len(), 0); assert_eq!(exchange.get_failed_orders().len(), 0); - + // Test get by non-existent cloid let fake_cloid = Uuid::new_v4(); assert!(exchange.get_tracked_order(&fake_cloid).is_none()); @@ -55,7 +55,7 @@ mod tests { #[test] fn test_clear_tracked_orders() { let exchange = create_test_exchange(); - + // Clear should work even when empty exchange.clear_tracked_orders(); assert_eq!(exchange.tracked_order_count(), 0); @@ -64,7 +64,7 @@ mod tests { #[test] fn test_order_builder_with_tracking() { let exchange = create_test_exchange(); - + // Create order using builder let _order_builder = exchange .order(0) @@ -72,7 +72,7 @@ mod tests { .limit_px("45000.0") .size("0.01") .cloid(Uuid::new_v4()); - + // Builder doesn't automatically track until order is placed assert_eq!(exchange.tracked_order_count(), 0); } @@ -80,18 +80,19 @@ mod tests { #[test] fn test_tracking_disabled_by_default() { init_crypto(); - let private_key = "ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80"; + let private_key = + "ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80"; let signer = private_key.parse::().unwrap(); let alloy_signer = AlloySigner { inner: signer }; - + // Create exchange without tracking let exchange = ExchangeProvider::testnet(alloy_signer); - + // Methods should return empty results assert_eq!(exchange.tracked_order_count(), 0); assert_eq!(exchange.get_all_tracked_orders().len(), 0); assert_eq!(exchange.get_pending_orders().len(), 0); - + // Clear should be safe to call exchange.clear_tracked_orders(); } @@ -99,14 +100,14 @@ mod tests { #[tokio::test] async fn test_order_tracking_with_mock_placement() { let exchange = create_test_exchange(); - + // Create a test order let _order = OrderRequest::limit(0, true, "45000.0", "0.01", TIF_GTC); - + // Before placing, no orders tracked assert_eq!(exchange.tracked_order_count(), 0); - + // Note: Actually placing the order would require a valid connection // This test verifies the tracking infrastructure is in place } -} \ No newline at end of file +}