Skip to content

Commit a506b1b

Browse files
committed
refactor: use mutable BlockchainClient references
Update function signatures to accept &mut BlockchainClient instead of taking ownership. This is required after the Kyoto client refactoring to allow the client to be used mutably across multiple operations, including repeated calls to sync_kyoto_client. - Update handle_online_wallet_subcommand signature - Update all PayjoinManager methods to use &mut BlockchainClient - Fix parameter dereferencing in full_scan calls - Update all call sites to pass mutable references
1 parent 35731e1 commit a506b1b

File tree

2 files changed

+49
-67
lines changed

2 files changed

+49
-67
lines changed

src/handlers.rs

Lines changed: 31 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ use crate::commands::*;
1414
use crate::error::BDKCliError as Error;
1515
#[cfg(any(feature = "sqlite", feature = "redb"))]
1616
use crate::persister::Persister;
17+
#[cfg(feature = "cbf")]
18+
use crate::utils::BlockchainClient::KyotoClient;
1719
use crate::utils::*;
1820
#[cfg(feature = "redb")]
1921
use bdk_redb::Store as RedbStore;
@@ -45,8 +47,6 @@ use bdk_wallet::{
4547
};
4648
use cli_table::{Cell, CellStruct, Style, Table, format::Justify};
4749
use serde_json::json;
48-
#[cfg(feature = "cbf")]
49-
use {crate::utils::BlockchainClient::KyotoClient, bdk_kyoto::LightClient, tokio::select};
5050

