-
Notifications
You must be signed in to change notification settings - Fork 277
feat(pyth-lazer-agent) Allow deduplicating updates within each batch #2944
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
relayer_urls = ["wss://relayer.pyth-lazer-staging.dourolabs.app/v1/transaction", "wss://relayer-1.pyth-lazer-staging.dourolabs.app/v1/transaction"] | ||
publish_keypair_path = "/path/to/solana/id.json" | ||
relayer_urls = ["ws://localhost:10001/v1/transaction"] | ||
publish_keypair_path = "/tmp/keypair.json" | ||
listen_address = "0.0.0.0:8910" | ||
publish_interval_duration = "25ms" | ||
authorization_token="token1" |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -299,6 +299,7 @@ pub mod tests { | |
publish_keypair_path: Default::default(), | ||
publish_interval_duration: Default::default(), | ||
history_service_url: None, | ||
enable_update_deduplication: false, | ||
}; | ||
|
||
println!("{:?}", get_metadata(config).await.unwrap()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason this is a println and not an info log? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we don't have tracing setup properly in unit tests making println the easiest choice. this is just for that one manual test that's only ever run locally during dev to grab example data |
||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -13,6 +13,7 @@ use pyth_lazer_publisher_sdk::transaction::{ | |||||
Ed25519SignatureData, LazerTransaction, SignatureData, SignedLazerTransaction, | ||||||
}; | ||||||
use solana_keypair::read_keypair_file; | ||||||
use std::collections::HashMap; | ||||||
use std::path::PathBuf; | ||||||
use std::sync::Arc; | ||||||
use std::sync::atomic::AtomicBool; | ||||||
|
@@ -132,8 +133,14 @@ impl LazerPublisherTask { | |||||
return Ok(()); | ||||||
} | ||||||
|
||||||
let updates = if self.config.enable_update_deduplication { | ||||||
deduplicate_feed_updates(&self.pending_updates.drain(..).collect())? | ||||||
} else { | ||||||
self.pending_updates.drain(..).collect() | ||||||
}; | ||||||
|
||||||
let publisher_update = PublisherUpdate { | ||||||
updates: self.pending_updates.drain(..).collect(), | ||||||
updates, | ||||||
publisher_timestamp: MessageField::some(Timestamp::now()), | ||||||
special_fields: Default::default(), | ||||||
}; | ||||||
|
@@ -173,13 +180,37 @@ impl LazerPublisherTask { | |||||
} | ||||||
} | ||||||
|
||||||
fn deduplicate_feed_updates(feed_updates: &Vec<FeedUpdate>) -> Result<Vec<FeedUpdate>> { | ||||||
let mut deduped_feed_updates = Vec::new(); | ||||||
let mut last_feed_update = HashMap::new(); | ||||||
|
||||||
// assume that feed_updates is already sorted by ts (within feed_update_id groups) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
for feed_update in feed_updates { | ||||||
let feed_id = feed_update.feed_id.context("feed_id is required")?; | ||||||
|
||||||
if let Some(update) = feed_update.update.as_ref() { | ||||||
if let Some(last_update) = last_feed_update.get(&feed_id) { | ||||||
if update == last_update { | ||||||
continue; | ||||||
} | ||||||
} | ||||||
|
||||||
deduped_feed_updates.push(feed_update.clone()); | ||||||
last_feed_update.insert(feed_id, update.clone()); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we are keeping the update with the lowest timestamp, shouldn't we keep the highest timestamp? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would insert all in the map and collect it to Vec at the end. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can also use dedupe_by_key on the std vec. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I don't think we should. My reasoning was that we generally care about lowest-latency so it's more fair to keep the first value we've seen and remove consecutive duplicates - that way we capture the 'earliest' timestamp in the batch (may not affect quality metrics but it's still a better representation of what the publisher does). In contrast keeping "highest" timestamp is kind of like a "most recently seen" cache. If we do that, is there even a point in recording intra-batch history instead f just returning last seen? What's your rationale for keeping the last rather than first? What's the benefit? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given that this is intended for real-time consumption, I'd think we want to reflect the latest timestamp that this data is accurate as-of, but I'm not entirely clear. If so, yeah, for a single publisher stream, why not just send the most recently seen per batch. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I don't want to censor data as much as I don't want to invent it. I'm OK with removing truly duplicate values but I'm not comfortable deleting things that actually contain information. Maybe we should just do a simple deduplication on (feed_id, source_timestamp)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMHO the data is accurate "since" the first time it was seen, "until" the end of the aggregate window (at which point the publisher needs to retransmit unchanged-entries) or, notionally, until they send a changed value (for which we also need to know the earliest occurance). Knowing what the "most recent" occurrence of a value within an agg window doesn't give us any useful information. Knowing the "first time" the value showed up in a batch let's us better understand the timing characteristics of each publisher There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reason I was suggesting the latest timestamp was because of price expiry. When we do lowest timestamp, the price will expire faster than it should if they don't send another update. However, I think you are right about keeping the information about the earliest time a publisher sent us a new price is more important. Let's keep the lowest timestamp. |
||||||
} | ||||||
} | ||||||
|
||||||
Ok(deduped_feed_updates) | ||||||
} | ||||||
|
||||||
#[cfg(test)] | ||||||
mod tests { | ||||||
use crate::config::{CHANNEL_CAPACITY, Config}; | ||||||
use crate::lazer_publisher::LazerPublisherTask; | ||||||
use crate::lazer_publisher::{LazerPublisherTask, deduplicate_feed_updates}; | ||||||
use ed25519_dalek::SigningKey; | ||||||
use protobuf::well_known_types::timestamp::Timestamp; | ||||||
use protobuf::{Message, MessageField}; | ||||||
use pyth_lazer_protocol::time::TimestampUs; | ||||||
use pyth_lazer_publisher_sdk::publisher_update::feed_update::Update; | ||||||
use pyth_lazer_publisher_sdk::publisher_update::{FeedUpdate, PriceUpdate}; | ||||||
use pyth_lazer_publisher_sdk::transaction::{LazerTransaction, lazer_transaction}; | ||||||
|
@@ -212,6 +243,18 @@ mod tests { | |||||
temp_file | ||||||
} | ||||||
|
||||||
fn test_feed_update(feed_id: u32, timestamp: TimestampUs, price: i64) -> FeedUpdate { | ||||||
FeedUpdate { | ||||||
feed_id: Some(feed_id), | ||||||
source_timestamp: MessageField::some(timestamp.into()), | ||||||
update: Some(Update::PriceUpdate(PriceUpdate { | ||||||
price: Some(price), | ||||||
..PriceUpdate::default() | ||||||
})), | ||||||
special_fields: Default::default(), | ||||||
} | ||||||
} | ||||||
|
||||||
#[tokio::test] | ||||||
async fn test_lazer_exporter_task() { | ||||||
let signing_key_file = get_private_key_file(); | ||||||
|
@@ -224,6 +267,7 @@ mod tests { | |||||
publish_keypair_path: PathBuf::from(signing_key_file.path()), | ||||||
publish_interval_duration: Duration::from_millis(25), | ||||||
history_service_url: None, | ||||||
enable_update_deduplication: false, | ||||||
}; | ||||||
|
||||||
let (relayer_sender, mut relayer_receiver) = broadcast::channel(CHANNEL_CAPACITY); | ||||||
|
@@ -274,4 +318,59 @@ mod tests { | |||||
_ => panic!("channel should have a transaction waiting"), | ||||||
} | ||||||
} | ||||||
|
||||||
#[test] | ||||||
fn test_deduplicate_feed_updates() { | ||||||
// let's consider a batch containing updates for a single feed. the updates are (ts, price): | ||||||
// - (1, 10) | ||||||
// - (2, 10) | ||||||
// - (3, 10) | ||||||
// - (4, 15) | ||||||
// - (5, 15) | ||||||
// - (6, 10) | ||||||
// we should only return (1, 10), (4, 15), (6, 10) | ||||||
|
||||||
let updates = vec![ | ||||||
test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10), | ||||||
test_feed_update(1, TimestampUs::from_millis(2).unwrap(), 10), | ||||||
test_feed_update(1, TimestampUs::from_millis(3).unwrap(), 10), | ||||||
test_feed_update(1, TimestampUs::from_millis(4).unwrap(), 15), | ||||||
test_feed_update(1, TimestampUs::from_millis(5).unwrap(), 15), | ||||||
test_feed_update(1, TimestampUs::from_millis(6).unwrap(), 10), | ||||||
]; | ||||||
|
||||||
let expected_updates = vec![ | ||||||
test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10), | ||||||
test_feed_update(1, TimestampUs::from_millis(4).unwrap(), 15), | ||||||
test_feed_update(1, TimestampUs::from_millis(6).unwrap(), 10), | ||||||
]; | ||||||
|
||||||
assert_eq!( | ||||||
deduplicate_feed_updates(&updates).unwrap(), | ||||||
expected_updates | ||||||
); | ||||||
} | ||||||
|
||||||
#[test] | ||||||
fn test_deduplicate_feed_updates_multiple_feeds() { | ||||||
let updates = vec![ | ||||||
test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10), | ||||||
test_feed_update(1, TimestampUs::from_millis(2).unwrap(), 10), | ||||||
test_feed_update(1, TimestampUs::from_millis(3).unwrap(), 10), | ||||||
test_feed_update(2, TimestampUs::from_millis(4).unwrap(), 15), | ||||||
test_feed_update(2, TimestampUs::from_millis(5).unwrap(), 15), | ||||||
test_feed_update(2, TimestampUs::from_millis(6).unwrap(), 10), | ||||||
]; | ||||||
|
||||||
let expected_updates = vec![ | ||||||
test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10), | ||||||
test_feed_update(2, TimestampUs::from_millis(4).unwrap(), 15), | ||||||
test_feed_update(2, TimestampUs::from_millis(6).unwrap(), 10), | ||||||
]; | ||||||
|
||||||
assert_eq!( | ||||||
deduplicate_feed_updates(&updates).unwrap(), | ||||||
expected_updates | ||||||
); | ||||||
} | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in future we should have keys in the repo for local testing.