diff --git a/src/bin/info.rs b/src/bin/info.rs index f6df1178..1dc1898b 100644 --- a/src/bin/info.rs +++ b/src/bin/info.rs @@ -95,7 +95,7 @@ async fn meta_example(info_client: &InfoClient) { async fn meta_and_asset_contexts_example(info_client: &InfoClient) { info!( "Meta and asset contexts: {:?}", - info_client.meta_and_asset_contexts().await.unwrap() + info_client.meta_and_asset_contexts(None).await.unwrap() ); } diff --git a/src/bin/ws_all_mids.rs b/src/bin/ws_all_mids.rs index ee780db0..7aeccbcc 100644 --- a/src/bin/ws_all_mids.rs +++ b/src/bin/ws_all_mids.rs @@ -14,7 +14,7 @@ async fn main() { let (sender, mut receiver) = unbounded_channel(); let subscription_id = info_client - .subscribe(Subscription::AllMids, sender) + .subscribe(Subscription::AllMids { dex: Some("xyz".to_string()) }, sender) .await .unwrap(); diff --git a/src/bin/ws_candles.rs b/src/bin/ws_candles.rs index 255b152c..ad013773 100644 --- a/src/bin/ws_candles.rs +++ b/src/bin/ws_candles.rs @@ -17,6 +17,7 @@ async fn main() { Subscription::Candle { coin: "ETH".to_string(), interval: "1m".to_string(), + dex: Some("xyz".to_string()), }, sender, ) diff --git a/src/info/info_client.rs b/src/info/info_client.rs index b3d8ba2a..8f673a3c 100644 --- a/src/info/info_client.rs +++ b/src/info/info_client.rs @@ -26,6 +26,8 @@ pub struct CandleSnapshotRequest { interval: String, start_time: u64, end_time: u64, + #[serde(skip_serializing_if = "Option::is_none")] + dex: Option, } #[derive(Deserialize, Serialize, Debug, Clone)] @@ -55,7 +57,10 @@ pub enum InfoRequest { oid: u64, }, Meta, - MetaAndAssetCtxs, + MetaAndAssetCtxs { + #[serde(skip_serializing_if = "Option::is_none")] + dex: Option, + }, SpotMeta, SpotMetaAndAssetCtxs, AllMids, @@ -212,8 +217,11 @@ impl InfoClient { self.send_info_request(input).await } - pub async fn meta_and_asset_contexts(&self) -> Result<(Meta, Vec)> { - let input = InfoRequest::MetaAndAssetCtxs; + pub async fn meta_and_asset_contexts( + &self, + dex: impl Into>, + ) -> Result<(Meta, Vec)> { + let input = InfoRequest::MetaAndAssetCtxs { dex: dex.into() }; self.send_info_request(input).await } @@ -281,6 +289,17 @@ impl InfoClient { interval: String, start_time: u64, end_time: u64, + ) -> Result> { + self.candles_snapshot_with_dex(coin, interval, start_time, end_time, None).await + } + + pub async fn candles_snapshot_with_dex( + &self, + coin: String, + interval: String, + start_time: u64, + end_time: u64, + dex: Option, ) -> Result> { let input = InfoRequest::CandleSnapshot { req: CandleSnapshotRequest { @@ -288,6 +307,7 @@ impl InfoClient { interval, start_time, end_time, + dex, }, }; self.send_info_request(input).await diff --git a/src/market_maker.rs b/src/market_maker.rs index f5ae2297..9f65ece7 100644 --- a/src/market_maker.rs +++ b/src/market_maker.rs @@ -93,7 +93,7 @@ impl MarketMaker { // Subscribe to AllMids so we can market make around the mid price self.info_client - .subscribe(Subscription::AllMids, sender) + .subscribe(Subscription::AllMids { dex: Some("xyz".to_string()) }, sender) .await .unwrap(); diff --git a/src/ws/ws_manager.rs b/src/ws/ws_manager.rs index 4035baf3..b818d287 100755 --- a/src/ws/ws_manager.rs +++ b/src/ws/ws_manager.rs @@ -54,20 +54,51 @@ pub(crate) struct WsManager { #[serde(tag = "type")] #[serde(rename_all = "camelCase")] pub enum Subscription { - AllMids, - Notification { user: Address }, - WebData2 { user: Address }, - Candle { coin: String, interval: String }, - L2Book { coin: String }, - Trades { coin: String }, - OrderUpdates { user: Address }, - UserEvents { user: Address }, - UserFills { user: Address }, - UserFundings { user: Address }, - UserNonFundingLedgerUpdates { user: Address }, - ActiveAssetCtx { coin: String }, - ActiveAssetData { user: Address, coin: String }, - Bbo { coin: String }, + AllMids { + dex: Option, + }, + Notification { + user: Address, + }, + WebData2 { + user: Address, + }, + Candle { + coin: String, + interval: String, + dex: Option, + }, + L2Book { + coin: String, + }, + Trades { + coin: String, + }, + OrderUpdates { + user: Address, + }, + UserEvents { + user: Address, + }, + UserFills { + user: Address, + }, + UserFundings { + user: Address, + }, + UserNonFundingLedgerUpdates { + user: Address, + }, + ActiveAssetCtx { + coin: String, + }, + ActiveAssetData { + user: Address, + coin: String, + }, + Bbo { + coin: String, + }, } #[derive(Deserialize, Clone, Debug)] @@ -226,9 +257,44 @@ impl WsManager { .0) } + fn get_possible_identifiers(message: &Message) -> Result> { + let mut identifiers = Vec::new(); + + match message { + Message::AllMids(_) => { + // Try both None and common dex values + for dex in [None, Some("xyz".to_string()), Some("hyna".to_string())] { + if let Ok(id) = serde_json::to_string(&Subscription::AllMids { dex }) { + identifiers.push(id); + } + } + } + Message::Candle(candle) => { + // Try both None and common dex values for candles + for dex in [None, Some("xyz".to_string()), Some("hyna".to_string())] { + if let Ok(id) = serde_json::to_string(&Subscription::Candle { + coin: candle.data.coin.clone(), + interval: candle.data.interval.clone(), + dex, + }) { + identifiers.push(id); + } + } + } + _ => { + // For other message types, use the old single identifier logic + if let Ok(id) = WsManager::get_identifier(message) { + identifiers.push(id); + } + } + } + + Ok(identifiers) + } + fn get_identifier(message: &Message) -> Result { match message { - Message::AllMids(_) => serde_json::to_string(&Subscription::AllMids) + Message::AllMids(_) => serde_json::to_string(&Subscription::AllMids { dex: None }) .map_err(|e| Error::JsonParse(e.to_string())), Message::User(_) => Ok("userEvents".to_string()), Message::UserFills(fills) => serde_json::to_string(&Subscription::UserFills { @@ -252,6 +318,7 @@ impl WsManager { Message::Candle(candle) => serde_json::to_string(&Subscription::Candle { coin: candle.data.coin.clone(), interval: candle.data.interval.clone(), + dex: None, }) .map_err(|e| Error::JsonParse(e.to_string())), Message::OrderUpdates(_) => Ok("orderUpdates".to_string()), @@ -311,21 +378,27 @@ impl WsManager { } let message = serde_json::from_str::(&data) .map_err(|e| Error::JsonParse(e.to_string()))?; - let identifier = WsManager::get_identifier(&message)?; - if identifier.is_empty() { + // Try to find matching subscriptions for this message + // We need to try multiple dex values since the message doesn't contain dex info + let possible_identifiers = WsManager::get_possible_identifiers(&message)?; + if possible_identifiers.is_empty() { return Ok(()); } let mut subscriptions = subscriptions.lock().await; let mut res = Ok(()); - if let Some(subscription_datas) = subscriptions.get_mut(&identifier) { - for subscription_data in subscription_datas { - if let Err(e) = subscription_data - .sending_channel - .send(message.clone()) - .map_err(|e| Error::WsSend(e.to_string())) - { - res = Err(e); + + // Try each possible identifier until we find matching subscriptions + for identifier in possible_identifiers { + if let Some(subscription_datas) = subscriptions.get_mut(&identifier) { + for subscription_data in subscription_datas { + if let Err(e) = subscription_data + .sending_channel + .send(message.clone()) + .map_err(|e| Error::WsSend(e.to_string())) + { + res = Err(e); + } } } }