Skip to content
44 changes: 5 additions & 39 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions apps/pyth-lazer-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ license = "Apache-2.0"
repository = "https://github.com/pyth-network/pyth-crosschain"

[dependencies]
pyth-lazer-publisher-sdk = "0.3.0"
pyth-lazer-protocol = "0.10.1"
pyth-lazer-publisher-sdk = { path = "../../lazer/publisher_sdk/rust" }
pyth-lazer-protocol = { version = "0.14.0", path = "../../lazer/sdk/rust/protocol" }

anyhow = "1.0.98"
backoff = "0.4.0"
Expand Down
4 changes: 2 additions & 2 deletions apps/pyth-lazer-agent/src/jrpc_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,9 @@ async fn handle_get_metadata<T: AsyncRead + AsyncWrite + Unpin>(

#[cfg(test)]
pub mod tests {
use pyth_lazer_protocol::{PriceFeedId, SymbolState, api::Channel, time::FixedRate};

use super::*;
use pyth_lazer_protocol::router::{Channel, FixedRate, PriceFeedId};
use pyth_lazer_protocol::symbol_state::SymbolState;
use std::net::SocketAddr;

fn gen_test_symbol(name: String, asset_type: String) -> SymbolMetadata {
Expand Down
72 changes: 56 additions & 16 deletions apps/pyth-lazer-agent/src/lazer_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use pyth_lazer_publisher_sdk::transaction::{
Ed25519SignatureData, LazerTransaction, SignatureData, SignedLazerTransaction,
};
use solana_keypair::read_keypair_file;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
Expand Down Expand Up @@ -132,9 +133,10 @@ impl LazerPublisherTask {
return Ok(());
}

let mut updates = self.pending_updates.drain(..).collect();
let mut updates: Vec<FeedUpdate> = self.pending_updates.drain(..).collect();
updates.sort();
if self.config.enable_update_deduplication {
deduplicate_feed_updates(&mut updates);
updates = deduplicate_feed_updates(&updates)?;
}

let publisher_update = PublisherUpdate {
Expand Down Expand Up @@ -178,9 +180,17 @@ impl LazerPublisherTask {
}
}

fn deduplicate_feed_updates(feed_updates: &mut Vec<FeedUpdate>) {
// assume that feed_updates is already sorted by timestamp for each feed_update.feed_id
feed_updates.dedup_by_key(|feed_update| (feed_update.feed_id, feed_update.update.clone()));
/// For each feed, keep the latest data. Among updates with the same data, keep the one with the earliest timestamp.
/// Assumes the input is sorted by timestamp ascending.
fn deduplicate_feed_updates(sorted_feed_updates: &Vec<FeedUpdate>) -> Result<Vec<FeedUpdate>> {
let mut deduped_feed_updates = HashMap::new();
for update in sorted_feed_updates {
let entry = deduped_feed_updates.entry(update.feed_id).or_insert(update);
if entry.update != update.update {
*entry = update;
}
}
Ok(deduped_feed_updates.into_values().cloned().collect())
}

#[cfg(test)]
Expand Down Expand Up @@ -308,25 +318,28 @@ mod tests {
// - (4, 15)
// - (5, 15)
// - (6, 10)
// we should only return (1, 10), (4, 15), (6, 10)
// - (7, 10)
// we should only return (6, 10)

let updates = &mut vec![
let updates = &vec![
test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10),
test_feed_update(1, TimestampUs::from_millis(2).unwrap(), 10),
test_feed_update(1, TimestampUs::from_millis(3).unwrap(), 10),
test_feed_update(1, TimestampUs::from_millis(4).unwrap(), 15),
test_feed_update(1, TimestampUs::from_millis(5).unwrap(), 15),
test_feed_update(1, TimestampUs::from_millis(6).unwrap(), 10),
test_feed_update(1, TimestampUs::from_millis(7).unwrap(), 10),
];

let expected_updates = vec![
test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10),
test_feed_update(1, TimestampUs::from_millis(4).unwrap(), 15),
test_feed_update(1, TimestampUs::from_millis(6).unwrap(), 10),
];
let expected_updates = vec![test_feed_update(
1,
TimestampUs::from_millis(6).unwrap(),
10,
)];

deduplicate_feed_updates(updates);
assert_eq!(updates.to_vec(), expected_updates);
let mut deduped_updates = deduplicate_feed_updates(updates).unwrap();
deduped_updates.sort();
assert_eq!(deduped_updates, expected_updates);
}

#[test]
Expand All @@ -342,11 +355,38 @@ mod tests {

let expected_updates = vec![
test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10),
test_feed_update(2, TimestampUs::from_millis(6).unwrap(), 10),
];

let mut deduped_updates = deduplicate_feed_updates(updates).unwrap();
deduped_updates.sort();
assert_eq!(deduped_updates, expected_updates);
}

#[test]
fn test_deduplicate_feed_updates_multiple_feeds_random_order() {
let updates = &mut vec![
test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10),
test_feed_update(1, TimestampUs::from_millis(2).unwrap(), 20),
test_feed_update(1, TimestampUs::from_millis(3).unwrap(), 10),
test_feed_update(2, TimestampUs::from_millis(4).unwrap(), 15),
test_feed_update(2, TimestampUs::from_millis(5).unwrap(), 15),
test_feed_update(2, TimestampUs::from_millis(6).unwrap(), 10),
test_feed_update(1, TimestampUs::from_millis(7).unwrap(), 20),
test_feed_update(1, TimestampUs::from_millis(8).unwrap(), 10), // last distinct update for feed 1
test_feed_update(1, TimestampUs::from_millis(9).unwrap(), 10),
test_feed_update(2, TimestampUs::from_millis(10).unwrap(), 15),
test_feed_update(2, TimestampUs::from_millis(11).unwrap(), 15),
test_feed_update(2, TimestampUs::from_millis(12).unwrap(), 10), // last distinct update for feed 2
];

let expected_updates = vec![
test_feed_update(1, TimestampUs::from_millis(8).unwrap(), 10),
test_feed_update(2, TimestampUs::from_millis(12).unwrap(), 10),
];

deduplicate_feed_updates(updates);
assert_eq!(updates.to_vec(), expected_updates);
let mut deduped_updates = deduplicate_feed_updates(updates).unwrap();
deduped_updates.sort();
assert_eq!(deduped_updates, expected_updates);
}
}
10 changes: 5 additions & 5 deletions apps/pyth-lazer-agent/src/publisher_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ async fn try_handle_publisher(
feed_id: Some(data.price_feed_id.0),
source_timestamp: MessageField::some(data.source_timestamp_us.into()),
update: Some(Update::PriceUpdate(PriceUpdate {
price: data.price.map(|p| p.0.get()),
best_bid_price: data.best_bid_price.map(|p| p.0.get()),
best_ask_price: data.best_ask_price.map(|p| p.0.get()),
price: data.price.map(|p| p.mantissa_i64()),
best_bid_price: data.best_bid_price.map(|p| p.mantissa_i64()),
best_ask_price: data.best_ask_price.map(|p| p.mantissa_i64()),
..PriceUpdate::default()
})),
special_fields: Default::default(),
Expand Down Expand Up @@ -125,8 +125,8 @@ async fn try_handle_publisher(
feed_id: Some(data.price_feed_id.0),
source_timestamp: MessageField::some(data.source_timestamp_us.into()),
update: Some(Update::FundingRateUpdate(FundingRateUpdate {
price: data.price.map(|p| p.0.get()),
rate: data.funding_rate.map(|r| r.0),
price: data.price.map(|p| p.mantissa_i64()),
rate: data.funding_rate.map(|r| r.mantissa()),
..FundingRateUpdate::default()
})),
special_fields: Default::default(),
Expand Down
2 changes: 1 addition & 1 deletion lazer/publisher_sdk/rust/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pyth-lazer-publisher-sdk"
version = "0.10.0"
version = "0.11.0"
edition = "2021"
description = "Pyth Lazer Publisher SDK types."
license = "Apache-2.0"
Expand Down
92 changes: 92 additions & 0 deletions lazer/publisher_sdk/rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,95 @@ impl From<pyth_lazer_protocol::api::Channel> for state::Channel {
result
}
}

impl Eq for PriceUpdate {}

impl Ord for PriceUpdate {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
(self.price, self.best_bid_price, self.best_ask_price).cmp(&(
other.price,
other.best_bid_price,
other.best_ask_price,
))
}
}

