Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ evm = [
"dep:foundry-block-explorers", "dep:revm", "dep:revm-inspectors", "dep:alloy", "dep:reqwest"
]
rfq = ["dep:reqwest", "dep:async-trait", "dep:tokio-tungstenite", "dep:async-stream", "dep:http", "dep:prost"]
test-utils = []

[profile.bench]
debug = true
220 changes: 164 additions & 56 deletions src/evm/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,13 @@
use std::{collections::HashMap, sync::Arc, time};

use futures::{Stream, StreamExt};
use tokio::sync::mpsc::Receiver;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, error, warn};
use tycho_client::{
feed::{
component_tracker::ComponentFilter, synchronizer::ComponentWithState, BlockHeader,
SynchronizerState,
BlockSynchronizerError, FeedMessage, SynchronizerState,
},
stream::{RetryConfiguration, StreamError, TychoStreamBuilder},
};
Expand Down Expand Up @@ -156,13 +157,21 @@ impl StreamEndPolicy {
}
}

/// The source of feed messages for the protocol stream.
enum StreamSource {
/// Live connection to Tycho server
Tycho(TychoStreamBuilder),
/// Pre-built receiver (for testing or custom sources)
Receiver(Receiver<Result<FeedMessage<BlockHeader>, BlockSynchronizerError>>),
}

/// Builds and configures the multi protocol stream described in the [module-level docs](self).
///
/// See the module documentation for details on protocols, configuration options, and
/// stream behavior.
pub struct ProtocolStreamBuilder {
decoder: TychoStreamDecoder<BlockHeader>,
stream_builder: TychoStreamBuilder,
source: StreamSource,
stream_end_policy: StreamEndPolicy,
}

