Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 88 additions & 63 deletions tests/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use kyoto::{
chain::checkpoints::HeaderCheckpoint,
core::{client::Client, node::Node},
BlockHash, Event, Log, NodeState, ServiceFlags, SqliteHeaderDb, SqlitePeerDb, TrustedPeer,
Warning,
};
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::UnboundedReceiver;
Expand All @@ -39,7 +40,7 @@ fn start_bitcoind(with_v2_transport: bool) -> anyhow::Result<(corepc_node::Node,
Ok((bitcoind, socket_addr))
}

async fn new_node(addrs: HashSet<ScriptBuf>, socket_addr: SocketAddrV4) -> (Node<(), ()>, Client) {
fn new_node(addrs: HashSet<ScriptBuf>, socket_addr: SocketAddrV4) -> (Node<(), ()>, Client) {
let host = (IpAddr::V4(*socket_addr.ip()), Some(socket_addr.port()));
let builder = kyoto::core::builder::NodeBuilder::new(bitcoin::Network::Regtest);
let (node, client) = builder
Expand All @@ -49,7 +50,7 @@ async fn new_node(addrs: HashSet<ScriptBuf>, socket_addr: SocketAddrV4) -> (Node
(node, client)
}

async fn new_node_sql(
fn new_node_sql(
addrs: HashSet<ScriptBuf>,
socket_addr: SocketAddrV4,
tempdir_path: PathBuf,
Expand All @@ -67,7 +68,7 @@ async fn new_node_sql(
(node, client)
}

async fn new_node_anchor_sql(
fn new_node_anchor_sql(
addrs: HashSet<ScriptBuf>,
checkpoint: HeaderCheckpoint,
socket_addr: SocketAddrV4,
Expand Down Expand Up @@ -111,11 +112,7 @@ async fn invalidate_block(rpc: &corepc_node::Client, hash: &bitcoin::BlockHash)
tokio::time::sleep(Duration::from_secs(2)).await;
}

async fn sync_assert(
best: &bitcoin::BlockHash,
channel: &mut UnboundedReceiver<Event>,
log: &mut Receiver<Log>,
) {
async fn sync_assert(best: &bitcoin::BlockHash, channel: &mut UnboundedReceiver<Event>) {
loop {
tokio::select! {
event = channel.recv() => {
Expand All @@ -125,9 +122,21 @@ async fn sync_assert(
break;
};
}
log = log.recv() => {
}
}
}

async fn print_logs(mut log_rx: Receiver<Log>, mut warn_rx: UnboundedReceiver<Warning>) {
loop {
tokio::select! {
log = log_rx.recv() => {
if let Some(log) = log {
println!("{log}");
println!("{log}")
}
}
warn = warn_rx.recv() => {
if let Some(warn) = warn {
println!("{warn}")
}
}
}
Expand All @@ -152,15 +161,16 @@ async fn test_reorg() {
let mut scripts = HashSet::new();
let other = rpc.new_address().unwrap();
scripts.insert(other.into());
let (node, client) = new_node(scripts.clone(), socket_addr).await;
let (node, client) = new_node(scripts.clone(), socket_addr);
tokio::task::spawn(async move { node.run().await });
let Client {
requester,
log_rx: mut log,
warn_rx: _,
log_rx,
warn_rx,
event_rx: mut channel,
} = client;
sync_assert(&best, &mut channel, &mut log).await;
tokio::task::spawn(async move { print_logs(log_rx, warn_rx).await });
sync_assert(&best, &mut channel).await;
// Reorganize the blocks
let old_best = best;
let old_height = num_blocks(rpc);
Expand Down Expand Up @@ -205,15 +215,16 @@ async fn test_mine_after_reorg() {
let mut scripts = HashSet::new();
let other = rpc.new_address().unwrap();
scripts.insert(other.into());
let (node, client) = new_node(scripts.clone(), socket_addr).await;
let (node, client) = new_node(scripts.clone(), socket_addr);
tokio::task::spawn(async move { node.run().await });
let Client {
requester,
log_rx: mut log,
warn_rx: _,
log_rx,
warn_rx,
event_rx: mut channel,
} = client;
sync_assert(&best, &mut channel, &mut log).await;
tokio::task::spawn(async move { print_logs(log_rx, warn_rx).await });
sync_assert(&best, &mut channel).await;
// Reorganize the blocks
let old_best = best;
let old_height = num_blocks(rpc);
Expand All @@ -239,7 +250,7 @@ async fn test_mine_after_reorg() {
}
mine_blocks(rpc, &miner, 2, 1).await;
let best = best_hash(rpc);
sync_assert(&best, &mut channel, &mut log).await;
sync_assert(&best, &mut channel).await;
requester.shutdown().await.unwrap();
rpc.stop().unwrap();
}
Expand All @@ -261,15 +272,16 @@ async fn test_various_client_methods() {
let mut scripts = HashSet::new();
let other = rpc.new_address().unwrap();
scripts.insert(other.into());
let (node, client) = new_node(scripts.clone(), socket_addr).await;
let (node, client) = new_node(scripts.clone(), socket_addr);
tokio::task::spawn(async move { node.run().await });
let Client {
requester,
log_rx: mut log,
warn_rx: _,
log_rx,
warn_rx,
event_rx: mut channel,
} = client;
sync_assert(&best, &mut channel, &mut log).await;
tokio::task::spawn(async move { print_logs(log_rx, warn_rx).await });
sync_assert(&best, &mut channel).await;
let batch = requester.get_header_range(10_000..10_002).await.unwrap();
assert!(batch.is_empty());
let _ = requester.broadcast_min_feerate().await.unwrap();
Expand Down Expand Up @@ -299,15 +311,16 @@ async fn test_sql_reorg() {
let mut scripts = HashSet::new();
let other = rpc.new_address().unwrap();
scripts.insert(other.into());
let (node, client) = new_node_sql(scripts.clone(), socket_addr, tempdir.clone()).await;
let (node, client) = new_node_sql(scripts.clone(), socket_addr, tempdir.clone());
tokio::task::spawn(async move { node.run().await });
let Client {
requester,
log_rx: mut log,
warn_rx: _,
log_rx,
warn_rx,
event_rx: mut channel,
} = client;
sync_assert(&best, &mut channel, &mut log).await;
tokio::task::spawn(async move { print_logs(log_rx, warn_rx).await });
sync_assert(&best, &mut channel).await;
let batch = requester.get_header_range(0..10).await.unwrap();
assert!(!batch.is_empty());
requester.shutdown().await.unwrap();
Expand All @@ -318,14 +331,15 @@ async fn test_sql_reorg() {
mine_blocks(rpc, &miner, 2, 1).await;
let best = best_hash(rpc);
// Spin up the node on a cold start
let (node, client) = new_node_sql(scripts.clone(), socket_addr, tempdir.clone()).await;
let (node, client) = new_node_sql(scripts.clone(), socket_addr, tempdir.clone());
tokio::task::spawn(async move { node.run().await });
let Client {
requester,
log_rx: _,
warn_rx: _,
log_rx,
warn_rx,
event_rx: mut channel,
} = client;
let handle = tokio::task::spawn(async move { print_logs(log_rx, warn_rx).await });
// Make sure the reorganization is caught after a cold start
while let Some(message) = channel.recv().await {
match message {
Expand All @@ -343,20 +357,22 @@ async fn test_sql_reorg() {
}
}
requester.shutdown().await.unwrap();
drop(handle);
// Mine more blocks
mine_blocks(rpc, &miner, 2, 1).await;
let best = best_hash(rpc);
// Make sure the node does not have any corrupted headers
let (node, client) = new_node_sql(scripts.clone(), socket_addr, tempdir).await;
let (node, client) = new_node_sql(scripts.clone(), socket_addr, tempdir);
tokio::task::spawn(async move { node.run().await });
let Client {
requester,
log_rx: mut log,
warn_rx: _,
log_rx,
warn_rx,
event_rx: mut channel,
} = client;
tokio::task::spawn(async move { print_logs(log_rx, warn_rx).await });
// The node properly syncs after persisting a reorg
sync_assert(&best, &mut channel, &mut log).await;
sync_assert(&best, &mut channel).await;
requester.shutdown().await.unwrap();
rpc.stop().unwrap();
}
Expand All @@ -379,15 +395,16 @@ async fn test_two_deep_reorg() {
let mut scripts = HashSet::new();
let other = rpc.new_address().unwrap();
scripts.insert(other.into());
let (node, client) = new_node_sql(scripts.clone(), socket_addr, tempdir.clone()).await;
let (node, client) = new_node_sql(scripts.clone(), socket_addr, tempdir.clone());
tokio::task::spawn(async move { node.run().await });
let Client {
requester,
log_rx: mut log,
warn_rx: _,
log_rx,
warn_rx,
event_rx: mut channel,
} = client;
sync_assert(&best, &mut channel, &mut log).await;
let handle = tokio::task::spawn(async move { print_logs(log_rx, warn_rx).await });
sync_assert(&best, &mut channel).await;
requester.shutdown().await.unwrap();
// Reorganize the blocks
let old_height = num_blocks(rpc);
Expand All @@ -397,15 +414,17 @@ async fn test_two_deep_reorg() {
invalidate_block(rpc, &best).await;
mine_blocks(rpc, &miner, 3, 1).await;
let best = best_hash(rpc);
drop(handle);
// Make sure the reorganization is caught after a cold start
let (node, client) = new_node_sql(scripts.clone(), socket_addr, tempdir.clone()).await;
let (node, client) = new_node_sql(scripts.clone(), socket_addr, tempdir.clone());
tokio::task::spawn(async move { node.run().await });
let Client {
requester,
log_rx: _,
warn_rx: _,
log_rx,
warn_rx,
event_rx: mut channel,
} = client;
let handle = tokio::task::spawn(async move { print_logs(log_rx, warn_rx).await });
while let Some(message) = channel.recv().await {
match message {
kyoto::core::messages::Event::BlocksDisconnected(blocks) => {
Expand All @@ -421,21 +440,23 @@ async fn test_two_deep_reorg() {
_ => {}
}
}
drop(handle);
requester.shutdown().await.unwrap();
// Mine more blocks
mine_blocks(rpc, &miner, 2, 1).await;
let best = best_hash(rpc);
// Make sure the node does not have any corrupted headers
let (node, client) = new_node_sql(scripts.clone(), socket_addr, tempdir).await;
let (node, client) = new_node_sql(scripts.clone(), socket_addr, tempdir);
tokio::task::spawn(async move { node.run().await });
let Client {
requester,
log_rx: mut log,
warn_rx: _,
log_rx,
warn_rx,
event_rx: mut channel,
} = client;
tokio::task::spawn(async move { print_logs(log_rx, warn_rx).await });
// The node properly syncs after persisting a reorg
sync_assert(&best, &mut channel, &mut log).await;
sync_assert(&best, &mut channel).await;
requester.shutdown().await.unwrap();
rpc.stop().unwrap();
}
Expand All @@ -457,15 +478,17 @@ async fn test_sql_stale_anchor() {
let mut scripts = HashSet::new();
let other = rpc.new_address().unwrap();
scripts.insert(other.into());
let (node, client) = new_node_sql(scripts.clone(), socket_addr, tempdir.clone()).await;
let (node, client) = new_node_sql(scripts.clone(), socket_addr, tempdir.clone());
tokio::task::spawn(async move { node.run().await });
let Client {
requester,
log_rx: mut log,
warn_rx: _,
log_rx,
warn_rx,
event_rx: mut channel,
} = client;
sync_assert(&best, &mut channel, &mut log).await;
let handle = tokio::task::spawn(async move { print_logs(log_rx, warn_rx).await });
sync_assert(&best, &mut channel).await;
drop(handle);
requester.shutdown().await.unwrap();
// Reorganize the blocks
let old_best = best;
Expand All @@ -479,15 +502,15 @@ async fn test_sql_stale_anchor() {
HeaderCheckpoint::new(old_height as u32, old_best),
socket_addr,
tempdir.clone(),
)
.await;
);
tokio::task::spawn(async move { node.run().await });
let Client {
requester,
log_rx: _,
warn_rx: _,
log_rx,
warn_rx,
event_rx: mut channel,
} = client;
let handle = tokio::task::spawn(async move { print_logs(log_rx, warn_rx).await });
// Ensure SQL is able to catch the fork by loading in headers from the database
while let Some(message) = channel.recv().await {
match message {
Expand All @@ -504,6 +527,7 @@ async fn test_sql_stale_anchor() {
_ => {}
}
}
drop(handle);
requester.shutdown().await.unwrap();
// Don't do anything, but reload the node from the checkpoint
let cp = best_hash(rpc);
Expand All @@ -515,17 +539,18 @@ async fn test_sql_stale_anchor() {
HeaderCheckpoint::new(old_height as u32, cp),
socket_addr,
tempdir.clone(),
)
.await;
);
tokio::task::spawn(async move { node.run().await });
let Client {
requester,
log_rx: mut log,
warn_rx: _,
log_rx,
warn_rx,
event_rx: mut channel,
} = client;
let handle = tokio::task::spawn(async move { print_logs(log_rx, warn_rx).await });
// The node properly syncs after persisting a reorg
sync_assert(&best, &mut channel, &mut log).await;
sync_assert(&best, &mut channel).await;
drop(handle);
requester.shutdown().await.unwrap();
// Mine more blocks and reload from the checkpoint
let cp = best_hash(rpc);
Expand All @@ -538,17 +563,17 @@ async fn test_sql_stale_anchor() {
HeaderCheckpoint::new(old_height as u32, cp),
socket_addr,
tempdir,
)
.await;
);
tokio::task::spawn(async move { node.run().await });
let Client {
requester,
log_rx: mut log,
warn_rx: _,
log_rx,
warn_rx,
event_rx: mut channel,
} = client;
tokio::task::spawn(async move { print_logs(log_rx, warn_rx).await });
// The node properly syncs after persisting a reorg
sync_assert(&best, &mut channel, &mut log).await;
sync_assert(&best, &mut channel).await;
requester.shutdown().await.unwrap();
rpc.stop().unwrap();
}
Expand Down
Loading