1
+ use std:: collections:: HashMap ;
1
2
use crate :: config:: { CHANNEL_CAPACITY , Config } ;
2
3
use crate :: relayer_session:: RelayerSessionTask ;
3
4
use anyhow:: { Context , Result , bail} ;
@@ -132,8 +133,14 @@ impl LazerPublisherTask {
132
133
return Ok ( ( ) ) ;
133
134
}
134
135
136
+ let updates = if self . config . enable_update_deduplication {
137
+ deduplicate_feed_updates ( & self . pending_updates . drain ( ..) . collect ( ) ) ?
138
+ } else {
139
+ self . pending_updates . drain ( ..) . collect ( )
140
+ } ;
141
+
135
142
let publisher_update = PublisherUpdate {
136
- updates : self . pending_updates . drain ( .. ) . collect ( ) ,
143
+ updates,
137
144
publisher_timestamp : MessageField :: some ( Timestamp :: now ( ) ) ,
138
145
special_fields : Default :: default ( ) ,
139
146
} ;
@@ -173,10 +180,33 @@ impl LazerPublisherTask {
173
180
}
174
181
}
175
182
183
+ fn deduplicate_feed_updates ( feed_updates : & Vec < FeedUpdate > ) -> Result < Vec < FeedUpdate > > {
184
+ let mut deduped_feed_updates = Vec :: new ( ) ;
185
+ let mut last_feed_update = HashMap :: new ( ) ;
186
+
187
+ // assume that feed_updates is already sorted by ts (within feed_update_id groups)
188
+ for feed_update in feed_updates {
189
+ let feed_id = feed_update. feed_id . context ( "feed_id is required" ) ?;
190
+
191
+ if let Some ( update) = feed_update. update . as_ref ( ) {
192
+ if let Some ( last_update) = last_feed_update. get ( & feed_id) {
193
+ if update == last_update {
194
+ continue ;
195
+ }
196
+ }
197
+
198
+ deduped_feed_updates. push ( feed_update. clone ( ) ) ;
199
+ last_feed_update. insert ( feed_id, update. clone ( ) ) ;
200
+ }
201
+ }
202
+
203
+ Ok ( deduped_feed_updates)
204
+ }
205
+
176
206
#[ cfg( test) ]
177
207
mod tests {
178
208
use crate :: config:: { CHANNEL_CAPACITY , Config } ;
179
- use crate :: lazer_publisher:: LazerPublisherTask ;
209
+ use crate :: lazer_publisher:: { deduplicate_feed_updates , LazerPublisherTask } ;
180
210
use ed25519_dalek:: SigningKey ;
181
211
use protobuf:: well_known_types:: timestamp:: Timestamp ;
182
212
use protobuf:: { Message , MessageField } ;
@@ -186,6 +216,7 @@ mod tests {
186
216
use std:: io:: Write ;
187
217
use std:: path:: PathBuf ;
188
218
use std:: time:: Duration ;
219
+ use pyth_lazer_protocol:: time:: TimestampUs ;
189
220
use tempfile:: NamedTempFile ;
190
221
use tokio:: sync:: broadcast:: error:: TryRecvError ;
191
222
use tokio:: sync:: { broadcast, mpsc} ;
@@ -212,6 +243,18 @@ mod tests {
212
243
temp_file
213
244
}
214
245
246
+ fn test_feed_update ( feed_id : u32 , timestamp : TimestampUs , price : i64 ) -> FeedUpdate {
247
+ FeedUpdate {
248
+ feed_id : Some ( feed_id) ,
249
+ source_timestamp : MessageField :: some ( timestamp. into ( ) ) ,
250
+ update : Some ( Update :: PriceUpdate ( PriceUpdate {
251
+ price : Some ( price) ,
252
+ ..PriceUpdate :: default ( )
253
+ } ) ) ,
254
+ special_fields : Default :: default ( ) ,
255
+ }
256
+ }
257
+
215
258
#[ tokio:: test]
216
259
async fn test_lazer_exporter_task ( ) {
217
260
let signing_key_file = get_private_key_file ( ) ;
@@ -224,6 +267,7 @@ mod tests {
224
267
publish_keypair_path : PathBuf :: from ( signing_key_file. path ( ) ) ,
225
268
publish_interval_duration : Duration :: from_millis ( 25 ) ,
226
269
history_service_url : None ,
270
+ enable_update_deduplication : false ,
227
271
} ;
228
272
229
273
let ( relayer_sender, mut relayer_receiver) = broadcast:: channel ( CHANNEL_CAPACITY ) ;
@@ -274,4 +318,53 @@ mod tests {
274
318
_ => panic ! ( "channel should have a transaction waiting" ) ,
275
319
}
276
320
}
321
+
322
+ #[ test]
323
+ fn test_deduplicate_feed_updates ( ) {
324
+ // let's consider a batch containing updates for a single feed. the updates are (ts, price):
325
+ // - (1, 10)
326
+ // - (2, 10)
327
+ // - (3, 10)
328
+ // - (4, 15)
329
+ // - (5, 15)
330
+ // - (6, 10)
331
+ // we should only return (1, 10), (4, 15), (6, 10)
332
+
333
+ let updates = vec ! [
334
+ test_feed_update( 1 , TimestampUs :: from_millis( 1 ) . unwrap( ) , 10 ) ,
335
+ test_feed_update( 1 , TimestampUs :: from_millis( 2 ) . unwrap( ) , 10 ) ,
336
+ test_feed_update( 1 , TimestampUs :: from_millis( 3 ) . unwrap( ) , 10 ) ,
337
+ test_feed_update( 1 , TimestampUs :: from_millis( 4 ) . unwrap( ) , 15 ) ,
338
+ test_feed_update( 1 , TimestampUs :: from_millis( 5 ) . unwrap( ) , 15 ) ,
339
+ test_feed_update( 1 , TimestampUs :: from_millis( 6 ) . unwrap( ) , 10 ) ,
340
+ ] ;
341
+
342
+ let expected_updates = vec ! [
343
+ test_feed_update( 1 , TimestampUs :: from_millis( 1 ) . unwrap( ) , 10 ) ,
344
+ test_feed_update( 1 , TimestampUs :: from_millis( 4 ) . unwrap( ) , 15 ) ,
345
+ test_feed_update( 1 , TimestampUs :: from_millis( 6 ) . unwrap( ) , 10 ) ,
346
+ ] ;
347
+
348
+ assert_eq ! ( deduplicate_feed_updates( & updates) . unwrap( ) , expected_updates) ;
349
+ }
350
+
351
+ #[ test]
352
+ fn test_deduplicate_feed_updates_multiple_feeds ( ) {
353
+ let updates = vec ! [
354
+ test_feed_update( 1 , TimestampUs :: from_millis( 1 ) . unwrap( ) , 10 ) ,
355
+ test_feed_update( 1 , TimestampUs :: from_millis( 2 ) . unwrap( ) , 10 ) ,
356
+ test_feed_update( 1 , TimestampUs :: from_millis( 3 ) . unwrap( ) , 10 ) ,
357
+ test_feed_update( 2 , TimestampUs :: from_millis( 4 ) . unwrap( ) , 15 ) ,
358
+ test_feed_update( 2 , TimestampUs :: from_millis( 5 ) . unwrap( ) , 15 ) ,
359
+ test_feed_update( 2 , TimestampUs :: from_millis( 6 ) . unwrap( ) , 10 ) ,
360
+ ] ;
361
+
362
+ let expected_updates = vec ! [
363
+ test_feed_update( 1 , TimestampUs :: from_millis( 1 ) . unwrap( ) , 10 ) ,
364
+ test_feed_update( 2 , TimestampUs :: from_millis( 4 ) . unwrap( ) , 15 ) ,
365
+ test_feed_update( 2 , TimestampUs :: from_millis( 6 ) . unwrap( ) , 10 ) ,
366
+ ] ;
367
+
368
+ assert_eq ! ( deduplicate_feed_updates( & updates) . unwrap( ) , expected_updates) ;
369
+ }
277
370
}
0 commit comments