Skip to content

Commit d7d588f

Browse files
committed
refactor: use BlockchainClient as references
Update function signatures to accept &BlockchainClient instead of taking ownership. This refactoring to allow the client to be Referenced across multiple operations including repeated calls to sync_kyoto_client. - Update handle_online_wallet_subcommand signature - Update all PayjoinManager methods to use &BlockchainClient - Fix parameter dereferencing in full_scan calls - Update all call sites to pass references
1 parent dd72039 commit d7d588f

File tree

2 files changed

+40
-63
lines changed

2 files changed

+40
-63
lines changed

src/handlers.rs

Lines changed: 22 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ use crate::commands::*;
1414
use crate::error::BDKCliError as Error;
1515
#[cfg(any(feature = "sqlite", feature = "redb"))]
1616
use crate::persister::Persister;
17+
#[cfg(feature = "cbf")]
18+
use crate::utils::BlockchainClient::KyotoClient;
1719
use crate::utils::*;
1820
#[cfg(feature = "redb")]
1921
use bdk_redb::Store as RedbStore;
@@ -45,8 +47,6 @@ use bdk_wallet::{
4547
};
4648
use cli_table::{Cell, CellStruct, Style, Table, format::Justify};
4749
use serde_json::json;
48-
#[cfg(feature = "cbf")]
49-
use {crate::utils::BlockchainClient::KyotoClient, bdk_kyoto::LightClient, tokio::select};
5050