Expand All @@ -173,11 +182,53 @@ impl ProtocolStreamBuilder {
pub fn new(tycho_url: &str, chain: Chain) -> Self {
Self {
decoder: TychoStreamDecoder::new(),
stream_builder: TychoStreamBuilder::new(tycho_url, chain.into()),
source: StreamSource::Tycho(TychoStreamBuilder::new(tycho_url, chain.into())),
stream_end_policy: StreamEndPolicy::default(),
}
}

/// Creates a new builder from a pre-built receiver.
///
/// This is useful for testing or custom feed sources. Tycho-client specific configuration
/// methods (like `block_time`, `auth_key`, etc.) will have no effect when using this
/// constructor.
///
/// # Example
/// ```ignore
/// use tokio::sync::mpsc;
/// use tycho_simulation::evm::stream::ProtocolStreamBuilder;
/// use tycho_simulation::evm::protocol::uniswap_v2::state::UniswapV2State;
/// use tycho_client::feed::component_tracker::ComponentFilter;
///
/// let (tx, rx) = mpsc::channel(100);
/// // Send test messages via tx...
///
/// let stream = ProtocolStreamBuilder::from_receiver(rx)
/// .exchange::<UniswapV2State>("uniswap_v2", ComponentFilter::default(), None)
/// .build()
/// .await?;
/// ```
pub fn from_receiver(
rx: Receiver<Result<FeedMessage<BlockHeader>, BlockSynchronizerError>>,
) -> Self {
Self {
decoder: TychoStreamDecoder::new(),
source: StreamSource::Receiver(rx),
stream_end_policy: StreamEndPolicy::default(),
}
}

/// Helper to apply configuration only when source is Tycho.
/// Returns self unchanged if source is a Receiver.
fn with_tycho(mut self, f: impl FnOnce(TychoStreamBuilder) -> TychoStreamBuilder) -> Self {
if let StreamSource::Tycho(builder) = self.source {
self.source = StreamSource::Tycho(f(builder));
} else {
warn!("with_tycho() called but source is not Tycho. Ignoring configuration.");
}
self
}

/// Adds a specific exchange to the stream.
///
/// This configures the builder to include a new protocol synchronizer for `name`,
Expand All @@ -189,7 +240,8 @@ impl ProtocolStreamBuilder {
/// # Parameters
///
/// - `name`: The protocol or exchange name (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`).
/// - `filter`: Defines the set of components to include in the stream.
/// - `filter`: Defines the set of components to include in the stream. Ignored when using
/// `from_receiver()`.
/// - `filter_fn`: Optional custom filter function for client-side filtering of components not
/// expressible in `filter`.
///
Expand All @@ -210,9 +262,12 @@ impl ProtocolStreamBuilder {
+ Send
+ 'static,
{
self.stream_builder = self
.stream_builder
.exchange(name, filter);
// Register with TychoStreamBuilder only for live connections
if let StreamSource::Tycho(builder) = self.source {
self.source = StreamSource::Tycho(builder.exchange(name, filter));
}

// Decoder registration happens regardless of source
self.decoder.register_decoder::<T>(name);
if let Some(predicate) = filter_fn {
self.decoder
Expand All @@ -238,7 +293,8 @@ impl ProtocolStreamBuilder {
/// # Parameters
///
/// - `name`: The protocol or exchange name (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`).
/// - `filter`: Defines the set of components to include in the stream.
/// - `filter`: Defines the set of components to include in the stream. Ignored when using
/// `from_receiver()`.
/// - `filter_fn`: Optional custom filter function for client-side filtering of components not
/// expressible in `filter`.
/// - `decoder_context`: The decoder context for this exchange
Expand All @@ -261,9 +317,12 @@ impl ProtocolStreamBuilder {
+ Send
+ 'static,
{
self.stream_builder = self
.stream_builder
.exchange(name, filter);
// Register with TychoStreamBuilder only for live connections
if let StreamSource::Tycho(builder) = self.source {
self.source = StreamSource::Tycho(builder.exchange(name, filter));
}

// Decoder registration happens regardless of source
self.decoder
.register_decoder_with_context::<T>(name, decoder_context);
if let Some(predicate) = filter_fn {
Expand All @@ -281,77 +340,80 @@ impl ProtocolStreamBuilder {
/// Sets the block time interval for the stream.
///
/// This controls how often the stream produces updates.
pub fn block_time(mut self, block_time: u64) -> Self {
self.stream_builder = self
.stream_builder
.block_time(block_time);
self
///
/// Note: Has no effect when using `from_receiver()`.
pub fn block_time(self, block_time: u64) -> Self {
self.with_tycho(|b| b.block_time(block_time))
}

/// Sets the network operation timeout (deprecated).
///
/// Use [`latency_buffer`] instead for controlling latency.
/// This method is retained for backwards compatibility.
///
/// Note: Has no effect when using `from_receiver()`.
#[deprecated = "Use latency_buffer instead"]
pub fn timeout(mut self, timeout: u64) -> Self {
self.stream_builder = self.stream_builder.timeout(timeout);
self
pub fn timeout(self, timeout: u64) -> Self {
self.with_tycho(|b| b.timeout(timeout))
}

/// Sets the latency buffer to aggregate same-block messages.
///
/// This allows the supervisor to wait a short interval for all synchronizers to emit
/// before aggregating.
pub fn latency_buffer(mut self, timeout: u64) -> Self {
self.stream_builder = self.stream_builder.timeout(timeout);
self
///
/// Note: Has no effect when using `from_receiver()`.
pub fn latency_buffer(self, timeout: u64) -> Self {
self.with_tycho(|b| b.timeout(timeout))
}

/// Sets the maximum number of blocks a synchronizer may miss before being marked as `Stale`.
pub fn max_missed_blocks(mut self, n: u64) -> Self {
self.stream_builder = self.stream_builder.max_missed_blocks(n);
self
///
/// Note: Has no effect when using `from_receiver()`.
pub fn max_missed_blocks(self, n: u64) -> Self {
self.with_tycho(|b| b.max_missed_blocks(n))
}

/// Sets how long a synchronizer may take to process the initial message.
///
/// Useful for data-intensive protocols where startup decoding takes longer.
pub fn startup_timeout(mut self, timeout: time::Duration) -> Self {
self.stream_builder = self
.stream_builder
.startup_timeout(timeout);
self
///
/// Note: Has no effect when using `from_receiver()`.
pub fn startup_timeout(self, timeout: time::Duration) -> Self {
self.with_tycho(|b| b.startup_timeout(timeout))
}

/// Configures the stream to exclude state updates.
///
/// This reduces bandwidth and decoding workload if protocol state is not of
/// interest (e.g. only process new tokens).
pub fn no_state(mut self, no_state: bool) -> Self {
self.stream_builder = self.stream_builder.no_state(no_state);
self
///
/// Note: Has no effect when using `from_receiver()`.
pub fn no_state(self, no_state: bool) -> Self {
self.with_tycho(|b| b.no_state(no_state))
}

/// Sets the API key for authenticating with the Tycho server.
pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
self.stream_builder = self.stream_builder.auth_key(auth_key);
self
///
/// Note: Has no effect when using `from_receiver()`.
pub fn auth_key(self, auth_key: Option<String>) -> Self {
self.with_tycho(|b| b.auth_key(auth_key))
}

/// Disables TLS/ SSL for the connection, using http and ws protocols.
///
/// This is not recommended for production use.
pub fn no_tls(mut self, no_tls: bool) -> Self {
self.stream_builder = self.stream_builder.no_tls(no_tls);
self
///
/// Note: Has no effect when using `from_receiver()`.
pub fn no_tls(self, no_tls: bool) -> Self {
self.with_tycho(|b| b.no_tls(no_tls))
}

/// Disable compression for the connection.
pub fn disable_compression(mut self) -> Self {
self.stream_builder = self
.stream_builder
.disable_compression();
self
///
/// Note: Has no effect when using `from_receiver()`.
pub fn disable_compression(self) -> Self {
self.with_tycho(|b| b.disable_compression())
}

/// Sets the stream end policy.
Expand Down Expand Up @@ -388,19 +450,17 @@ impl ProtocolStreamBuilder {
}

/// Configures the retry policy for websocket reconnects.
pub fn websocket_retry_config(mut self, config: &RetryConfiguration) -> Self {
self.stream_builder = self
.stream_builder
.websockets_retry_config(config);
self
///
/// Note: Has no effect when using `from_receiver()`.
pub fn websocket_retry_config(self, config: &RetryConfiguration) -> Self {
self.with_tycho(|b| b.websockets_retry_config(config))
}

/// Configures the retry policy for state synchronization.
pub fn state_synchronizer_retry_config(mut self, config: &RetryConfiguration) -> Self {
self.stream_builder = self
.stream_builder
.state_synchronizer_retry_config(config);
self
///
/// Note: Has no effect when using `from_receiver()`.
pub fn state_synchronizer_retry_config(self, config: &RetryConfiguration) -> Self {
self.with_tycho(|b| b.state_synchronizer_retry_config(config))
}

pub fn get_decoder(&self) -> &TychoStreamDecoder<BlockHeader> {
Expand All @@ -417,7 +477,15 @@ impl ProtocolStreamBuilder {
initialize_hook_handlers().map_err(|e| {
StreamError::SetUpError(format!("Error initializing hook handlers: {e:?}"))
})?;
let (_, rx) = self.stream_builder.build().await?;

let rx = match self.source {
StreamSource::Tycho(builder) => {
let (_, rx) = builder.build().await?;
rx
}
StreamSource::Receiver(rx) => rx,
};

let decoder = Arc::new(self.decoder);

let stream = Box::pin(
Expand Down Expand Up @@ -448,7 +516,7 @@ impl ProtocolStreamBuilder {
move |msg| {
let decoder = decoder.clone(); // Clone again for the async block
async move {
let msg = msg.expect("Save since stream ends if we receive an error");
let msg = msg.expect("Safe since stream ends if we receive an error");
decoder.decode(&msg).await.map_err(|e| {
debug!(msg=?msg, "Decode error: {}", e);
e
Expand All @@ -460,3 +528,43 @@ impl ProtocolStreamBuilder {
Ok(stream)
}
}

#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils {
use std::{path::Path, time::Duration};

use tokio::sync::mpsc;

use super::*;

/// Creates a mock receiver that emits messages at a fixed interval.
pub fn mock_receiver(
messages: Vec<FeedMessage<BlockHeader>>,
interval: Duration,
) -> Receiver<Result<FeedMessage<BlockHeader>, BlockSynchronizerError>> {
let (tx, rx) = mpsc::channel(messages.len().max(1));

tokio::spawn(async move {
for msg in messages {
tokio::time::sleep(interval).await;
if tx.send(Ok(msg)).await.is_err() {
break;
}
}
});

rx
}

/// Creates a mock receiver from a JSON fixture file.
pub fn mock_receiver_from_file(
path: impl AsRef<Path>,
interval: Duration,
) -> Result<Receiver<Result<FeedMessage<BlockHeader>, BlockSynchronizerError>>, std::io::Error>
{
let data = std::fs::read(path)?;
let messages: Vec<FeedMessage<BlockHeader>> = serde_json::from_slice(&data)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
Ok(mock_receiver(messages, interval))
}
}