Skip to content

Commit b69c13d

Browse files
committed
example_bitcoind_rpc: tweaks
* avoid holding mutex lock over io * document `CHANNEL_BOUND` const * use the `relevant` variant of `batch_insert_unconfirmed` * print elapsed time in stdout for various updates
1 parent 5f34df8 commit b69c13d

File tree

1 file changed

+47
-25
lines changed
  • example-crates/example_bitcoind_rpc_polling/src

1 file changed

+47
-25
lines changed

example-crates/example_bitcoind_rpc_polling/src/main.rs

Lines changed: 47 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,13 @@ use example_cli::{
2626
const DB_MAGIC: &[u8] = b"bdk_example_rpc";
2727
const DB_PATH: &str = ".bdk_example_rpc.db";
2828

29+
/// The mpsc channel bound for emissions from [`Emitter`].
2930
const CHANNEL_BOUND: usize = 10;
3031
/// Delay for printing status to stdout.
3132
const STDOUT_PRINT_DELAY: Duration = Duration::from_secs(6);
3233
/// Delay between mempool emissions.
3334
const MEMPOOL_EMIT_DELAY: Duration = Duration::from_secs(30);
34-
/// Delay for commiting to persistance.
35+
/// Delay for committing to persistance.
3536
const DB_COMMIT_DELAY: Duration = Duration::from_secs(60);
3637

3738
type ChangeSet = (
@@ -111,18 +112,30 @@ enum RpcCommands {
111112
}
112113

113114
fn main() -> anyhow::Result<()> {
115+
let start = Instant::now();
116+
114117
let (args, keymap, index, db, init_changeset) =
115118
example_cli::init::<RpcCommands, RpcArgs, ChangeSet>(DB_MAGIC, DB_PATH)?;
119+
println!(
120+
"[{:>10}s] loaded initial changeset from db",
121+
start.elapsed().as_secs_f32()
122+
);
116123

117124
let graph = Mutex::new({
118125
let mut graph = IndexedTxGraph::new(index);
119126
graph.apply_changeset(init_changeset.1);
120127
graph
121128
});
122-
println!("loaded indexed tx graph from db");
129+
println!(
130+
"[{:>10}s] loaded indexed tx graph from changeset",
131+
start.elapsed().as_secs_f32()
132+
);
123133

124134
let chain = Mutex::new(LocalChain::from_changeset(init_changeset.0));
125-
println!("loaded local chain from db");
135+
println!(
136+
"[{:>10}s] loaded local chain from changeset",
137+
start.elapsed().as_secs_f32()
138+
);
126139

127140
let rpc_cmd = match args.command {
128141
example_cli::Commands::ChainSpecific(rpc_cmd) => rpc_cmd,
@@ -153,14 +166,11 @@ fn main() -> anyhow::Result<()> {
153166
..
154167
} = rpc_args;
155168

156-
let mut chain = chain.lock().unwrap();
157-
let mut graph = graph.lock().unwrap();
158-
let mut db = db.lock().unwrap();
159-
160-
graph.index.set_lookahead_for_all(lookahead);
169+
graph.lock().unwrap().index.set_lookahead_for_all(lookahead);
161170

171+
let chain_tip = chain.lock().unwrap().tip();
162172
let rpc_client = rpc_args.new_client()?;
163-
let mut emitter = match chain.tip() {
173+
let mut emitter = match chain_tip {
164174
Some(cp) => Emitter::from_checkpoint(&rpc_client, cp),
165175
None => Emitter::from_height(&rpc_client, fallback_height),
166176
};
@@ -169,6 +179,10 @@ fn main() -> anyhow::Result<()> {
169179
let mut last_print = Instant::now();
170180

171181
while let Some((height, block)) = emitter.next_block()? {
182+
let mut chain = chain.lock().unwrap();
183+
let mut graph = graph.lock().unwrap();
184+
let mut db = db.lock().unwrap();
185+
172186
let chain_update =
173187
CheckPoint::from_header(&block.header, height).into_update(false);
174188
let chain_changeset = chain
@@ -182,7 +196,8 @@ fn main() -> anyhow::Result<()> {
182196
last_db_commit = Instant::now();
183197
db.commit()?;
184198
println!(
185-
"commited to db (took {}s)",
199+
"[{:>10}s] commited to db (took {}s)",
200+
start.elapsed().as_secs_f32(),
186201
last_db_commit.elapsed().as_secs_f32()
187202
);
188203
}
@@ -200,7 +215,8 @@ fn main() -> anyhow::Result<()> {
200215
)
201216
};
202217
println!(
203-
"synced to {} @ {} | total: {} sats",
218+
"[{:>10}s] synced to {} @ {} | total: {} sats",
219+
start.elapsed().as_secs_f32(),
204220
synced_to.hash(),
205221
synced_to.height(),
206222
balance.total()
@@ -209,13 +225,15 @@ fn main() -> anyhow::Result<()> {
209225
}
210226
}
211227

212-
// mempool
213228
let mempool_txs = emitter.mempool()?;
214-
let graph_changeset = graph.batch_insert_unconfirmed(mempool_txs);
215-
db.stage((local_chain::ChangeSet::default(), graph_changeset));
216-
217-
// commit one last time!
218-
db.commit()?;
229+
let graph_changeset = graph.lock().unwrap().batch_insert_relevant_unconfirmed(
230+
mempool_txs.iter().map(|(tx, time)| (tx, *time)),
231+
);
232+
{
233+
let mut db = db.lock().unwrap();
234+
db.stage((local_chain::ChangeSet::default(), graph_changeset));
235+
db.commit()?; // commit one last time
236+
}
219237
}
220238
RpcCommands::Live { rpc_args } => {
221239
let RpcArgs {
@@ -228,10 +246,12 @@ fn main() -> anyhow::Result<()> {
228246
graph.lock().unwrap().index.set_lookahead_for_all(lookahead);
229247
let last_cp = chain.lock().unwrap().tip();
230248

249+
println!(
250+
"[{:>10}s] starting emitter thread...",
251+
start.elapsed().as_secs_f32()
252+
);
231253
let (tx, rx) = std::sync::mpsc::sync_channel::<Emission>(CHANNEL_BOUND);
232254
let emission_jh = std::thread::spawn(move || -> anyhow::Result<()> {
233-
println!("emitter thread started...");
234-
235255
let rpc_client = rpc_args.new_client()?;
236256
let mut emitter = match last_cp {
237257
Some(cp) => Emitter::from_checkpoint(&rpc_client, cp),
@@ -270,15 +290,15 @@ fn main() -> anyhow::Result<()> {
270290
Ok(())
271291
});
272292

273-
let mut db = db.lock().unwrap();
274-
let mut graph = graph.lock().unwrap();
275-
let mut chain = chain.lock().unwrap();
276293
let mut tip_height = 0_u32;
277-
278294
let mut last_db_commit = Instant::now();
279295
let mut last_print = Option::<Instant>::None;
280296

281297
for emission in rx {
298+
let mut db = db.lock().unwrap();
299+
let mut graph = graph.lock().unwrap();
300+
let mut chain = chain.lock().unwrap();
301+
282302
let changeset = match emission {
283303
Emission::Block { height, block } => {
284304
let chain_update =
@@ -307,7 +327,8 @@ fn main() -> anyhow::Result<()> {
307327
last_db_commit = Instant::now();
308328
db.commit()?;
309329
println!(
310-
"commited to db (took {}s)",
330+
"[{:>10}s] commited to db (took {}s)",
331+
start.elapsed().as_secs_f32(),
311332
last_db_commit.elapsed().as_secs_f32()
312333
);
313334
}
@@ -324,7 +345,8 @@ fn main() -> anyhow::Result<()> {
324345
)
325346
};
326347
println!(
327-
"synced to {} @ {} / {} | total: {} sats",
348+
"[{:>10}s] synced to {} @ {} / {} | total: {} sats",
349+
start.elapsed().as_secs_f32(),
328350
synced_to.hash(),
329351
synced_to.height(),
330352
tip_height,

0 commit comments

Comments
 (0)