Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 11 additions & 24 deletions tycho-client/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ use std::{collections::HashSet, str::FromStr, time::Duration};
use clap::Parser;
use tracing::{debug, error, info, warn};
use tracing_appender::rolling;
use tycho_common::dto::{Chain, ExtractorIdentity, PaginationParams, ProtocolSystemsRequestBody};
use tycho_common::dto::{Chain, ExtractorIdentity};

use crate::{
deltas::DeltasClient,
feed::{
component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer,
BlockSynchronizer,
},
rpc::{HttpRPCClientOptions, RPCClient},
rpc::HttpRPCClientOptions,
stream::ProtocolSystemsInfo,
HttpRPCClient, WsDeltasClient,
};

Expand Down Expand Up @@ -244,30 +245,14 @@ async fn run(exchanges: Vec<(String, Option<String>)>, args: CliArgs) -> Result<
block_sync.max_messages(*mm);
}

let available_protocols_set = rpc_client
.get_protocol_systems(&ProtocolSystemsRequestBody {
chain,
pagination: PaginationParams { page: 0, page_size: 100 },
})
.await
.map_err(|e| format!("Failed to get protocol systems: {e}"))?
.protocol_systems
.into_iter()
.collect::<HashSet<_>>();

let requested_protocol_set = exchanges
let requested_protocol_set: HashSet<_> = exchanges
.iter()
.map(|(name, _)| name.clone())
.collect::<HashSet<_>>();

let not_requested_protocols = available_protocols_set
.difference(&requested_protocol_set)
.cloned()
.collect::<Vec<_>>();

if !not_requested_protocols.is_empty() {
info!("Other available protocols: {}", not_requested_protocols.join(", "));
}
.collect();
let protocol_info =
ProtocolSystemsInfo::fetch(&rpc_client, chain, &requested_protocol_set).await;
protocol_info.log_other_available();
let dci_protocols = protocol_info.dci_protocols;

for (name, address) in exchanges {
debug!("Registering exchange: {}", name);
Expand All @@ -281,6 +266,7 @@ async fn run(exchanges: Vec<(String, Option<String>)>, args: CliArgs) -> Result<
} else {
ComponentFilter::with_tvl_range(args.min_tvl, args.min_tvl)
};
let uses_dci = dci_protocols.contains(&name);
let sync = ProtocolStateSynchronizer::new(
id.clone(),
true,
Expand All @@ -294,6 +280,7 @@ async fn run(exchanges: Vec<(String, Option<String>)>, args: CliArgs) -> Result<
ws_client.clone(),
args.block_time + args.timeout,
)
.with_dci(uses_dci)
.with_partial_blocks(args.partial_blocks);
block_sync = block_sync.register_synchronizer(id, sync);
}
Expand Down
127 changes: 117 additions & 10 deletions tycho-client/src/feed/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ pub struct ProtocolStateSynchronizer<R: RPCClient, D: DeltasClient> {
include_tvl: bool,
compression: bool,
partial_blocks: bool,
uses_dci: bool,
}

#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -254,9 +255,17 @@ where
include_tvl,
compression,
partial_blocks: false,
uses_dci: false,
}
}

/// Sets whether this protocol uses Dynamic Contract Indexing (DCI).
/// When true, entrypoints will be fetched during snapshot retrieval.
pub fn with_dci(mut self, uses_dci: bool) -> Self {
self.uses_dci = uses_dci;
self
}

/// Enables receiving partial block updates.
pub fn with_partial_blocks(mut self, partial_blocks: bool) -> Self {
self.partial_blocks = partial_blocks;
Expand Down Expand Up @@ -285,16 +294,7 @@ where
return Ok(StateSyncMessage { header, ..Default::default() });
}

