Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions example/managed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ async fn main() {
let Client {
requester,
mut log_rx,
warn_rx: _,
mut event_rx,
} = client;

Expand All @@ -62,10 +63,8 @@ async fn main() {
tokio::select! {
log = log_rx.recv() => {
if let Some(log) = log {
match log {
Log::Dialog(d) => tracing::info!("{d}"),
Log::Warning(e) => tracing::warn!("{e}"),
_ => (),
if let Log::Dialog(log) = log {
tracing::info!("{log}")
}
}
}
Expand Down
25 changes: 14 additions & 11 deletions example/rescan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! blocks.

use kyoto::{chain::checkpoints::HeaderCheckpoint, core::builder::NodeBuilder};
use kyoto::{Address, Client, Event, Log, Network};
use kyoto::{Address, Client, Event, Network};
use std::collections::HashSet;
use std::str::FromStr;

Expand Down Expand Up @@ -45,6 +45,7 @@ async fn main() {
let Client {
requester,
mut log_rx,
mut warn_rx,
mut event_rx,
} = client;
// Sync with the single script added
Expand All @@ -59,11 +60,12 @@ async fn main() {
}
log = log_rx.recv() => {
if let Some(log) = log {
match log {
Log::Dialog(d) => tracing::info!("{d}"),
Log::Warning(warning) => tracing::warn!("{warning}"),
_ => (),
}
tracing::info!("{log}");
}
}
warn = warn_rx.recv() => {
if let Some(warn) = warn {
tracing::warn!("{warn}");
}
}
}
Expand All @@ -89,11 +91,12 @@ async fn main() {
}
log = log_rx.recv() => {
if let Some(log) = log {
match log {
Log::Dialog(d) => tracing::info!("{d}"),
Log::Warning(warning) => tracing::warn!("{warning}"),
_ => (),
}
tracing::info!("{log}");
}
}
warn = warn_rx.recv() => {
if let Some(warn) = warn {
tracing::warn!("{warn}");
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions example/signet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ async fn main() {
let Client {
requester,
mut log_rx,
mut warn_rx,
mut event_rx,
} = client;
// Continually listen for events until the node is synced to its peers.
Expand All @@ -80,21 +81,24 @@ async fn main() {
Event::BlocksDisconnected(_) => {
tracing::warn!("Some blocks were reorganized")
},
_ => (),
}
}
}
log = log_rx.recv() => {
if let Some(log) = log {
match log {
Log::Dialog(d)=> tracing::info!("{d}"),
Log::Warning(warning)=> tracing::warn!("{warning}"),
Log::StateChange(node_state) => tracing::info!("{node_state}"),
Log::ConnectionsMet => tracing::info!("All required connections met"),
_ => (),
}
}
}
warn = warn_rx.recv() => {
if let Some(warn) = warn {
tracing::warn!("{warn}");
}
}
}
}
let _ = requester.shutdown().await;
Expand Down
8 changes: 6 additions & 2 deletions example/testnet4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ async fn main() {
let Client {
requester,
mut log_rx,
mut warn_rx,
mut event_rx,
} = client;
// Continually listen for events until the node is synced to its peers.
Expand All @@ -72,21 +73,24 @@ async fn main() {
Event::BlocksDisconnected(_) => {
tracing::warn!("Some blocks were reorganized")
},
_ => (),
}
}
}
log = log_rx.recv() => {
if let Some(log) = log {
match log {
Log::Dialog(d)=> tracing::info!("{d}"),
Log::Warning(warning)=> tracing::warn!("{warning}"),
Log::StateChange(node_state) => tracing::info!("{node_state}"),
Log::ConnectionsMet => tracing::info!("All required connections met"),
_ => (),
}
}
}
warn = warn_rx.recv() => {
if let Some(warn) = warn {
tracing::warn!("{warn}");
}
}
}
}
let _ = requester.shutdown().await;
Expand Down
75 changes: 31 additions & 44 deletions src/chain/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,7 @@ impl<H: HeaderStore> Chain<H> {
warning: format!(
"Unexpected error fetching a header from the header store at height {height}"
),
})
.await;
});
}
header_opt.map_err(HeaderPersistenceError::Database)
}
Expand Down Expand Up @@ -249,11 +248,9 @@ impl<H: HeaderStore> Chain<H> {
.write(self.header_chain.headers())
.await
{
self.dialog
.send_warning(Warning::FailedPersistance {
warning: format!("Could not save headers to disk: {e}"),
})
.await;
self.dialog.send_warning(Warning::FailedPersistance {
warning: format!("Could not save headers to disk: {e}"),
});
}
}

