Skip to content

Commit 003552c

Browse files
committed
feat(client)!: split Warning into separate receiver
1 parent b09425d commit 003552c

File tree

11 files changed

+79
-42
lines changed

11 files changed

+79
-42
lines changed

example/managed.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ async fn main() {
5454
let Client {
5555
requester,
5656
mut log_rx,
57+
warn_rx: _,
5758
mut event_rx,
5859
} = client;
5960

@@ -62,10 +63,8 @@ async fn main() {
6263
tokio::select! {
6364
log = log_rx.recv() => {
6465
if let Some(log) = log {
65-
match log {
66-
Log::Dialog(d) => tracing::info!("{d}"),
67-
Log::Warning(e) => tracing::warn!("{e}"),
68-
_ => (),
66+
if let Log::Dialog(log) = log {
67+
tracing::info!("{log}")
6968
}
7069
}
7170
}

example/rescan.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
//! blocks.
44
55
use kyoto::{chain::checkpoints::HeaderCheckpoint, core::builder::NodeBuilder};
6-
use kyoto::{Address, Client, Event, Log, Network};
6+
use kyoto::{Address, Client, Event, Network};
77
use std::collections::HashSet;
88
use std::str::FromStr;
99

@@ -45,6 +45,7 @@ async fn main() {
4545
let Client {
4646
requester,
4747
mut log_rx,
48+
mut warn_rx,
4849
mut event_rx,
4950
} = client;
5051
// Sync with the single script added
@@ -59,11 +60,12 @@ async fn main() {
5960
}
6061
log = log_rx.recv() => {
6162
if let Some(log) = log {
62-
match log {
63-
Log::Dialog(d) => tracing::info!("{d}"),
64-
Log::Warning(warning) => tracing::warn!("{warning}"),
65-
_ => (),
66-
}
63+
tracing::info!("{log}");
64+
}
65+
}
66+
warn = warn_rx.recv() => {
67+
if let Some(warn) = warn {
68+
tracing::warn!("{warn}");
6769
}
6870
}
6971
}
@@ -89,11 +91,12 @@ async fn main() {
8991
}
9092
log = log_rx.recv() => {
9193
if let Some(log) = log {
92-
match log {
93-
Log::Dialog(d) => tracing::info!("{d}"),
94-
Log::Warning(warning) => tracing::warn!("{warning}"),
95-
_ => (),
96-
}
94+
tracing::info!("{log}");
95+
}
96+
}
97+
warn = warn_rx.recv() => {
98+
if let Some(warn) = warn {
99+
tracing::warn!("{warn}");
97100
}
98101
}
99102
}

example/signet.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ async fn main() {
5757
let Client {
5858
requester,
5959
mut log_rx,
60+
mut warn_rx,
6061
mut event_rx,
6162
} = client;
6263
// Continually listen for events until the node is synced to its peers.
@@ -87,13 +88,17 @@ async fn main() {
8788
if let Some(log) = log {
8889
match log {
8990
Log::Dialog(d)=> tracing::info!("{d}"),
90-
Log::Warning(warning)=> tracing::warn!("{warning}"),
9191
Log::StateChange(node_state) => tracing::info!("{node_state}"),
9292
Log::ConnectionsMet => tracing::info!("All required connections met"),
9393
_ => (),
9494
}
9595
}
9696
}
97+
warn = warn_rx.recv() => {
98+
if let Some(warn) = warn {
99+
tracing::warn!("{warn}");
100+
}
101+
}
97102
}
98103
}
99104
let _ = requester.shutdown().await;

example/testnet4.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ async fn main() {
5252
let Client {
5353
requester,
5454
mut log_rx,
55+
mut warn_rx,
5556
mut event_rx,
5657
} = client;
5758
// Continually listen for events until the node is synced to its peers.
@@ -79,13 +80,17 @@ async fn main() {
7980
if let Some(log) = log {
8081
match log {
8182
Log::Dialog(d)=> tracing::info!("{d}"),
82-
Log::Warning(warning)=> tracing::warn!("{warning}"),
8383
Log::StateChange(node_state) => tracing::info!("{node_state}"),
8484
Log::ConnectionsMet => tracing::info!("All required connections met"),
8585
_ => (),
8686
}
8787
}
8888
}
89+
warn = warn_rx.recv() => {
90+
if let Some(warn) = warn {
91+
tracing::warn!("{warn}");
92+
}
93+
}
8994
}
9095
}
9196
let _ = requester.shutdown().await;

src/chain/chain.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -949,7 +949,7 @@ mod tests {
949949
},
950950
core::{
951951
dialog::Dialog,
952-
messages::{Event, Log},
952+
messages::{Event, Log, Warning},
953953
},
954954
filters::cfheader_chain::AppendAttempt,
955955
};
@@ -958,6 +958,7 @@ mod tests {
958958

959959
fn new_regtest(anchor: HeaderCheckpoint) -> Chain<()> {
960960
let (log_tx, _) = tokio::sync::mpsc::channel::<Log>(1);
961+
let (warn_tx, _) = tokio::sync::mpsc::unbounded_channel::<Warning>();
961962
let (event_tx, _) = tokio::sync::mpsc::unbounded_channel::<Event>();
962963
let mut checkpoints = HeaderCheckpoints::new(&bitcoin::Network::Regtest);
963964
checkpoints.prune_up_to(anchor);
@@ -966,14 +967,15 @@ mod tests {
966967
HashSet::new(),
967968
anchor,
968969
checkpoints,
969-
Dialog::new(log_tx, event_tx),
970+
Dialog::new(log_tx, warn_tx, event_tx),
970971
(),
971972
1,
972973
)
973974
}
974975

975976
fn new_regtest_two_peers(anchor: HeaderCheckpoint) -> Chain<()> {
976977
let (log_tx, _) = tokio::sync::mpsc::channel::<Log>(1);
978+
let (warn_tx, _) = tokio::sync::mpsc::unbounded_channel::<Warning>();
977979
let (event_tx, _) = tokio::sync::mpsc::unbounded_channel::<Event>();
978980
let mut checkpoints = HeaderCheckpoints::new(&bitcoin::Network::Regtest);
979981
checkpoints.prune_up_to(anchor);
@@ -982,7 +984,7 @@ mod tests {
982984
HashSet::new(),
983985
anchor,
984986
checkpoints,
985-
Dialog::new(log_tx, event_tx),
987+
Dialog::new(log_tx, warn_tx, event_tx),
986988
(),
987989
2,
988990
)

src/core/client.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::{collections::BTreeMap, ops::Range, time::Duration};
88
use tokio::sync::mpsc;
99
use tokio::sync::mpsc::Sender;
1010

11-
use crate::{Event, Log, TrustedPeer, TxBroadcast};
11+
use crate::{Event, Log, TrustedPeer, TxBroadcast, Warning};
1212

1313
#[cfg(feature = "filter-control")]
1414
use super::{error::FetchBlockError, messages::BlockRequest, BlockReceiver, IndexedBlock};
@@ -24,19 +24,23 @@ pub struct Client {
2424
pub requester: Requester,
2525
/// Receive log messages from a node.
2626
pub log_rx: mpsc::Receiver<Log>,
27+
/// Receive warning messages from a node.
28+
pub warn_rx: mpsc::UnboundedReceiver<Warning>,
2729
/// Receive [`Event`] from a node to act on.
2830
pub event_rx: mpsc::UnboundedReceiver<Event>,
2931
}
3032

3133
impl Client {
3234
pub(crate) fn new(
3335
log_rx: mpsc::Receiver<Log>,
36+
warn_rx: mpsc::UnboundedReceiver<Warning>,
3437
event_rx: mpsc::UnboundedReceiver<Event>,
3538
ntx: Sender<ClientMessage>,
3639
) -> Self {
3740
Self {
3841
requester: Requester::new(ntx),
3942
log_rx,
43+
warn_rx,
4044
event_rx,
4145
}
4246
}
@@ -351,13 +355,15 @@ mod tests {
351355
async fn test_client_works() {
352356
let transaction: Transaction = deserialize(&hex::decode("0200000001aad73931018bd25f84ae400b68848be09db706eac2ac18298babee71ab656f8b0000000048473044022058f6fc7c6a33e1b31548d481c826c015bd30135aad42cd67790dab66d2ad243b02204a1ced2604c6735b6393e5b41691dd78b00f0c5942fb9f751856faa938157dba01feffffff0280f0fa020000000017a9140fb9463421696b82c833af241c78c17ddbde493487d0f20a270100000017a91429ca74f8a08f81999428185c97b5d852e4063f618765000000").unwrap()).unwrap();
353357
let (log_tx, log_rx) = tokio::sync::mpsc::channel::<Log>(1);
358+
let (_, warn_rx) = tokio::sync::mpsc::unbounded_channel::<Warning>();
354359
let (_, event_rx) = tokio::sync::mpsc::unbounded_channel::<Event>();
355360
let (ctx, crx) = mpsc::channel::<ClientMessage>(5);
356361
let Client {
357362
requester,
358363
mut log_rx,
364+
warn_rx: _,
359365
event_rx: _,
360-
} = Client::new(log_rx, event_rx, ctx);
366+
} = Client::new(log_rx, warn_rx, event_rx, ctx);
361367
let send_res = log_tx
362368
.send(Log::Dialog("An important message".into()))
363369
.await;

src/core/dialog.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,21 @@ use super::messages::{Event, Log, Progress, Warning};
55
#[derive(Debug, Clone)]
66
pub(crate) struct Dialog {
77
log_tx: Sender<Log>,
8+
warn_tx: UnboundedSender<Warning>,
89
event_tx: UnboundedSender<Event>,
910
}
1011

1112
impl Dialog {
12-
pub(crate) fn new(log_tx: Sender<Log>, event_tx: UnboundedSender<Event>) -> Self {
13-
Self { log_tx, event_tx }
13+
pub(crate) fn new(
14+
log_tx: Sender<Log>,
15+
warn_tx: UnboundedSender<Warning>,
16+
event_tx: UnboundedSender<Event>,
17+
) -> Self {
18+
Self {
19+
log_tx,
20+
warn_tx,
21+
event_tx,
22+
}
1423
}
1524

1625
pub(crate) async fn send_dialog(&self, dialog: impl Into<String>) {
@@ -40,7 +49,7 @@ impl Dialog {
4049
}
4150

4251
pub(crate) async fn send_warning(&self, warning: Warning) {
43-
let _ = self.log_tx.send(Log::Warning(warning)).await;
52+
let _ = self.warn_tx.send(warning);
4453
}
4554

4655
pub(crate) async fn send_info(&self, info: Log) {

src/core/messages.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ use super::{
2121
pub enum Log {
2222
/// Human readable dialog of what the node is currently doing.
2323
Dialog(String),
24-
/// A warning that may effect the function of the node.
25-
Warning(Warning),
2624
/// The current state of the node in the syncing process.
2725
StateChange(NodeState),
2826
/// The node is connected to all required peers.
@@ -39,7 +37,6 @@ impl core::fmt::Display for Log {
3937
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
4038
match self {
4139
Log::Dialog(d) => write!(f, "{}", d),
42-
Log::Warning(w) => write!(f, "{}", w),
4340
Log::StateChange(s) => write!(f, "{}", s),
4441
Log::TxSent(txid) => write!(f, "Transaction sent: {}", txid),
4542
Log::ConnectionsMet => write!(f, "Required connections met"),

src/core/node.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,12 @@ impl<H: HeaderStore, P: PeerStore> Node<H, P> {
9393
) -> (Self, Client) {
9494
// Set up a communication channel between the node and client
9595
let (log_tx, log_rx) = mpsc::channel::<Log>(32);
96+
let (warn_tx, warn_rx) = mpsc::unbounded_channel::<Warning>();
9697
let (event_tx, event_rx) = mpsc::unbounded_channel::<Event>();
9798
let (ctx, crx) = mpsc::channel::<ClientMessage>(5);
98-
let client = Client::new(log_rx, event_rx, ctx);
99+
let client = Client::new(log_rx, warn_rx, event_rx, ctx);
99100
// A structured way to talk to the client
100-
let dialog = Dialog::new(log_tx, event_tx);
101+
let dialog = Dialog::new(log_tx, warn_tx, event_tx);
101102
// We always assume we are behind
102103
let state = Arc::new(RwLock::new(NodeState::Behind));
103104
// Configure the peer manager

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
//! // Run the node and wait for the sync message;
4343
//! tokio::task::spawn(async move { node.run().await });
4444
//! // Split the client into components that send messages and listen to messages
45-
//! let Client { requester, mut log_rx, mut event_rx } = client;
45+
//! let Client { requester, mut log_rx, warn_rx: _, mut event_rx } = client;
4646
//! // Sync with the single script added
4747
//! loop {
4848
//! tokio::select! {

0 commit comments

Comments
 (0)