diff --git a/Cargo.lock b/Cargo.lock index 72e306a390..b4522b9ed6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -701,6 +701,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "assert_float_eq" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10d2119f741b79fe9907f5396d19bffcb46568cfcc315e78677d731972ac7085" + [[package]] name = "async-channel" version = "1.9.0" @@ -5644,7 +5650,7 @@ dependencies = [ [[package]] name = "pyth-lazer-client" -version = "2.0.1" +version = "3.0.0" dependencies = [ "alloy-primitives 0.8.25", "anyhow", @@ -5657,7 +5663,7 @@ dependencies = [ "futures-util", "hex", "libsecp256k1 0.7.2", - "pyth-lazer-protocol 0.10.2", + "pyth-lazer-protocol 0.11.0", "serde", "serde_json", "tokio", @@ -5690,10 +5696,11 @@ dependencies = [ [[package]] name = "pyth-lazer-protocol" -version = "0.10.2" +version = "0.11.0" dependencies = [ "alloy-primitives 0.8.25", "anyhow", + "assert_float_eq", "bincode 1.3.3", "bs58", "byteorder", @@ -5710,6 +5717,7 @@ dependencies = [ "rust_decimal", "serde", "serde_json", + "thiserror 2.0.12", ] [[package]] @@ -5728,13 +5736,13 @@ dependencies = [ [[package]] name = "pyth-lazer-publisher-sdk" -version = "0.5.0" +version = "0.6.0" dependencies = [ "anyhow", "fs-err", "protobuf", "protobuf-codegen", - "pyth-lazer-protocol 0.10.2", + "pyth-lazer-protocol 0.11.0", "serde_json", ] diff --git a/lazer/contracts/solana/programs/pyth-lazer-solana-contract/Cargo.toml b/lazer/contracts/solana/programs/pyth-lazer-solana-contract/Cargo.toml index f0dc8d37f5..cb2de74e6d 100644 --- a/lazer/contracts/solana/programs/pyth-lazer-solana-contract/Cargo.toml +++ b/lazer/contracts/solana/programs/pyth-lazer-solana-contract/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-lazer-solana-contract" -version = "0.5.0" +version = "0.6.0" edition = "2021" description = "Pyth Lazer Solana contract and SDK." license = "Apache-2.0" @@ -19,7 +19,7 @@ no-log-ix-name = [] idl-build = ["anchor-lang/idl-build"] [dependencies] -pyth-lazer-protocol = { path = "../../../../sdk/rust/protocol", version = "0.10.1" } +pyth-lazer-protocol = { path = "../../../../sdk/rust/protocol", version = "0.11.0" } anchor-lang = "0.31.1" bytemuck = { version = "1.20.0", features = ["derive"] } diff --git a/lazer/publisher_sdk/rust/Cargo.toml b/lazer/publisher_sdk/rust/Cargo.toml index 656736ed9b..be22223403 100644 --- a/lazer/publisher_sdk/rust/Cargo.toml +++ b/lazer/publisher_sdk/rust/Cargo.toml @@ -1,13 +1,13 @@ [package] name = "pyth-lazer-publisher-sdk" -version = "0.5.0" +version = "0.6.0" edition = "2021" description = "Pyth Lazer Publisher SDK types." license = "Apache-2.0" repository = "https://github.com/pyth-network/pyth-crosschain" [dependencies] -pyth-lazer-protocol = { version = "0.10.2", path = "../../sdk/rust/protocol" } +pyth-lazer-protocol = { version = "0.11.0", path = "../../sdk/rust/protocol" } anyhow = "1.0.98" protobuf = "3.7.2" serde_json = "1.0.140" diff --git a/lazer/publisher_sdk/rust/src/lib.rs b/lazer/publisher_sdk/rust/src/lib.rs index ae98a79c39..9600c61f64 100644 --- a/lazer/publisher_sdk/rust/src/lib.rs +++ b/lazer/publisher_sdk/rust/src/lib.rs @@ -3,8 +3,8 @@ use crate::publisher_update::{FeedUpdate, FundingRateUpdate, PriceUpdate}; use crate::state::FeedState; use ::protobuf::MessageField; use pyth_lazer_protocol::jrpc::{FeedUpdateParams, UpdateParams}; -use pyth_lazer_protocol::symbol_state::SymbolState; use pyth_lazer_protocol::FeedKind; +use pyth_lazer_protocol::SymbolState; pub mod transaction_envelope { pub use crate::protobuf::transaction_envelope::*; @@ -56,9 +56,9 @@ impl From for Update { best_bid_price, best_ask_price, } => Update::PriceUpdate(PriceUpdate { - price: Some(price.0.into()), - best_bid_price: best_bid_price.map(|p| p.0.into()), - best_ask_price: best_ask_price.map(|p| p.0.into()), + price: Some(price.mantissa_i64()), + best_bid_price: best_bid_price.map(|p| p.mantissa_i64()), + best_ask_price: best_ask_price.map(|p| p.mantissa_i64()), special_fields: Default::default(), }), UpdateParams::FundingRateUpdate { @@ -66,8 +66,8 @@ impl From for Update { rate, funding_rate_interval, } => Update::FundingRateUpdate(FundingRateUpdate { - price: price.map(|p| p.0.into()), - rate: Some(rate.0), + price: price.map(|p| p.mantissa_i64()), + rate: Some(rate.mantissa()), funding_rate_interval: MessageField::from_option( funding_rate_interval.map(|i| i.into()), ), diff --git a/lazer/sdk/rust/client/Cargo.toml b/lazer/sdk/rust/client/Cargo.toml index 36927a49ed..44d595e0c5 100644 --- a/lazer/sdk/rust/client/Cargo.toml +++ b/lazer/sdk/rust/client/Cargo.toml @@ -1,12 +1,12 @@ [package] name = "pyth-lazer-client" -version = "2.0.1" +version = "3.0.0" edition = "2021" description = "A Rust client for Pyth Lazer" license = "Apache-2.0" [dependencies] -pyth-lazer-protocol = { path = "../protocol", version = "0.10.2" } +pyth-lazer-protocol = { path = "../protocol", version = "0.11.0" } tokio = { version = "1", features = ["full"] } tokio-tungstenite = { version = "0.20", features = ["native-tls"] } futures-util = "0.3" diff --git a/lazer/sdk/rust/client/examples/subscribe_price_feeds.rs b/lazer/sdk/rust/client/examples/subscribe_price_feeds.rs index e39c4dc41b..9087450f2b 100644 --- a/lazer/sdk/rust/client/examples/subscribe_price_feeds.rs +++ b/lazer/sdk/rust/client/examples/subscribe_price_feeds.rs @@ -4,15 +4,16 @@ use base64::Engine; use pyth_lazer_client::backoff::PythLazerExponentialBackoffBuilder; use pyth_lazer_client::client::PythLazerClientBuilder; use pyth_lazer_client::ws_connection::AnyResponse; +use pyth_lazer_protocol::api::{ + Channel, DeliveryFormat, Format, JsonBinaryEncoding, SubscriptionParams, SubscriptionParamsRepr, +}; +use pyth_lazer_protocol::api::{SubscribeRequest, SubscriptionId, WsResponse}; use pyth_lazer_protocol::message::{ EvmMessage, LeEcdsaMessage, LeUnsignedMessage, Message, SolanaMessage, }; use pyth_lazer_protocol::payload::PayloadData; -use pyth_lazer_protocol::router::{ - Channel, DeliveryFormat, FixedRate, Format, JsonBinaryEncoding, PriceFeedId, PriceFeedProperty, - SubscriptionParams, SubscriptionParamsRepr, -}; -use pyth_lazer_protocol::subscription::{Response, SubscribeRequest, SubscriptionId}; +use pyth_lazer_protocol::time::FixedRate; +use pyth_lazer_protocol::{PriceFeedId, PriceFeedProperty}; use tokio::pin; use tracing::level_filters::LevelFilter; use tracing_subscriber::EnvFilter; @@ -109,7 +110,7 @@ async fn main() -> anyhow::Result<()> { // The stream gives us base64-encoded binary messages. We need to decode, parse, and verify them. match msg { AnyResponse::Json(msg) => match msg { - Response::StreamUpdated(update) => { + WsResponse::StreamUpdated(update) => { println!("Received a JSON update for {:?}", update.subscription_id); if let Some(evm_data) = update.payload.evm { // Decode binary data diff --git a/lazer/sdk/rust/client/src/client.rs b/lazer/sdk/rust/client/src/client.rs index ad19052df9..15755e9483 100644 --- a/lazer/sdk/rust/client/src/client.rs +++ b/lazer/sdk/rust/client/src/client.rs @@ -51,7 +51,7 @@ use crate::{ }; use anyhow::{bail, Result}; use backoff::ExponentialBackoff; -use pyth_lazer_protocol::subscription::{SubscribeRequest, SubscriptionId}; +use pyth_lazer_protocol::api::{SubscribeRequest, SubscriptionId}; use tokio::sync::mpsc::{self, error::TrySendError}; use tracing::{error, warn}; use ttl_cache::TtlCache; diff --git a/lazer/sdk/rust/client/src/resilient_ws_connection.rs b/lazer/sdk/rust/client/src/resilient_ws_connection.rs index 70385d5946..96ee51ec0a 100644 --- a/lazer/sdk/rust/client/src/resilient_ws_connection.rs +++ b/lazer/sdk/rust/client/src/resilient_ws_connection.rs @@ -2,9 +2,7 @@ use std::time::Duration; use backoff::{backoff::Backoff, ExponentialBackoff}; use futures_util::StreamExt; -use pyth_lazer_protocol::subscription::{ - Request, SubscribeRequest, SubscriptionId, UnsubscribeRequest, -}; +use pyth_lazer_protocol::api::{SubscribeRequest, SubscriptionId, UnsubscribeRequest, WsRequest}; use tokio::{pin, select, sync::mpsc, time::Instant}; use tracing::{error, info, warn}; use url::Url; @@ -18,7 +16,7 @@ use anyhow::{bail, Context, Result}; const BACKOFF_RESET_DURATION: Duration = Duration::from_secs(10); pub struct PythLazerResilientWSConnection { - request_sender: mpsc::Sender, + request_sender: mpsc::Sender, } impl PythLazerResilientWSConnection { @@ -53,7 +51,7 @@ impl PythLazerResilientWSConnection { pub async fn subscribe(&mut self, request: SubscribeRequest) -> Result<()> { self.request_sender - .send(Request::Subscribe(request)) + .send(WsRequest::Subscribe(request)) .await .context("Failed to send subscribe request")?; Ok(()) @@ -61,7 +59,9 @@ impl PythLazerResilientWSConnection { pub async fn unsubscribe(&mut self, subscription_id: SubscriptionId) -> Result<()> { self.request_sender - .send(Request::Unsubscribe(UnsubscribeRequest { subscription_id })) + .send(WsRequest::Unsubscribe(UnsubscribeRequest { + subscription_id, + })) .await .context("Failed to send unsubscribe request")?; Ok(()) @@ -95,7 +95,7 @@ impl PythLazerResilientWSConnectionTask { pub async fn run( &mut self, response_sender: mpsc::Sender, - request_receiver: &mut mpsc::Receiver, + request_receiver: &mut mpsc::Receiver, ) -> Result<()> { loop { let start_time = Instant::now(); @@ -128,7 +128,7 @@ impl PythLazerResilientWSConnectionTask { pub async fn start( &mut self, sender: mpsc::Sender, - request_receiver: &mut mpsc::Receiver, + request_receiver: &mut mpsc::Receiver, ) -> Result<()> { let mut ws_connection = PythLazerWSConnection::new(self.endpoint.clone(), self.access_token.clone())?; @@ -137,7 +137,7 @@ impl PythLazerResilientWSConnectionTask { for subscription in self.subscriptions.clone() { ws_connection - .send_request(Request::Subscribe(subscription)) + .send_request(WsRequest::Subscribe(subscription)) .await?; } loop { @@ -167,10 +167,10 @@ impl PythLazerResilientWSConnectionTask { } Some(request) = request_receiver.recv() => { match request { - Request::Subscribe(request) => { + WsRequest::Subscribe(request) => { self.subscribe(&mut ws_connection, request).await?; } - Request::Unsubscribe(request) => { + WsRequest::Unsubscribe(request) => { self.unsubscribe(&mut ws_connection, request).await?; } } diff --git a/lazer/sdk/rust/client/src/ws_connection.rs b/lazer/sdk/rust/client/src/ws_connection.rs index 385bd2efd7..3efbcbd656 100644 --- a/lazer/sdk/rust/client/src/ws_connection.rs +++ b/lazer/sdk/rust/client/src/ws_connection.rs @@ -4,8 +4,8 @@ use anyhow::Result; use derive_more::From; use futures_util::{SinkExt, StreamExt, TryStreamExt}; use pyth_lazer_protocol::{ + api::{ErrorResponse, SubscribeRequest, UnsubscribeRequest, WsRequest, WsResponse}, binary_update::BinaryWsUpdate, - subscription::{ErrorResponse, Request, Response, SubscribeRequest, UnsubscribeRequest}, }; use tokio_tungstenite::{connect_async, tungstenite::Message}; use url::Url; @@ -32,7 +32,7 @@ pub struct PythLazerWSConnection { #[derive(Debug, Clone, PartialEq, Eq, Hash, From)] pub enum AnyResponse { - Json(Response), + Json(WsResponse), Binary(BinaryWsUpdate), } @@ -84,13 +84,13 @@ impl PythLazerWSConnection { .try_filter_map(|msg| async { let r: Result> = match msg { Message::Text(text) => { - Ok(Some(serde_json::from_str::(&text)?.into())) + Ok(Some(serde_json::from_str::(&text)?.into())) } Message::Binary(data) => { Ok(Some(BinaryWsUpdate::deserialize_slice(&data)?.into())) } Message::Close(_) => Ok(Some( - Response::Error(ErrorResponse { + WsResponse::Error(ErrorResponse { error: "WebSocket connection closed".to_string(), }) .into(), @@ -103,7 +103,7 @@ impl PythLazerWSConnection { Ok(response_stream) } - pub async fn send_request(&mut self, request: Request) -> Result<()> { + pub async fn send_request(&mut self, request: WsRequest) -> Result<()> { if let Some(sender) = &mut self.ws_sender { let msg = serde_json::to_string(&request)?; sender.send(Message::Text(msg)).await?; @@ -118,7 +118,7 @@ impl PythLazerWSConnection { /// # Arguments /// * `request` - A subscription request containing feed IDs and parameters pub async fn subscribe(&mut self, request: SubscribeRequest) -> Result<()> { - let request = Request::Subscribe(request); + let request = WsRequest::Subscribe(request); self.send_request(request).await } @@ -127,7 +127,7 @@ impl PythLazerWSConnection { /// # Arguments /// * `subscription_id` - The ID of the subscription to cancel pub async fn unsubscribe(&mut self, request: UnsubscribeRequest) -> Result<()> { - let request = Request::Unsubscribe(request); + let request = WsRequest::Unsubscribe(request); self.send_request(request).await } diff --git a/lazer/sdk/rust/protocol/Cargo.toml b/lazer/sdk/rust/protocol/Cargo.toml index 5865eb27da..e058344eea 100644 --- a/lazer/sdk/rust/protocol/Cargo.toml +++ b/lazer/sdk/rust/protocol/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-lazer-protocol" -version = "0.10.2" +version = "0.11.0" edition = "2021" description = "Pyth Lazer SDK - protocol types." license = "Apache-2.0" @@ -20,6 +20,7 @@ mry = { version = "0.13.0", features = ["serde"], optional = true } chrono = "0.4.41" humantime = "2.2.0" hex = "0.4.3" +thiserror = "2.0.12" [dev-dependencies] bincode = "1.3.3" @@ -28,3 +29,4 @@ hex = "0.4.3" libsecp256k1 = "0.7.1" bs58 = "0.5.1" alloy-primitives = "0.8.19" +assert_float_eq = "1.1.4" diff --git a/lazer/sdk/rust/protocol/src/api.rs b/lazer/sdk/rust/protocol/src/api.rs index 4a059ca255..8423c96cb6 100644 --- a/lazer/sdk/rust/protocol/src/api.rs +++ b/lazer/sdk/rust/protocol/src/api.rs @@ -1,8 +1,16 @@ -use serde::{Deserialize, Serialize}; +use std::{ + fmt::Display, + ops::{Deref, DerefMut}, +}; + +use derive_more::derive::From; +use itertools::Itertools as _; +use serde::{de::Error, Deserialize, Serialize}; use crate::{ - router::{Channel, Format, JsonBinaryEncoding, JsonUpdate, PriceFeedId, PriceFeedProperty}, - time::TimestampUs, + payload::AggregatedPriceFeedData, + time::{DurationUs, FixedRate, TimestampUs}, + ChannelId, Price, PriceFeedId, PriceFeedProperty, Rate, }; #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] @@ -52,3 +60,417 @@ pub type PriceResponse = JsonUpdate; pub fn default_parsed() -> bool { true } + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum DeliveryFormat { + /// Deliver stream updates as JSON text messages. + #[default] + Json, + /// Deliver stream updates as binary messages. + Binary, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum Format { + Evm, + Solana, + LeEcdsa, + LeUnsigned, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum JsonBinaryEncoding { + #[default] + Base64, + Hex, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, From)] +pub enum Channel { + FixedRate(FixedRate), +} + +impl Serialize for Channel { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + match self { + Channel::FixedRate(fixed_rate) => { + if *fixed_rate == FixedRate::MIN { + return serializer.serialize_str("real_time"); + } + serializer.serialize_str(&format!( + "fixed_rate@{}ms", + fixed_rate.duration().as_millis() + )) + } + } + } +} + +impl Display for Channel { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Channel::FixedRate(fixed_rate) => match *fixed_rate { + FixedRate::MIN => write!(f, "real_time"), + rate => write!(f, "fixed_rate@{}ms", rate.duration().as_millis()), + }, + } + } +} + +impl Channel { + pub fn id(&self) -> ChannelId { + match self { + Channel::FixedRate(fixed_rate) => match fixed_rate.duration().as_millis() { + 1 => ChannelId::FIXED_RATE_1, + 50 => ChannelId::FIXED_RATE_50, + 200 => ChannelId::FIXED_RATE_200, + _ => panic!("unknown channel: {self:?}"), + }, + } + } +} + +#[test] +fn id_supports_all_fixed_rates() { + for rate in FixedRate::ALL { + Channel::FixedRate(rate).id(); + } +} + +fn parse_channel(value: &str) -> Option { + if value == "real_time" { + Some(Channel::FixedRate(FixedRate::MIN)) + } else if let Some(rest) = value.strip_prefix("fixed_rate@") { + let ms_value = rest.strip_suffix("ms")?; + Some(Channel::FixedRate(FixedRate::from_millis( + ms_value.parse().ok()?, + )?)) + } else { + None + } +} + +impl<'de> Deserialize<'de> for Channel { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let value = ::deserialize(deserializer)?; + parse_channel(&value).ok_or_else(|| Error::custom("unknown channel")) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SubscriptionParamsRepr { + pub price_feed_ids: Vec, + pub properties: Vec, + // "chains" was renamed to "formats". "chains" is still supported for compatibility. + #[serde(alias = "chains")] + pub formats: Vec, + #[serde(default)] + pub delivery_format: DeliveryFormat, + #[serde(default)] + pub json_binary_encoding: JsonBinaryEncoding, + /// If `true`, the stream update will contain a `parsed` JSON field containing + /// all data of the update. + #[serde(default = "default_parsed")] + pub parsed: bool, + pub channel: Channel, + #[serde(default)] + pub ignore_invalid_feed_ids: bool, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct SubscriptionParams(SubscriptionParamsRepr); + +impl<'de> Deserialize<'de> for SubscriptionParams { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let value = SubscriptionParamsRepr::deserialize(deserializer)?; + Self::new(value).map_err(Error::custom) + } +} + +impl SubscriptionParams { + pub fn new(value: SubscriptionParamsRepr) -> Result { + if value.price_feed_ids.is_empty() { + return Err("no price feed ids specified"); + } + if !value.price_feed_ids.iter().all_unique() { + return Err("duplicate price feed ids specified"); + } + if !value.formats.iter().all_unique() { + return Err("duplicate formats or chains specified"); + } + if value.properties.is_empty() { + return Err("no properties specified"); + } + if !value.properties.iter().all_unique() { + return Err("duplicate properties specified"); + } + Ok(Self(value)) + } +} + +impl Deref for SubscriptionParams { + type Target = SubscriptionParamsRepr; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} +impl DerefMut for SubscriptionParams { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct JsonBinaryData { + pub encoding: JsonBinaryEncoding, + pub data: String, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct JsonUpdate { + /// Present unless `parsed = false` is specified in subscription params. + #[serde(skip_serializing_if = "Option::is_none")] + pub parsed: Option, + /// Only present if `Evm` is present in `formats` in subscription params. + #[serde(skip_serializing_if = "Option::is_none")] + pub evm: Option, + /// Only present if `Solana` is present in `formats` in subscription params. + #[serde(skip_serializing_if = "Option::is_none")] + pub solana: Option, + /// Only present if `LeEcdsa` is present in `formats` in subscription params. + #[serde(skip_serializing_if = "Option::is_none")] + pub le_ecdsa: Option, + /// Only present if `LeUnsigned` is present in `formats` in subscription params. + #[serde(skip_serializing_if = "Option::is_none")] + pub le_unsigned: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ParsedPayload { + #[serde(with = "crate::serde_str::timestamp")] + pub timestamp_us: TimestampUs, + pub price_feeds: Vec, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ParsedFeedPayload { + pub price_feed_id: PriceFeedId, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(with = "crate::serde_str::option_price")] + #[serde(default)] + pub price: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(with = "crate::serde_str::option_price")] + #[serde(default)] + pub best_bid_price: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(with = "crate::serde_str::option_price")] + #[serde(default)] + pub best_ask_price: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub publisher_count: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub exponent: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub confidence: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub funding_rate: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub funding_timestamp: Option, + // More fields may be added later. + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub funding_rate_interval: Option, +} + +impl ParsedFeedPayload { + pub fn new( + price_feed_id: PriceFeedId, + exponent: Option, + data: &AggregatedPriceFeedData, + properties: &[PriceFeedProperty], + ) -> Self { + let mut output = Self { + price_feed_id, + price: None, + best_bid_price: None, + best_ask_price: None, + publisher_count: None, + exponent: None, + confidence: None, + funding_rate: None, + funding_timestamp: None, + funding_rate_interval: None, + }; + for &property in properties { + match property { + PriceFeedProperty::Price => { + output.price = data.price; + } + PriceFeedProperty::BestBidPrice => { + output.best_bid_price = data.best_bid_price; + } + PriceFeedProperty::BestAskPrice => { + output.best_ask_price = data.best_ask_price; + } + PriceFeedProperty::PublisherCount => { + output.publisher_count = Some(data.publisher_count); + } + PriceFeedProperty::Exponent => { + output.exponent = exponent; + } + PriceFeedProperty::Confidence => { + output.confidence = data.confidence; + } + PriceFeedProperty::FundingRate => { + output.funding_rate = data.funding_rate; + } + PriceFeedProperty::FundingTimestamp => { + output.funding_timestamp = data.funding_timestamp; + } + PriceFeedProperty::FundingRateInterval => { + output.funding_rate_interval = data.funding_rate_interval; + } + } + } + output + } + + pub fn new_full( + price_feed_id: PriceFeedId, + exponent: Option, + data: &AggregatedPriceFeedData, + ) -> Self { + Self { + price_feed_id, + price: data.price, + best_bid_price: data.best_bid_price, + best_ask_price: data.best_ask_price, + publisher_count: Some(data.publisher_count), + exponent, + confidence: data.confidence, + funding_rate: data.funding_rate, + funding_timestamp: data.funding_timestamp, + funding_rate_interval: data.funding_rate_interval, + } + } +} + +/// A request sent from the client to the server. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(tag = "type")] +#[serde(rename_all = "camelCase")] +pub enum WsRequest { + Subscribe(SubscribeRequest), + Unsubscribe(UnsubscribeRequest), +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] +pub struct SubscriptionId(pub u64); + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SubscribeRequest { + pub subscription_id: SubscriptionId, + #[serde(flatten)] + pub params: SubscriptionParams, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct UnsubscribeRequest { + pub subscription_id: SubscriptionId, +} + +/// A JSON response sent from the server to the client. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, From)] +#[serde(tag = "type")] +#[serde(rename_all = "camelCase")] +pub enum WsResponse { + Error(ErrorResponse), + Subscribed(SubscribedResponse), + SubscribedWithInvalidFeedIdsIgnored(SubscribedWithInvalidFeedIdsIgnoredResponse), + Unsubscribed(UnsubscribedResponse), + SubscriptionError(SubscriptionErrorResponse), + StreamUpdated(StreamUpdatedResponse), +} + +/// Sent from the server after a successul subscription. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SubscribedResponse { + pub subscription_id: SubscriptionId, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct InvalidFeedSubscriptionDetails { + pub unknown_ids: Vec, + pub unsupported_channels: Vec, + pub unstable: Vec, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SubscribedWithInvalidFeedIdsIgnoredResponse { + pub subscription_id: SubscriptionId, + pub subscribed_feed_ids: Vec, + pub ignored_invalid_feed_ids: InvalidFeedSubscriptionDetails, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct UnsubscribedResponse { + pub subscription_id: SubscriptionId, +} + +/// Sent from the server if the requested subscription or unsubscription request +/// could not be fulfilled. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SubscriptionErrorResponse { + pub subscription_id: SubscriptionId, + pub error: String, +} + +/// Sent from the server if an internal error occured while serving data for an existing subscription, +/// or a client request sent a bad request. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ErrorResponse { + pub error: String, +} + +/// Sent from the server when new data is available for an existing subscription +/// (only if `delivery_format == Json`). +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct StreamUpdatedResponse { + pub subscription_id: SubscriptionId, + #[serde(flatten)] + pub payload: JsonUpdate, +} diff --git a/lazer/sdk/rust/protocol/src/binary_update.rs b/lazer/sdk/rust/protocol/src/binary_update.rs index aab2b5b139..9c030ed667 100644 --- a/lazer/sdk/rust/protocol/src/binary_update.rs +++ b/lazer/sdk/rust/protocol/src/binary_update.rs @@ -1,5 +1,5 @@ use { - crate::{message::Message, subscription::SubscriptionId}, + crate::{api::SubscriptionId, message::Message}, anyhow::{bail, Context}, byteorder::{WriteBytesExt, BE, LE}, }; diff --git a/lazer/sdk/rust/protocol/src/jrpc.rs b/lazer/sdk/rust/protocol/src/jrpc.rs index ceb7704f22..ed5e67870d 100644 --- a/lazer/sdk/rust/protocol/src/jrpc.rs +++ b/lazer/sdk/rust/protocol/src/jrpc.rs @@ -1,6 +1,8 @@ -use crate::router::{Channel, Price, PriceFeedId, Rate}; +use crate::rate::Rate; use crate::symbol_state::SymbolState; use crate::time::TimestampUs; +use crate::PriceFeedId; +use crate::{api::Channel, price::Price}; use serde::{Deserialize, Serialize}; use std::time::Duration; diff --git a/lazer/sdk/rust/protocol/src/lib.rs b/lazer/sdk/rust/protocol/src/lib.rs index 21e94b4a4b..361539e7ba 100644 --- a/lazer/sdk/rust/protocol/src/lib.rs +++ b/lazer/sdk/rust/protocol/src/lib.rs @@ -1,21 +1,93 @@ -//! Protocol types. +//! Lazer type definitions and utilities. +/// Types describing Lazer HTTP and WebSocket APIs. pub mod api; +/// Binary delivery format for WebSocket. pub mod binary_update; mod dynamic_value; mod feed_kind; +/// Lazer Agent JSON-RPC API. pub mod jrpc; +/// Types describing Lazer's verifiable messages containing signature and payload. pub mod message; +/// Types describing Lazer's message payload. pub mod payload; +mod price; +/// Legacy Websocket API for publishers. pub mod publisher; -pub mod router; +mod rate; mod serde_price_as_i64; mod serde_str; -pub mod subscription; -pub mod symbol_state; +mod symbol_state; +/// Lazer's types for time representation. pub mod time; -pub use crate::{dynamic_value::DynamicValue, feed_kind::FeedKind}; +use derive_more::derive::{From, Into}; +use serde::{Deserialize, Serialize}; + +pub use crate::{ + dynamic_value::DynamicValue, + feed_kind::FeedKind, + price::{Price, PriceError}, + rate::{Rate, RateError}, + symbol_state::SymbolState, +}; + +#[derive( + Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into, +)] +pub struct PublisherId(pub u16); + +#[derive( + Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into, +)] +pub struct PriceFeedId(pub u32); + +#[derive( + Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into, +)] +pub struct ChannelId(pub u8); + +impl ChannelId { + pub const FIXED_RATE_1: ChannelId = ChannelId(1); + pub const FIXED_RATE_50: ChannelId = ChannelId(2); + pub const FIXED_RATE_200: ChannelId = ChannelId(3); +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum PriceFeedProperty { + Price, + BestBidPrice, + BestAskPrice, + PublisherCount, + Exponent, + Confidence, + FundingRate, + FundingTimestamp, + FundingRateInterval, + // More fields may be added later. +} + +// Operation and coefficient for converting value to mantissa. +enum ExponentFactor { + // mantissa = value * factor + Mul(i64), + // mantissa = value / factor + Div(i64), +} + +impl ExponentFactor { + fn get(exponent: i16) -> Option { + if exponent >= 0 { + let exponent: u32 = exponent.try_into().ok()?; + Some(ExponentFactor::Div(10_i64.checked_pow(exponent)?)) + } else { + let minus_exponent: u32 = exponent.checked_neg()?.try_into().ok()?; + Some(ExponentFactor::Mul(10_i64.checked_pow(minus_exponent)?)) + } + } +} #[test] fn magics_in_big_endian() { diff --git a/lazer/sdk/rust/protocol/src/message.rs b/lazer/sdk/rust/protocol/src/message.rs index 199e081b56..cfb7832e4e 100644 --- a/lazer/sdk/rust/protocol/src/message.rs +++ b/lazer/sdk/rust/protocol/src/message.rs @@ -1,6 +1,6 @@ use { self::format_magics_le::{EVM_FORMAT_MAGIC, SOLANA_FORMAT_MAGIC}, - crate::router::ParsedPayload, + crate::api::ParsedPayload, anyhow::{bail, Context}, byteorder::{ByteOrder, ReadBytesExt, WriteBytesExt, BE, LE}, derive_more::From, diff --git a/lazer/sdk/rust/protocol/src/payload.rs b/lazer/sdk/rust/protocol/src/payload.rs index 144c566e5f..3fab9673f7 100644 --- a/lazer/sdk/rust/protocol/src/payload.rs +++ b/lazer/sdk/rust/protocol/src/payload.rs @@ -1,12 +1,10 @@ -//! Types representing binary encoding of signable payloads and signature envelopes. - -use crate::time::DurationUs; +use crate::{ + price::Price, + rate::Rate, + time::{DurationUs, TimestampUs}, + ChannelId, PriceFeedId, PriceFeedProperty, +}; use { - super::router::{PriceFeedId, PriceFeedProperty}, - crate::{ - router::{ChannelId, Price, Rate}, - time::TimestampUs, - }, anyhow::bail, byteorder::{ByteOrder, ReadBytesExt, WriteBytesExt, BE, LE}, serde::{Deserialize, Serialize}, @@ -231,12 +229,12 @@ fn write_option_price( mut writer: impl Write, value: Option, ) -> std::io::Result<()> { - writer.write_i64::(value.map_or(0, |v| v.0.get())) + writer.write_i64::(value.map_or(0, |v| v.mantissa_i64())) } fn read_option_price(mut reader: impl Read) -> std::io::Result> { let value = NonZeroI64::new(reader.read_i64::()?); - Ok(value.map(Price)) + Ok(value.map(Price::from_nonzero_mantissa)) } fn write_option_rate( @@ -246,7 +244,7 @@ fn write_option_rate( match value { Some(value) => { writer.write_u8(1)?; - writer.write_i64::(value.0) + writer.write_i64::(value.mantissa()) } None => { writer.write_u8(0)?; @@ -258,7 +256,7 @@ fn write_option_rate( fn read_option_rate(mut reader: impl Read) -> std::io::Result> { let present = reader.read_u8()? != 0; if present { - Ok(Some(Rate(reader.read_i64::()?))) + Ok(Some(Rate::from_mantissa(reader.read_i64::()?))) } else { Ok(None) } diff --git a/lazer/sdk/rust/protocol/src/price.rs b/lazer/sdk/rust/protocol/src/price.rs new file mode 100644 index 0000000000..cbc5a8200e --- /dev/null +++ b/lazer/sdk/rust/protocol/src/price.rs @@ -0,0 +1,155 @@ +#[cfg(test)] +mod tests; + +use { + crate::ExponentFactor, + rust_decimal::{prelude::FromPrimitive, Decimal}, + serde::{Deserialize, Serialize}, + std::num::NonZeroI64, + thiserror::Error, +}; + +#[derive(Debug, Error)] +pub enum PriceError { + #[error("decimal parse error: {0}")] + DecimalParse(#[from] rust_decimal::Error), + #[error("price value is more precise than available exponent")] + TooPrecise, + #[error("zero price is unsupported")] + ZeroPriceUnsupported, + #[error("overflow")] + Overflow, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] +#[repr(transparent)] +pub struct Price(NonZeroI64); + +impl Price { + pub fn from_integer(value: i64, exponent: i16) -> Result { + let value = match ExponentFactor::get(exponent).ok_or(PriceError::Overflow)? { + ExponentFactor::Mul(coef) => value.checked_mul(coef).ok_or(PriceError::Overflow)?, + ExponentFactor::Div(coef) => value.checked_div(coef).ok_or(PriceError::Overflow)?, + }; + let value = NonZeroI64::new(value).ok_or(PriceError::ZeroPriceUnsupported)?; + Ok(Self(value)) + } + + pub fn parse_str(value: &str, exponent: i16) -> Result { + let value: Decimal = value.parse()?; + let value = match ExponentFactor::get(exponent).ok_or(PriceError::Overflow)? { + ExponentFactor::Mul(coef) => value + .checked_mul(Decimal::from_i64(coef).ok_or(PriceError::Overflow)?) + .ok_or(PriceError::Overflow)?, + ExponentFactor::Div(coef) => value + .checked_div(Decimal::from_i64(coef).ok_or(PriceError::Overflow)?) + .ok_or(PriceError::Overflow)?, + }; + if !value.is_integer() { + return Err(PriceError::TooPrecise); + } + let value: i64 = value.try_into().map_err(|_| PriceError::Overflow)?; + let value = NonZeroI64::new(value).ok_or(PriceError::Overflow)?; + Ok(Self(value)) + } + + pub const fn from_nonzero_mantissa(mantissa: NonZeroI64) -> Self { + Self(mantissa) + } + + pub const fn from_mantissa(mantissa: i64) -> Result { + if let Some(value) = NonZeroI64::new(mantissa) { + Ok(Self(value)) + } else { + Err(PriceError::ZeroPriceUnsupported) + } + } + + pub fn mantissa(self) -> NonZeroI64 { + self.0 + } + + pub fn mantissa_i64(self) -> i64 { + self.0.get() + } + + pub fn to_f64(self, exponent: i16) -> Result { + match ExponentFactor::get(exponent).ok_or(PriceError::Overflow)? { + // Mul/div is reversed for this conversion + ExponentFactor::Mul(coef) => Ok(self.0.get() as f64 / coef as f64), + ExponentFactor::Div(coef) => Ok(self.0.get() as f64 * coef as f64), + } + } + + pub fn from_f64(value: f64, exponent: i16) -> Result { + let value = Decimal::from_f64(value).ok_or(PriceError::Overflow)?; + let value = match ExponentFactor::get(exponent).ok_or(PriceError::Overflow)? { + ExponentFactor::Mul(coef) => value + .checked_mul(Decimal::from_i64(coef).ok_or(PriceError::Overflow)?) + .ok_or(PriceError::Overflow)?, + ExponentFactor::Div(coef) => value + .checked_div(Decimal::from_i64(coef).ok_or(PriceError::Overflow)?) + .ok_or(PriceError::Overflow)?, + }; + let value: i64 = value.try_into().map_err(|_| PriceError::Overflow)?; + Ok(Self( + NonZeroI64::new(value).ok_or(PriceError::ZeroPriceUnsupported)?, + )) + } + + pub fn add_with_same_mantissa(self, other: Price) -> Result { + let value = self + .0 + .get() + .checked_add(other.0.get()) + .ok_or(PriceError::Overflow)?; + Self::from_mantissa(value).map_err(|_| PriceError::ZeroPriceUnsupported) + } + + pub fn sub_with_same_mantissa(self, other: Price) -> Result { + let value = self + .0 + .get() + .checked_sub(other.0.get()) + .ok_or(PriceError::Overflow)?; + Self::from_mantissa(value).map_err(|_| PriceError::ZeroPriceUnsupported) + } + + pub fn mul_integer(self, factor: i64) -> Result { + let value = self + .0 + .get() + .checked_mul(factor) + .ok_or(PriceError::Overflow)?; + Self::from_mantissa(value).map_err(|_| PriceError::ZeroPriceUnsupported) + } + + pub fn div_integer(self, factor: i64) -> Result { + let value = self + .0 + .get() + .checked_div(factor) + .ok_or(PriceError::Overflow)?; + Self::from_mantissa(value).map_err(|_| PriceError::ZeroPriceUnsupported) + } + + pub fn mul_decimal(self, mantissa: i64, rhs_exponent: i16) -> Result { + let left_value = i128::from(self.0.get()); + let right_value = i128::from(mantissa); + + let value = left_value + .checked_mul(right_value) + .ok_or(PriceError::Overflow)?; + + let value = match ExponentFactor::get(rhs_exponent).ok_or(PriceError::Overflow)? { + ExponentFactor::Mul(coef) => { + value.checked_div(coef.into()).ok_or(PriceError::Overflow)? + } + ExponentFactor::Div(coef) => { + value.checked_mul(coef.into()).ok_or(PriceError::Overflow)? + } + }; + let value: i64 = value.try_into().map_err(|_| PriceError::Overflow)?; + Self::from_mantissa(value).map_err(|_| PriceError::ZeroPriceUnsupported) + } +} diff --git a/lazer/sdk/rust/protocol/src/price/tests.rs b/lazer/sdk/rust/protocol/src/price/tests.rs new file mode 100644 index 0000000000..0abe2dd542 --- /dev/null +++ b/lazer/sdk/rust/protocol/src/price/tests.rs @@ -0,0 +1,149 @@ +use {super::Price, assert_float_eq::assert_float_absolute_eq}; + +#[test] +fn price_constructs() { + let price = Price::parse_str("42.68", -8).unwrap(); + assert_eq!(price.0.get(), 4_268_000_000); + assert_float_absolute_eq!(price.to_f64(-8).unwrap(), 42.68); + + let price2 = Price::from_integer(2, -8).unwrap(); + assert_eq!(price2.0.get(), 200_000_000); + assert_float_absolute_eq!(price2.to_f64(-8).unwrap(), 2.); + + let price3 = Price::from_mantissa(123_456).unwrap(); + assert_eq!(price3.0.get(), 123_456); + assert_float_absolute_eq!(price3.to_f64(-8).unwrap(), 0.001_234_56); + + let price4 = Price::from_f64(42.68, -8).unwrap(); + assert_eq!(price4.0.get(), 4_268_000_000); + assert_float_absolute_eq!(price4.to_f64(-8).unwrap(), 42.68); +} + +#[test] +fn price_constructs_with_negative_mantissa() { + let price = Price::parse_str("-42.68", -8).unwrap(); + assert_eq!(price.0.get(), -4_268_000_000); + assert_float_absolute_eq!(price.to_f64(-8).unwrap(), -42.68); + + let price2 = Price::from_integer(-2, -8).unwrap(); + assert_eq!(price2.0.get(), -200_000_000); + assert_float_absolute_eq!(price2.to_f64(-8).unwrap(), -2.); + + let price3 = Price::from_mantissa(-123_456).unwrap(); + assert_eq!(price3.0.get(), -123_456); + assert_float_absolute_eq!(price3.to_f64(-8).unwrap(), -0.001_234_56); + + let price4 = Price::from_f64(-42.68, -8).unwrap(); + assert_eq!(price4.0.get(), -4_268_000_000); + assert_float_absolute_eq!(price4.to_f64(-8).unwrap(), -42.68); +} + +#[test] +fn price_constructs_with_zero_exponent() { + let price = Price::parse_str("42", 0).unwrap(); + assert_eq!(price.0.get(), 42); + assert_float_absolute_eq!(price.to_f64(0).unwrap(), 42.); + + let price2 = Price::from_integer(2, 0).unwrap(); + assert_eq!(price2.0.get(), 2); + assert_float_absolute_eq!(price2.to_f64(0).unwrap(), 2.); + + let price3 = Price::from_mantissa(123_456).unwrap(); + assert_eq!(price3.0.get(), 123_456); + assert_float_absolute_eq!(price3.to_f64(0).unwrap(), 123_456.); + + let price4 = Price::from_f64(42., 0).unwrap(); + assert_eq!(price4.0.get(), 42); + assert_float_absolute_eq!(price4.to_f64(0).unwrap(), 42.); +} + +#[test] +fn price_constructs_with_positive_exponent() { + let price = Price::parse_str("42_680_000", 3).unwrap(); + assert_eq!(price.0.get(), 42_680); + assert_float_absolute_eq!(price.to_f64(3).unwrap(), 42_680_000.); + + let price2 = Price::from_integer(200_000, 3).unwrap(); + assert_eq!(price2.0.get(), 200); + assert_float_absolute_eq!(price2.to_f64(3).unwrap(), 200_000.); + + let price3 = Price::from_mantissa(123_456).unwrap(); + assert_eq!(price3.0.get(), 123_456); + assert_float_absolute_eq!(price3.to_f64(3).unwrap(), 123_456_000.); + + let price4 = Price::from_f64(42_680_000., 3).unwrap(); + assert_eq!(price4.0.get(), 42_680); + assert_float_absolute_eq!(price4.to_f64(3).unwrap(), 42_680_000.); +} + +#[test] +fn price_rejects_zero_mantissa() { + Price::parse_str("0.0", -8).unwrap_err(); + Price::from_integer(0, -8).unwrap_err(); + Price::from_mantissa(0).unwrap_err(); + Price::from_f64(-0.0, -8).unwrap_err(); + + Price::parse_str("0.0", 8).unwrap_err(); + Price::from_integer(0, 8).unwrap_err(); + Price::from_f64(-0.0, 8).unwrap_err(); +} + +#[test] +fn price_rejects_too_precise() { + Price::parse_str("42.68", 0).unwrap_err(); + Price::parse_str("42.68", -1).unwrap_err(); + Price::parse_str("42.68", -2).unwrap(); + + Price::parse_str("42_680", 3).unwrap_err(); + Price::parse_str("42_600", 3).unwrap_err(); + Price::parse_str("42_000", 3).unwrap(); +} + +#[test] +fn price_ops() { + let price1 = Price::parse_str("12.34", -8).unwrap(); + let price2 = Price::parse_str("23.45", -8).unwrap(); + assert_float_absolute_eq!( + price1 + .add_with_same_mantissa(price2) + .unwrap() + .to_f64(-8) + .unwrap(), + 12.34 + 23.45 + ); + assert_float_absolute_eq!( + price1 + .sub_with_same_mantissa(price2) + .unwrap() + .to_f64(-8) + .unwrap(), + 12.34 - 23.45 + ); + assert_float_absolute_eq!( + price1.mul_integer(2).unwrap().to_f64(-8).unwrap(), + 12.34 * 2. + ); + assert_float_absolute_eq!( + price1.div_integer(2).unwrap().to_f64(-8).unwrap(), + 12.34 / 2. + ); + + assert_float_absolute_eq!( + price1.mul_decimal(3456, -2).unwrap().to_f64(-8).unwrap(), + 12.34 * 34.56 + ); + + let price2 = Price::parse_str("42_000", 3).unwrap(); + assert_float_absolute_eq!( + price2.mul_integer(2).unwrap().to_f64(3).unwrap(), + 42_000. * 2. + ); + assert_float_absolute_eq!( + price2.div_integer(2).unwrap().to_f64(3).unwrap(), + 42_000. / 2. + ); + assert_float_absolute_eq!( + price2.mul_decimal(3456, -2).unwrap().to_f64(3).unwrap(), + (42_000_f64 * 34.56 / 1000.).floor() * 1000. + ); +} diff --git a/lazer/sdk/rust/protocol/src/publisher.rs b/lazer/sdk/rust/protocol/src/publisher.rs index ebcee2c89c..74dc90cbd7 100644 --- a/lazer/sdk/rust/protocol/src/publisher.rs +++ b/lazer/sdk/rust/protocol/src/publisher.rs @@ -1,10 +1,5 @@ -//! WebSocket JSON protocol types for API the publisher provides to the router. -//! Publisher data sourcing may also be implemented in the router process, -//! eliminating WebSocket overhead. - use { - super::router::{Price, PriceFeedId, Rate}, - crate::time::TimestampUs, + crate::{price::Price, rate::Rate, time::TimestampUs, PriceFeedId}, derive_more::derive::From, serde::{Deserialize, Serialize}, }; @@ -104,9 +99,11 @@ fn price_feed_data_v1_serde() { price_feed_id: PriceFeedId(1), source_timestamp_us: TimestampUs::from_micros(2), publisher_timestamp_us: TimestampUs::from_micros(3), - price: Some(Price(4.try_into().unwrap())), - best_bid_price: Some(Price(5.try_into().unwrap())), - best_ask_price: Some(Price((2 * 256 + 6).try_into().unwrap())), + price: Some(Price::from_nonzero_mantissa(4.try_into().unwrap())), + best_bid_price: Some(Price::from_nonzero_mantissa(5.try_into().unwrap())), + best_ask_price: Some(Price::from_nonzero_mantissa( + (2 * 256 + 6).try_into().unwrap(), + )), }; assert_eq!( bincode::deserialize::(&data).unwrap(), @@ -126,7 +123,7 @@ fn price_feed_data_v1_serde() { price_feed_id: PriceFeedId(1), source_timestamp_us: TimestampUs::from_micros(2), publisher_timestamp_us: TimestampUs::from_micros(3), - price: Some(Price(4.try_into().unwrap())), + price: Some(Price::from_nonzero_mantissa(4.try_into().unwrap())), best_bid_price: None, best_ask_price: None, }; @@ -153,9 +150,11 @@ fn price_feed_data_v2_serde() { price_feed_id: PriceFeedId(1), source_timestamp_us: TimestampUs::from_micros(2), publisher_timestamp_us: TimestampUs::from_micros(3), - price: Some(Price(4.try_into().unwrap())), - best_bid_price: Some(Price(5.try_into().unwrap())), - best_ask_price: Some(Price((2 * 256 + 6).try_into().unwrap())), + price: Some(Price::from_nonzero_mantissa(4.try_into().unwrap())), + best_bid_price: Some(Price::from_nonzero_mantissa(5.try_into().unwrap())), + best_ask_price: Some(Price::from_nonzero_mantissa( + (2 * 256 + 6).try_into().unwrap(), + )), funding_rate: None, }; assert_eq!( @@ -177,10 +176,10 @@ fn price_feed_data_v2_serde() { price_feed_id: PriceFeedId(1), source_timestamp_us: TimestampUs::from_micros(2), publisher_timestamp_us: TimestampUs::from_micros(3), - price: Some(Price(4.try_into().unwrap())), + price: Some(Price::from_nonzero_mantissa(4.try_into().unwrap())), best_bid_price: None, best_ask_price: None, - funding_rate: Some(Rate(3 * 256 + 7)), + funding_rate: Some(Rate::from_mantissa(3 * 256 + 7)), }; assert_eq!( bincode::deserialize::(&data2).unwrap(), diff --git a/lazer/sdk/rust/protocol/src/rate.rs b/lazer/sdk/rust/protocol/src/rate.rs new file mode 100644 index 0000000000..43af27a1c7 --- /dev/null +++ b/lazer/sdk/rust/protocol/src/rate.rs @@ -0,0 +1,80 @@ +#[cfg(test)] +mod tests; + +use { + crate::ExponentFactor, + rust_decimal::{prelude::FromPrimitive, Decimal}, + serde::{Deserialize, Serialize}, + thiserror::Error, +}; + +#[derive(Debug, Error)] +pub enum RateError { + #[error("decimal parse error: {0}")] + DecimalParse(#[from] rust_decimal::Error), + #[error("price value is more precise than available exponent")] + TooPrecise, + #[error("overflow")] + Overflow, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] +#[repr(transparent)] +pub struct Rate(i64); + +impl Rate { + pub fn from_integer(value: i64, exponent: i16) -> Result { + let value = match ExponentFactor::get(exponent).ok_or(RateError::Overflow)? { + ExponentFactor::Mul(coef) => value.checked_mul(coef).ok_or(RateError::Overflow)?, + ExponentFactor::Div(coef) => value.checked_div(coef).ok_or(RateError::Overflow)?, + }; + Ok(Self(value)) + } + + pub fn parse_str(value: &str, exponent: i16) -> Result { + let value: Decimal = value.parse()?; + let value = match ExponentFactor::get(exponent).ok_or(RateError::Overflow)? { + ExponentFactor::Mul(coef) => value + .checked_mul(Decimal::from_i64(coef).ok_or(RateError::Overflow)?) + .ok_or(RateError::Overflow)?, + ExponentFactor::Div(coef) => value + .checked_div(Decimal::from_i64(coef).ok_or(RateError::Overflow)?) + .ok_or(RateError::Overflow)?, + }; + if !value.is_integer() { + return Err(RateError::TooPrecise); + } + let value: i64 = value.try_into().map_err(|_| RateError::Overflow)?; + Ok(Self(value)) + } + + pub const fn from_mantissa(mantissa: i64) -> Self { + Self(mantissa) + } + + pub fn from_f64(value: f64, exponent: i16) -> Result { + let value = Decimal::from_f64(value).ok_or(RateError::Overflow)?; + let value = match ExponentFactor::get(exponent).ok_or(RateError::Overflow)? { + ExponentFactor::Mul(coef) => value + .checked_mul(Decimal::from_i64(coef).ok_or(RateError::Overflow)?) + .ok_or(RateError::Overflow)?, + ExponentFactor::Div(coef) => value + .checked_div(Decimal::from_i64(coef).ok_or(RateError::Overflow)?) + .ok_or(RateError::Overflow)?, + }; + let value: i64 = value.try_into().map_err(|_| RateError::Overflow)?; + Ok(Self(value)) + } + + pub fn mantissa(self) -> i64 { + self.0 + } + + pub fn to_f64(self, exponent: i16) -> Result { + match ExponentFactor::get(exponent).ok_or(RateError::Overflow)? { + // Mul/div is reversed for this conversion + ExponentFactor::Mul(coef) => Ok(self.0 as f64 / coef as f64), + ExponentFactor::Div(coef) => Ok(self.0 as f64 * coef as f64), + } + } +} diff --git a/lazer/sdk/rust/protocol/src/rate/tests.rs b/lazer/sdk/rust/protocol/src/rate/tests.rs new file mode 100644 index 0000000000..26b49528ec --- /dev/null +++ b/lazer/sdk/rust/protocol/src/rate/tests.rs @@ -0,0 +1,107 @@ +use {crate::rate::Rate, assert_float_eq::assert_float_absolute_eq}; + +#[test] +fn rate_constructs() { + let rate = Rate::parse_str("42.68", -8).unwrap(); + assert_eq!(rate.0, 4_268_000_000); + assert_float_absolute_eq!(rate.to_f64(-8).unwrap(), 42.68); + + let rate2 = Rate::from_integer(2, -8).unwrap(); + assert_eq!(rate2.0, 200_000_000); + assert_float_absolute_eq!(rate2.to_f64(-8).unwrap(), 2.); + + let rate3 = Rate::from_mantissa(123_456); + assert_eq!(rate3.0, 123_456); + assert_float_absolute_eq!(rate3.to_f64(-8).unwrap(), 0.001_234_56); + + let rate4 = Rate::from_f64(42.68, -8).unwrap(); + assert_eq!(rate4.0, 4_268_000_000); + assert_float_absolute_eq!(rate4.to_f64(-8).unwrap(), 42.68); +} + +#[test] +fn rate_constructs_with_negative_mantissa() { + let rate = Rate::parse_str("-42.68", -8).unwrap(); + assert_eq!(rate.0, -4_268_000_000); + assert_float_absolute_eq!(rate.to_f64(-8).unwrap(), -42.68); + + let rate2 = Rate::from_integer(-2, -8).unwrap(); + assert_eq!(rate2.0, -200_000_000); + assert_float_absolute_eq!(rate2.to_f64(-8).unwrap(), -2.); + + let rate3 = Rate::from_mantissa(-123_456); + assert_eq!(rate3.0, -123_456); + assert_float_absolute_eq!(rate3.to_f64(-8).unwrap(), -0.001_234_56); + + let rate4 = Rate::from_f64(-42.68, -8).unwrap(); + assert_eq!(rate4.0, -4_268_000_000); + assert_float_absolute_eq!(rate4.to_f64(-8).unwrap(), -42.68); +} + +#[test] +fn rate_constructs_with_zero_exponent() { + let rate = Rate::parse_str("42", 0).unwrap(); + assert_eq!(rate.0, 42); + assert_float_absolute_eq!(rate.to_f64(0).unwrap(), 42.); + + let rate2 = Rate::from_integer(2, 0).unwrap(); + assert_eq!(rate2.0, 2); + assert_float_absolute_eq!(rate2.to_f64(0).unwrap(), 2.); + + let rate3 = Rate::from_mantissa(123_456); + assert_eq!(rate3.0, 123_456); + assert_float_absolute_eq!(rate3.to_f64(0).unwrap(), 123_456.); + + let rate4 = Rate::from_f64(42., 0).unwrap(); + assert_eq!(rate4.0, 42); + assert_float_absolute_eq!(rate4.to_f64(0).unwrap(), 42.); +} + +#[test] +fn rate_constructs_with_zero_mantissa() { + let rate1 = Rate::parse_str("0.0", -8).unwrap(); + assert_eq!(rate1.0, 0); + let rate2 = Rate::from_integer(0, -8).unwrap(); + assert_eq!(rate2.0, 0); + let rate3 = Rate::from_mantissa(0); + assert_eq!(rate3.0, 0); + let rate4 = Rate::from_f64(-0.0, -8).unwrap(); + assert_eq!(rate4.0, 0); + + let rate1 = Rate::parse_str("0.0", 8).unwrap(); + assert_eq!(rate1.0, 0); + let rate2 = Rate::from_integer(0, 8).unwrap(); + assert_eq!(rate2.0, 0); + let rate4 = Rate::from_f64(-0.0, 8).unwrap(); + assert_eq!(rate4.0, 0); +} + +#[test] +fn rate_constructs_with_positive_exponent() { + let rate = Rate::parse_str("42_680_000", 3).unwrap(); + assert_eq!(rate.0, 42_680); + assert_float_absolute_eq!(rate.to_f64(3).unwrap(), 42_680_000.); + + let rate2 = Rate::from_integer(200_000, 3).unwrap(); + assert_eq!(rate2.0, 200); + assert_float_absolute_eq!(rate2.to_f64(3).unwrap(), 200_000.); + + let rate3 = Rate::from_mantissa(123_456); + assert_eq!(rate3.0, 123_456); + assert_float_absolute_eq!(rate3.to_f64(3).unwrap(), 123_456_000.); + + let rate4 = Rate::from_f64(42_680_000., 3).unwrap(); + assert_eq!(rate4.0, 42_680); + assert_float_absolute_eq!(rate4.to_f64(3).unwrap(), 42_680_000.); +} + +#[test] +fn rate_rejects_too_precise() { + Rate::parse_str("42.68", 0).unwrap_err(); + Rate::parse_str("42.68", -1).unwrap_err(); + Rate::parse_str("42.68", -2).unwrap(); + + Rate::parse_str("42_680", 3).unwrap_err(); + Rate::parse_str("42_600", 3).unwrap_err(); + Rate::parse_str("42_000", 3).unwrap(); +} diff --git a/lazer/sdk/rust/protocol/src/router.rs b/lazer/sdk/rust/protocol/src/router.rs deleted file mode 100644 index e995380e8b..0000000000 --- a/lazer/sdk/rust/protocol/src/router.rs +++ /dev/null @@ -1,605 +0,0 @@ -//! WebSocket JSON protocol types for the API the router provides to consumers and publishers. - -use { - crate::{ - payload::AggregatedPriceFeedData, - time::{DurationUs, TimestampUs}, - }, - anyhow::{bail, Context}, - derive_more::derive::{From, Into}, - itertools::Itertools, - protobuf::well_known_types::duration::Duration as ProtobufDuration, - rust_decimal::{prelude::FromPrimitive, Decimal}, - serde::{de::Error, Deserialize, Serialize}, - std::{ - fmt::Display, - num::NonZeroI64, - ops::{Add, Deref, DerefMut, Div, Sub}, - }, -}; - -#[derive( - Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into, -)] -pub struct PublisherId(pub u16); - -#[derive( - Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into, -)] -pub struct PriceFeedId(pub u32); - -#[derive( - Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into, -)] -pub struct ChannelId(pub u8); - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] -#[repr(transparent)] -pub struct Rate(pub i64); - -impl Rate { - pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result { - let value: Decimal = value.parse()?; - let coef = 10i64.checked_pow(exponent).context("overflow")?; - let coef = Decimal::from_i64(coef).context("overflow")?; - let value = value.checked_mul(coef).context("overflow")?; - if !value.is_integer() { - bail!("price value is more precise than available exponent"); - } - let value: i64 = value.try_into().context("overflow")?; - Ok(Self(value)) - } - - pub fn from_f64(value: f64, exponent: u32) -> anyhow::Result { - let value = Decimal::from_f64(value).context("overflow")?; - let coef = 10i64.checked_pow(exponent).context("overflow")?; - let coef = Decimal::from_i64(coef).context("overflow")?; - let value = value.checked_mul(coef).context("overflow")?; - let value: i64 = value.try_into().context("overflow")?; - Ok(Self(value)) - } - - pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result { - let coef = 10i64.checked_pow(exponent).context("overflow")?; - let value = value.checked_mul(coef).context("overflow")?; - Ok(Self(value)) - } -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] -#[repr(transparent)] -pub struct Price(pub NonZeroI64); - -impl Price { - pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result { - let coef = 10i64.checked_pow(exponent).context("overflow")?; - let value = value.checked_mul(coef).context("overflow")?; - let value = NonZeroI64::new(value).context("zero price is unsupported")?; - Ok(Self(value)) - } - - pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result { - let value: Decimal = value.parse()?; - let coef = 10i64.checked_pow(exponent).context("overflow")?; - let coef = Decimal::from_i64(coef).context("overflow")?; - let value = value.checked_mul(coef).context("overflow")?; - if !value.is_integer() { - bail!("price value is more precise than available exponent"); - } - let value: i64 = value.try_into().context("overflow")?; - let value = NonZeroI64::new(value).context("zero price is unsupported")?; - Ok(Self(value)) - } - - pub fn new(value: i64) -> anyhow::Result { - let value = NonZeroI64::new(value).context("zero price is unsupported")?; - Ok(Self(value)) - } - - pub fn into_inner(self) -> NonZeroI64 { - self.0 - } - - pub fn to_f64(self, exponent: u32) -> anyhow::Result { - Ok(self.0.get() as f64 / 10i64.checked_pow(exponent).context("overflow")? as f64) - } - - pub fn from_f64(value: f64, exponent: u32) -> anyhow::Result { - let value = (value * 10f64.powi(exponent as i32)) as i64; - let value = NonZeroI64::new(value).context("zero price is unsupported")?; - Ok(Self(value)) - } - - pub fn mul(self, rhs: Price, rhs_exponent: u32) -> anyhow::Result { - let left_value = i128::from(self.0.get()); - let right_value = i128::from(rhs.0.get()); - - let value = left_value * right_value / 10i128.pow(rhs_exponent); - let value = value.try_into()?; - NonZeroI64::new(value) - .context("zero price is unsupported") - .map(Self) - } -} - -impl Sub for Price { - type Output = Option; - - fn sub(self, rhs: i64) -> Self::Output { - let value = self.0.get().saturating_sub(rhs); - NonZeroI64::new(value).map(Self) - } -} - -impl Add for Price { - type Output = Option; - - fn add(self, rhs: i64) -> Self::Output { - let value = self.0.get().saturating_add(rhs); - NonZeroI64::new(value).map(Self) - } -} - -impl Add for Price { - type Output = Option; - fn add(self, rhs: Price) -> Self::Output { - let value = self.0.get().saturating_add(rhs.0.get()); - NonZeroI64::new(value).map(Self) - } -} - -impl Sub for Price { - type Output = Option; - fn sub(self, rhs: Price) -> Self::Output { - let value = self.0.get().saturating_sub(rhs.0.get()); - NonZeroI64::new(value).map(Self) - } -} - -impl Div for Price { - type Output = Option; - fn div(self, rhs: i64) -> Self::Output { - let value = self.0.get().saturating_div(rhs); - NonZeroI64::new(value).map(Self) - } -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub enum PriceFeedProperty { - Price, - BestBidPrice, - BestAskPrice, - PublisherCount, - Exponent, - Confidence, - FundingRate, - FundingTimestamp, - FundingRateInterval, - // More fields may be added later. -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub enum DeliveryFormat { - /// Deliver stream updates as JSON text messages. - #[default] - Json, - /// Deliver stream updates as binary messages. - Binary, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub enum Format { - Evm, - Solana, - LeEcdsa, - LeUnsigned, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub enum JsonBinaryEncoding { - #[default] - Base64, - Hex, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, From)] -pub enum Channel { - FixedRate(FixedRate), -} - -impl Serialize for Channel { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - match self { - Channel::FixedRate(fixed_rate) => { - if *fixed_rate == FixedRate::MIN { - return serializer.serialize_str("real_time"); - } - serializer.serialize_str(&format!( - "fixed_rate@{}ms", - fixed_rate.duration().as_millis() - )) - } - } - } -} - -pub mod channel_ids { - use super::ChannelId; - - pub const FIXED_RATE_1: ChannelId = ChannelId(1); - pub const FIXED_RATE_50: ChannelId = ChannelId(2); - pub const FIXED_RATE_200: ChannelId = ChannelId(3); -} - -impl Display for Channel { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Channel::FixedRate(fixed_rate) => match *fixed_rate { - FixedRate::MIN => write!(f, "real_time"), - rate => write!(f, "fixed_rate@{}ms", rate.duration().as_millis()), - }, - } - } -} - -impl Channel { - pub fn id(&self) -> ChannelId { - match self { - Channel::FixedRate(fixed_rate) => match fixed_rate.duration().as_millis() { - 1 => channel_ids::FIXED_RATE_1, - 50 => channel_ids::FIXED_RATE_50, - 200 => channel_ids::FIXED_RATE_200, - _ => panic!("unknown channel: {self:?}"), - }, - } - } -} - -#[test] -fn id_supports_all_fixed_rates() { - for rate in FixedRate::ALL { - Channel::FixedRate(rate).id(); - } -} - -fn parse_channel(value: &str) -> Option { - if value == "real_time" { - Some(Channel::FixedRate(FixedRate::MIN)) - } else if let Some(rest) = value.strip_prefix("fixed_rate@") { - let ms_value = rest.strip_suffix("ms")?; - Some(Channel::FixedRate(FixedRate::from_millis( - ms_value.parse().ok()?, - )?)) - } else { - None - } -} - -impl<'de> Deserialize<'de> for Channel { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - let value = ::deserialize(deserializer)?; - parse_channel(&value).ok_or_else(|| Error::custom("unknown channel")) - } -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] -pub struct FixedRate { - rate: DurationUs, -} - -impl FixedRate { - pub const RATE_1_MS: Self = Self { - rate: DurationUs::from_millis_u32(1), - }; - pub const RATE_50_MS: Self = Self { - rate: DurationUs::from_millis_u32(50), - }; - pub const RATE_200_MS: Self = Self { - rate: DurationUs::from_millis_u32(200), - }; - - // Assumptions (tested below): - // - Values are sorted. - // - 1 second contains a whole number of each interval. - // - all intervals are divisable by the smallest interval. - pub const ALL: [Self; 3] = [Self::RATE_1_MS, Self::RATE_50_MS, Self::RATE_200_MS]; - pub const MIN: Self = Self::ALL[0]; - - pub fn from_millis(millis: u32) -> Option { - Self::ALL - .into_iter() - .find(|v| v.rate.as_millis() == u64::from(millis)) - } - - pub fn duration(self) -> DurationUs { - self.rate - } -} - -impl TryFrom for FixedRate { - type Error = anyhow::Error; - - fn try_from(value: DurationUs) -> Result { - Self::ALL - .into_iter() - .find(|v| v.rate == value) - .with_context(|| format!("unsupported rate: {value:?}")) - } -} - -impl TryFrom<&ProtobufDuration> for FixedRate { - type Error = anyhow::Error; - - fn try_from(value: &ProtobufDuration) -> Result { - let duration = DurationUs::try_from(value)?; - Self::try_from(duration) - } -} - -impl TryFrom for FixedRate { - type Error = anyhow::Error; - - fn try_from(duration: ProtobufDuration) -> anyhow::Result { - TryFrom::<&ProtobufDuration>::try_from(&duration) - } -} - -impl From for DurationUs { - fn from(value: FixedRate) -> Self { - value.rate - } -} - -impl From for ProtobufDuration { - fn from(value: FixedRate) -> Self { - value.rate.into() - } -} - -#[test] -fn fixed_rate_values() { - assert!( - FixedRate::ALL.windows(2).all(|w| w[0] < w[1]), - "values must be unique and sorted" - ); - for value in FixedRate::ALL { - assert_eq!( - 1_000_000 % value.duration().as_micros(), - 0, - "1 s must contain whole number of intervals" - ); - assert_eq!( - value.duration().as_micros() % FixedRate::MIN.duration().as_micros(), - 0, - "the interval's borders must be a subset of the minimal interval's borders" - ); - } -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct SubscriptionParamsRepr { - pub price_feed_ids: Vec, - pub properties: Vec, - // "chains" was renamed to "formats". "chains" is still supported for compatibility. - #[serde(alias = "chains")] - pub formats: Vec, - #[serde(default)] - pub delivery_format: DeliveryFormat, - #[serde(default)] - pub json_binary_encoding: JsonBinaryEncoding, - /// If `true`, the stream update will contain a `parsed` JSON field containing - /// all data of the update. - #[serde(default = "default_parsed")] - pub parsed: bool, - pub channel: Channel, - #[serde(default)] - pub ignore_invalid_feed_ids: bool, -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct SubscriptionParams(SubscriptionParamsRepr); - -impl<'de> Deserialize<'de> for SubscriptionParams { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - let value = SubscriptionParamsRepr::deserialize(deserializer)?; - Self::new(value).map_err(Error::custom) - } -} - -impl SubscriptionParams { - pub fn new(value: SubscriptionParamsRepr) -> Result { - if value.price_feed_ids.is_empty() { - return Err("no price feed ids specified"); - } - if !value.price_feed_ids.iter().all_unique() { - return Err("duplicate price feed ids specified"); - } - if !value.formats.iter().all_unique() { - return Err("duplicate formats or chains specified"); - } - if value.properties.is_empty() { - return Err("no properties specified"); - } - if !value.properties.iter().all_unique() { - return Err("duplicate properties specified"); - } - Ok(Self(value)) - } -} - -impl Deref for SubscriptionParams { - type Target = SubscriptionParamsRepr; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} -impl DerefMut for SubscriptionParams { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -pub fn default_parsed() -> bool { - true -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct JsonBinaryData { - pub encoding: JsonBinaryEncoding, - pub data: String, -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct JsonUpdate { - /// Present unless `parsed = false` is specified in subscription params. - #[serde(skip_serializing_if = "Option::is_none")] - pub parsed: Option, - /// Only present if `Evm` is present in `formats` in subscription params. - #[serde(skip_serializing_if = "Option::is_none")] - pub evm: Option, - /// Only present if `Solana` is present in `formats` in subscription params. - #[serde(skip_serializing_if = "Option::is_none")] - pub solana: Option, - /// Only present if `LeEcdsa` is present in `formats` in subscription params. - #[serde(skip_serializing_if = "Option::is_none")] - pub le_ecdsa: Option, - /// Only present if `LeUnsigned` is present in `formats` in subscription params. - #[serde(skip_serializing_if = "Option::is_none")] - pub le_unsigned: Option, -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ParsedPayload { - #[serde(with = "crate::serde_str::timestamp")] - pub timestamp_us: TimestampUs, - pub price_feeds: Vec, -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ParsedFeedPayload { - pub price_feed_id: PriceFeedId, - #[serde(skip_serializing_if = "Option::is_none")] - #[serde(with = "crate::serde_str::option_price")] - #[serde(default)] - pub price: Option, - #[serde(skip_serializing_if = "Option::is_none")] - #[serde(with = "crate::serde_str::option_price")] - #[serde(default)] - pub best_bid_price: Option, - #[serde(skip_serializing_if = "Option::is_none")] - #[serde(with = "crate::serde_str::option_price")] - #[serde(default)] - pub best_ask_price: Option, - #[serde(skip_serializing_if = "Option::is_none")] - #[serde(default)] - pub publisher_count: Option, - #[serde(skip_serializing_if = "Option::is_none")] - #[serde(default)] - pub exponent: Option, - #[serde(skip_serializing_if = "Option::is_none")] - #[serde(default)] - pub confidence: Option, - #[serde(skip_serializing_if = "Option::is_none")] - #[serde(default)] - pub funding_rate: Option, - #[serde(skip_serializing_if = "Option::is_none")] - #[serde(default)] - pub funding_timestamp: Option, - // More fields may be added later. - #[serde(skip_serializing_if = "Option::is_none")] - #[serde(default)] - pub funding_rate_interval: Option, -} - -impl ParsedFeedPayload { - pub fn new( - price_feed_id: PriceFeedId, - exponent: Option, - data: &AggregatedPriceFeedData, - properties: &[PriceFeedProperty], - ) -> Self { - let mut output = Self { - price_feed_id, - price: None, - best_bid_price: None, - best_ask_price: None, - publisher_count: None, - exponent: None, - confidence: None, - funding_rate: None, - funding_timestamp: None, - funding_rate_interval: None, - }; - for &property in properties { - match property { - PriceFeedProperty::Price => { - output.price = data.price; - } - PriceFeedProperty::BestBidPrice => { - output.best_bid_price = data.best_bid_price; - } - PriceFeedProperty::BestAskPrice => { - output.best_ask_price = data.best_ask_price; - } - PriceFeedProperty::PublisherCount => { - output.publisher_count = Some(data.publisher_count); - } - PriceFeedProperty::Exponent => { - output.exponent = exponent; - } - PriceFeedProperty::Confidence => { - output.confidence = data.confidence; - } - PriceFeedProperty::FundingRate => { - output.funding_rate = data.funding_rate; - } - PriceFeedProperty::FundingTimestamp => { - output.funding_timestamp = data.funding_timestamp; - } - PriceFeedProperty::FundingRateInterval => { - output.funding_rate_interval = data.funding_rate_interval; - } - } - } - output - } - - pub fn new_full( - price_feed_id: PriceFeedId, - exponent: Option, - data: &AggregatedPriceFeedData, - ) -> Self { - Self { - price_feed_id, - price: data.price, - best_bid_price: data.best_bid_price, - best_ask_price: data.best_ask_price, - publisher_count: Some(data.publisher_count), - exponent, - confidence: data.confidence, - funding_rate: data.funding_rate, - funding_timestamp: data.funding_timestamp, - funding_rate_interval: data.funding_rate_interval, - } - } -} diff --git a/lazer/sdk/rust/protocol/src/serde_price_as_i64.rs b/lazer/sdk/rust/protocol/src/serde_price_as_i64.rs index b2830c6d9f..1fea60cc8a 100644 --- a/lazer/sdk/rust/protocol/src/serde_price_as_i64.rs +++ b/lazer/sdk/rust/protocol/src/serde_price_as_i64.rs @@ -1,5 +1,5 @@ use { - crate::router::Price, + crate::price::Price, serde::{Deserialize, Deserializer, Serialize, Serializer}, std::num::NonZeroI64, }; @@ -9,7 +9,7 @@ where S: Serializer, { value - .map_or(0i64, |price| price.0.get()) + .map_or(0i64, |price| price.mantissa_i64()) .serialize(serializer) } @@ -18,5 +18,5 @@ where D: Deserializer<'de>, { let value = i64::deserialize(deserializer)?; - Ok(NonZeroI64::new(value).map(Price)) + Ok(NonZeroI64::new(value).map(Price::from_nonzero_mantissa)) } diff --git a/lazer/sdk/rust/protocol/src/serde_str.rs b/lazer/sdk/rust/protocol/src/serde_str.rs index 1446fb4332..cb3b27af52 100644 --- a/lazer/sdk/rust/protocol/src/serde_str.rs +++ b/lazer/sdk/rust/protocol/src/serde_str.rs @@ -1,6 +1,6 @@ pub mod option_price { use { - crate::router::Price, + crate::price::Price, serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}, std::num::NonZeroI64, }; @@ -10,7 +10,7 @@ pub mod option_price { S: Serializer, { value - .map(|price| price.0.get().to_string()) + .map(|price| price.mantissa_i64().to_string()) .serialize(serializer) } @@ -22,7 +22,7 @@ pub mod option_price { if let Some(value) = value { let value: i64 = value.parse().map_err(D::Error::custom)?; let value = NonZeroI64::new(value).ok_or_else(|| D::Error::custom("zero price"))?; - Ok(Some(Price(value))) + Ok(Some(Price::from_nonzero_mantissa(value))) } else { Ok(None) } diff --git a/lazer/sdk/rust/protocol/src/subscription.rs b/lazer/sdk/rust/protocol/src/subscription.rs deleted file mode 100644 index c773283839..0000000000 --- a/lazer/sdk/rust/protocol/src/subscription.rs +++ /dev/null @@ -1,103 +0,0 @@ -//! Types descibing general WebSocket subscription/unsubscription JSON messages -//! used across publishers, agents and routers. - -use { - crate::router::{JsonUpdate, PriceFeedId, SubscriptionParams}, - derive_more::From, - serde::{Deserialize, Serialize}, -}; - -/// A request sent from the client to the server. -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(tag = "type")] -#[serde(rename_all = "camelCase")] -pub enum Request { - Subscribe(SubscribeRequest), - Unsubscribe(UnsubscribeRequest), -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] -pub struct SubscriptionId(pub u64); - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct SubscribeRequest { - pub subscription_id: SubscriptionId, - #[serde(flatten)] - pub params: SubscriptionParams, -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct UnsubscribeRequest { - pub subscription_id: SubscriptionId, -} - -/// A JSON response sent from the server to the client. -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, From)] -#[serde(tag = "type")] -#[serde(rename_all = "camelCase")] -pub enum Response { - Error(ErrorResponse), - Subscribed(SubscribedResponse), - SubscribedWithInvalidFeedIdsIgnored(SubscribedWithInvalidFeedIdsIgnoredResponse), - Unsubscribed(UnsubscribedResponse), - SubscriptionError(SubscriptionErrorResponse), - StreamUpdated(StreamUpdatedResponse), -} - -/// Sent from the server after a successul subscription. -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct SubscribedResponse { - pub subscription_id: SubscriptionId, -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct InvalidFeedSubscriptionDetails { - pub unknown_ids: Vec, - pub unsupported_channels: Vec, - pub unstable: Vec, -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct SubscribedWithInvalidFeedIdsIgnoredResponse { - pub subscription_id: SubscriptionId, - pub subscribed_feed_ids: Vec, - pub ignored_invalid_feed_ids: InvalidFeedSubscriptionDetails, -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct UnsubscribedResponse { - pub subscription_id: SubscriptionId, -} - -/// Sent from the server if the requested subscription or unsubscription request -/// could not be fulfilled. -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct SubscriptionErrorResponse { - pub subscription_id: SubscriptionId, - pub error: String, -} - -/// Sent from the server if an internal error occured while serving data for an existing subscription, -/// or a client request sent a bad request. -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ErrorResponse { - pub error: String, -} - -/// Sent from the server when new data is available for an existing subscription -/// (only if `delivery_format == Json`). -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct StreamUpdatedResponse { - pub subscription_id: SubscriptionId, - #[serde(flatten)] - pub payload: JsonUpdate, -} diff --git a/lazer/sdk/rust/protocol/src/time.rs b/lazer/sdk/rust/protocol/src/time.rs index 5fbc6956c4..8362d5a6c9 100644 --- a/lazer/sdk/rust/protocol/src/time.rs +++ b/lazer/sdk/rust/protocol/src/time.rs @@ -486,3 +486,97 @@ pub mod duration_us_serde_humantime { value.into_inner().try_into().map_err(D::Error::custom) } } + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct FixedRate { + rate: DurationUs, +} + +impl FixedRate { + pub const RATE_1_MS: Self = Self { + rate: DurationUs::from_millis_u32(1), + }; + pub const RATE_50_MS: Self = Self { + rate: DurationUs::from_millis_u32(50), + }; + pub const RATE_200_MS: Self = Self { + rate: DurationUs::from_millis_u32(200), + }; + + // Assumptions (tested below): + // - Values are sorted. + // - 1 second contains a whole number of each interval. + // - all intervals are divisable by the smallest interval. + pub const ALL: [Self; 3] = [Self::RATE_1_MS, Self::RATE_50_MS, Self::RATE_200_MS]; + pub const MIN: Self = Self::ALL[0]; + + pub fn from_millis(millis: u32) -> Option { + Self::ALL + .into_iter() + .find(|v| v.rate.as_millis() == u64::from(millis)) + } + + pub fn duration(self) -> DurationUs { + self.rate + } +} + +impl TryFrom for FixedRate { + type Error = anyhow::Error; + + fn try_from(value: DurationUs) -> Result { + Self::ALL + .into_iter() + .find(|v| v.rate == value) + .with_context(|| format!("unsupported rate: {value:?}")) + } +} + +impl TryFrom<&ProtobufDuration> for FixedRate { + type Error = anyhow::Error; + + fn try_from(value: &ProtobufDuration) -> Result { + let duration = DurationUs::try_from(value)?; + Self::try_from(duration) + } +} + +impl TryFrom for FixedRate { + type Error = anyhow::Error; + + fn try_from(duration: ProtobufDuration) -> anyhow::Result { + TryFrom::<&ProtobufDuration>::try_from(&duration) + } +} + +impl From for DurationUs { + fn from(value: FixedRate) -> Self { + value.rate + } +} + +impl From for ProtobufDuration { + fn from(value: FixedRate) -> Self { + value.rate.into() + } +} + +#[test] +fn fixed_rate_values() { + assert!( + FixedRate::ALL.windows(2).all(|w| w[0] < w[1]), + "values must be unique and sorted" + ); + for value in FixedRate::ALL { + assert_eq!( + 1_000_000 % value.duration().as_micros(), + 0, + "1 s must contain whole number of intervals" + ); + assert_eq!( + value.duration().as_micros() % FixedRate::MIN.duration().as_micros(), + 0, + "the interval's borders must be a subset of the minimal interval's borders" + ); + } +}