// TODO: Find a smarter way to dynamically detect DCI protocols.
const DCI_PROTOCOLS: &[&str] = &[
"uniswap_v4_hooks",
"vm:curve",
"vm:balancer_v2",
"vm:balancer_v3",
"fluid_v1",
"erc4626",
];
let entrypoints_result = if DCI_PROTOCOLS.contains(&self.extractor_id.name.as_str()) {
let entrypoints_result = if self.uses_dci {
let result = self
.rpc_client
.get_traced_entry_points_paginated(
Expand Down Expand Up @@ -3100,4 +3100,111 @@ mod test {
let _ = close_tx.send(());
jh.await.expect("Task should not panic");
}

#[test_log::test(tokio::test)]
async fn test_get_snapshots_skips_entrypoints_when_not_dci() {
let header = BlockHeader::default();
let mut rpc = make_mock_client();
let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };

let component_clone = component.clone();
rpc.expect_get_snapshots()
.returning(move |_request, _chunk_size, _concurrency| {
Ok(Snapshot {
states: [(
"Component1".to_string(),
ComponentWithState {
state: ResponseProtocolState {
component_id: "Component1".to_string(),
..Default::default()
},
component: component_clone.clone(),
entrypoints: vec![],
component_tvl: None,
},
)]
.into_iter()
.collect(),
vm_storage: HashMap::new(),
})
});

// get_traced_entry_points should NOT be called for a non-DCI protocol
rpc.expect_get_traced_entry_points()
.never();

let mut state_sync = with_mocked_clients(true, false, Some(rpc), None);
// uses_dci defaults to false, no .with_dci() call needed
state_sync
.component_tracker
.components
.insert("Component1".to_string(), component);

let components_arg = ["Component1".to_string()];
let snap = state_sync
.get_snapshots(header, Some(&components_arg))
.await
.expect("Retrieving snapshot failed");

assert!(snap
.snapshots
.states
.contains_key("Component1"));
}

#[test_log::test(tokio::test)]
async fn test_get_snapshots_fetches_entrypoints_when_dci() {
let header = BlockHeader::default();
let mut rpc = make_mock_client();
let component = ProtocolComponent { id: "Component1".to_string(), ..Default::default() };

let component_clone = component.clone();
rpc.expect_get_snapshots()
.returning(move |_request, _chunk_size, _concurrency| {
Ok(Snapshot {
states: [(
"Component1".to_string(),
ComponentWithState {
state: ResponseProtocolState {
component_id: "Component1".to_string(),
..Default::default()
},
component: component_clone.clone(),
entrypoints: vec![],
component_tvl: None,
},
)]
.into_iter()
.collect(),
vm_storage: HashMap::new(),
})
});

// get_traced_entry_points SHOULD be called for a DCI protocol
rpc.expect_get_traced_entry_points()
.times(1)
.returning(|_| {
Ok(TracedEntryPointRequestResponse {
traced_entry_points: HashMap::new(),
pagination: PaginationResponse::new(0, 20, 0),
})
});

let mut state_sync = with_mocked_clients(true, false, Some(rpc), None).with_dci(true);
state_sync
.component_tracker
.components
.insert("Component1".to_string(), component);

let components_arg = ["Component1".to_string()];
let snap = state_sync
.get_snapshots(header, Some(&components_arg))
.await
.expect("Retrieving snapshot failed");

assert!(snap
.snapshots
.states
.contains_key("Component1"));
}
}
39 changes: 20 additions & 19 deletions tycho-client/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1735,29 +1735,30 @@ mod tests {
assert_eq!(response.pagination, PaginationResponse { page: 0, page_size: 20, total: 10 });
}

#[rstest]
#[case::with_dci(Some(vec!["system2"]), vec!["system2"])]
#[case::backward_compat(None, vec![])]
#[tokio::test]
async fn test_get_protocol_systems() {
let mut server = Server::new_async().await;
let server_resp = r#"
{
"protocol_systems": [
"system1",
"system2"
],
"pagination": {
"page": 0,
"page_size": 20,
"total": 10
}
async fn test_get_protocol_systems(
#[case] dci_protocols: Option<Vec<&str>>,
#[case] expected_dci: Vec<&str>,
) {
use serde_json::json;

let mut json_value = json!({
"protocol_systems": ["system1", "system2"],
"pagination": { "page": 0, "page_size": 20, "total": 2 }
});
if let Some(dci) = dci_protocols {
json_value["dci_protocols"] = json!(dci);
}
"#;
// test that the response is deserialized correctly
serde_json::from_str::<ProtocolSystemsRequestResponse>(server_resp).expect("deserialize");
let server_resp = serde_json::to_string(&json_value).unwrap();

let mut server = Server::new_async().await;
let mocked_server = server
.mock("POST", "/v1/protocol_systems")
.expect(1)
.with_body(server_resp)
.with_body(&server_resp)
.create_async()
.await;
let client = HttpRPCClient::new(server.url().as_str(), HttpRPCClientOptions::default())
Expand All @@ -1767,10 +1768,10 @@ mod tests {
.get_protocol_systems(&Default::default())
.await
.expect("get protocol systems");
let protocol_systems = response.protocol_systems;

mocked_server.assert();
assert_eq!(protocol_systems, vec!["system1", "system2"]);
assert_eq!(response.protocol_systems, vec!["system1", "system2"]);
assert_eq!(response.dci_protocols, expected_dci);
}

#[tokio::test]
Expand Down
Loading
Loading