Skip to content

Commit 652b2bc

Browse files
committed
Enforce client semantics with state
Uses the sealed trait hack to only allow for compiler enforced state transitions. The flow is `Idle -> Subscribed -> Actice`, where a user decides what to do with the events, runs the node, then gets access to all the usual client features. I decided to scrap the `Requester` and instead encapsulate the event receivers with a newtype. The only downside I see with this is `&mut self` doesn't seem to be possible, as each impl block is defined for it's `State` type. I need to fiddle with that to see if we can't remove the need to return the client each state transition.
1 parent c7e0e7c commit 652b2bc

File tree

7 files changed

+247
-187
lines changed

7 files changed

+247
-187
lines changed

examples/bitcoin.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
44
use bip157::builder::Builder;
55
use bip157::chain::{BlockHeaderChanges, ChainState};
6-
use bip157::{lookup_host, Client, Event, HeaderCheckpoint, Network, ScriptBuf};
6+
use bip157::client::EventListeners;
7+
use bip157::{lookup_host, Event, HeaderCheckpoint, Network, ScriptBuf};
78
use std::collections::HashSet;
89
use tokio::time::Instant;
910

@@ -35,17 +36,17 @@ async fn main() {
3536
// Create the node and client
3637
.build();
3738

38-
let client = client.run();
3939
// Split the client into components that send messages and listen to messages.
4040
// With this construction, different parts of the program can take ownership of
4141
// specific tasks.
42-
let Client {
43-
requester,
42+
let (client, events) = client.subscribe();
43+
let EventListeners {
4444
mut info_rx,
4545
mut warn_rx,
4646
mut event_rx,
47-
..
48-
} = client;
47+
} = events;
48+
let client = client.start();
49+
4950
// Continually listen for events until the node is synced to its peers.
5051
loop {
5152
tokio::select! {
@@ -55,11 +56,11 @@ async fn main() {
5556
Event::FiltersSynced(update) => {
5657
tracing::info!("Chain tip: {}",update.tip().hash);
5758
// Request information from the node
58-
let fee = requester.broadcast_min_feerate().await.unwrap();
59+
let fee = client.broadcast_min_feerate().await.unwrap();
5960
tracing::info!("Minimum transaction broadcast fee rate: {:#}", fee);
6061
let sync_time = now.elapsed().as_secs_f32();
6162
tracing::info!("Total sync time: {sync_time} seconds");
62-
let avg_fee_rate = requester.average_fee_rate(update.tip().hash).await.unwrap();
63+
let avg_fee_rate = client.average_fee_rate(update.tip().hash).await.unwrap();
6364
tracing::info!("Last block average fee rate: {:#}", avg_fee_rate);
6465
break;
6566
},
@@ -82,6 +83,6 @@ async fn main() {
8283
}
8384
}
8485
}
85-
let _ = requester.shutdown();
86+
let _ = client.shutdown();
8687
tracing::info!("Shutting down");
8788
}

examples/signet.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
44
use bip157::chain::{BlockHeaderChanges, ChainState};
55
use bip157::messages::Event;
6-
use bip157::{builder::Builder, chain::checkpoints::HeaderCheckpoint, Client};
7-
use bip157::{Address, BlockHash, Network};
6+
use bip157::{builder::Builder, chain::checkpoints::HeaderCheckpoint};
7+
use bip157::{Address, BlockHash, EventListeners, Network};
88
use std::collections::HashSet;
99
use std::str::FromStr;
1010

@@ -40,15 +40,13 @@ async fn main() {
4040
// Create the node and client
4141
.build();
4242

43-
let client = client.run();
44-
45-
let Client {
46-
requester,
43+
let (client, events) = client.subscribe();
44+
let EventListeners {
4745
mut info_rx,
4846
mut warn_rx,
4947
mut event_rx,
50-
..
51-
} = client;
48+
} = events;
49+
let client = client.start();
5250

5351
// Continually listen for events until the node is synced to its peers.
5452
loop {
@@ -72,7 +70,7 @@ async fn main() {
7270
if filter.contains_any(addresses.iter()) {
7371
let hash = filter.block_hash();
7472
tracing::info!("Found script at {}!", hash);
75-
let indexed_block = requester.get_block(hash).await.unwrap();
73+
let indexed_block = client.get_block(hash).await.unwrap();
7674
let coinbase = indexed_block.block.txdata.first().unwrap().compute_txid();
7775
tracing::info!("Coinbase transaction ID: {}", coinbase);
7876
break;
@@ -88,6 +86,6 @@ async fn main() {
8886
}
8987
}
9088
}
91-
let _ = requester.shutdown();
89+
let _ = client.shutdown();
9290
tracing::info!("Shutting down");
9391
}

src/builder.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use bitcoin::Network;
55

66
use super::{client::Client, node::Node};
77
use crate::chain::ChainState;
8+
use crate::client::Idle;
89
use crate::network::ConnectionType;
910
use crate::TrustedPeer;
1011
use crate::{Config, FilterType};
@@ -139,7 +140,7 @@ impl Builder {
139140
}
140141

141142
/// Consume the node builder and receive a [`Client`].
142-
pub fn build(mut self) -> Client {
143+
pub fn build(mut self) -> Client<Idle> {
143144
Node::build(self.network, core::mem::take(&mut self.config))
144145
}
145146
}

src/client.rs

Lines changed: 102 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12,58 +12,135 @@ use crate::{Event, Info, TrustedPeer, Warning};
1212
use super::{error::ClientError, messages::ClientMessage};
1313
use super::{error::FetchBlockError, IndexedBlock};
1414

15-
/// A [`Client`] allows for communication with a running node.
15+
/// Client state when idle.
16+
pub struct Idle;
17+
/// Client state when subscribed to events.
18+
pub struct Subscribed;
19+
/// Client state when active.
20+
pub struct Active;
21+
22+
mod sealed {
23+
pub trait Sealed {}
24+
}
25+
26+
impl sealed::Sealed for Idle {}
27+
impl sealed::Sealed for Subscribed {}
28+
impl sealed::Sealed for Active {}
29+
30+
/// State of the client.
31+
pub trait State: sealed::Sealed {}
32+
33+
impl State for Idle {}
34+
impl State for Subscribed {}
35+
impl State for Active {}
36+
37+
/// Wrapper type for the channels that will receive events.
1638
#[derive(Debug)]
17-
pub struct Client {
18-
/// Send events to a node, such as broadcasting a transaction.
19-
pub requester: Requester,
39+
pub struct EventListeners {
2040
/// Receive informational messages from the node.
2141
pub info_rx: mpsc::Receiver<Info>,
2242
/// Receive warning messages from a node.
2343
pub warn_rx: mpsc::UnboundedReceiver<Warning>,
2444
/// Receive [`Event`] from a node to act on.
2545
pub event_rx: mpsc::UnboundedReceiver<Event>,
26-
/// Internal node structure.
27-
node: Option<Node>,
2846
}
2947

30-
impl Client {
31-
pub(crate) fn new(
48+
impl EventListeners {
49+
fn new(
3250
info_rx: mpsc::Receiver<Info>,
3351
warn_rx: mpsc::UnboundedReceiver<Warning>,
3452
event_rx: mpsc::UnboundedReceiver<Event>,
35-
ntx: UnboundedSender<ClientMessage>,
36-
node: Node,
3753
) -> Self {
3854
Self {
39-
requester: Requester::new(ntx),
4055
info_rx,
4156
warn_rx,
4257
event_rx,
58+
}
59+
}
60+
}
61+
62+
/// A [`Client`] allows for communication with a running node.
63+
///
64+
/// The [`Client`] is generic over 3 states:
65+
/// - [`Idle`]: the client is not running and event handling has not been initialized.
66+
/// - [`Subscribed`]: events have been subscribed to in the program, but the client has not started.
67+
/// - [`Active`]: data is actively being fetched and the [`Client`] may perform actions.
68+
///
69+
#[derive(Debug)]
70+
pub struct Client<S: State> {
71+
/// Send events to a node, such as broadcasting a transaction.
72+
ntx: UnboundedSender<ClientMessage>,
73+
/// Receive informational messages from the node.
74+
events: Option<EventListeners>,
75+
/// Internal node structure.
76+
node: Option<Node>,
77+
/// Marker for state.
78+
_marker: core::marker::PhantomData<S>,
79+
}
80+
81+
impl Client<Idle> {
82+
pub(crate) fn new(
83+
info_rx: mpsc::Receiver<Info>,
84+
warn_rx: mpsc::UnboundedReceiver<Warning>,
85+
event_rx: mpsc::UnboundedReceiver<Event>,
86+
ntx: UnboundedSender<ClientMessage>,
87+
node: Node,
88+
) -> Client<Idle> {
89+
Client {
90+
ntx,
91+
events: Some(EventListeners::new(info_rx, warn_rx, event_rx)),
4392
node: Some(node),
93+
_marker: core::marker::PhantomData,
4494
}
4595
}
4696

47-
/// Start the underlying node on a [`tokio::task`]. This assumes there is a runtime present to
48-
/// execute the task.
49-
pub fn run(mut self) -> Self {
50-
let node = core::mem::take(&mut self.node).expect("cannot call run twice.");
51-
tokio::task::spawn(async move { node.run().await });
52-
self
97+
/// Subscribe to the events published by the light client. Applications may perform arbitrary behavior
98+
/// when receiving these events, such as logging or applying the effect of a block to a wallet.
99+
/// The client is not yet running after this step.
100+
pub fn subscribe(mut self) -> (Client<Subscribed>, EventListeners) {
101+
let events = core::mem::take(&mut self.events).expect("cannot call run twice.");
102+
(
103+
Client {
104+
ntx: self.ntx,
105+
events: None,
106+
node: self.node,
107+
_marker: core::marker::PhantomData,
108+
},
109+
events,
110+
)
53111
}
54112
}
55113

56-
/// Send messages to a node that is running so the node may complete a task.
57-
#[derive(Debug, Clone)]
58-
pub struct Requester {
59-
ntx: UnboundedSender<ClientMessage>,
60-
}
114+
impl Client<Subscribed> {
115+
/// Start the client, which will begin publishing events to subscribers. This will implicitly
116+
/// spawn a [`tokio::task`] to fetch data for the client.
117+
pub fn start(mut self) -> Client<Active> {
118+
let node = core::mem::take(&mut self.node).expect("cannot call run twice.");
119+
tokio::task::spawn(async move { node.run().await });
120+
Client {
121+
ntx: self.ntx,
122+
events: None,
123+
node: None,
124+
_marker: core::marker::PhantomData,
125+
}
126+
}
61127

62-
impl Requester {
63-
fn new(ntx: UnboundedSender<ClientMessage>) -> Self {
64-
Self { ntx }
128+
/// Receive a [`Node`] to run on a dedicated resource, likely with a custom [`tokio::runtime::Runtime`].
129+
pub fn start_managed(mut self) -> (Client<Active>, Node) {
130+
let node = core::mem::take(&mut self.node).expect("cannot call run twice.");
131+
(
132+
Client {
133+
ntx: self.ntx,
134+
events: None,
135+
node: None,
136+
_marker: core::marker::PhantomData,
137+
},
138+
node,
139+
)
65140
}
141+
}
66142

143+
impl Client<Active> {
67144
/// Tell the node to shut down.
68145
///
69146
/// # Errors

src/lib.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
//! # Example usage
99
//!
1010
//! ```no_run
11-
//! use bip157::{Builder, Event, Client, Network, BlockHash};
11+
//! use bip157::{Builder, Event, EventListeners, Client, Network, BlockHash};
1212
//!
1313
//! #[tokio::main]
1414
//! async fn main() {
@@ -22,10 +22,12 @@
2222
//! // The number of connections we would like to maintain
2323
//! .required_peers(2)
2424
//! .build();
25-
//! // Run the node and wait for the sync message;
26-
//! let client = client.run();
25+
//! // Start the node
26+
//! let (client, events) = client.subscribe();
27+
//! let client = client.start();
2728
//! // Split the client into components that send messages and listen to messages
28-
//! let Client { requester, info_rx: _, warn_rx: _, mut event_rx, .. } = client;
29+
//! let EventListeners { info_rx: _, warn_rx: _, mut event_rx } = events;
30+
//! // Wait for the sync message;
2931
//! loop {
3032
//! if let Some(event) = event_rx.recv().await {
3133
//! match event {
@@ -37,7 +39,7 @@
3739
//! }
3840
//! }
3941
//! }
40-
//! requester.shutdown();
42+
//! client.shutdown();
4143
//! }
4244
//! ```
4345
@@ -81,7 +83,7 @@ use tokio::sync::mpsc::UnboundedSender;
8183
pub use {
8284
crate::builder::Builder,
8385
crate::chain::ChainState,
84-
crate::client::{Client, Requester},
86+
crate::client::{Client, EventListeners},
8587
crate::error::{ClientError, NodeError},
8688
crate::messages::{Event, Info, Progress, RejectPayload, SyncUpdate, Warning},
8789
crate::node::Node,

src/node.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use crate::{
3131
error::HeaderSyncError,
3232
CFHeaderChanges, ChainState, FilterCheck, HeightMonitor,
3333
},
34+
client::Idle,
3435
error::FetchBlockError,
3536
messages::ClientRequest,
3637
network::{
@@ -66,7 +67,7 @@ pub struct Node {
6667
}
6768

6869
impl Node {
69-
pub(crate) fn build(network: Network, config: Config) -> Client {
70+
pub(crate) fn build(network: Network, config: Config) -> Client<Idle> {
7071
let Config {
7172
required_peers,
7273
white_list,

0 commit comments

Comments
 (0)