Skip to content

feat(pyth-lazer-agent) Allow deduplicating updates within each batch #2944

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apps/pyth-lazer-agent/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pyth-lazer-agent"
version = "0.4.0"
version = "0.4.1"
edition = "2024"
description = "Pyth Lazer Agent"
license = "Apache-2.0"
Expand Down
2 changes: 2 additions & 0 deletions apps/pyth-lazer-agent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,12 @@ publish_keypair_path = "/path/to/keypair.json"
authorization_token = "your_token"
listen_address = "0.0.0.0:8910"
publish_interval_duration = "25ms"
enable_update_deduplication = false
```

- `relayers_urls`: The Lazer team will provide these.
- `publish_keypair_path`: The keypair file generated with `solana-keygen` or similar.
- `authorization_token`: The Lazer team will provide this or instruct that it can be omitted.
- `listen_address`: The local port the agent will be listening on; can be anything you want.
- `publisher_interval`: The agent will batch and send transaction bundles at this interval. The Lazer team will provide guidance here.
- `enable_update_deduplication`: The agent will deduplicate updates based inside each batch before sending it to Lazer.
2 changes: 2 additions & 0 deletions apps/pyth-lazer-agent/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ pub struct Config {
#[serde(with = "humantime_serde", default = "default_publish_interval")]
pub publish_interval_duration: Duration,
pub history_service_url: Option<Url>,
#[serde(default)]
pub enable_update_deduplication: bool,
}

#[derive(Deserialize, Derivative, Clone, PartialEq)]
Expand Down
1 change: 1 addition & 0 deletions apps/pyth-lazer-agent/src/jrpc_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ pub mod tests {
publish_keypair_path: Default::default(),
publish_interval_duration: Default::default(),
history_service_url: None,
enable_update_deduplication: false,
};

println!("{:?}", get_metadata(config).await.unwrap());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this is a println and not an info log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't have tracing setup properly in unit tests making println the easiest choice. this is just for that one manual test that's only ever run locally during dev to grab example data

Expand Down
79 changes: 77 additions & 2 deletions apps/pyth-lazer-agent/src/lazer_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,13 @@ impl LazerPublisherTask {
return Ok(());
}

let mut updates = self.pending_updates.drain(..).collect();
if self.config.enable_update_deduplication {
deduplicate_feed_updates(&mut updates);
}

let publisher_update = PublisherUpdate {
updates: self.pending_updates.drain(..).collect(),
updates,
publisher_timestamp: MessageField::some(Timestamp::now()),
special_fields: Default::default(),
};
Expand Down Expand Up @@ -173,13 +178,19 @@ impl LazerPublisherTask {
}
}

fn deduplicate_feed_updates(feed_updates: &mut Vec<FeedUpdate>) {
// assume that feed_updates is already sorted by timestamp for each feed_update.feed_id
feed_updates.dedup_by_key(|feed_update| (feed_update.feed_id, feed_update.update.clone()));
}

#[cfg(test)]
mod tests {
use crate::config::{CHANNEL_CAPACITY, Config};
use crate::lazer_publisher::LazerPublisherTask;
use crate::lazer_publisher::{LazerPublisherTask, deduplicate_feed_updates};
use ed25519_dalek::SigningKey;
use protobuf::well_known_types::timestamp::Timestamp;
use protobuf::{Message, MessageField};
use pyth_lazer_protocol::time::TimestampUs;
use pyth_lazer_publisher_sdk::publisher_update::feed_update::Update;
use pyth_lazer_publisher_sdk::publisher_update::{FeedUpdate, PriceUpdate};
use pyth_lazer_publisher_sdk::transaction::{LazerTransaction, lazer_transaction};
Expand Down Expand Up @@ -212,6 +223,18 @@ mod tests {
temp_file
}

fn test_feed_update(feed_id: u32, timestamp: TimestampUs, price: i64) -> FeedUpdate {
FeedUpdate {
feed_id: Some(feed_id),
source_timestamp: MessageField::some(timestamp.into()),
update: Some(Update::PriceUpdate(PriceUpdate {
price: Some(price),
..PriceUpdate::default()
})),
special_fields: Default::default(),
}
}

#[tokio::test]
async fn test_lazer_exporter_task() {
let signing_key_file = get_private_key_file();
Expand All @@ -224,6 +247,7 @@ mod tests {
publish_keypair_path: PathBuf::from(signing_key_file.path()),
publish_interval_duration: Duration::from_millis(25),
history_service_url: None,
enable_update_deduplication: false,
};

let (relayer_sender, mut relayer_receiver) = broadcast::channel(CHANNEL_CAPACITY);
Expand Down Expand Up @@ -274,4 +298,55 @@ mod tests {
_ => panic!("channel should have a transaction waiting"),
}
}

#[test]
fn test_deduplicate_feed_updates() {
// let's consider a batch containing updates for a single feed. the updates are (ts, price):
// - (1, 10)
// - (2, 10)
// - (3, 10)
// - (4, 15)
// - (5, 15)
// - (6, 10)
// we should only return (1, 10), (4, 15), (6, 10)

let updates = &mut vec![
test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10),
test_feed_update(1, TimestampUs::from_millis(2).unwrap(), 10),
test_feed_update(1, TimestampUs::from_millis(3).unwrap(), 10),
test_feed_update(1, TimestampUs::from_millis(4).unwrap(), 15),
test_feed_update(1, TimestampUs::from_millis(5).unwrap(), 15),
test_feed_update(1, TimestampUs::from_millis(6).unwrap(), 10),
];

let expected_updates = vec![
test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10),
test_feed_update(1, TimestampUs::from_millis(4).unwrap(), 15),
test_feed_update(1, TimestampUs::from_millis(6).unwrap(), 10),
];

deduplicate_feed_updates(updates);
assert_eq!(updates.to_vec(), expected_updates);
}

#[test]
fn test_deduplicate_feed_updates_multiple_feeds() {
let updates = &mut vec![
test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10),
test_feed_update(1, TimestampUs::from_millis(2).unwrap(), 10),
test_feed_update(1, TimestampUs::from_millis(3).unwrap(), 10),
test_feed_update(2, TimestampUs::from_millis(4).unwrap(), 15),
test_feed_update(2, TimestampUs::from_millis(5).unwrap(), 15),
test_feed_update(2, TimestampUs::from_millis(6).unwrap(), 10),
];

let expected_updates = vec![
test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10),
test_feed_update(2, TimestampUs::from_millis(4).unwrap(), 15),
test_feed_update(2, TimestampUs::from_millis(6).unwrap(), 10),
];

deduplicate_feed_updates(updates);
assert_eq!(updates.to_vec(), expected_updates);
}
}