impl Eq for FundingRateUpdate {}

impl Ord for FundingRateUpdate {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
(
self.price,
self.rate,
self.funding_rate_interval
.as_ref()
.map(|duration| (duration.seconds, duration.nanos)),
)
.cmp(&(
other.price,
other.rate,
other
.funding_rate_interval
.as_ref()
.map(|duration| (duration.seconds, duration.nanos)),
))
}
}

impl PartialOrd for FundingRateUpdate {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl PartialOrd for PriceUpdate {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl Eq for Update {}

impl Ord for Update {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
match (self, other) {
(Update::PriceUpdate(a), Update::PriceUpdate(b)) => a.cmp(b),
(Update::FundingRateUpdate(a), Update::FundingRateUpdate(b)) => a.cmp(b),
(Update::PriceUpdate(_), Update::FundingRateUpdate(_)) => std::cmp::Ordering::Less,
(Update::FundingRateUpdate(_), Update::PriceUpdate(_)) => std::cmp::Ordering::Greater,
}
}
}

impl PartialOrd for Update {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl Eq for FeedUpdate {}

// FeedUpdates are ordered first by source_timestamp, then by feed_id, then by update.
impl Ord for FeedUpdate {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
(
&self.source_timestamp.as_ref().map(|t| (t.seconds, t.nanos)),
&self.feed_id,
&self.update,
)
.cmp(&(
&other
.source_timestamp
.as_ref()
.map(|t| (t.seconds, t.nanos)),
&other.feed_id,
&other.update,
))
}
}

impl PartialOrd for FeedUpdate {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
Loading