Skip to content

Commit f9881cc

Browse files
fix: deduplication logic in agent (#3033)
* feat: impl Ord and Eq for FeedUpdate * fix: deduplication logic in agent * update cargo lock * update cargo lock * use sort by key * remove ord and partial ord impls * roll back * fix sort issues * fix dep versions * increase agent version
1 parent f5402df commit f9881cc

File tree

5 files changed

+84
-44
lines changed

5 files changed

+84
-44
lines changed

Cargo.lock

Lines changed: 19 additions & 18 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/pyth-lazer-agent/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
[package]
22
name = "pyth-lazer-agent"
3-
version = "0.4.1"
3+
version = "0.4.2"
44
edition = "2024"
55
description = "Pyth Lazer Agent"
66
license = "Apache-2.0"
77
repository = "https://github.com/pyth-network/pyth-crosschain"
88

99
[dependencies]
10-
pyth-lazer-publisher-sdk = "0.3.0"
11-
pyth-lazer-protocol = "0.10.1"
10+
pyth-lazer-publisher-sdk = "0.10.0"
11+
pyth-lazer-protocol = "0.14.0"
1212

1313
anyhow = "1.0.98"
1414
backoff = "0.4.0"

apps/pyth-lazer-agent/src/jrpc_handle.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,9 +266,9 @@ async fn handle_get_metadata<T: AsyncRead + AsyncWrite + Unpin>(
266266

267267
#[cfg(test)]
268268
pub mod tests {
269+
use pyth_lazer_protocol::{PriceFeedId, SymbolState, api::Channel, time::FixedRate};
270+
269271
use super::*;
270-
use pyth_lazer_protocol::router::{Channel, FixedRate, PriceFeedId};
271-
use pyth_lazer_protocol::symbol_state::SymbolState;
272272
use std::net::SocketAddr;
273273

274274
fn gen_test_symbol(name: String, asset_type: String) -> SymbolMetadata {

apps/pyth-lazer-agent/src/lazer_publisher.rs

Lines changed: 55 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use pyth_lazer_publisher_sdk::transaction::{
1313
Ed25519SignatureData, LazerTransaction, SignatureData, SignedLazerTransaction,
1414
};
1515
use solana_keypair::read_keypair_file;
16+
use std::collections::HashMap;
1617
use std::path::PathBuf;
1718
use std::sync::Arc;
1819
use std::sync::atomic::AtomicBool;
@@ -132,9 +133,10 @@ impl LazerPublisherTask {
132133
return Ok(());
133134
}
134135

135-
let mut updates = self.pending_updates.drain(..).collect();
136+
let mut updates: Vec<FeedUpdate> = self.pending_updates.drain(..).collect();
137+
updates.sort_by_key(|u| u.source_timestamp.as_ref().map(|t| (t.seconds, t.nanos)));
136138
if self.config.enable_update_deduplication {
137-
deduplicate_feed_updates(&mut updates);
139+
updates = deduplicate_feed_updates(&updates)?;
138140
}
139141

140142
let publisher_update = PublisherUpdate {
@@ -178,9 +180,17 @@ impl LazerPublisherTask {
178180
}
179181
}
180182

181-
fn deduplicate_feed_updates(feed_updates: &mut Vec<FeedUpdate>) {
182-
// assume that feed_updates is already sorted by timestamp for each feed_update.feed_id
183-
feed_updates.dedup_by_key(|feed_update| (feed_update.feed_id, feed_update.update.clone()));
183+
/// For each feed, keep the latest data. Among updates with the same data, keep the one with the earliest timestamp.
184+
/// Assumes the input is sorted by timestamp ascending.
185+
fn deduplicate_feed_updates(sorted_feed_updates: &Vec<FeedUpdate>) -> Result<Vec<FeedUpdate>> {
186+
let mut deduped_feed_updates = HashMap::new();
187+
for update in sorted_feed_updates {
188+
let entry = deduped_feed_updates.entry(update.feed_id).or_insert(update);
189+
if entry.update != update.update {
190+
*entry = update;
191+
}
192+
}
193+
Ok(deduped_feed_updates.into_values().cloned().collect())
184194
}
185195

186196
#[cfg(test)]
@@ -308,25 +318,27 @@ mod tests {
308318
// - (4, 15)
309319
// - (5, 15)
310320
// - (6, 10)
311-
// we should only return (1, 10), (4, 15), (6, 10)
321+
// - (7, 10)
322+
// we should only return (6, 10)
312323

313-
let updates = &mut vec![
324+
let updates = &vec![
314325
test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10),
315326
test_feed_update(1, TimestampUs::from_millis(2).unwrap(), 10),
316327
test_feed_update(1, TimestampUs::from_millis(3).unwrap(), 10),
317328
test_feed_update(1, TimestampUs::from_millis(4).unwrap(), 15),
318329
test_feed_update(1, TimestampUs::from_millis(5).unwrap(), 15),
319330
test_feed_update(1, TimestampUs::from_millis(6).unwrap(), 10),
331+
test_feed_update(1, TimestampUs::from_millis(7).unwrap(), 10),
320332
];
321333

322-
let expected_updates = vec![
323-
test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10),
324-
test_feed_update(1, TimestampUs::from_millis(4).unwrap(), 15),
325-
test_feed_update(1, TimestampUs::from_millis(6).unwrap(), 10),
326-
];
334+
let expected_updates = vec![test_feed_update(
335+
1,
336+
TimestampUs::from_millis(6).unwrap(),
337+
10,
338+
)];
327339

328-
deduplicate_feed_updates(updates);
329-
assert_eq!(updates.to_vec(), expected_updates);
340+
let deduped_updates = deduplicate_feed_updates(updates).unwrap();
341+
assert_eq!(deduped_updates, expected_updates);
330342
}
331343

332344
#[test]
@@ -342,11 +354,38 @@ mod tests {
342354

343355
let expected_updates = vec![
344356
test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10),
357+
test_feed_update(2, TimestampUs::from_millis(6).unwrap(), 10),
358+
];
359+
360+
let mut deduped_updates = deduplicate_feed_updates(updates).unwrap();
361+
deduped_updates.sort_by_key(|u| u.feed_id);
362+
assert_eq!(deduped_updates, expected_updates);
363+
}
364+
365+
#[test]
366+
fn test_deduplicate_feed_updates_multiple_feeds_random_order() {
367+
let updates = &mut vec![
368+
test_feed_update(1, TimestampUs::from_millis(1).unwrap(), 10),
369+
test_feed_update(1, TimestampUs::from_millis(2).unwrap(), 20),
370+
test_feed_update(1, TimestampUs::from_millis(3).unwrap(), 10),
345371
test_feed_update(2, TimestampUs::from_millis(4).unwrap(), 15),
372+
test_feed_update(2, TimestampUs::from_millis(5).unwrap(), 15),
346373
test_feed_update(2, TimestampUs::from_millis(6).unwrap(), 10),
374+
test_feed_update(1, TimestampUs::from_millis(7).unwrap(), 20),
375+
test_feed_update(1, TimestampUs::from_millis(8).unwrap(), 10), // last distinct update for feed 1
376+
test_feed_update(1, TimestampUs::from_millis(9).unwrap(), 10),
377+
test_feed_update(2, TimestampUs::from_millis(10).unwrap(), 15),
378+
test_feed_update(2, TimestampUs::from_millis(11).unwrap(), 15),
379+
test_feed_update(2, TimestampUs::from_millis(12).unwrap(), 10), // last distinct update for feed 2
380+
];
381+
382+
let expected_updates = vec![
383+
test_feed_update(1, TimestampUs::from_millis(8).unwrap(), 10),
384+
test_feed_update(2, TimestampUs::from_millis(12).unwrap(), 10),
347385
];
348386

349-
deduplicate_feed_updates(updates);
350-
assert_eq!(updates.to_vec(), expected_updates);
387+
let mut deduped_updates = deduplicate_feed_updates(updates).unwrap();
388+
deduped_updates.sort_by_key(|u| u.feed_id);
389+
assert_eq!(deduped_updates, expected_updates);
351390
}
352391
}

apps/pyth-lazer-agent/src/publisher_handle.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,9 @@ async fn try_handle_publisher(
8989
feed_id: Some(data.price_feed_id.0),
9090
source_timestamp: MessageField::some(data.source_timestamp_us.into()),
9191
update: Some(Update::PriceUpdate(PriceUpdate {
92-
price: data.price.map(|p| p.0.get()),
93-
best_bid_price: data.best_bid_price.map(|p| p.0.get()),
94-
best_ask_price: data.best_ask_price.map(|p| p.0.get()),
92+
price: data.price.map(|p| p.mantissa_i64()),
93+
best_bid_price: data.best_bid_price.map(|p| p.mantissa_i64()),
94+
best_ask_price: data.best_ask_price.map(|p| p.mantissa_i64()),
9595
..PriceUpdate::default()
9696
})),
9797
special_fields: Default::default(),
@@ -125,8 +125,8 @@ async fn try_handle_publisher(
125125
feed_id: Some(data.price_feed_id.0),
126126
source_timestamp: MessageField::some(data.source_timestamp_us.into()),
127127
update: Some(Update::FundingRateUpdate(FundingRateUpdate {
128-
price: data.price.map(|p| p.0.get()),
129-
rate: data.funding_rate.map(|r| r.0),
128+
price: data.price.map(|p| p.mantissa_i64()),
129+
rate: data.funding_rate.map(|r| r.mantissa()),
130130
..FundingRateUpdate::default()
131131
})),
132132
special_fields: Default::default(),

0 commit comments

Comments
 (0)