Skip to content

Commit 150d6f8

Browse files
committed
feat(example_bitcoind_rpc_polling): add example for RPC polling
1 parent 4f10463 commit 150d6f8

File tree

3 files changed

+379
-0
lines changed

3 files changed

+379
-0
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ members = [
1010
"example-crates/example_cli",
1111
"example-crates/example_electrum",
1212
"example-crates/example_esplora",
13+
"example-crates/example_bitcoind_rpc_polling",
1314
"example-crates/wallet_electrum",
1415
"example-crates/wallet_esplora_blocking",
1516
"example-crates/wallet_esplora_async",
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
[package]
2+
name = "example_bitcoind_rpc_polling"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
7+
8+
[dependencies]
9+
bdk_chain = { path = "../../crates/chain", features = ["serde"] }
10+
bdk_bitcoind_rpc = { path = "../../crates/bitcoind_rpc" }
11+
example_cli = { path = "../example_cli" }
12+
ctrlc = { version = "^2" }
Lines changed: 366 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,366 @@
1+
use std::{
2+
path::PathBuf,
3+
sync::{
4+
atomic::{AtomicBool, Ordering},
5+
Arc, Mutex,
6+
},
7+
time::{Duration, Instant},
8+
};
9+
10+
use bdk_bitcoind_rpc::{
11+
bitcoincore_rpc::{Auth, Client, RpcApi},
12+
Emitter,
13+
};
14+
use bdk_chain::{
15+
bitcoin::{Block, Transaction},
16+
indexed_tx_graph, keychain,
17+
local_chain::{self, CheckPoint, LocalChain},
18+
ConfirmationTimeAnchor, IndexedTxGraph,
19+
};
20+
use example_cli::{
21+
anyhow,
22+
clap::{self, Args, Subcommand},
23+
Keychain,
24+
};
25+
26+
const DB_MAGIC: &[u8] = b"bdk_example_rpc";
27+
const DB_PATH: &str = ".bdk_example_rpc.db";
28+
29+
const CHANNEL_BOUND: usize = 10;
30+
/// The block depth which we assume no reorgs can happen at.
31+
const ASSUME_FINAL_DEPTH: u32 = 6;
32+
/// Delay for printing status to stdout.
33+
const STDOUT_PRINT_DELAY: Duration = Duration::from_secs(6);
34+
/// Delay between mempool emissions.
35+
const MEMPOOL_EMIT_DELAY: Duration = Duration::from_secs(30);
36+
/// Delay for commiting to persistance.
37+
const DB_COMMIT_DELAY: Duration = Duration::from_secs(60);
38+
39+
type ChangeSet = (
40+
local_chain::ChangeSet,
41+
indexed_tx_graph::ChangeSet<ConfirmationTimeAnchor, keychain::ChangeSet<Keychain>>,
42+
);
43+
44+
#[derive(Debug)]
45+
enum Emission {
46+
Block { height: u32, block: Block },
47+
Mempool(Vec<(Transaction, u64)>),
48+
Tip(u32),
49+
}
50+
51+
#[derive(Args, Debug, Clone)]
52+
struct RpcArgs {
53+
/// RPC URL
54+
#[clap(env = "RPC_URL", long, default_value = "127.0.0.1:8332")]
55+
url: String,
56+
/// RPC auth cookie file
57+
#[clap(env = "RPC_COOKIE", long)]
58+
rpc_cookie: Option<PathBuf>,
59+
/// RPC auth username
60+
#[clap(env = "RPC_USER", long)]
61+
rpc_user: Option<String>,
62+
/// RPC auth password
63+
#[clap(env = "RPC_PASS", long)]
64+
rpc_password: Option<String>,
65+
/// Starting block height to fallback to if no point of agreement if found
66+
#[clap(env = "FALLBACK_HEIGHT", long, default_value = "0")]
67+
fallback_height: u32,
68+
/// The unused-scripts lookahead will be kept at this size
69+
#[clap(long, default_value = "10")]
70+
lookahead: u32,
71+
}
72+
73+
impl From<RpcArgs> for Auth {
74+
fn from(args: RpcArgs) -> Self {
75+
match (args.rpc_cookie, args.rpc_user, args.rpc_password) {
76+
(None, None, None) => Self::None,
77+
(Some(path), _, _) => Self::CookieFile(path),
78+
(_, Some(user), Some(pass)) => Self::UserPass(user, pass),
79+
(_, Some(_), None) => panic!("rpc auth: missing rpc_pass"),
80+
(_, None, Some(_)) => panic!("rpc auth: missing rpc_user"),
81+
}
82+
}
83+
}
84+
85+
impl RpcArgs {
86+
fn new_client(&self) -> anyhow::Result<Client> {
87+
Ok(Client::new(
88+
&self.url,
89+
match (&self.rpc_cookie, &self.rpc_user, &self.rpc_password) {
90+
(None, None, None) => Auth::None,
91+
(Some(path), _, _) => Auth::CookieFile(path.clone()),
92+
(_, Some(user), Some(pass)) => Auth::UserPass(user.clone(), pass.clone()),
93+
(_, Some(_), None) => panic!("rpc auth: missing rpc_pass"),
94+
(_, None, Some(_)) => panic!("rpc auth: missing rpc_user"),
95+
},
96+
)?)
97+
}
98+
}
99+
100+
#[derive(Subcommand, Debug, Clone)]
101+
enum RpcCommands {
102+
/// Syncs local state with remote state via RPC (starting from last point of agreement) and
103+
/// stores/indexes relevant transactions
104+
Sync {
105+
#[clap(flatten)]
106+
rpc_args: RpcArgs,
107+
},
108+
/// Sync by having the emitter logic in a separate thread
109+
Live {
110+
#[clap(flatten)]
111+
rpc_args: RpcArgs,
112+
},
113+
}
114+
115+
fn main() -> anyhow::Result<()> {
116+
let (args, keymap, index, db, init_changeset) =
117+
example_cli::init::<RpcCommands, RpcArgs, ChangeSet>(DB_MAGIC, DB_PATH)?;
118+
119+
let graph = Mutex::new({
120+
let mut graph = IndexedTxGraph::new(index);
121+
graph.apply_changeset(init_changeset.1);
122+
graph
123+
});
124+
println!("loaded indexed tx graph from db");
125+
126+
let chain = Mutex::new(LocalChain::from_changeset(init_changeset.0));
127+
println!("loaded local chain from db");
128+
129+
let rpc_cmd = match args.command {
130+
example_cli::Commands::ChainSpecific(rpc_cmd) => rpc_cmd,
131+
general_cmd => {
132+
let res = example_cli::handle_commands(
133+
&graph,
134+
&db,
135+
&chain,
136+
&keymap,
137+
args.network,
138+
|rpc_args, tx| {
139+
let client = rpc_args.new_client()?;
140+
client.send_raw_transaction(tx)?;
141+
Ok(())
142+
},
143+
general_cmd,
144+
);
145+
db.lock().unwrap().commit()?;
146+
return res;
147+
}
148+
};
149+
150+
match rpc_cmd {
151+
RpcCommands::Sync { rpc_args } => {
152+
let RpcArgs {
153+
fallback_height,
154+
lookahead,
155+
..
156+
} = rpc_args;
157+
158+
let mut chain = chain.lock().unwrap();
159+
let mut graph = graph.lock().unwrap();
160+
let mut db = db.lock().unwrap();
161+
162+
graph.index.set_lookahead_for_all(lookahead);
163+
// we start at a height lower than last-seen tip in case of reorgs
164+
let start_height = chain.tip().as_ref().map_or(fallback_height, |cp| {
165+
cp.height().saturating_sub(ASSUME_FINAL_DEPTH)
166+
});
167+
168+
let rpc_client = rpc_args.new_client()?;
169+
let mut emitter = Emitter::new(&rpc_client, start_height);
170+
171+
let mut last_db_commit = Instant::now();
172+
let mut last_print = Instant::now();
173+
174+
while let Some((height, block)) = emitter.next_block()? {
175+
let chain_update =
176+
CheckPoint::from_header(&block.header, height).into_update(false);
177+
let chain_changeset = chain.apply_update(chain_update)?;
178+
let graph_changeset = graph.apply_block_relevant(block, height);
179+
db.stage((chain_changeset, graph_changeset));
180+
181+
// commit staged db changes in intervals
182+
if last_db_commit.elapsed() >= DB_COMMIT_DELAY {
183+
last_db_commit = Instant::now();
184+
db.commit()?;
185+
println!(
186+
"commited to db (took {}s)",
187+
last_db_commit.elapsed().as_secs_f32()
188+
);
189+
}
190+
191+
// print synced-to height and current balance in intervals
192+
if last_print.elapsed() >= STDOUT_PRINT_DELAY {
193+
last_print = Instant::now();
194+
if let Some(synced_to) = chain.tip() {
195+
let balance = {
196+
graph.graph().balance(
197+
&*chain,
198+
synced_to.block_id(),
199+
graph.index.outpoints().iter().cloned(),
200+
|(k, _), _| k == &Keychain::Internal,
201+
)
202+
};
203+
println!(
204+
"synced to {} @ {} | total: {} sats",
205+
synced_to.hash(),
206+
synced_to.height(),
207+
balance.total()
208+
);
209+
}
210+
}
211+
}
212+
213+
// mempool
214+
let mempool_txs = emitter.mempool()?;
215+
let graph_changeset = graph
216+
.batch_insert_unconfirmed(mempool_txs.iter().map(|(tx, time)| (tx, Some(*time))));
217+
db.stage((local_chain::ChangeSet::default(), graph_changeset));
218+
219+
// commit one last time!
220+
db.commit()?;
221+
}
222+
RpcCommands::Live { rpc_args } => {
223+
let RpcArgs {
224+
fallback_height,
225+
lookahead,
226+
..
227+
} = rpc_args;
228+
let sigterm_flag = start_ctrlc_handler();
229+
230+
graph.lock().unwrap().index.set_lookahead_for_all(lookahead);
231+
// we start at a height lower than last-seen tip in case of reorgs
232+
let start_height = chain.lock().unwrap().tip().map_or(fallback_height, |cp| {
233+
cp.height().saturating_sub(ASSUME_FINAL_DEPTH)
234+
});
235+
236+
let (tx, rx) = std::sync::mpsc::sync_channel::<Emission>(CHANNEL_BOUND);
237+
let emission_jh = std::thread::spawn(move || -> anyhow::Result<()> {
238+
println!("emitter thread started...");
239+
240+
let rpc_client = rpc_args.new_client()?;
241+
let mut emitter = Emitter::new(&rpc_client, start_height);
242+
243+
let mut block_count = rpc_client.get_block_count()? as u32;
244+
tx.send(Emission::Tip(block_count))?;
245+
246+
loop {
247+
match emitter.next_block()? {
248+
Some((height, block)) => {
249+
if sigterm_flag.load(Ordering::Acquire) {
250+
break;
251+
}
252+
if height > block_count {
253+
block_count = rpc_client.get_block_count()? as u32;
254+
tx.send(Emission::Tip(block_count))?;
255+
}
256+
tx.send(Emission::Block { height, block })?;
257+
}
258+
None => {
259+
if await_flag(&sigterm_flag, MEMPOOL_EMIT_DELAY) {
260+
break;
261+
}
262+
println!("preparing mempool emission...");
263+
let now = Instant::now();
264+
tx.send(Emission::Mempool(emitter.mempool()?))?;
265+
println!("mempool emission prepared in {}s", now.elapsed().as_secs());
266+
continue;
267+
}
268+
};
269+
}
270+
271+
println!("emitter thread shutting down...");
272+
Ok(())
273+
});
274+
275+
let mut db = db.lock().unwrap();
276+
let mut graph = graph.lock().unwrap();
277+
let mut chain = chain.lock().unwrap();
278+
let mut tip_height = 0_u32;
279+
280+
let mut last_db_commit = Instant::now();
281+
let mut last_print = Option::<Instant>::None;
282+
283+
for emission in rx {
284+
let changeset = match emission {
285+
Emission::Block { height, block } => {
286+
let chain_update =
287+
CheckPoint::from_header(&block.header, height).into_update(false);
288+
let chain_changeset = chain.apply_update(chain_update)?;
289+
let graph_changeset = graph.apply_block_relevant(block, height);
290+
(chain_changeset, graph_changeset)
291+
}
292+
Emission::Mempool(mempool_txs) => {
293+
let graph_changeset = graph.batch_insert_relevant_unconfirmed(
294+
mempool_txs.iter().map(|(tx, time)| (tx, Some(*time))),
295+
);
296+
(local_chain::ChangeSet::default(), graph_changeset)
297+
}
298+
Emission::Tip(h) => {
299+
tip_height = h;
300+
continue;
301+
}
302+
};
303+
304+
db.stage(changeset);
305+
306+
if last_db_commit.elapsed() >= DB_COMMIT_DELAY {
307+
last_db_commit = Instant::now();
308+
db.commit()?;
309+
println!(
310+
"commited to db (took {}s)",
311+
last_db_commit.elapsed().as_secs_f32()
312+
);
313+
}
314+
315+
if last_print.map_or(Duration::MAX, |i| i.elapsed()) >= STDOUT_PRINT_DELAY {
316+
last_print = Some(Instant::now());
317+
if let Some(synced_to) = chain.tip() {
318+
let balance = {
319+
graph.graph().balance(
320+
&*chain,
321+
synced_to.block_id(),
322+
graph.index.outpoints().iter().cloned(),
323+
|(k, _), _| k == &Keychain::Internal,
324+
)
325+
};
326+
println!(
327+
"synced to {} @ {} / {} | total: {} sats",
328+
synced_to.hash(),
329+
synced_to.height(),
330+
tip_height,
331+
balance.total()
332+
);
333+
}
334+
}
335+
}
336+
337+
emission_jh.join().expect("must join emitter thread")?;
338+
}
339+
}
340+
341+
Ok(())
342+
}
343+
344+
#[allow(dead_code)]
345+
fn start_ctrlc_handler() -> Arc<AtomicBool> {
346+
let flag = Arc::new(AtomicBool::new(false));
347+
let cloned_flag = flag.clone();
348+
349+
ctrlc::set_handler(move || cloned_flag.store(true, Ordering::Release));
350+
351+
flag
352+
}
353+
354+
#[allow(dead_code)]
355+
fn await_flag(flag: &AtomicBool, duration: Duration) -> bool {
356+
let start = Instant::now();
357+
loop {
358+
if flag.load(Ordering::Acquire) {
359+
return true;
360+
}
361+
if start.elapsed() >= duration {
362+
return false;
363+
}
364+
std::thread::sleep(Duration::from_secs(1));
365+
}
366+
}

0 commit comments

Comments
 (0)