Skip to content

Commit d9adfbe

Browse files
committed
Merge #704: Fix rpc::CoreTxIter logic.
74e2c47 Replace `rpc::CoreTxIter` with `list_transactions` fn. (志宇) Pull request description: ### Description This fixes a bug where `CoreTxIter` attempts to call `listtransactions` immediately after a tx result is filtered (instead of being returned), when in fact, the correct logic will be to pop another tx result. The new logic also ensures that tx results are returned in chonological order. The test `test_list_transactions` verifies this. We also now ensure that `page_size` is between the range `[0 to 1000]` otherwise an error is returned. Some needless cloning is removed from `from_config` as well as logging improvements. ### Notes to the reviewers This is an oversight by me (sorry) for PR #683 ### Checklists #### All Submissions: * [x] I've signed all my commits * [x] I followed the [contribution guidelines](https://github.com/bitcoindevkit/bdk/blob/master/CONTRIBUTING.md) * [x] I ran `cargo fmt` and `cargo clippy` before committing #### Bugfixes: ~* [ ] This pull request breaks the existing API~ * [x] I've added tests to reproduce the issue which are now passing * [x] I'm linking the issue being fixed by this PR ACKs for top commit: afilini: ACK 74e2c47 Tree-SHA512: f32314a9947067673d19d95da8cde36b350c0bb0ebe0924405ad50602c14590f7ccb09a3e03cdfdd227f938dccd0f556f3a2b4dd7fdd6eba1591c0f8d3e65182
2 parents 03d3c78 + 74e2c47 commit d9adfbe

File tree

1 file changed

+130
-87
lines changed

1 file changed

+130
-87
lines changed

src/blockchain/rpc.rs

