Skip to content

Commit e7ba850

Browse files
committed
fix order of batch calls by including the response id in the ChannelMessage
fixes bitcoindevkit#160
1 parent ac1d7c0 commit e7ba850

File tree

3 files changed

+69
-14
lines changed

3 files changed

+69
-14
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ path = "src/lib.rs"
2020

2121
[dependencies]
2222
log = "^0.4"
23+
pretty_env_logger = "0.5"
2324
bitcoin = { version = "0.32", features = ["serde"] }
2425
serde = { version = "^1.0", features = ["derive"] }
2526
serde_json = { version = "^1.0" }

examples/plaintext.rs

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,59 @@
11
extern crate electrum_client;
2+
extern crate log;
3+
extern crate pretty_env_logger;
24

5+
use bitcoin::block::Header;
36
use electrum_client::{Client, ElectrumApi};
47

58
fn main() {
6-
let client = Client::new("tcp://electrum.blockstream.info:50001").unwrap();
7-
let res = client.server_features();
8-
println!("{:#?}", res);
9+
pretty_env_logger::init_timed();
10+
11+
for _ in 0..10 {
12+
// let client = Client::new("electrum.blockstream.info:50001").unwrap();
13+
let client = Client::new("exs.dyshek.org:50001").unwrap();
14+
// let client = Client::new("tcp://bitcoin.aranguren.org:50001").unwrap();
15+
16+
let heights: Vec<u32> = vec![1, 4, 8, 12, 222, 6666];
17+
let headers: Vec<Header> = client.batch_block_header(heights).unwrap();
18+
19+
let mut err_counter = 0;
20+
if headers.get(0).unwrap().time != 1231469665 {
21+
// time of block 1
22+
err_counter += 1;
23+
log::error!("error 0");
24+
}
25+
if headers.get(1).unwrap().time != 1231470988 {
26+
// time of block 4
27+
err_counter += 1;
28+
log::error!("error 1");
29+
}
30+
if headers.get(2).unwrap().time != 1231472743 {
31+
// time of block 8
32+
err_counter += 1;
33+
log::error!("error 2");
34+
}
35+
if headers.get(3).unwrap().time != 1231474888 {
36+
// time of block 12
37+
err_counter += 1;
38+
log::error!("error 3");
39+
}
40+
if headers.get(4).unwrap().time != 1231770653 {
41+
// time of block 222
42+
err_counter += 1;
43+
log::error!("error 4");
44+
}
45+
if headers.get(5).unwrap().time != 1236456633 {
46+
// time of block 6666
47+
err_counter += 1;
48+
log::error!("error 5");
49+
}
50+
if err_counter > 0 {
51+
log::error!(
52+
"Result: {} headers were not at expected index in result vector",
53+
err_counter
54+
);
55+
break;
56+
}
57+
println!("==========================================")
58+
}
959
}

src/raw_client.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ impl RawClient<ElectrumProxyStream> {
511511

512512
#[derive(Debug)]
513513
enum ChannelMessage {
514-
Response(serde_json::Value),
514+
Response((usize, serde_json::Value)),
515515
WakeUp,
516516
Error(Arc<std::io::Error>),
517517
}
@@ -605,7 +605,8 @@ impl<S: Read + Write> RawClient<S> {
605605
trace!("Reader thread received response for {}", resp_id);
606606

607607
if let Some(sender) = self.waiting_map.lock()?.remove(&resp_id) {
608-
sender.send(ChannelMessage::Response(resp))?;
608+
log::debug!("Sending resp_id {resp_id}");
609+
sender.send(ChannelMessage::Response((resp_id, resp)))?;
609610
} else {
610611
warn!("Missing listener for {}", resp_id);
611612
}
@@ -657,12 +658,12 @@ impl<S: Read + Write> RawClient<S> {
657658
self.increment_calls();
658659

659660
let mut resp = match self.recv(&receiver, req.id) {
660-
Ok(resp) => resp,
661-
e @ Err(_) => {
661+
Ok((_, resp)) => resp,
662+
Err(e) => {
662663
// In case of error our sender could still be left in the map, depending on where
663664
// the error happened. Just in case, try to remove it here
664665
self.waiting_map.lock()?.remove(&req.id);
665-
return e;
666+
return Err(e);
666667
}
667668
};
668669
Ok(resp["result"].take())
@@ -672,16 +673,16 @@ impl<S: Read + Write> RawClient<S> {
672673
&self,
673674
receiver: &Receiver<ChannelMessage>,
674675
req_id: usize,
675-
) -> Result<serde_json::Value, Error> {
676+
) -> Result<(usize, serde_json::Value), Error> {
676677
loop {
677678
// Try to take the lock on the reader. If we manage to do so, we'll become the reader
678679
// thread until we get our reponse
679680
match self._reader_thread(Some(req_id)) {
680-
Ok(response) => break Ok(response),
681+
Ok(response) => break Ok((req_id, response)),
681682
Err(Error::CouldntLockReader) => {
682683
match receiver.recv()? {
683-
// Received our response, returning it
684-
ChannelMessage::Response(received) => break Ok(received),
684+
// Received a response, returning it
685+
ChannelMessage::Response((resp_id, resp)) => break Ok((resp_id, resp)),
685686
ChannelMessage::WakeUp => {
686687
// We have been woken up, this means that we should try becoming the
687688
// reader thread ourselves
@@ -696,7 +697,7 @@ impl<S: Read + Write> RawClient<S> {
696697
}
697698
}
698699
}
699-
e @ Err(_) => break e,
700+
Err(e) => break Err(e),
700701
}
701702
}
702703
}
@@ -798,7 +799,10 @@ impl<T: Read + Write> ElectrumApi for RawClient<T> {
798799

799800
for req_id in missing_responses.iter() {
800801
match self.recv(&receiver, *req_id) {
801-
Ok(mut resp) => answers.insert(req_id, resp["result"].take()),
802+
Ok((resp_id, mut resp)) => {
803+
log::debug!("Received resp_id {resp_id}");
804+
answers.insert(resp_id, resp["result"].take());
805+
}
802806
Err(e) => {
803807
// In case of error our sender could still be left in the map, depending on where
804808
// the error happened. Just in case, try to remove it here

0 commit comments

Comments
 (0)