Skip to content

Commit ffafc83

Browse files
committed
Enforce client semantics with state
Uses the sealed trait hack to only allow for compiler enforced state transitions. The flow is `Idle -> Subscribed -> Actice`, where a user decides what to do with the events, runs the node, then gets access to all the usual client features. I decided to scrap the `Requester` and instead encapsulate the event receivers with a newtype. The only downside I see with this is `&mut self` doesn't seem to be possible, as each impl block is defined for it's `State` type. I need to fiddle with that to see if we can't remove the need to return the client each state transition.
1 parent c7e0e7c commit ffafc83

File tree

7 files changed

+241
-187
lines changed

7 files changed

+241
-187
lines changed

examples/bitcoin.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
44
use bip157::builder::Builder;
55
use bip157::chain::{BlockHeaderChanges, ChainState};
6-
use bip157::{lookup_host, Client, Event, HeaderCheckpoint, Network, ScriptBuf};
6+
use bip157::client::EventListeners;
7+
use bip157::{lookup_host, Event, HeaderCheckpoint, Network, ScriptBuf};
78
use std::collections::HashSet;
89
use tokio::time::Instant;
910

@@ -35,17 +36,17 @@ async fn main() {
3536
// Create the node and client
3637
.build();
3738

38-
let client = client.run();
3939
// Split the client into components that send messages and listen to messages.
4040
// With this construction, different parts of the program can take ownership of
4141
// specific tasks.
42-
let Client {
43-
requester,
42+
let (client, events) = client.subscribe();
43+
let EventListeners {
4444
mut info_rx,
4545
mut warn_rx,
4646
mut event_rx,
47-
..
48-
} = client;
47+
} = events;
48+
let client = client.start();
49+
4950
// Continually listen for events until the node is synced to its peers.
5051
loop {
5152
tokio::select! {
@@ -55,11 +56,11 @@ async fn main() {
5556
Event::FiltersSynced(update) => {
5657
tracing::info!("Chain tip: {}",update.tip().hash);
5758
// Request information from the node
58-
let fee = requester.broadcast_min_feerate().await.unwrap();
59+
let fee = client.broadcast_min_feerate().await.unwrap();
5960
tracing::info!("Minimum transaction broadcast fee rate: {:#}", fee);
6061
let sync_time = now.elapsed().as_secs_f32();
6162
tracing::info!("Total sync time: {sync_time} seconds");
62-
let avg_fee_rate = requester.average_fee_rate(update.tip().hash).await.unwrap();
63+
let avg_fee_rate = client.average_fee_rate(update.tip().hash).await.unwrap();
6364
tracing::info!("Last block average fee rate: {:#}", avg_fee_rate);
6465
break;
6566
},
@@ -82,6 +83,6 @@ async fn main() {
8283
}
8384
}
8485
}
85-
let _ = requester.shutdown();
86+
let _ = client.shutdown();
8687
tracing::info!("Shutting down");
8788
}

examples/signet.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
44
use bip157::chain::{BlockHeaderChanges, ChainState};
55
use bip157::messages::Event;
6-
use bip157::{builder::Builder, chain::checkpoints::HeaderCheckpoint, Client};
7-
use bip157::{Address, BlockHash, Network};
6+
use bip157::{builder::Builder, chain::checkpoints::HeaderCheckpoint};
7+
use bip157::{Address, BlockHash, EventListeners, Network};
88
use std::collections::HashSet;
99
use std::str::FromStr;
1010

@@ -40,15 +40,13 @@ async fn main() {
4040
// Create the node and client
4141
.build();
4242

43-
let client = client.run();
44-
45-
let Client {
46-
requester,
43+
let (client, events) = client.subscribe();
44+
let EventListeners {
4745
mut info_rx,
4846
mut warn_rx,
4947
mut event_rx,
50-
..
51-
} = client;
48+
} = events;
49+
let client = client.start();
5250

5351
// Continually listen for events until the node is synced to its peers.
5452
loop {
@@ -72,7 +70,7 @@ async fn main() {
7270
if filter.contains_any(addresses.iter()) {
7371
let hash = filter.block_hash();
7472
tracing::info!("Found script at {}!", hash);
75-
let indexed_block = requester.get_block(hash).await.unwrap();
73+
let indexed_block = client.get_block(hash).await.unwrap();
7674
let coinbase = indexed_block.block.txdata.first().unwrap().compute_txid();
7775
tracing::info!("Coinbase transaction ID: {}", coinbase);
7876
break;
@@ -88,6 +86,6 @@ async fn main() {
8886
}
8987
}
9088
}
91-
let _ = requester.shutdown();
89+
let _ = client.shutdown();
9290
tracing::info!("Shutting down");
9391
}

src/builder.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use bitcoin::Network;
55

66
use super::{client::Client, node::Node};
77
use crate::chain::ChainState;
8+
use crate::client::Idle;
89
use crate::network::ConnectionType;
910
use crate::TrustedPeer;
1011
use crate::{Config, FilterType};
@@ -139,7 +140,7 @@ impl Builder {
139140
}
140141

141142
/// Consume the node builder and receive a [`Client`].
142-
pub fn build(mut self) -> Client {
143+
pub fn build(mut self) -> Client<Idle> {
143144
Node::build(self.network, core::mem::take(&mut self.config))
144145
}
145146
}

src/client.rs

Lines changed: 96 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12,58 +12,129 @@ use crate::{Event, Info, TrustedPeer, Warning};
1212
use super::{error::ClientError, messages::ClientMessage};
1313
use super::{error::FetchBlockError, IndexedBlock};
1414

15-
/// A [`Client`] allows for communication with a running node.
15+
/// Client state when idle.
16+
pub struct Idle;
17+
/// Client state when subscribed to events.
18+
pub struct Subscribed;
19+
/// Client state when active.
20+
pub struct Active;
21+
22+
mod sealed {
23+
pub trait Sealed {}
24+
}
25+
26+
impl sealed::Sealed for Idle {}
27+
impl sealed::Sealed for Subscribed {}
28+
impl sealed::Sealed for Active {}
29+
30+
/// State of the client
31+
pub trait State: sealed::Sealed {}
32+
33+
impl State for Idle {}
34+
impl State for Subscribed {}
35+
impl State for Active {}
36+
37+
/// Wrapper type for the channels that will receive events.
1638
#[derive(Debug)]
17-
pub struct Client {
18-
/// Send events to a node, such as broadcasting a transaction.
19-
pub requester: Requester,
39+
pub struct EventListeners {
2040
/// Receive informational messages from the node.
2141
pub info_rx: mpsc::Receiver<Info>,
2242
/// Receive warning messages from a node.
2343
pub warn_rx: mpsc::UnboundedReceiver<Warning>,
2444
/// Receive [`Event`] from a node to act on.
2545
pub event_rx: mpsc::UnboundedReceiver<Event>,
26-
/// Internal node structure.
27-
node: Option<Node>,
2846
}
2947

30-
impl Client {
31-
pub(crate) fn new(
48+
impl EventListeners {
49+
fn new(
3250
info_rx: mpsc::Receiver<Info>,
3351
warn_rx: mpsc::UnboundedReceiver<Warning>,
3452
event_rx: mpsc::UnboundedReceiver<Event>,
35-
ntx: UnboundedSender<ClientMessage>,
36-
node: Node,
3753
) -> Self {
3854
Self {
39-
requester: Requester::new(ntx),
4055
info_rx,
4156
warn_rx,
4257
event_rx,
58+
}
59+
}
60+
}
61+
62+
/// A [`Client`] allows for communication with a running node.
63+
#[derive(Debug)]
64+
pub struct Client<S: State> {
65+
/// Send events to a node, such as broadcasting a transaction.
66+
ntx: UnboundedSender<ClientMessage>,
67+
/// Receive informational messages from the node.
68+
events: Option<EventListeners>,
69+
/// Internal node structure.
70+
node: Option<Node>,
71+
/// Marker for state.
72+
_marker: core::marker::PhantomData<S>,
73+
}
74+
75+
impl Client<Idle> {
76+
pub(crate) fn new(
77+
info_rx: mpsc::Receiver<Info>,
78+
warn_rx: mpsc::UnboundedReceiver<Warning>,
79+
event_rx: mpsc::UnboundedReceiver<Event>,
80+
ntx: UnboundedSender<ClientMessage>,
81+
node: Node,
82+
) -> Client<Idle> {
83+
Client {
84+
ntx,
85+
events: Some(EventListeners::new(info_rx, warn_rx, event_rx)),
4386
node: Some(node),
87+
_marker: core::marker::PhantomData,
4488
}
4589
}
4690

47-
/// Start the underlying node on a [`tokio::task`]. This assumes there is a runtime present to
48-
/// execute the task.
49-
pub fn run(mut self) -> Self {
50-
let node = core::mem::take(&mut self.node).expect("cannot call run twice.");
51-
tokio::task::spawn(async move { node.run().await });
52-
self
91+
/// Subscribe to the events published by the light client. Applications may perform arbitrary behavior
92+
/// when receiving these events, such as logging or applying the effect of a block to a wallet.
93+
/// The client is not yet running after this step.
94+
pub fn subscribe(mut self) -> (Client<Subscribed>, EventListeners) {
95+
let events = core::mem::take(&mut self.events).expect("cannot call run twice.");
96+
(
97+
Client {
98+
ntx: self.ntx,
99+
events: None,
100+
node: self.node,
101+
_marker: core::marker::PhantomData,
102+
},
103+
events,
104+
)
53105
}
54106
}
55107

56-
/// Send messages to a node that is running so the node may complete a task.
57-
#[derive(Debug, Clone)]
58-
pub struct Requester {
59-
ntx: UnboundedSender<ClientMessage>,
60-
}
108+
impl Client<Subscribed> {
109+
/// Start the client, which will begin publishing events to subscribers. This will implicitly
110+
/// spawn a [`tokio::task`] to fetch data for the client.
111+
pub fn start(mut self) -> Client<Active> {
112+
let node = core::mem::take(&mut self.node).expect("cannot call run twice.");
113+
tokio::task::spawn(async move { node.run().await });
114+
Client {
115+
ntx: self.ntx,
116+
events: None,
117+
node: None,
118+
_marker: core::marker::PhantomData,
119+
}
120+
}
61121

62-
impl Requester {
63-
fn new(ntx: UnboundedSender<ClientMessage>) -> Self {
64-
Self { ntx }
122+
/// Receive a [`Node`] to run on a dedicated resource, likely with a custom [`tokio::runtime::Runtime`].
123+
pub fn start_managed(mut self) -> (Client<Active>, Node) {
124+
let node = core::mem::take(&mut self.node).expect("cannot call run twice.");
125+
(
126+
Client {
127+
ntx: self.ntx,
128+
events: None,
129+
node: None,
130+
_marker: core::marker::PhantomData,
131+
},
132+
node,
133+
)
65134
}
135+
}
66136

137+
impl Client<Active> {
67138
/// Tell the node to shut down.
68139
///
69140
/// # Errors

src/lib.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
//! # Example usage
99
//!
1010
//! ```no_run
11-
//! use bip157::{Builder, Event, Client, Network, BlockHash};
11+
//! use bip157::{Builder, Event, EventListeners, Client, Network, BlockHash};
1212
//!
1313
//! #[tokio::main]
1414
//! async fn main() {
@@ -22,10 +22,12 @@
2222
//! // The number of connections we would like to maintain
2323
//! .required_peers(2)
2424
//! .build();
25-
//! // Run the node and wait for the sync message;
26-
//! let client = client.run();
25+
//! // Start the node
26+
//! let (client, events) = client.subscribe();
27+
//! let client = client.start();
2728
//! // Split the client into components that send messages and listen to messages
28-
//! let Client { requester, info_rx: _, warn_rx: _, mut event_rx, .. } = client;
29+
//! let EventListeners { info_rx: _, warn_rx: _, mut event_rx } = events;
30+
//! // Wait for the sync message;
2931
//! loop {
3032
//! if let Some(event) = event_rx.recv().await {
3133
//! match event {
@@ -37,7 +39,7 @@
3739
//! }
3840
//! }
3941
//! }
40-
//! requester.shutdown();
42+
//! client.shutdown();
4143
//! }
4244
//! ```
4345
@@ -81,7 +83,7 @@ use tokio::sync::mpsc::UnboundedSender;
8183
pub use {
8284
crate::builder::Builder,
8385
crate::chain::ChainState,
84-
crate::client::{Client, Requester},
86+
crate::client::{Client, EventListeners},
8587
crate::error::{ClientError, NodeError},
8688
crate::messages::{Event, Info, Progress, RejectPayload, SyncUpdate, Warning},
8789
crate::node::Node,

src/node.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use crate::{
3131
error::HeaderSyncError,
3232
CFHeaderChanges, ChainState, FilterCheck, HeightMonitor,
3333
},
34+
client::Idle,
3435
error::FetchBlockError,
3536
messages::ClientRequest,
3637
network::{
@@ -66,7 +67,7 @@ pub struct Node {
6667
}
6768

6869
impl Node {
69-
pub(crate) fn build(network: Network, config: Config) -> Client {
70+
pub(crate) fn build(network: Network, config: Config) -> Client<Idle> {
7071
let Config {
7172
required_peers,
7273
white_list,

0 commit comments

Comments
 (0)