Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions examples/bitcoin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

use bip157::builder::Builder;
use bip157::chain::{BlockHeaderChanges, ChainState};
use bip157::{lookup_host, Client, Event, HeaderCheckpoint, Network, ScriptBuf};
use bip157::client::EventListeners;
use bip157::{lookup_host, Event, HeaderCheckpoint, Network, ScriptBuf};
use std::collections::HashSet;
use tokio::time::Instant;

Expand All @@ -23,7 +24,7 @@ async fn main() {
// Create a new node builder
let builder = Builder::new(NETWORK);
// Add node preferences and build the node/client
let (node, client) = builder
let client = builder
// The number of connections we would like to maintain
.required_peers(2)
// Only scan for taproot scripts
Expand All @@ -34,17 +35,18 @@ async fn main() {
.add_peers(seeds.into_iter().map(From::from))
// Create the node and client
.build();
// Run the node on a separate task
tokio::task::spawn(async move { node.run().await });

// Split the client into components that send messages and listen to messages.
// With this construction, different parts of the program can take ownership of
// specific tasks.
let Client {
requester,
let (client, events) = client.subscribe();
let EventListeners {
mut info_rx,
mut warn_rx,
mut event_rx,
} = client;
} = events;
let client = client.start();

// Continually listen for events until the node is synced to its peers.
loop {
tokio::select! {
Expand All @@ -54,11 +56,11 @@ async fn main() {
Event::FiltersSynced(update) => {
tracing::info!("Chain tip: {}",update.tip().hash);
// Request information from the node
let fee = requester.broadcast_min_feerate().await.unwrap();
let fee = client.broadcast_min_feerate().await.unwrap();
tracing::info!("Minimum transaction broadcast fee rate: {:#}", fee);
let sync_time = now.elapsed().as_secs_f32();
tracing::info!("Total sync time: {sync_time} seconds");
let avg_fee_rate = requester.average_fee_rate(update.tip().hash).await.unwrap();
let avg_fee_rate = client.average_fee_rate(update.tip().hash).await.unwrap();
tracing::info!("Last block average fee rate: {:#}", avg_fee_rate);
break;
},
Expand All @@ -81,6 +83,6 @@ async fn main() {
}
}
}
let _ = requester.shutdown();
let _ = client.shutdown();
tracing::info!("Shutting down");
}
19 changes: 9 additions & 10 deletions examples/signet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

use bip157::chain::{BlockHeaderChanges, ChainState};
use bip157::messages::Event;
use bip157::{builder::Builder, chain::checkpoints::HeaderCheckpoint, Client};
use bip157::{Address, BlockHash, Network};
use bip157::{builder::Builder, chain::checkpoints::HeaderCheckpoint};
use bip157::{Address, BlockHash, EventListeners, Network};
use std::collections::HashSet;
use std::str::FromStr;

Expand Down Expand Up @@ -32,22 +32,21 @@ async fn main() {
// Create a new node builder
let builder = Builder::new(NETWORK);
// Add node preferences and build the node/client
let (node, client) = builder
let client = builder
// Only scan blocks strictly after a checkpoint
.chain_state(ChainState::Checkpoint(checkpoint))
// The number of connections we would like to maintain
.required_peers(1)
// Create the node and client
.build();

tokio::task::spawn(async move { node.run().await });

let Client {
requester,
let (client, events) = client.subscribe();
let EventListeners {
mut info_rx,
mut warn_rx,
mut event_rx,
} = client;
} = events;
let client = client.start();

// Continually listen for events until the node is synced to its peers.
loop {
Expand All @@ -71,7 +70,7 @@ async fn main() {
if filter.contains_any(addresses.iter()) {
let hash = filter.block_hash();
tracing::info!("Found script at {}!", hash);
let indexed_block = requester.get_block(hash).await.unwrap();
let indexed_block = client.get_block(hash).await.unwrap();
let coinbase = indexed_block.block.txdata.first().unwrap().compute_txid();
tracing::info!("Coinbase transaction ID: {}", coinbase);
break;
Expand All @@ -87,6 +86,6 @@ async fn main() {
}
}
}
let _ = requester.shutdown();
let _ = client.shutdown();
tracing::info!("Shutting down");
}
9 changes: 5 additions & 4 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use bitcoin::Network;

use super::{client::Client, node::Node};
use crate::chain::ChainState;
use crate::client::Idle;
use crate::network::ConnectionType;
use crate::TrustedPeer;
use crate::{Config, FilterType};
Expand All @@ -25,7 +26,7 @@ const MAX_PEERS: u8 = 15;
///
/// let host = (IpAddr::from(Ipv4Addr::new(0, 0, 0, 0)), None);
/// let builder = Builder::new(Network::Regtest);
/// let (node, client) = builder
/// let client = builder
/// .add_peers(vec![host.into()])
/// .build();
/// ```
Expand Down Expand Up @@ -138,8 +139,8 @@ impl Builder {
self
}

/// Consume the node builder and receive a [`Node`] and [`Client`].
pub fn build(mut self) -> (Node, Client) {
Node::new(self.network, core::mem::take(&mut self.config))
/// Consume the node builder and receive a [`Client`].
pub fn build(mut self) -> Client<Idle> {
Node::build(self.network, core::mem::take(&mut self.config))
}
}
118 changes: 104 additions & 14 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,37 @@ use tokio::sync::oneshot;

use crate::chain::block_subsidy;
use crate::messages::ClientRequest;
use crate::node::Node;
use crate::{Event, Info, TrustedPeer, Warning};

use super::{error::ClientError, messages::ClientMessage};
use super::{error::FetchBlockError, IndexedBlock};

/// A [`Client`] allows for communication with a running node.
/// Client state when idle.
pub struct Idle;
/// Client state when subscribed to events.
pub struct Subscribed;
/// Client state when active.
pub struct Active;

mod sealed {
pub trait Sealed {}
}

impl sealed::Sealed for Idle {}
impl sealed::Sealed for Subscribed {}
impl sealed::Sealed for Active {}

/// State of the client.
pub trait State: sealed::Sealed {}

impl State for Idle {}
impl State for Subscribed {}
impl State for Active {}

/// Wrapper type for the channels that will receive events.
#[derive(Debug)]
pub struct Client {
/// Send events to a node, such as broadcasting a transaction.
pub requester: Requester,
pub struct EventListeners {
/// Receive informational messages from the node.
pub info_rx: mpsc::Receiver<Info>,
/// Receive warning messages from a node.
Expand All @@ -24,33 +45,102 @@ pub struct Client {
pub event_rx: mpsc::UnboundedReceiver<Event>,
}

impl Client {
pub(crate) fn new(
impl EventListeners {
fn new(
info_rx: mpsc::Receiver<Info>,
warn_rx: mpsc::UnboundedReceiver<Warning>,
event_rx: mpsc::UnboundedReceiver<Event>,
ntx: UnboundedSender<ClientMessage>,
) -> Self {
Self {
requester: Requester::new(ntx),
info_rx,
warn_rx,
event_rx,
}
}
}

/// Send messages to a node that is running so the node may complete a task.
#[derive(Debug, Clone)]
pub struct Requester {
/// A [`Client`] allows for communication with a running node.
///
/// The [`Client`] is generic over 3 states:
/// - [`Idle`]: the client is not running and event handling has not been initialized.
/// - [`Subscribed`]: events have been subscribed to in the program, but the client has not started.
/// - [`Active`]: data is actively being fetched and the [`Client`] may perform actions.
///
#[derive(Debug)]
pub struct Client<S: State> {
/// Send events to a node, such as broadcasting a transaction.
ntx: UnboundedSender<ClientMessage>,
/// Receive informational messages from the node.
events: Option<EventListeners>,
/// Internal node structure.
node: Option<Node>,
/// Marker for state.
_marker: core::marker::PhantomData<S>,
}

impl Requester {
fn new(ntx: UnboundedSender<ClientMessage>) -> Self {
Self { ntx }
impl Client<Idle> {
pub(crate) fn new(
info_rx: mpsc::Receiver<Info>,
warn_rx: mpsc::UnboundedReceiver<Warning>,
event_rx: mpsc::UnboundedReceiver<Event>,
ntx: UnboundedSender<ClientMessage>,
node: Node,
) -> Client<Idle> {
Client {
ntx,
events: Some(EventListeners::new(info_rx, warn_rx, event_rx)),
node: Some(node),
_marker: core::marker::PhantomData,
}
}

/// Subscribe to the events published by the light client. Applications may perform arbitrary behavior
/// when receiving these events, such as logging or applying the effect of a block to a wallet.
/// The client is not yet running after this step.
pub fn subscribe(mut self) -> (Client<Subscribed>, EventListeners) {
let events = core::mem::take(&mut self.events).expect("cannot call run twice.");
(
Client {
ntx: self.ntx,
events: None,
node: self.node,
_marker: core::marker::PhantomData,
},
events,
)
}
}

impl Client<Subscribed> {
/// Start the client, which will begin publishing events to subscribers. This will implicitly
/// spawn a [`tokio::task`] to fetch data for the client.
pub fn start(mut self) -> Client<Active> {
let node = core::mem::take(&mut self.node).expect("cannot call run twice.");
tokio::task::spawn(async move { node.run().await });
Client {
ntx: self.ntx,
events: None,
node: None,
_marker: core::marker::PhantomData,
}
}

/// Receive a [`Node`] to run on a dedicated resource, likely with a custom [`tokio::runtime::Runtime`].
pub fn start_managed(mut self) -> (Client<Active>, Node) {
let node = core::mem::take(&mut self.node).expect("cannot call run twice.");
(
Client {
ntx: self.ntx,
events: None,
node: None,
_marker: core::marker::PhantomData,
},
node,
)
}
}

impl Client<Active> {
/// Tell the node to shut down.
///
/// # Errors
Expand Down
16 changes: 9 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
//! # Example usage
//!
//! ```no_run
//! use bip157::{Builder, Event, Client, Network, BlockHash};
//! use bip157::{Builder, Event, EventListeners, Client, Network, BlockHash};
//!
//! #[tokio::main]
//! async fn main() {
Expand All @@ -18,14 +18,16 @@
//! // Create a new node builder
//! let builder = Builder::new(Network::Signet);
//! // Add node preferences and build the node/client
//! let (node, client) = builder
//! let client = builder
//! // The number of connections we would like to maintain
//! .required_peers(2)
//! .build();
//! // Run the node and wait for the sync message;
//! tokio::task::spawn(async move { node.run().await });
//! // Start the node
//! let (client, events) = client.subscribe();
//! let client = client.start();
//! // Split the client into components that send messages and listen to messages
//! let Client { requester, info_rx: _, warn_rx: _, mut event_rx } = client;
//! let EventListeners { info_rx: _, warn_rx: _, mut event_rx } = events;
//! // Wait for the sync message;
//! loop {
//! if let Some(event) = event_rx.recv().await {
//! match event {
Expand All @@ -37,7 +39,7 @@
//! }
//! }
//! }
//! requester.shutdown();
//! client.shutdown();
//! }
//! ```

Expand Down Expand Up @@ -81,7 +83,7 @@ use tokio::sync::mpsc::UnboundedSender;
pub use {
crate::builder::Builder,
crate::chain::ChainState,
crate::client::{Client, Requester},
crate::client::{Client, EventListeners},
crate::error::{ClientError, NodeError},
crate::messages::{Event, Info, Progress, RejectPayload, SyncUpdate, Warning},
crate::node::Node,
Expand Down
Loading