diff --git a/Cargo.lock b/Cargo.lock index 76c7708924..678d660608 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5608,7 +5608,7 @@ dependencies = [ [[package]] name = "pyth-lazer-agent" -version = "0.4.0" +version = "0.4.1" dependencies = [ "anyhow", "backoff", diff --git a/apps/pyth-lazer-agent/Cargo.toml b/apps/pyth-lazer-agent/Cargo.toml index 1dcae42e12..c398fd52d7 100644 --- a/apps/pyth-lazer-agent/Cargo.toml +++ b/apps/pyth-lazer-agent/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-lazer-agent" -version = "0.4.0" +version = "0.4.1" edition = "2024" description = "Pyth Lazer Agent" license = "Apache-2.0" diff --git a/apps/pyth-lazer-agent/README.md b/apps/pyth-lazer-agent/README.md index cd6455e4c7..14ed2576c7 100644 --- a/apps/pyth-lazer-agent/README.md +++ b/apps/pyth-lazer-agent/README.md @@ -49,6 +49,7 @@ publish_keypair_path = "/path/to/keypair.json" authorization_token = "your_token" listen_address = "0.0.0.0:8910" publish_interval_duration = "25ms" +enable_update_deduplication = false ``` - `relayers_urls`: The Lazer team will provide these. @@ -56,3 +57,4 @@ publish_interval_duration = "25ms" - `authorization_token`: The Lazer team will provide this or instruct that it can be omitted. - `listen_address`: The local port the agent will be listening on; can be anything you want. - `publisher_interval`: The agent will batch and send transaction bundles at this interval. The Lazer team will provide guidance here. +- `enable_update_deduplication`: The agent will deduplicate updates based inside each batch before sending it to Lazer. diff --git a/apps/pyth-lazer-agent/src/config.rs b/apps/pyth-lazer-agent/src/config.rs index 795130aa4f..8e442c5167 100644 --- a/apps/pyth-lazer-agent/src/config.rs +++ b/apps/pyth-lazer-agent/src/config.rs @@ -19,6 +19,8 @@ pub struct Config { #[serde(with = "humantime_serde", default = "default_publish_interval")] pub publish_interval_duration: Duration, pub history_service_url: Option, + #[serde(default)] + pub enable_update_deduplication: bool, } #[derive(Deserialize, Derivative, Clone, PartialEq)] diff --git a/apps/pyth-lazer-agent/src/jrpc_handle.rs b/apps/pyth-lazer-agent/src/jrpc_handle.rs index dea9d337a0..7fe9401e25 100644 --- a/apps/pyth-lazer-agent/src/jrpc_handle.rs +++ b/apps/pyth-lazer-agent/src/jrpc_handle.rs @@ -299,6 +299,7 @@ pub mod tests { publish_keypair_path: Default::default(), publish_interval_duration: Default::default(), history_service_url: None, + enable_update_deduplication: false, }; println!("{:?}", get_metadata(config).await.unwrap()); diff --git a/apps/pyth-lazer-agent/src/lazer_publisher.rs b/apps/pyth-lazer-agent/src/lazer_publisher.rs index 79bb00b493..7bf7f46ce5 100644 --- a/apps/pyth-lazer-agent/src/lazer_publisher.rs +++ b/apps/pyth-lazer-agent/src/lazer_publisher.rs @@ -132,8 +132,13 @@ impl LazerPublisherTask { return Ok(()); } + let mut updates = self.pending_updates.drain(..).collect(); + if self.config.enable_update_deduplication { + deduplicate_feed_updates(&mut updates); + } + let publisher_update = PublisherUpdate { - updates: self.pending_updates.drain(..).collect(), + updates, publisher_timestamp: MessageField::some(Timestamp::now()), special_fields: Default::default(), }; @@ -173,13 +178,19 @@ impl LazerPublisherTask { } } +fn deduplicate_feed_updates(feed_updates: &mut Vec) { + // assume that feed_updates is already sorted by timestamp for each feed_update.feed_id + feed_updates.dedup_by_key(|feed_update| (feed_update.feed_id, feed_update.update.clone())); +} + #[cfg(test)] mod tests { use crate::config::{CHANNEL_CAPACITY, Config}; - use crate::lazer_publisher::LazerPublisherTask; + use crate::lazer_publisher::{LazerPublisherTask, deduplicate_feed_updates}; use ed25519_dalek::SigningKey; use protobuf::well_known_types::timestamp::Timestamp; use protobuf::{Message, MessageField}; + use pyth_lazer_protocol::time::TimestampUs; use pyth_lazer_publisher_sdk::publisher_update::feed_update::Update; use pyth_lazer_publisher_sdk::publisher_update::{FeedUpdate, PriceUpdate}; use pyth_lazer_publisher_sdk::transaction::{LazerTransaction, lazer_transaction}; @@ -212,6 +223,18 @@ mod tests { temp_file } + fn test_feed_update(feed_id: u32, timestamp: TimestampUs, price: i64) -> FeedUpdate { + FeedUpdate { + feed_id: Some(feed_id), + source_timestamp: MessageField::some(timestamp.into()), + update: Some(Update::PriceUpdate(PriceUpdate { + price: Some(price), + ..PriceUpdate::default() + })), + special_fields: Default::default(), + } + } + #[tokio::test] async fn test_lazer_exporter_task() { let signing_key_file = get_private_key_file(); @@ -224,6 +247,7 @@ mod tests { publish_keypair_path: PathBuf::from(signing_key_file.path()), publish_interval_duration: Duration::from_millis(25), history_service_url: None, + enable_update_deduplication: false, }; let (relayer_sender, mut relayer_receiver) = broadcast::channel(CHANNEL_CAPACITY); @@ -274,4 +298,55 @@ mod tests { _ => panic!("channel should have a transaction waiting"), } } + + #[test] + fn test_deduplicate_feed_updates() { + // let's consider a batch containing updates for a single feed. the updates are (ts, price): + // - (1, 10) + // - (2, 10) + // - (3, 10) + // - (4, 15) + // - (5, 15) + // - (6, 10) + // we should only return (1, 10), (4, 15), (6, 10) + + let updates = &mut vec![ + test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10), + test_feed_update(1, TimestampUs::from_millis(2).unwrap(), 10), + test_feed_update(1, TimestampUs::from_millis(3).unwrap(), 10), + test_feed_update(1, TimestampUs::from_millis(4).unwrap(), 15), + test_feed_update(1, TimestampUs::from_millis(5).unwrap(), 15), + test_feed_update(1, TimestampUs::from_millis(6).unwrap(), 10), + ]; + + let expected_updates = vec![ + test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10), + test_feed_update(1, TimestampUs::from_millis(4).unwrap(), 15), + test_feed_update(1, TimestampUs::from_millis(6).unwrap(), 10), + ]; + + deduplicate_feed_updates(updates); + assert_eq!(updates.to_vec(), expected_updates); + } + + #[test] + fn test_deduplicate_feed_updates_multiple_feeds() { + let updates = &mut vec![ + test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10), + test_feed_update(1, TimestampUs::from_millis(2).unwrap(), 10), + test_feed_update(1, TimestampUs::from_millis(3).unwrap(), 10), + test_feed_update(2, TimestampUs::from_millis(4).unwrap(), 15), + test_feed_update(2, TimestampUs::from_millis(5).unwrap(), 15), + test_feed_update(2, TimestampUs::from_millis(6).unwrap(), 10), + ]; + + let expected_updates = vec![ + test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10), + test_feed_update(2, TimestampUs::from_millis(4).unwrap(), 15), + test_feed_update(2, TimestampUs::from_millis(6).unwrap(), 10), + ]; + + deduplicate_feed_updates(updates); + assert_eq!(updates.to_vec(), expected_updates); + } }