Lines changed: 130 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -201,27 +201,26 @@ impl ConfigurableBlockchain for RpcBlockchain {
201201
/// Returns RpcBlockchain backend creating an RPC client to a specific wallet named as the descriptor's checksum
202202
/// if it's the first time it creates the wallet in the node and upon return is granted the wallet is loaded
203203
fn from_config(config: &Self::Config) -> Result<Self, Error> {
204-
let wallet_name = config.wallet_name.clone();
205-
let wallet_url = format!("{}/wallet/{}", config.url, &wallet_name);
206-
debug!("connecting to {} auth:{:?}", wallet_url, config.auth);
204+
let wallet_url = format!("{}/wallet/{}", config.url, &config.wallet_name);
207205

208206
let client = Client::new(wallet_url.as_str(), config.auth.clone().into())?;
209207
let rpc_version = client.version()?;
210208

211-
let loaded_wallets = client.list_wallets()?;
212-
if loaded_wallets.contains(&wallet_name) {
213-
debug!("wallet already loaded {:?}", wallet_name);
214-
} else if list_wallet_dir(&client)?.contains(&wallet_name) {
215-
client.load_wallet(&wallet_name)?;
216-
debug!("wallet loaded {:?}", wallet_name);
209+
info!("connected to '{}' with auth: {:?}", wallet_url, config.auth);
210+
211+
if client.list_wallets()?.contains(&config.wallet_name) {
212+
info!("wallet already loaded: {}", config.wallet_name);
213+
} else if list_wallet_dir(&client)?.contains(&config.wallet_name) {
214+
client.load_wallet(&config.wallet_name)?;
215+
info!("wallet loaded: {}", config.wallet_name);
217216
} else {
218217
// pre-0.21 use legacy wallets
219218
if rpc_version < 210_000 {
220-
client.create_wallet(&wallet_name, Some(true), None, None, None)?;
219+
client.create_wallet(&config.wallet_name, Some(true), None, None, None)?;
221220
} else {
222221
// TODO: move back to api call when https://github.com/rust-bitcoin/rust-bitcoincore-rpc/issues/225 is closed
223222
let args = [
224-
Value::String(wallet_name.clone()),
223+
Value::String(config.wallet_name.clone()),
225224
Value::Bool(true),
226225
Value::Bool(false),
227226
Value::Null,
@@ -231,7 +230,7 @@ impl ConfigurableBlockchain for RpcBlockchain {
231230
let _: Value = client.call("createwallet", &args)?;
232231
}
233232

234-
debug!("wallet created {:?}", wallet_name);
233+
info!("wallet created: {}", config.wallet_name);
235234
}
236235

237236
let is_descriptors = is_wallet_descriptor(&client)?;
@@ -386,9 +385,16 @@ impl<'a, D: BatchDatabase> DbState<'a, D> {
386385
// wait for Core wallet to rescan (TODO: maybe make this async)
387386
await_wallet_scan(client, self.params.poll_rate_sec, self.prog)?;
388387

389-
// loop through results of Core RPC method `listtransactions`
390-
for tx_res in CoreTxIter::new(client, 100) {
391-
let tx_res = tx_res?;
388+
// obtain iterator of pagenated `listtransactions` RPC calls
389+
const LIST_TX_PAGE_SIZE: usize = 100; // item count per page
390+
let tx_iter = list_transactions(client, LIST_TX_PAGE_SIZE)?.filter(|item| {
391+
// filter out conflicting transactions - only accept transactions that are already
392+
// confirmed, or exists in mempool
393+
item.info.confirmations > 0 || client.get_mempool_entry(&item.info.txid).is_ok()
394+
});
395+
396+
// iterate through chronological results of `listtransactions`
397+
for tx_res in tx_iter {
392398
let mut updated = false;
393399

394400
let db_tx = self.txs.entry(tx_res.info.txid).or_insert_with(|| {
@@ -695,81 +701,53 @@ where
695701
Ok(())
696702
}
697703

698-
/// Iterates through results of multiple `listtransactions` calls.
699-
struct CoreTxIter<'a> {
700-
client: &'a Client,
704+
/// Calls the `listtransactions` RPC method in `page_size`s and returns iterator of the tx results
705+
/// in chronological order.
706+
///
707+
/// `page_size` cannot be less than 1 and cannot be greater than 1000.
708+
fn list_transactions(
709+
client: &Client,
701710
page_size: usize,
702-
page_index: usize,
703-
704-
stack: Vec<ListTransactionResult>,
705-
done: bool,
706-
}
707-
708-
impl<'a> CoreTxIter<'a> {
709-
fn new(client: &'a Client, mut page_size: usize) -> Self {
710-
if page_size > 1000 {
711-
page_size = 1000;
712-
}
713-
714-
Self {
715-
client,
716-
page_size,
717-
page_index: 0,
718-
stack: Vec::with_capacity(page_size),
719-
done: false,
720-
}
721-
}
722-
723-
/// We want to filter out conflicting transactions.
724-
/// Only accept transactions that are already confirmed, or existing in mempool.
725-
fn keep_tx(&self, item: &ListTransactionResult) -> bool {
726-
item.info.confirmations > 0 || self.client.get_mempool_entry(&item.info.txid).is_ok()
711+
) -> Result<impl Iterator<Item = ListTransactionResult>, Error> {
712+
if !(1..=1000).contains(&page_size) {
713+
return Err(Error::Generic(format!(
714+
"Core RPC method `listtransactions` must have `page_size` in range [1 to 1000]: got {}",
715+
page_size
716+
)));
727717
}
728-
}
729-
730-
impl<'a> Iterator for CoreTxIter<'a> {
731-
type Item = Result<ListTransactionResult, Error>;
732-
733-
fn next(&mut self) -> Option<Self::Item> {
734-
loop {
735-
if self.done {
736-
return None;
737-
}
738-
739-
if let Some(item) = self.stack.pop() {
740-
if self.keep_tx(&item) {
741-
return Some(Ok(item));
742-
}
743-
}
744718

745-
let res = self
746-
.client
747-
.list_transactions(
748-
None,
749-
Some(self.page_size),
750-
Some(self.page_size * self.page_index),
751-
Some(true),
752-
)
753-
.map_err(Error::Rpc);
754-
755-
self.page_index += 1;
756-
757-
let list = match res {
758-
Ok(list) => list,
759-
Err(err) => {
760-
self.done = true;
761-
return Some(Err(err));
762-
}
763-
};
764-
765-
if list.is_empty() {
766-
self.done = true;
767-
return None;
719+
// `.take_while` helper to obtain the first error (TODO: remove when we can use `.map_while`)
720+
let mut got_err = false;
721+
722+
// obtain results in batches (of `page_size`)
723+
let nested_list = (0_usize..)
724+
.map(|page_index| {
725+
client.list_transactions(
726+
None,
727+
Some(page_size),
728+
Some(page_size * page_index),
729+
Some(true),
730+
)
731+
})
732+
// take until returned rpc call is empty or until error
733+
// TODO: replace with the following when MSRV is 1.57.0:
734+
// `.map_while(|res| res.map(|l| if l.is_empty() { None } else { Some(l) }).transpose())`
735+
.take_while(|res| {
736+
if got_err || matches!(res, Ok(list) if list.is_empty()) {
737+
// break if last iteration was an error, or if the current result is empty
738+
false
739+
} else {
740+
// record whether result is error or not
741+
got_err = res.is_err();
742+
// continue on non-empty result or first error
743+
true
768744
}
745+
})
746+
.collect::<Result<Vec<_>, _>>()
747+
.map_err(Error::Rpc)?;
769748

770-
self.stack = list;
771-
}
772-
}
749+
// reverse here to have txs in chronological order
750+
Ok(nested_list.into_iter().rev().flatten())
773751
}
774752

775753
fn await_wallet_scan(client: &Client, rate_sec: u64, progress: &dyn Progress) -> Result<(), Error> {
@@ -885,10 +863,16 @@ impl BlockchainFactory for RpcBlockchainFactory {
885863
#[cfg(any(feature = "test-rpc", feature = "test-rpc-legacy"))]
886864
mod test {
887865
use super::*;
888-
use crate::testutils::blockchain_tests::TestClient;
866+
use crate::{
867+
descriptor::{into_wallet_descriptor_checked, AsDerived},
868+
testutils::blockchain_tests::TestClient,
869+
wallet::utils::SecpCtx,
870+
};
889871

890-
use bitcoin::Network;
872+
use bitcoin::{Address, Network};
891873
use bitcoincore_rpc::RpcApi;
874+
use log::LevelFilter;
875+
use miniscript::DescriptorTrait;
892876

893877
crate::bdk_blockchain_tests! {
894878
fn test_instance(test_client: &TestClient) -> RpcBlockchain {
@@ -942,4 +926,63 @@ mod test {
942926
"prefix-bbbbbb"
943927
);
944928
}
929+
930+
/// This test ensures that [list_transactions] always iterates through transactions in
931+
/// chronological order, independent of the `page_size`.
932+
#[test]
933+
fn test_list_transactions() {
934+
let _ = env_logger::builder()
935+
.filter_level(LevelFilter::Info)
936+
.default_format()
937+
.try_init();
938+
939+
const DESC: &'static str = "wpkh(tpubD9zMNV59kgbWgKK55SHJugmKKSt6wQXczxpucGYqNKwGmJp1x7Ar2nrLUXYHDdCctXmyDoSCn2JVMzMUDfib3FaDhwxCEMUELoq19xLSx66/*)";
940+
const AMOUNT_PER_TX: u64 = 10_000;
941+
const TX_COUNT: u32 = 50;
942+
943+
let secp = SecpCtx::default();
944+
let network = Network::Regtest;
945+
let (desc, ..) = into_wallet_descriptor_checked(DESC, &secp, network).unwrap();
946+
947+
let (mut test_client, factory) = get_factory();
948+
let bc = factory.build("itertest", None).unwrap();
949+
950+
// generate scripts (1 tx per script)
951+
let scripts = (0..TX_COUNT)
952+
.map(|index| desc.as_derived(index, &secp).script_pubkey())
953+
.collect::<Vec<_>>();
954+
955+
// import scripts and wait
956+
if bc.is_descriptors {
957+
import_descriptors(&bc.client, 0, scripts.iter()).unwrap();
958+
} else {
959+
import_multi(&bc.client, 0, scripts.iter()).unwrap();
960+
}
961+
await_wallet_scan(&bc.client, 2, &NoopProgress).unwrap();
962+
963+
// create and broadcast txs
964+
let expected_txids = scripts
965+
.iter()
966+
.map(|script| {
967+
let addr = Address::from_script(script, network).unwrap();
968+
let txid =
969+
test_client.receive(testutils! { @tx ( (@addr addr) => AMOUNT_PER_TX ) });
970+
test_client.generate(1, None);
971+
txid
972+
})
973+
.collect::<Vec<_>>();
974+
975+
// iterate through different page sizes - should always return txs in chronological order
976+
[1000, 1, 2, 6, 25, 49, 50].iter().for_each(|page_size| {
977+
println!("trying with page_size: {}", page_size);
978+
979+
let txids = list_transactions(&bc.client, *page_size)
980+
.unwrap()
981+
.map(|res| res.info.txid)
982+
.collect::<Vec<_>>();
983+
984+
assert_eq!(txids.len(), expected_txids.len());
985+
assert_eq!(txids, expected_txids);
986+
});
987+
}
945988
}

0 commit comments

Comments
 (0)