Skip to content

Commit 12b8bad

Browse files
committed
refactor(twap): update get_twaps_with_update_data to use window_seconds and add LatestTimeEarliestSlot request time
1 parent a0c2567 commit 12b8bad

File tree

4 files changed

+239
-30
lines changed

4 files changed

+239
-30
lines changed

apps/hermes/server/src/api/rest.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ mod tests {
201201
async fn get_twaps_with_update_data(
202202
&self,
203203
_price_ids: &[PriceIdentifier],
204-
_start_time: RequestTime,
204+
_window_seconds: i64,
205205
_end_time: RequestTime,
206206
) -> Result<TwapsWithUpdateData> {
207207
unimplemented!("Not needed for this test")

apps/hermes/server/src/api/rest/v2/latest_twaps.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use {
1313
Json,
1414
},
1515
base64::{engine::general_purpose::STANDARD as base64_standard_engine, Engine as _},
16-
pyth_sdk::{DurationInSeconds, PriceIdentifier, UnixTimestamp},
16+
pyth_sdk::{DurationInSeconds, PriceIdentifier},
1717
serde::Deserialize,
1818
serde_qs::axum::QsQuery,
1919
utoipa::IntoParams,
@@ -105,20 +105,12 @@ where
105105
let price_ids: Vec<PriceIdentifier> =
106106
validate_price_ids(&state, &price_id_inputs, params.ignore_invalid_price_ids).await?;
107107

108-
// Collect start and end bounds for the TWAP window
109-
let window_seconds = path_params.window_seconds as i64;
110-
let current_time = std::time::SystemTime::now()
111-
.duration_since(std::time::UNIX_EPOCH)
112-
.unwrap()
113-
.as_secs() as UnixTimestamp;
114-
let start_time = current_time - window_seconds;
115-
116108
// Calculate the average
117109
let twaps_with_update_data = Aggregates::get_twaps_with_update_data(
118110
&*state.state,
119111
&price_ids,
120-
RequestTime::FirstAfter(start_time),
121-
RequestTime::Latest,
112+
path_params.window_seconds as i64,
113+
RequestTime::LatestTimeEarliestSlot,
122114
)
123115
.await
124116
.map_err(|e| {

apps/hermes/server/src/state/aggregate.rs

Lines changed: 146 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use log::warn;
12
#[cfg(test)]
23
use mock_instant::{SystemTime, UNIX_EPOCH};
34
use pythnet_sdk::messages::TwapMessage;
@@ -60,6 +61,7 @@ pub type UnixTimestamp = i64;
6061
#[derive(Clone, PartialEq, Eq, Debug)]
6162
pub enum RequestTime {
6263
Latest,
64+
LatestTimeEarliestSlot,
6365
FirstAfter(UnixTimestamp),
6466
AtSlot(Slot),
6567
}
@@ -242,7 +244,7 @@ where
242244
async fn get_twaps_with_update_data(
243245
&self,
244246
price_ids: &[PriceIdentifier],
245-
start_time: RequestTime,
247+
window_seconds: i64,
246248
end_time: RequestTime,
247249
) -> Result<TwapsWithUpdateData>;
248250
}
@@ -410,16 +412,11 @@ where
410412
async fn get_twaps_with_update_data(
411413
&self,
412414
price_ids: &[PriceIdentifier],
413-
start_time: RequestTime,
415+
window_seconds: i64,
414416
end_time: RequestTime,
415417
) -> Result<TwapsWithUpdateData> {
416-
match get_verified_twaps_with_update_data(
417-
self,
418-
price_ids,
419-
start_time.clone(),
420-
end_time.clone(),
421-
)
422-
.await
418+
match get_verified_twaps_with_update_data(self, price_ids, window_seconds, end_time.clone())
419+
.await
423420
{
424421
Ok(twaps_with_update_data) => Ok(twaps_with_update_data),
425422
Err(e) => {
@@ -637,33 +634,68 @@ where
637634
async fn get_verified_twaps_with_update_data<S>(
638635
state: &S,
639636
price_ids: &[PriceIdentifier],
640-
start_time: RequestTime,
637+
window_seconds: i64,
641638
end_time: RequestTime,
642639
) -> Result<TwapsWithUpdateData>
643640
where
644641
S: Cache,
645642
{
646-
// Get all start messages for all price IDs
647-
let start_messages = state
643+
// Get all end messages for all price IDs
644+
let end_messages = state
648645
.fetch_message_states(
649646
price_ids.iter().map(|id| id.to_bytes()).collect(),
650-
start_time.clone(),
647+
end_time.clone(),
651648
MessageStateFilter::Only(MessageType::TwapMessage),
652649
)
653650
.await?;
654651

655-
// Get all end messages for all price IDs
656-
let end_messages = state
652+
// Calculate start_time based on the publish time of the end messages
653+
// to guarantee that the start and end messages are window_seconds apart
654+
let start_timestamp = if end_messages.is_empty() {
655+
// If there are no end messages, we can't calculate a TWAP
656+
tracing::warn!(
657+
price_ids = ?price_ids,
658+
time = ?end_time,
659+
"Could not find TWAP messages"
660+
);
661+
return Err(anyhow!(
662+
"Update data not found for the specified timestamps"
663+
));
664+
} else {
665+
// Use the publish time from the first end message
666+
end_messages[0].message.publish_time() - window_seconds
667+
};
668+
let start_time = RequestTime::FirstAfter(start_timestamp);
669+
670+
// Get all start messages for all price IDs
671+
let start_messages = state
657672
.fetch_message_states(
658673
price_ids.iter().map(|id| id.to_bytes()).collect(),
659-
end_time.clone(),
674+
start_time.clone(),
660675
MessageStateFilter::Only(MessageType::TwapMessage),
661676
)
662677
.await?;
663678

679+
if start_messages.is_empty() {
680+
tracing::warn!(
681+
price_ids = ?price_ids,
682+
time = ?start_time,
683+
"Could not find TWAP messages"
684+
);
685+
return Err(anyhow!(
686+
"Update data not found for the specified timestamps"
687+
));
688+
}
689+
664690
// Verify we have matching start and end messages.
665691
// The cache should throw an error earlier, but checking just in case.
666692
if start_messages.len() != end_messages.len() {
693+
tracing::warn!(
694+
price_ids = ?price_ids,
695+
start_message_length = ?price_ids,
696+
end_message_length = ?start_time,
697+
"Start and end messages length mismatch"
698+
);
667699
return Err(anyhow!(
668700
"Update data not found for the specified timestamps"
669701
));
@@ -695,6 +727,11 @@ where
695727
});
696728
}
697729
Err(e) => {
730+
tracing::error!(
731+
feed_id = ?start_twap.feed_id,
732+
error = %e,
733+
"Failed to calculate TWAP for price feed"
734+
);
698735
return Err(anyhow!(
699736
"Failed to calculate TWAP for price feed {:?}: {}",
700737
start_twap.feed_id,
@@ -1295,7 +1332,7 @@ mod test {
12951332
PriceIdentifier::new(feed_id_1),
12961333
PriceIdentifier::new(feed_id_2),
12971334
],
1298-
RequestTime::FirstAfter(100), // Start time
1335+
100, // window seconds
12991336
RequestTime::FirstAfter(200), // End time
13001337
)
13011338
.await
@@ -1329,6 +1366,97 @@ mod test {
13291366
// update_data should have 2 elements, one for the start block and one for the end block.
13301367
assert_eq!(result.update_data.len(), 2);
13311368
}
1369+
1370+
#[tokio::test]
1371+
/// Tests that the TWAP calculation correctly selects TWAP messages that are the first ones
1372+
/// for their timestamp (non-optional prices). This is important because if a message such that
1373+
/// `publish_time == prev_publish_time`is chosen, the TWAP calculation will fail due to the optionality check.
1374+
async fn test_get_verified_twaps_with_update_data_uses_non_optional_prices() {
1375+
let (state, _update_rx) = setup_state(10).await;
1376+
let feed_id = [1u8; 32];
1377+
1378+
// Store start TWAP message
1379+
store_multiple_concurrent_valid_updates(
1380+
state.clone(),
1381+
generate_update(
1382+
vec![create_basic_twap_message(
1383+
feed_id, 100, // cumulative_price
1384+
0, // num_down_slots
1385+
100, // publish_time
1386+
99, // prev_publish_time
1387+
1000, // publish_slot
1388+
)],
1389+
1000,
1390+
20,
1391+
),
1392+
)
1393+
.await;
1394+
1395+
// Store end TWAP messages
1396+
1397+
// This first message has the latest publish_time and earliest slot,
1398+
// so it should be chosen as the end_message to calculate TWAP with.
1399+
store_multiple_concurrent_valid_updates(
1400+
state.clone(),
1401+
generate_update(
1402+
vec![create_basic_twap_message(
1403+
feed_id, 300, // cumulative_price
1404+
50, // num_down_slots
1405+
200, // publish_time
1406+
180, // prev_publish_time
1407+
1100, // publish_slot
1408+
)],
1409+
1100,
1410+
21,
1411+
),
1412+
)
1413+
.await;
1414+
1415+
// This second message has the same publish_time as the previous one and a later slot.
1416+
// It will fail the optionality check since publish_time == prev_publish_time.
1417+
// Thus, it should not be chosen to calculate TWAP with.
1418+
store_multiple_concurrent_valid_updates(
1419+
state.clone(),
1420+
generate_update(
1421+
vec![create_basic_twap_message(
1422+
feed_id, 900, // cumulative_price
1423+
50, // num_down_slots
1424+
200, // publish_time
1425+
200, // prev_publish_time
1426+
1101, // publish_slot
1427+
)],
1428+
1101,
1429+
22,
1430+
),
1431+
)
1432+
.await;
1433+
1434+
// Get TWAPs over timestamp window 100 -> 200
1435+
let result = get_verified_twaps_with_update_data(
1436+
&*state,
1437+
&[PriceIdentifier::new(feed_id)],
1438+
100, // window seconds
1439+
RequestTime::LatestTimeEarliestSlot, // End time
1440+
)
1441+
.await
1442+
.unwrap();
1443+
1444+
// Verify that the first end message was chosen to calculate the TWAP
1445+
// and that the calculation is accurate
1446+
assert_eq!(result.twaps.len(), 1);
1447+
let twap_1 = result
1448+
.twaps
1449+
.iter()
1450+
.find(|t| t.id == PriceIdentifier::new(feed_id))
1451+
.unwrap();
1452+
assert_eq!(twap_1.twap.price, 2); // (300-100)/(1100-1000) = 2
1453+
assert_eq!(twap_1.down_slots_ratio, Decimal::from_f64(0.5).unwrap()); // (50-0)/(1100-1000) = 0.5
1454+
assert_eq!(twap_1.start_timestamp, 100);
1455+
assert_eq!(twap_1.end_timestamp, 200);
1456+
1457+
// update_data should have 2 elements, one for the start block and one for the end block.
1458+
assert_eq!(result.update_data.len(), 2);
1459+
}
13321460
#[tokio::test]
13331461

13341462
async fn test_get_verified_twaps_with_missing_messages_throws_error() {
@@ -1385,7 +1513,7 @@ mod test {
13851513
PriceIdentifier::new(feed_id_1),
13861514
PriceIdentifier::new(feed_id_2),
13871515
],
1388-
RequestTime::FirstAfter(100),
1516+
100,
13891517
RequestTime::FirstAfter(200),
13901518
)
13911519
.await;

apps/hermes/server/src/state/cache.rs

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,28 @@ async fn retrieve_message_state(
285285
Some(key_cache) => {
286286
match request_time {
287287
RequestTime::Latest => key_cache.last_key_value().map(|(_, v)| v).cloned(),
288+
RequestTime::LatestTimeEarliestSlot => {
289+
// Get the latest publish time from the last entry
290+
let last_entry = key_cache.last_key_value()?;
291+
let latest_publish_time = last_entry.0.publish_time;
292+
let mut latest_entry_with_earliest_slot = last_entry;
293+
294+
// Walk backwards through the sorted entries rather than use `range` since we will only
295+
// have 1-2 entries that have the same publish_time.
296+
// We have acquired the RwLock via read() above, so we should be safe to reenter the cache here.
297+
for (k, v) in key_cache.iter().rev() {
298+
if k.publish_time < latest_publish_time {
299+
// We've found an entry with an earlier publish time
300+
break;
301+
}
302+
303+
// Update our tracked entry (the reverse iteration will find entries
304+
// with higher slots first, so we'll end up with the lowest slot)
305+
latest_entry_with_earliest_slot = (k, v);
306+
}
307+
308+
Some(latest_entry_with_earliest_slot.1.clone())
309+
}
288310
RequestTime::FirstAfter(time) => {
289311
// If the requested time is before the first element in the vector, we are
290312
// not sure that the first element is the closest one.
@@ -590,6 +612,73 @@ mod test {
590612
);
591613
}
592614

615+
#[tokio::test]
616+
pub async fn test_latest_time_earliest_slot_request_works() {
617+
// Initialize state with a cache size of 3 per key.
618+
let (state, _) = setup_state(3).await;
619+
620+
// Create and store a message state with feed id [1....] and publish time 10 at slot 7.
621+
create_and_store_dummy_price_feed_message_state(&*state, [1; 32], 10, 7).await;
622+
623+
// Create and store a message state with feed id [1....] and publish time 10 at slot 10.
624+
create_and_store_dummy_price_feed_message_state(&*state, [1; 32], 10, 10).await;
625+
626+
// Create and store a message state with feed id [1....] and publish time 10 at slot 5.
627+
let earliest_slot_message_state =
628+
create_and_store_dummy_price_feed_message_state(&*state, [1; 32], 10, 5).await;
629+
630+
// Create and store a message state with feed id [1....] and publish time 8 at slot 3.
631+
create_and_store_dummy_price_feed_message_state(&*state, [1; 32], 8, 3).await;
632+
633+
// The LatestTimeEarliestSlot should return the message with publish time 10 at slot 5
634+
assert_eq!(
635+
state
636+
.fetch_message_states(
637+
vec![[1; 32]],
638+
RequestTime::LatestTimeEarliestSlot,
639+
MessageStateFilter::Only(MessageType::PriceFeedMessage),
640+
)
641+
.await
642+
.unwrap(),
643+
vec![earliest_slot_message_state]
644+
);
645+
646+
// Create and store a message state with feed id [1....] and publish time 15 at slot 20.
647+
let newer_time_message_state =
648+
create_and_store_dummy_price_feed_message_state(&*state, [1; 32], 15, 20).await;
649+
650+
// The LatestTimeEarliestSlot should now return the message with publish time 15
651+
assert_eq!(
652+
state
653+
.fetch_message_states(
654+
vec![[1; 32]],
655+
RequestTime::LatestTimeEarliestSlot,
656+
MessageStateFilter::Only(MessageType::PriceFeedMessage),
657+
)
658+
.await
659+
.unwrap(),
660+
vec![newer_time_message_state]
661+
);
662+
663+
// Store two messages with even later publish time but different slots
664+
create_and_store_dummy_price_feed_message_state(&*state, [1; 32], 20, 35).await;
665+
let latest_time_earliest_slot_message =
666+
create_and_store_dummy_price_feed_message_state(&*state, [1; 32], 20, 30).await;
667+
668+
// The LatestTimeEarliestSlot should return the message with publish time 20 at slot 30
669+
assert_eq!(
670+
state
671+
.fetch_message_states(
672+
vec![[1; 32]],
673+
RequestTime::LatestTimeEarliestSlot,
674+
MessageStateFilter::Only(MessageType::PriceFeedMessage),
675+
)
676+
.await
677+
.unwrap(),
678+
vec![latest_time_earliest_slot_message]
679+
);
680+
}
681+
593682
#[tokio::test]
594683
pub async fn test_store_and_retrieve_first_after_message_state_fails_for_past_time() {
595684
// Initialize state with a cache size of 2 per key.

0 commit comments

Comments
 (0)