Expand All @@ -266,11 +263,9 @@ impl<H: HeaderStore> Chain<H> {
.write_over(self.header_chain.headers(), height)
.await
{
self.dialog
.send_warning(Warning::FailedPersistance {
warning: format!("Could not save headers to disk: {e}"),
})
.await;
self.dialog.send_warning(Warning::FailedPersistance {
warning: format!("Could not save headers to disk: {e}"),
});
}
}

Expand All @@ -285,15 +280,15 @@ impl<H: HeaderStore> Chain<H> {
.map_err(HeaderPersistenceError::Database)?;
if let Some(first) = loaded_headers.values().next() {
if first.prev_blockhash.ne(&self.tip()) {
self.dialog.send_warning(Warning::UnlinkableAnchor).await;
self.dialog.send_warning(Warning::UnlinkableAnchor);
// The header chain did not align, so just start from the anchor
return Err(HeaderPersistenceError::CannotLocateHistory);
} else if loaded_headers
.iter()
.zip(loaded_headers.iter().skip(1))
.any(|(first, second)| first.1.block_hash().ne(&second.1.prev_blockhash))
{
self.dialog.send_warning(Warning::CorruptedHeaders).await;
self.dialog.send_warning(Warning::CorruptedHeaders);
return Err(HeaderPersistenceError::HeadersDoNotLink);
}
loaded_headers.iter().for_each(|header| {
Expand Down Expand Up @@ -418,8 +413,7 @@ impl<H: HeaderStore> Chain<H> {
self.dialog
.send_warning(
Warning::UnexpectedSyncError { warning: "Unmatched checkpoint sent by a peer. Restarting header sync with new peers.".into() }
)
.await;
);
return Err(HeaderSyncError::InvalidCheckpoint);
}
}
Expand All @@ -432,7 +426,7 @@ impl<H: HeaderStore> Chain<H> {
// we only accept it if there is more work provided. otherwise, we disconnect the peer sending
// us this fork
async fn evaluate_fork(&mut self, header_batch: &HeadersBatch) -> Result<(), HeaderSyncError> {
self.dialog.send_warning(Warning::EvaluatingFork).await;
self.dialog.send_warning(Warning::EvaluatingFork);
// We only care about the headers these two chains do not have in common
let uncommon: Vec<Header> = header_batch
.inner()
Expand Down Expand Up @@ -472,11 +466,9 @@ impl<H: HeaderStore> Chain<H> {
self.flush_over_height(stem).await;
Ok(())
} else {
self.dialog
.send_warning(Warning::UnexpectedSyncError {
warning: "Peer sent us a fork with less work than the current chain".into(),
})
.await;
self.dialog.send_warning(Warning::UnexpectedSyncError {
warning: "Peer sent us a fork with less work than the current chain".into(),
});
Err(HeaderSyncError::LessWorkFork)
}
} else {
Expand Down Expand Up @@ -515,8 +507,7 @@ impl<H: HeaderStore> Chain<H> {
warning:
"The remote peer miscalculated the difficulty adjustment when syncing a batch of headers"
.into(),
})
.await;
});
return Err(HeaderSyncError::MiscalculatedDifficulty);
}
}
Expand Down Expand Up @@ -558,8 +549,7 @@ impl<H: HeaderStore> Chain<H> {
warning:
"The remote peer miscalculated the difficulty adjustment when syncing a batch of headers"
.into(),
})
.await;
});
return Err(HeaderSyncError::MiscalculatedDifficulty);
}
}
Expand All @@ -577,13 +567,10 @@ impl<H: HeaderStore> Chain<H> {
}
},
None => {
self.dialog
.send_warning(Warning::UnexpectedSyncError {
warning:
"Unable to audit the difficulty adjustment due to an index overflow"
.into(),
})
.await;
self.dialog.send_warning(Warning::UnexpectedSyncError {
warning: "Unable to audit the difficulty adjustment due to an index overflow"
.into(),
});
}
}
Ok(())
Expand Down Expand Up @@ -853,7 +840,7 @@ impl<H: HeaderStore> Chain<H> {
Some(sender) => {
let send_result = sender.send(Ok(IndexedBlock::new(height, block)));
if send_result.is_err() {
self.dialog.send_warning(Warning::ChannelDropped).await
self.dialog.send_warning(Warning::ChannelDropped)
};
}
None => {
Expand Down Expand Up @@ -885,7 +872,7 @@ impl<H: HeaderStore> Chain<H> {
if height_opt.is_none() {
let err_reponse = request.oneshot.send(Err(FetchBlockError::UnknownHash));
if err_reponse.is_err() {
self.dialog.send_warning(Warning::ChannelDropped).await;
self.dialog.send_warning(Warning::ChannelDropped);
}
} else {
self.dialog
Expand All @@ -902,12 +889,10 @@ impl<H: HeaderStore> Chain<H> {
let mut db = self.db.lock().await;
let range_opt = db.load(range).await;
if range_opt.is_err() {
self.dialog
.send_warning(Warning::FailedPersistance {
warning: "Unexpected error fetching a range of headers from the header store"
.to_string(),
})
.await;
self.dialog.send_warning(Warning::FailedPersistance {
warning: "Unexpected error fetching a range of headers from the header store"
.to_string(),
});
}
range_opt.map_err(HeaderPersistenceError::Database)
}
Expand Down Expand Up @@ -949,7 +934,7 @@ mod tests {
},
core::{
dialog::Dialog,
messages::{Event, Log},
messages::{Event, Log, Warning},
},
filters::cfheader_chain::AppendAttempt,
};
Expand All @@ -958,6 +943,7 @@ mod tests {

fn new_regtest(anchor: HeaderCheckpoint) -> Chain<()> {
let (log_tx, _) = tokio::sync::mpsc::channel::<Log>(1);
let (warn_tx, _) = tokio::sync::mpsc::unbounded_channel::<Warning>();
let (event_tx, _) = tokio::sync::mpsc::unbounded_channel::<Event>();
let mut checkpoints = HeaderCheckpoints::new(&bitcoin::Network::Regtest);
checkpoints.prune_up_to(anchor);
Expand All @@ -966,14 +952,15 @@ mod tests {
HashSet::new(),
anchor,
checkpoints,
Dialog::new(log_tx, event_tx),
Dialog::new(log_tx, warn_tx, event_tx),
(),
1,
)
}

fn new_regtest_two_peers(anchor: HeaderCheckpoint) -> Chain<()> {
let (log_tx, _) = tokio::sync::mpsc::channel::<Log>(1);
let (warn_tx, _) = tokio::sync::mpsc::unbounded_channel::<Warning>();
let (event_tx, _) = tokio::sync::mpsc::unbounded_channel::<Event>();
let mut checkpoints = HeaderCheckpoints::new(&bitcoin::Network::Regtest);
checkpoints.prune_up_to(anchor);
Expand All @@ -982,7 +969,7 @@ mod tests {
HashSet::new(),
anchor,
checkpoints,
Dialog::new(log_tx, event_tx),
Dialog::new(log_tx, warn_tx, event_tx),
(),
2,
)
Expand Down
4 changes: 2 additions & 2 deletions src/core/channel_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use bitcoin::{
Block, BlockHash, FeeRate, Transaction,
};

use crate::core::messages::FailurePayload;
use crate::core::messages::RejectPayload;

#[derive(Debug, Clone)]
pub(crate) enum MainThreadMessage {
Expand Down Expand Up @@ -49,7 +49,7 @@ pub(crate) enum PeerMessage {
Filter(CFilter),
Block(Block),
NewBlocks(Vec<BlockHash>),
Reject(FailurePayload),
Reject(RejectPayload),
Disconnect,
Verack,
Ping(u64),
Expand Down
Loading