5151
#[cfg(feature = "electrum")]
5252
use crate::utils::BlockchainClient::Electrum;
@@ -602,7 +602,7 @@ pub fn handle_offline_wallet_subcommand(
602602
))]
603603
pub(crate) async fn handle_online_wallet_subcommand(
604604
wallet: &mut Wallet,
605-
client: BlockchainClient,
605+
client: &BlockchainClient,
606606
online_subcommand: OnlineWalletSubCommand,
607607
) -> Result<String, Error> {
608608
match online_subcommand {
@@ -629,7 +629,7 @@ pub(crate) async fn handle_online_wallet_subcommand(
629629
client
630630
.populate_tx_cache(wallet.tx_graph().full_txs().map(|tx_node| tx_node.tx));
631631

632-
let update = client.full_scan(request, _stop_gap, batch_size, false)?;
632+
let update = client.full_scan(request, _stop_gap, *batch_size, false)?;
633633
wallet.apply_update(update)?;
634634
}
635635
#[cfg(feature = "esplora")]
@@ -638,7 +638,7 @@ pub(crate) async fn handle_online_wallet_subcommand(
638638
parallel_requests,
639639
} => {
640640
let update = client
641-
.full_scan(request, _stop_gap, parallel_requests)
641+
.full_scan(request, _stop_gap, *parallel_requests)
642642
.await
643643
.map_err(|e| *e)?;
644644
wallet.apply_update(update)?;
@@ -655,7 +655,7 @@ pub(crate) async fn handle_online_wallet_subcommand(
655655
hash: genesis_block.block_hash(),
656656
});
657657
let mut emitter = Emitter::new(
658-
&*client,
658+
&**client,
659659
genesis_cp.clone(),
660660
genesis_cp.height(),
661661
NO_EXPECTED_MEMPOOL_TXS,
@@ -990,7 +990,7 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result<String, Error> {
990990

991991
let result = handle_online_wallet_subcommand(
992992
&mut wallet,
993-
blockchain_client,
993+
&blockchain_client,
994994
online_subcommand,
995995
)
996996
.await?;
@@ -1002,7 +1002,7 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result<String, Error> {
10021002
let mut wallet = new_wallet(network, wallet_opts)?;
10031003
let blockchain_client =
10041004
crate::utils::new_blockchain_client(wallet_opts, &wallet, database_path)?;
1005-
handle_online_wallet_subcommand(&mut wallet, blockchain_client, online_subcommand)
1005+
handle_online_wallet_subcommand(&mut wallet, &blockchain_client, online_subcommand)
10061006
.await?
10071007
};
10081008
Ok(result)
@@ -1184,7 +1184,7 @@ async fn respond(
11841184
} => {
11851185
let blockchain =
11861186
new_blockchain_client(wallet_opts, wallet, _datadir).map_err(|e| e.to_string())?;
1187-
let value = handle_online_wallet_subcommand(wallet, blockchain, online_subcommand)
1187+
let value = handle_online_wallet_subcommand(wallet, &blockchain, online_subcommand)
11881188
.await
11891189
.map_err(|e| e.to_string())?;
11901190
Some(value)
@@ -1227,7 +1227,7 @@ async fn respond(
12271227
feature = "rpc"
12281228
))]
12291229
/// Syncs a given wallet using the blockchain client.
1230-
pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Result<(), Error> {
1230+
pub async fn sync_wallet(client: &BlockchainClient, wallet: &mut Wallet) -> Result<(), Error> {
12311231
#[cfg(any(feature = "electrum", feature = "esplora"))]
12321232
let request = wallet
12331233
.start_sync_with_revealed_spks()
@@ -1242,7 +1242,7 @@ pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Resul
12421242
// already have.
12431243
client.populate_tx_cache(wallet.tx_graph().full_txs().map(|tx_node| tx_node.tx));
12441244

1245-
let update = client.sync(request, batch_size, false)?;
1245+
let update = client.sync(request, *batch_size, false)?;
12461246
wallet
12471247
.apply_update(update)
12481248
.map_err(|e| Error::Generic(e.to_string()))
@@ -1253,7 +1253,7 @@ pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Resul
12531253
parallel_requests,
12541254
} => {
12551255
let update = client
1256-
.sync(request, parallel_requests)
1256+
.sync(request, *parallel_requests)
12571257
.await
12581258
.map_err(|e| *e)?;
12591259
wallet
@@ -1268,7 +1268,7 @@ pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Resul
12681268
// reload the last 200 blocks in case of a reorg
12691269
let emitter_height = wallet_cp.height().saturating_sub(200);
12701270
let mut emitter = Emitter::new(
1271-
&*client,
1271+
&**client,
12721272
wallet_cp,
12731273
emitter_height,
12741274
wallet
@@ -1319,7 +1319,7 @@ pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Resul
13191319
))]
13201320
/// Broadcasts a given transaction using the blockchain client.
13211321
pub async fn broadcast_transaction(
1322-
client: BlockchainClient,
1322+
client: &BlockchainClient,
13231323
tx: Transaction,
13241324
) -> Result<Txid, Error> {
13251325
match client {
@@ -1346,38 +1346,15 @@ pub async fn broadcast_transaction(
13461346

13471347
#[cfg(feature = "cbf")]
13481348
KyotoClient { client } => {
1349-
let LightClient {
1350-
requester,
1351-
mut info_subscriber,
1352-
mut warning_subscriber,
1353-
update_subscriber: _,
1354-
node,
1355-
} = *client;
1356-
1357-
let subscriber = tracing_subscriber::FmtSubscriber::new();
1358-
tracing::subscriber::set_global_default(subscriber)
1359-
.map_err(|e| Error::Generic(format!("SetGlobalDefault error: {e}")))?;
1360-
1361-
tokio::task::spawn(async move { node.run().await });
1362-
tokio::task::spawn(async move {
1363-
select! {
1364-
info = info_subscriber.recv() => {
1365-
if let Some(info) = info {
1366-
tracing::info!("{info}");
1367-
}
1368-
},
1369-
warn = warning_subscriber.recv() => {
1370-
if let Some(warn) = warn {
1371-
tracing::warn!("{warn}");
1372-
}
1373-
}
1374-
}
1375-
});
13761349
let txid = tx.compute_txid();
1377-
let wtxid = requester.broadcast_random(tx.clone()).await.map_err(|_| {
1378-
tracing::warn!("Broadcast was unsuccessful");
1379-
Error::Generic("Transaction broadcast timed out after 30 seconds".into())
1380-
})?;
1350+
let wtxid = client
1351+
.requester
1352+
.broadcast_random(tx.clone())
1353+
.await
1354+
.map_err(|_| {
1355+
tracing::warn!("Broadcast was unsuccessful");
1356+
Error::Generic("Transaction broadcast timed out after 30 seconds".into())
1357+
})?;
13811358
tracing::info!("Successfully broadcast WTXID: {wtxid}");
13821359
Ok(txid)
13831360
}

src/payjoin/mod.rs

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ impl<'a> PayjoinManager<'a> {
5151
directory: String,
5252
max_fee_rate: Option<u64>,
5353
ohttp_relays: Vec<String>,
54-
blockchain_client: BlockchainClient,
54+
blockchain_client: &BlockchainClient,
5555
) -> Result<String, Error> {
5656
let address = self
5757
.wallet
@@ -119,7 +119,7 @@ impl<'a> PayjoinManager<'a> {
119119
uri: String,
120120
fee_rate: u64,
121121
ohttp_relays: Vec<String>,
122-
blockchain_client: BlockchainClient,
122+
blockchain_client: &BlockchainClient,
123123
) -> Result<String, Error> {
124124
let uri = payjoin::Uri::try_from(uri)
125125
.map_err(|e| Error::Generic(format!("Failed parsing to Payjoin URI: {}", e)))?;
@@ -237,7 +237,7 @@ impl<'a> PayjoinManager<'a> {
237237
persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
238238
relay: impl payjoin::IntoUrl,
239239
max_fee_rate: FeeRate,
240-
blockchain_client: BlockchainClient,
240+
blockchain_client: &BlockchainClient,
241241
) -> Result<(), Error> {
242242
match session {
243243
ReceiveSession::Initialized(proposal) => {
@@ -306,7 +306,7 @@ impl<'a> PayjoinManager<'a> {
306306
persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
307307
relay: impl payjoin::IntoUrl,
308308
max_fee_rate: FeeRate,
309-
blockchain_client: BlockchainClient,
309+
blockchain_client: &BlockchainClient,
310310
) -> Result<(), Error> {
311311
let mut current_receiver_typestate = receiver;
312312
let next_receiver_typestate = loop {
@@ -353,7 +353,7 @@ impl<'a> PayjoinManager<'a> {
353353
receiver: Receiver<UncheckedOriginalPayload>,
354354
persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
355355
max_fee_rate: FeeRate,
356-
blockchain_client: BlockchainClient,
356+
blockchain_client: &BlockchainClient,
357357
) -> Result<(), Error> {
358358
let next_receiver_typestate = receiver
359359
.assume_interactive_receiver()
@@ -386,7 +386,7 @@ impl<'a> PayjoinManager<'a> {
386386
receiver: Receiver<MaybeInputsOwned>,
387387
persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
388388
max_fee_rate: FeeRate,
389-
blockchain_client: BlockchainClient,
389+
blockchain_client: &BlockchainClient,
390390
) -> Result<(), Error> {
391391
let next_receiver_typestate = receiver
392392
.check_inputs_not_owned(&mut |input| {
@@ -411,7 +411,7 @@ impl<'a> PayjoinManager<'a> {
411411
receiver: Receiver<MaybeInputsSeen>,
412412
persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
413413
max_fee_rate: FeeRate,
414-
blockchain_client: BlockchainClient,
414+
blockchain_client: &BlockchainClient,
415415
) -> Result<(), Error> {
416416
// This is not supported as there is no persistence of previous Payjoin attempts in BDK CLI
417417
// yet. If there is support either in the BDK persister or Payjoin persister, this can be
@@ -437,7 +437,7 @@ impl<'a> PayjoinManager<'a> {
437437
receiver: Receiver<OutputsUnknown>,
438438
persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
439439
max_fee_rate: FeeRate,
440-
blockchain_client: BlockchainClient,
440+
blockchain_client: &BlockchainClient,
441441
) -> Result<(), Error> {
442442
let next_receiver_typestate = receiver.identify_receiver_outputs(&mut |output_script| {
443443
Ok(self.wallet.is_mine(output_script.to_owned()))
@@ -459,7 +459,7 @@ impl<'a> PayjoinManager<'a> {
459459
receiver: Receiver<WantsOutputs>,
460460
persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
461461
max_fee_rate: FeeRate,
462-
blockchain_client: BlockchainClient,
462+
blockchain_client: &BlockchainClient,
463463
) -> Result<(), Error> {
464464
// This is a typestate to modify existing receiver-owned outputs in case the receiver wants
465465
// to do that. This is a very simple implementation of Payjoin so we are just going
@@ -483,7 +483,7 @@ impl<'a> PayjoinManager<'a> {
483483
receiver: Receiver<WantsInputs>,
484484
persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
485485
max_fee_rate: FeeRate,
486-
blockchain_client: BlockchainClient,
486+
blockchain_client: &BlockchainClient,
487487
) -> Result<(), Error> {
488488
let candidate_inputs: Vec<InputPair> = self
489489
.wallet
@@ -533,7 +533,7 @@ impl<'a> PayjoinManager<'a> {
533533
receiver: Receiver<WantsFeeRange>,
534534
persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
535535
max_fee_rate: FeeRate,
536-
blockchain_client: BlockchainClient,
536+
blockchain_client: &BlockchainClient,
537537
) -> Result<(), Error> {
538538
let next_receiver_typestate = receiver.apply_fee_range(None, Some(max_fee_rate)).save(persister).map_err(|e| {
539539
Error::Generic(format!("Error occurred when saving after applying the receiver fee range to the transaction: {e}"))
@@ -546,7 +546,7 @@ impl<'a> PayjoinManager<'a> {
546546
&mut self,
547547
receiver: Receiver<ProvisionalProposal>,
548548
persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
549-
blockchain_client: BlockchainClient,
549+
blockchain_client: &BlockchainClient,
550550
) -> Result<(), Error> {
551551
let next_receiver_typestate = receiver
552552
.finalize_proposal(|psbt| {
@@ -580,7 +580,7 @@ impl<'a> PayjoinManager<'a> {
580580
&mut self,
581581
receiver: Receiver<PayjoinProposal>,
582582
persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
583-
blockchain_client: BlockchainClient,
583+
blockchain_client: &BlockchainClient,
584584
) -> Result<(), Error> {
585585
let (req, ctx) = receiver.create_post_request(
586586
self.relay_manager
@@ -619,7 +619,7 @@ impl<'a> PayjoinManager<'a> {
619619
&mut self,
620620
receiver: Receiver<Monitor>,
621621
persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
622-
blockchain_client: BlockchainClient,
622+
blockchain_client: &BlockchainClient,
623623
) -> Result<(), Error> {
624624
let wait_time_for_sync = 3;
625625
let poll_internal = tokio::time::Duration::from_secs(wait_time_for_sync);
@@ -734,7 +734,7 @@ impl<'a> PayjoinManager<'a> {
734734
session: SendSession,
735735
persister: &impl SessionPersister<SessionEvent = SenderSessionEvent>,
736736
relay: impl payjoin::IntoUrl,
737-
blockchain_client: BlockchainClient,
737+
blockchain_client: &BlockchainClient,
738738
) -> Result<Txid, Error> {
739739
match session {
740740
SendSession::WithReplyKey(context) => {
@@ -757,7 +757,7 @@ impl<'a> PayjoinManager<'a> {
757757
sender: Sender<WithReplyKey>,
758758
relay: impl payjoin::IntoUrl,
759759
persister: &impl SessionPersister<SessionEvent = SenderSessionEvent>,
760-
blockchain_client: BlockchainClient,
760+
blockchain_client: &BlockchainClient,
761761
) -> Result<Txid, Error> {
762762
let (req, ctx) = sender.create_v2_post_request(relay.as_str()).map_err(|e| {
763763
Error::Generic(format!(
@@ -780,7 +780,7 @@ impl<'a> PayjoinManager<'a> {
780780
sender: Sender<PollingForProposal>,
781781
relay: impl payjoin::IntoUrl,
782782
persister: &impl SessionPersister<SessionEvent = SenderSessionEvent>,
783-
blockchain_client: BlockchainClient,
783+
blockchain_client: &BlockchainClient,
784784
) -> Result<Txid, Error> {
785785
let mut sender = sender.clone();
786786
loop {
@@ -815,7 +815,7 @@ impl<'a> PayjoinManager<'a> {
815815
async fn process_payjoin_proposal(
816816
&self,
817817
mut psbt: Psbt,
818-
blockchain_client: BlockchainClient,
818+
blockchain_client: &BlockchainClient,
819819
) -> Result<Txid, Error> {
820820
if !self.wallet.sign(&mut psbt, SignOptions::default())? {
821821
return Err(Error::Generic(

0 commit comments

Comments
 (0)