5151
#[cfg(feature = "electrum")]
5252
use crate::utils::BlockchainClient::Electrum;
@@ -602,7 +602,7 @@ pub fn handle_offline_wallet_subcommand(
602602
))]
603603
pub(crate) async fn handle_online_wallet_subcommand(
604604
wallet: &mut Wallet,
605-
client: BlockchainClient,
605+
client: &mut BlockchainClient,
606606
online_subcommand: OnlineWalletSubCommand,
607607
) -> Result<String, Error> {
608608
match online_subcommand {
@@ -629,7 +629,7 @@ pub(crate) async fn handle_online_wallet_subcommand(
629629
client
630630
.populate_tx_cache(wallet.tx_graph().full_txs().map(|tx_node| tx_node.tx));
631631

632-
let update = client.full_scan(request, _stop_gap, batch_size, false)?;
632+
let update = client.full_scan(request, _stop_gap, *batch_size, false)?;
633633
wallet.apply_update(update)?;
634634
}
635635
#[cfg(feature = "esplora")]
@@ -638,7 +638,7 @@ pub(crate) async fn handle_online_wallet_subcommand(
638638
parallel_requests,
639639
} => {
640640
let update = client
641-
.full_scan(request, _stop_gap, parallel_requests)
641+
.full_scan(request, _stop_gap, *parallel_requests)
642642
.await
643643
.map_err(|e| *e)?;
644644
wallet.apply_update(update)?;
@@ -655,7 +655,7 @@ pub(crate) async fn handle_online_wallet_subcommand(
655655
hash: genesis_block.block_hash(),
656656
});
657657
let mut emitter = Emitter::new(
658-
&*client,
658+
&**client,
659659
genesis_cp.clone(),
660660
genesis_cp.height(),
661661
NO_EXPECTED_MEMPOOL_TXS,
@@ -986,11 +986,12 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result<String, Error> {
986986
};
987987

988988
let mut wallet = new_persisted_wallet(network, &mut persister, wallet_opts)?;
989-
let blockchain_client = new_blockchain_client(wallet_opts, &wallet, database_path)?;
989+
let mut blockchain_client =
990+
new_blockchain_client(wallet_opts, &wallet, database_path)?;
990991

991992
let result = handle_online_wallet_subcommand(
992993
&mut wallet,
993-
blockchain_client,
994+
&mut blockchain_client,
994995
online_subcommand,
995996
)
996997
.await?;
@@ -1000,11 +1001,15 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result<String, Error> {
10001001
#[cfg(not(any(feature = "sqlite", feature = "redb")))]
10011002
let result = {
10021003
let wallet = new_wallet(network, wallet_opts)?;
1003-
let blockchain_client =
1004+
let mut blockchain_client =
10041005
crate::utils::new_blockchain_client(wallet_opts, &wallet, database_path)?;
10051006
let mut wallet = new_wallet(network, wallet_opts)?;
1006-
handle_online_wallet_subcommand(&mut wallet, blockchain_client, online_subcommand)
1007-
.await?
1007+
handle_online_wallet_subcommand(
1008+
&mut wallet,
1009+
&mut blockchain_client,
1010+
online_subcommand,
1011+
)
1012+
.await?
10081013
};
10091014
Ok(result)
10101015
}
@@ -1183,9 +1188,9 @@ async fn respond(
11831188
ReplSubCommand::Wallet {
11841189
subcommand: WalletSubCommand::OnlineWalletSubCommand(online_subcommand),
11851190
} => {
1186-
let blockchain =
1191+
let mut blockchain =
11871192
new_blockchain_client(wallet_opts, wallet, _datadir).map_err(|e| e.to_string())?;
1188-
let value = handle_online_wallet_subcommand(wallet, blockchain, online_subcommand)
1193+
let value = handle_online_wallet_subcommand(wallet, &mut blockchain, online_subcommand)
11891194
.await
11901195
.map_err(|e| e.to_string())?;
11911196
Some(value)
@@ -1228,7 +1233,7 @@ async fn respond(
12281233
feature = "rpc"
12291234
))]
12301235
/// Syncs a given wallet using the blockchain client.
1231-
pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Result<(), Error> {
1236+
pub async fn sync_wallet(client: &mut BlockchainClient, wallet: &mut Wallet) -> Result<(), Error> {
12321237
#[cfg(any(feature = "electrum", feature = "esplora"))]
12331238
let request = wallet
12341239
.start_sync_with_revealed_spks()
@@ -1243,7 +1248,7 @@ pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Resul
12431248
// already have.
12441249
client.populate_tx_cache(wallet.tx_graph().full_txs().map(|tx_node| tx_node.tx));
12451250

1246-
let update = client.sync(request, batch_size, false)?;
1251+
let update = client.sync(request, *batch_size, false)?;
12471252
wallet
12481253
.apply_update(update)
12491254
.map_err(|e| Error::Generic(e.to_string()))
@@ -1254,7 +1259,7 @@ pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Resul
12541259
parallel_requests,
12551260
} => {
12561261
let update = client
1257-
.sync(request, parallel_requests)
1262+
.sync(request, *parallel_requests)
12581263
.await
12591264
.map_err(|e| *e)?;
12601265
wallet
@@ -1269,7 +1274,7 @@ pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Resul
12691274
// reload the last 200 blocks in case of a reorg
12701275
let emitter_height = wallet_cp.height().saturating_sub(200);
12711276
let mut emitter = Emitter::new(
1272-
&*client,
1277+
&**client,
12731278
wallet_cp,
12741279
emitter_height,
12751280
wallet
@@ -1320,7 +1325,7 @@ pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Resul
13201325
))]
13211326
/// Broadcasts a given transaction using the blockchain client.
13221327
pub async fn broadcast_transaction(
1323-
client: BlockchainClient,
1328+
client: &mut BlockchainClient,
13241329
tx: Transaction,
13251330
) -> Result<Txid, Error> {
13261331
match client {
@@ -1347,38 +1352,15 @@ pub async fn broadcast_transaction(
13471352

13481353
#[cfg(feature = "cbf")]
13491354
KyotoClient { client } => {
1350-
let LightClient {
1351-
requester,
1352-
mut info_subscriber,
1353-
mut warning_subscriber,
1354-
update_subscriber: _,
1355-
node,
1356-
} = *client;
1357-
1358-
let subscriber = tracing_subscriber::FmtSubscriber::new();
1359-
tracing::subscriber::set_global_default(subscriber)
1360-
.map_err(|e| Error::Generic(format!("SetGlobalDefault error: {e}")))?;
1361-
1362-
tokio::task::spawn(async move { node.run().await });
1363-
tokio::task::spawn(async move {
1364-
select! {
1365-
info = info_subscriber.recv() => {
1366-
if let Some(info) = info {
1367-
tracing::info!("{info}");
1368-
}
1369-
},
1370-
warn = warning_subscriber.recv() => {
1371-
if let Some(warn) = warn {
1372-
tracing::warn!("{warn}");
1373-
}
1374-
}
1375-
}
1376-
});
13771355
let txid = tx.compute_txid();
1378-
let wtxid = requester.broadcast_random(tx.clone()).await.map_err(|_| {
1379-
tracing::warn!("Broadcast was unsuccessful");
1380-
Error::Generic("Transaction broadcast timed out after 30 seconds".into())
1381-
})?;
1356+
let wtxid = client
1357+
.requester
1358+
.broadcast_random(tx.clone())
1359+
.await
1360+
.map_err(|_| {
1361+
tracing::warn!("Broadcast was unsuccessful");
1362+
Error::Generic("Transaction broadcast timed out after 30 seconds".into())
1363+
})?;
13821364
tracing::info!("Successfully broadcast WTXID: {wtxid}");
13831365
Ok(txid)
13841366
}

src/payjoin/mod.rs

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ impl<'a> PayjoinManager<'a> {
5151
directory: String,
5252
max_fee_rate: Option<u64>,
5353
ohttp_relays: Vec<String>,
54-
blockchain_client: BlockchainClient,
54+
blockchain_client: &mut BlockchainClient,
5555
) -> Result<String, Error> {
5656
let address = self
5757
.wallet
@@ -119,7 +119,7 @@ impl<'a> PayjoinManager<'a> {
119119
uri: String,
120120
fee_rate: u64,
121121
ohttp_relays: Vec<String>,
122-
blockchain_client: BlockchainClient,
122+
blockchain_client: &mut BlockchainClient,
123123
) -> Result<String, Error> {
124124
let uri = payjoin::Uri::try_from(uri)
125125
.map_err(|e| Error::Generic(format!("Failed parsing to Payjoin URI: {}", e)))?;
@@ -237,7 +237,7 @@ impl<'a> PayjoinManager<'a> {
237237
persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
238238
relay: impl payjoin::IntoUrl,
239239
max_fee_rate: FeeRate,
240-
blockchain_client: BlockchainClient,
240+
blockchain_client: &mut BlockchainClient,
241241
) -> Result<(), Error> {
242242
match session {
243243
ReceiveSession::Initialized(proposal) => {
@@ -306,7 +306,7 @@ impl<'a> PayjoinManager<'a> {
306306
persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
307307
relay: impl payjoin::IntoUrl,
308308
max_fee_rate: FeeRate,
309-
blockchain_client: BlockchainClient,
309+
blockchain_client: &mut BlockchainClient,
310310
) -> Result<(), Error> {
311311
let mut current_receiver_typestate = receiver;
312312
let next_receiver_typestate = loop {
@@ -353,7 +353,7 @@ impl<'a> PayjoinManager<'a> {
353353
receiver: Receiver<UncheckedOriginalPayload>,
354354
persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
355355
max_fee_rate: FeeRate,
356-
blockchain_client: BlockchainClient,
356+
blockchain_client: &mut BlockchainClient,
357357
) -> Result<(), Error> {
358358
let next_receiver_typestate = receiver
359359
.assume_interactive_receiver()
@@ -386,7 +386,7 @@ impl<'a> PayjoinManager<'a> {
386386
receiver: Receiver<MaybeInputsOwned>,
387387
persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
388388
max_fee_rate: FeeRate,
389-
blockchain_client: BlockchainClient,
389+
blockchain_client: &mut BlockchainClient,
390390
) -> Result<(), Error> {
391391
let next_receiver_typestate = receiver
392392
.check_inputs_not_owned(&mut |input| {
@@ -411,7 +411,7 @@ impl<'a> PayjoinManager<'a> {
411411
receiver: Receiver<MaybeInputsSeen>,
412412
persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
413413
max_fee_rate: FeeRate,
414-
blockchain_client: BlockchainClient,
414+
blockchain_client: &mut BlockchainClient,
415415
) -> Result<(), Error> {
416416
// This is not supported as there is no persistence of previous Payjoin attempts in BDK CLI
417417
// yet. If there is support either in the BDK persister or Payjoin persister, this can be
@@ -437,7 +437,7 @@ impl<'a> PayjoinManager<'a> {
437437
receiver: Receiver<OutputsUnknown>,
438438
persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
439439
max_fee_rate: FeeRate,
440-
blockchain_client: BlockchainClient,
440+
blockchain_client: &mut BlockchainClient,
441441
) -> Result<(), Error> {
442442
let next_receiver_typestate = receiver.identify_receiver_outputs(&mut |output_script| {
443443
Ok(self.wallet.is_mine(output_script.to_owned()))
@@ -459,7 +459,7 @@ impl<'a> PayjoinManager<'a> {
459459
receiver: Receiver<WantsOutputs>,
460460
persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
461461
max_fee_rate: FeeRate,
462-
blockchain_client: BlockchainClient,
462+
blockchain_client: &mut BlockchainClient,
463463
) -> Result<(), Error> {
464464
// This is a typestate to modify existing receiver-owned outputs in case the receiver wants
465465
// to do that. This is a very simple implementation of Payjoin so we are just going
@@ -483,7 +483,7 @@ impl<'a> PayjoinManager<'a> {
483483
receiver: Receiver<WantsInputs>,
484484
persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
485485
max_fee_rate: FeeRate,
486-
blockchain_client: BlockchainClient,
486+
blockchain_client: &mut BlockchainClient,
487487
) -> Result<(), Error> {
488488
let candidate_inputs: Vec<InputPair> = self
489489
.wallet
@@ -533,7 +533,7 @@ impl<'a> PayjoinManager<'a> {
533533
receiver: Receiver<WantsFeeRange>,
534534
persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
535535
max_fee_rate: FeeRate,
536-
blockchain_client: BlockchainClient,
536+
blockchain_client: &mut BlockchainClient,
537537
) -> Result<(), Error> {
538538
let next_receiver_typestate = receiver.apply_fee_range(None, Some(max_fee_rate)).save(persister).map_err(|e| {
539539
Error::Generic(format!("Error occurred when saving after applying the receiver fee range to the transaction: {e}"))
@@ -546,7 +546,7 @@ impl<'a> PayjoinManager<'a> {
546546
&mut self,
547547
receiver: Receiver<ProvisionalProposal>,
548548
persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
549-
blockchain_client: BlockchainClient,
549+
blockchain_client: &mut BlockchainClient,
550550
) -> Result<(), Error> {
551551
let next_receiver_typestate = receiver
552552
.finalize_proposal(|psbt| {
@@ -580,7 +580,7 @@ impl<'a> PayjoinManager<'a> {
580580
&mut self,
581581
receiver: Receiver<PayjoinProposal>,
582582
persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
583-
blockchain_client: BlockchainClient,
583+
blockchain_client: &mut BlockchainClient,
584584
) -> Result<(), Error> {
585585
let (req, ctx) = receiver.create_post_request(
586586
self.relay_manager
@@ -619,7 +619,7 @@ impl<'a> PayjoinManager<'a> {
619619
&mut self,
620620
receiver: Receiver<Monitor>,
621621
persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
622-
blockchain_client: BlockchainClient,
622+
blockchain_client: &mut BlockchainClient,
623623
) -> Result<(), Error> {
624624
let wait_time_for_sync = 3;
625625
let poll_internal = tokio::time::Duration::from_secs(wait_time_for_sync);
@@ -734,7 +734,7 @@ impl<'a> PayjoinManager<'a> {
734734
session: SendSession,
735735
persister: &impl SessionPersister<SessionEvent = SenderSessionEvent>,
736736
relay: impl payjoin::IntoUrl,
737-
blockchain_client: BlockchainClient,
737+
blockchain_client: &mut BlockchainClient,
738738
) -> Result<Txid, Error> {
739739
match session {
740740
SendSession::WithReplyKey(context) => {
@@ -757,7 +757,7 @@ impl<'a> PayjoinManager<'a> {
757757
sender: Sender<WithReplyKey>,
758758
relay: impl payjoin::IntoUrl,
759759
persister: &impl SessionPersister<SessionEvent = SenderSessionEvent>,
760-
blockchain_client: BlockchainClient,
760+
blockchain_client: &mut BlockchainClient,
761761
) -> Result<Txid, Error> {
762762
let (req, ctx) = sender.create_v2_post_request(relay.as_str()).map_err(|e| {
763763
Error::Generic(format!(
@@ -780,7 +780,7 @@ impl<'a> PayjoinManager<'a> {
780780
sender: Sender<PollingForProposal>,
781781
relay: impl payjoin::IntoUrl,
782782
persister: &impl SessionPersister<SessionEvent = SenderSessionEvent>,
783-
blockchain_client: BlockchainClient,
783+
blockchain_client: &mut BlockchainClient,
784784
) -> Result<Txid, Error> {
785785
let mut sender = sender.clone();
786786
loop {
@@ -815,7 +815,7 @@ impl<'a> PayjoinManager<'a> {
815815
async fn process_payjoin_proposal(
816816
&self,
817817
mut psbt: Psbt,
818-
blockchain_client: BlockchainClient,
818+
blockchain_client: &mut BlockchainClient,
819819
) -> Result<Txid, Error> {
820820
if !self.wallet.sign(&mut psbt, SignOptions::default())? {
821821
return Err(Error::Generic(

0 commit comments

Comments
 (0)