Skip to content

Commit 8f6845d

Browse files
committed
impl dedup
1 parent 49ed653 commit 8f6845d

File tree

5 files changed

+44
-1
lines changed

5 files changed

+44
-1
lines changed

Cargo.lock

Lines changed: 16 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lazer/sdk/rust/client/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ tracing = "0.1"
1919
url = "2.4"
2020
derive_more = { version = "1.0.0", features = ["from"] }
2121
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
22+
ttl_cache = "0.5.1"
23+
2224

2325
[dev-dependencies]
2426
bincode = "1.3.3"

lazer/sdk/rust/client/examples/subscribe_price_feeds.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ async fn main() -> anyhow::Result<()> {
3636
let mut client = PythLazerClient::new(
3737
vec!["wss://pyth-lazer.dourolabs.app/v1/stream".to_string()],
3838
get_lazer_access_token(),
39-
1,
39+
2,
4040
ExponentialBackoffBuilder::default()
4141
.with_max_elapsed_time(None) // max_elapsed_time is not supported in Pyth Lazer client
4242
.build(),

lazer/sdk/rust/client/src/client.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
use std::{
2+
hash::{DefaultHasher, Hash},
3+
time::Duration,
4+
};
5+
16
use crate::{
27
resilient_ws_connection::PythLazerResilientWSConnection, ws_connection::AnyResponse,
38
CHANNEL_CAPACITY,
@@ -9,6 +14,10 @@ use pyth_lazer_protocol::subscription::{SubscribeRequest, SubscriptionId};
914
use tokio::sync::mpsc::{self, error::TrySendError};
1015
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
1116
use tracing::{error, warn};
17+
use ttl_cache::TtlCache;
18+
19+
const DEDUP_CACHE_SIZE: usize = 100_000;
20+
const DEDUP_TTL: Duration = Duration::from_secs(10);
1221

1322
pub struct PythLazerClient {
1423
endpoints: Vec<String>,
@@ -63,9 +72,16 @@ impl PythLazerClient {
6372

6473
let streams: Vec<_> = self.receivers.drain(..).map(ReceiverStream::new).collect();
6574
let mut merged_stream = stream::select_all(streams);
75+
let mut seen_updates = TtlCache::new(DEDUP_CACHE_SIZE);
6676

6777
tokio::spawn(async move {
6878
while let Some(response) = merged_stream.next().await {
79+
let cache_key = response.cache_key();
80+
if seen_updates.contains_key(&cache_key) {
81+
continue;
82+
}
83+
seen_updates.insert(cache_key, response.clone(), DEDUP_TTL);
84+
6985
match sender.try_send(response) {
7086
Ok(_) => (),
7187
Err(TrySendError::Full(r)) => {

lazer/sdk/rust/client/src/ws_connection.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::hash::{DefaultHasher, Hash, Hasher};
2+
13
use anyhow::Result;
24
use derive_more::From;
35
use futures_util::{SinkExt, StreamExt, TryStreamExt};
@@ -34,6 +36,13 @@ pub enum AnyResponse {
3436
Binary(BinaryWsUpdate),
3537
}
3638

39+
impl AnyResponse {
40+
pub fn cache_key(&self) -> u64 {
41+
let mut hasher = DefaultHasher::new();
42+
self.hash(&mut hasher);
43+
hasher.finish()
44+
}
45+
}
3746
impl PythLazerWSConnection {
3847
/// Creates a new Lazer client instance
3948
///

0 commit comments

Comments
 (0)