Skip to content

Commit 68ca931

Browse files
Create aggregator_tests.rs
1 parent e5bbfbe commit 68ca931

File tree

1 file changed

+147
-0
lines changed

1 file changed

+147
-0
lines changed
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
use super::*;
2+
use crate::config::Config;
3+
use crate::types::{
4+
ArbitrageOpportunity, Exchange, HealthStatus, Metrics, PriceLevel, PriceLevelUpdate, Summary,
5+
TradingPair,
6+
};
7+
use std::collections::HashMap;
8+
use std::sync::Arc;
9+
use tokio::sync::broadcast;
10+
use tokio::time::timeout;
11+
12+
#[tokio::test]
13+
async fn test_aggregator_new_and_subscriptions() {
14+
let config = Config::default();
15+
let aggregator = Aggregator::new(config);
16+
// Test that subscriptions do not panic
17+
let _ = aggregator.subscribe_summaries();
18+
let _ = aggregator.subscribe_arbitrage();
19+
let _ = aggregator.subscribe_shutdown();
20+
}
21+
22+
#[tokio::test]
23+
async fn test_aggregator_start_and_stop() {
24+
let config = Config::default();
25+
let aggregator = Aggregator::new(config);
26+
let handles = aggregator.start().await;
27+
assert!(handles.is_ok());
28+
let stop_result = aggregator.stop().await;
29+
assert!(stop_result.is_ok());
30+
}
31+
32+
#[tokio::test]
33+
async fn test_summary_and_health_metrics_accessors() {
34+
let config = Config::default();
35+
let aggregator = Aggregator::new(config);
36+
let pair = TradingPair::new("BTCUSDT", "USDT");
37+
let exchange = Exchange::Binance;
38+
// Initially, all should be empty
39+
assert!(aggregator.get_summary(&pair).await.is_none());
40+
assert!(aggregator.get_health_status(&exchange).await.is_none());
41+
assert!(aggregator.get_metrics(&exchange).await.is_none());
42+
assert!(aggregator.get_all_summaries().await.is_empty());
43+
assert!(aggregator.get_all_health_statuses().await.is_empty());
44+
assert!(aggregator.get_all_metrics().await.is_empty());
45+
}
46+
47+
#[tokio::test]
48+
async fn test_initialize_health_status() {
49+
let config = Config::default();
50+
let aggregator = Aggregator::new(config);
51+
aggregator.initialize_health_status().await.unwrap();
52+
let health_statuses = aggregator.get_all_health_statuses().await;
53+
for exchange in Exchange::all() {
54+
assert!(health_statuses.contains_key(&exchange));
55+
let status = health_statuses.get(&exchange).unwrap();
56+
assert!(!status.is_healthy);
57+
assert!(status.error_message.is_none());
58+
}
59+
}
60+
61+
#[tokio::test]
62+
async fn test_process_price_level_update_and_summary_broadcast() {
63+
let config = Config::default();
64+
let aggregator = Aggregator::new(config);
65+
let summary_sender = aggregator.summary_sender.clone();
66+
let price_level_update = PriceLevelUpdate {
67+
id: uuid::Uuid::new_v4(),
68+
symbol: "BTCUSDT".to_string(),
69+
exchange: Exchange::Binance,
70+
bids: vec![PriceLevel {
71+
price: 100.0,
72+
quantity: 1.0,
73+
exchange: Exchange::Binance,
74+
timestamp: chrono::Utc::now(),
75+
}],
76+
asks: vec![PriceLevel {
77+
price: 101.0,
78+
quantity: 1.0,
79+
exchange: Exchange::Binance,
80+
timestamp: chrono::Utc::now(),
81+
}],
82+
timestamp: chrono::Utc::now(),
83+
};
84+
let result = Aggregator::process_price_level_update(price_level_update, &summary_sender).await;
85+
assert!(result.is_ok());
86+
// Check that a summary was broadcast
87+
let mut rx = summary_sender.subscribe();
88+
let summary = timeout(std::time::Duration::from_millis(100), rx.recv()).await;
89+
assert!(summary.is_ok());
90+
}
91+
92+
#[tokio::test]
93+
async fn test_arbitrage_detector_no_opportunity() {
94+
let config = Config::default();
95+
let aggregator = Aggregator::new(config);
96+
let pair = TradingPair::new("BTCUSDT", "USDT");
97+
let summary = Summary {
98+
symbol: "BTCUSDT".to_string(),
99+
spread: 0.0,
100+
bids: vec![],
101+
asks: vec![],
102+
timestamp: chrono::Utc::now(),
103+
};
104+
let result = Aggregator::detect_arbitrage_opportunity(&pair, &summary).await;
105+
assert!(result.is_none());
106+
}
107+
108+
#[tokio::test]
109+
async fn test_health_monitor_marks_unhealthy() {
110+
let config = Config::default();
111+
let aggregator = Aggregator::new(config);
112+
aggregator.initialize_health_status().await.unwrap();
113+
// Simulate old last_update
114+
let mut health_status = aggregator.health_status.write().await;
115+
for status in health_status.values_mut() {
116+
status.last_update = chrono::Utc::now() - chrono::Duration::seconds(31);
117+
}
118+
drop(health_status);
119+
// Run health monitor once
120+
let health_status = aggregator.health_status.clone();
121+
let mut shutdown_rx = aggregator.shutdown_sender.subscribe();
122+
let handle = tokio::spawn(async move {
123+
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(10));
124+
tokio::select! {
125+
_ = interval.tick() => {
126+
let mut health_map = health_status.write().await;
127+
let now = chrono::Utc::now();
128+
for (_exchange, status) in health_map.iter_mut() {
129+
let time_since_update = now - status.last_update;
130+
if time_since_update.num_seconds() > 30 {
131+
status.is_healthy = false;
132+
if status.error_message.is_none() {
133+
status.error_message = Some("No recent updates".to_string());
134+
}
135+
}
136+
}
137+
}
138+
_ = shutdown_rx.recv() => {}
139+
}
140+
});
141+
handle.await.unwrap();
142+
let health_statuses = aggregator.get_all_health_statuses().await;
143+
for status in health_statuses.values() {
144+
assert!(!status.is_healthy);
145+
assert_eq!(status.error_message.as_deref(), Some("No recent updates"));
146+
}
147+
}

0 commit comments

Comments
 (0)