Skip to content

Commit e204f11

Browse files
committed
feat: add a p2p session struct and use it to query nakamoto inventory vectors
1 parent 90b98f0 commit e204f11

File tree

1 file changed

+186
-52
lines changed

1 file changed

+186
-52
lines changed

stackslib/src/main.rs

Lines changed: 186 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ use std::collections::{BTreeMap, HashMap, HashSet};
3636
use std::fs::File;
3737
use std::io::prelude::*;
3838
use std::io::BufReader;
39+
use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6, TcpStream, ToSocketAddrs};
40+
use std::time::Duration;
3941
use std::{env, fs, io, process, thread};
4042

4143
use blockstack_lib::burnchains::bitcoin::{spv, BitcoinNetworkType};
@@ -62,11 +64,12 @@ use blockstack_lib::clarity::vm::ClarityVersion;
6264
use blockstack_lib::core::{MemPoolDB, *};
6365
use blockstack_lib::cost_estimates::metrics::UnitMetric;
6466
use blockstack_lib::cost_estimates::UnitEstimator;
67+
use blockstack_lib::net::api::getinfo::RPCPeerInfoData;
6568
use blockstack_lib::net::db::LocalPeer;
66-
use blockstack_lib::net::inv::nakamoto::InvGenerator;
69+
use blockstack_lib::net::httpcore::{send_http_request, StacksHttpRequest};
6770
use blockstack_lib::net::p2p::PeerNetwork;
6871
use blockstack_lib::net::relay::Relayer;
69-
use blockstack_lib::net::{NakamotoInvData, StacksMessage};
72+
use blockstack_lib::net::{GetNakamotoInvData, HandshakeData, StacksMessage, StacksMessageType};
7073
use blockstack_lib::util_lib::db::sqlite_open;
7174
use blockstack_lib::util_lib::strings::UrlString;
7275
use blockstack_lib::{clarity_cli, cli};
@@ -77,7 +80,7 @@ use stacks_common::codec::{read_next, StacksMessageCodec};
7780
use stacks_common::types::chainstate::{
7881
BlockHeaderHash, BurnchainHeaderHash, StacksAddress, StacksBlockId,
7982
};
80-
use stacks_common::types::net::PeerAddress;
83+
use stacks_common::types::net::{PeerAddress, PeerHost};
8184
use stacks_common::types::sqlite::NO_PARAMS;
8285
use stacks_common::types::MempoolCollectionBehavior;
8386
use stacks_common::util::hash::{hex_bytes, to_hex, Hash160};
@@ -86,6 +89,170 @@ use stacks_common::util::secp256k1::{Secp256k1PrivateKey, Secp256k1PublicKey};
8689
use stacks_common::util::vrf::VRFProof;
8790
use stacks_common::util::{get_epoch_time_ms, sleep_ms};
8891

92+
struct P2PSession {
93+
pub local_peer: LocalPeer,
94+
peer_info: RPCPeerInfoData,
95+
burn_block_hash: BurnchainHeaderHash,
96+
stable_burn_block_hash: BurnchainHeaderHash,
97+
tcp_socket: TcpStream,
98+
seq: u32,
99+
}
100+
101+
impl P2PSession {
102+
/// Make a StacksMessage. Sign it and set a sequence number.
103+
fn make_peer_message(&mut self, payload: StacksMessageType) -> Result<StacksMessage, String> {
104+
let mut msg = StacksMessage::new(
105+
self.peer_info.peer_version,
106+
self.peer_info.network_id,
107+
self.peer_info.burn_block_height,
108+
&self.burn_block_hash,
109+
self.peer_info.stable_burn_block_height,
110+
&self.stable_burn_block_hash,
111+
payload,
112+
);
113+
114+
msg.sign(self.seq, &self.local_peer.private_key)
115+
.map_err(|e| format!("Failed to sign message {:?}: {:?}", &msg, &e))?;
116+
self.seq = self.seq.wrapping_add(1);
117+
118+
Ok(msg)
119+
}
120+
121+
/// Send a p2p message.
122+
/// Returns error text on failure.
123+
fn send_peer_message(&mut self, msg: StacksMessage) -> Result<(), String> {
124+
msg.consensus_serialize(&mut self.tcp_socket)
125+
.map_err(|e| format!("Failed to send message {:?}: {:?}", &msg, &e))
126+
}
127+
128+
/// Receive a p2p message.
129+
/// Returns error text on failure.
130+
fn recv_peer_message(&mut self) -> Result<StacksMessage, String> {
131+
let msg: StacksMessage = read_next(&mut self.tcp_socket)
132+
.map_err(|e| format!("Failed to receive message: {:?}", &e))?;
133+
Ok(msg)
134+
}
135+
136+
/// Begin a p2p session.
137+
/// Synthesizes a LocalPeer from the remote peer's responses to /v2/info and /v2/pox.
138+
/// Performs the initial handshake for you.
139+
///
140+
/// Returns the session handle on success.
141+
/// Returns error text on failure.
142+
pub fn begin(peer_addr: SocketAddr, data_port: u16) -> Result<Self, String> {
143+
let data_addr = match peer_addr {
144+
SocketAddr::V4(v4addr) => {
145+
SocketAddr::V4(SocketAddrV4::new(v4addr.ip().clone(), data_port))
146+
}
147+
SocketAddr::V6(v6addr) => {
148+
SocketAddr::V6(SocketAddrV6::new(v6addr.ip().clone(), data_port, 0, 0))
149+
}
150+
};
151+
152+
// get /v2/info
153+
let peer_info = send_http_request(
154+
&format!("{}", data_addr.ip()),
155+
data_addr.port(),
156+
StacksHttpRequest::new_getinfo(PeerHost::from(data_addr.clone()), None)
157+
.with_header("Connection".to_string(), "close".to_string()),
158+
Duration::from_secs(60),
159+
)
160+
.map_err(|e| format!("Failed to query /v2/info: {:?}", &e))?
161+
.decode_peer_info()
162+
.map_err(|e| format!("Failed to decode response from /v2/info: {:?}", &e))?;
163+
164+
// convert `pox_consensus` and `stable_pox_consensus` into their respective burn block
165+
// hashes
166+
let sort_info = send_http_request(
167+
&format!("{}", data_addr.ip()),
168+
data_addr.port(),
169+
StacksHttpRequest::new_get_sortition_consensus(
170+
PeerHost::from(data_addr.clone()),
171+
&peer_info.pox_consensus,
172+
)
173+
.with_header("Connection".to_string(), "close".to_string()),
174+
Duration::from_secs(60),
175+
)
176+
.map_err(|e| format!("Failed to query /v3/sortitions: {:?}", &e))?
177+
.decode_sortition_info()
178+
.map_err(|e| format!("Failed to decode response from /v3/sortitions: {:?}", &e))?
179+
.pop()
180+
.ok_or_else(|| format!("No sortition returned for {}", &peer_info.pox_consensus))?;
181+
182+
let stable_sort_info = send_http_request(
183+
&format!("{}", data_addr.ip()),
184+
data_addr.port(),
185+
StacksHttpRequest::new_get_sortition_consensus(
186+
PeerHost::from(data_addr.clone()),
187+
&peer_info.stable_pox_consensus,
188+
)
189+
.with_header("Connection".to_string(), "close".to_string()),
190+
Duration::from_secs(60),
191+
)
192+
.map_err(|e| format!("Failed to query stable /v3/sortitions: {:?}", &e))?
193+
.decode_sortition_info()
194+
.map_err(|e| {
195+
format!(
196+
"Failed to decode response from stable /v3/sortitions: {:?}",
197+
&e
198+
)
199+
})?
200+
.pop()
201+
.ok_or_else(|| {
202+
format!(
203+
"No sortition returned for {}",
204+
&peer_info.stable_pox_consensus
205+
)
206+
})?;
207+
208+
let burn_block_hash = sort_info.burn_block_hash;
209+
let stable_burn_block_hash = stable_sort_info.burn_block_hash;
210+
211+
let local_peer = LocalPeer::new(
212+
peer_info.network_id,
213+
peer_info.parent_network_id,
214+
PeerAddress::from_socketaddr(&peer_addr),
215+
peer_addr.port(),
216+
Some(StacksPrivateKey::new()),
217+
u64::MAX,
218+
UrlString::try_from(format!("http://127.0.0.1:{}", data_port).as_str()).unwrap(),
219+
vec![],
220+
);
221+
222+
let tcp_socket = TcpStream::connect(&peer_addr)
223+
.map_err(|e| format!("Failed to open {:?}: {:?}", &peer_addr, &e))?;
224+
225+
let mut session = Self {
226+
local_peer,
227+
peer_info,
228+
burn_block_hash,
229+
stable_burn_block_hash,
230+
tcp_socket,
231+
seq: 0,
232+
};
233+
234+
// perform the handshake
235+
let handshake_data =
236+
StacksMessageType::Handshake(HandshakeData::from_local_peer(&session.local_peer));
237+
let handshake = session.make_peer_message(handshake_data)?;
238+
session.send_peer_message(handshake)?;
239+
240+
let resp = session.recv_peer_message()?;
241+
match resp.payload {
242+
StacksMessageType::HandshakeAccept(..)
243+
| StacksMessageType::StackerDBHandshakeAccept(..) => {}
244+
x => {
245+
return Err(format!(
246+
"Peer returned unexpected message (expected HandshakeAccept variant): {:?}",
247+
&x
248+
));
249+
}
250+
}
251+
252+
Ok(session)
253+
}
254+
}
255+
89256
#[cfg_attr(test, mutants::skip)]
90257
fn main() {
91258
let mut argv: Vec<String> = env::args().collect();
@@ -975,59 +1142,26 @@ simulating a miner.
9751142
process::exit(1);
9761143
}
9771144

978-
if argv[1] == "get-tenure-inv" {
979-
let chainstate_root_path = &argv[2];
980-
let tip_block_ids = &argv[3..];
981-
let chainstate_path = format!("{}/chainstate", &chainstate_root_path);
982-
let sortition_path = format!("{}/burnchain/sortition", &chainstate_root_path);
1145+
if argv[1] == "getnakamotoinv" {
1146+
let peer_addr: SocketAddr = argv[2].to_socket_addrs().unwrap().next().unwrap();
1147+
let data_port: u16 = argv[3].parse().unwrap();
1148+
let ch = ConsensusHash::from_hex(&argv[4]).unwrap();
9831149

984-
let (chainstate, _) =
985-
StacksChainState::open(false, 0x80000000, &chainstate_path, None).unwrap();
986-
let pox_consts =
987-
PoxConstants::new(900, 100, 80, 0, 0, u64::MAX, u64::MAX, 240, 241, 242, 242);
988-
let sortition_db = SortitionDB::open(&sortition_path, true, pox_consts).unwrap();
989-
990-
let mut invgen = InvGenerator::new();
991-
let tip = SortitionDB::get_canonical_burn_chain_tip(sortition_db.conn()).unwrap();
992-
993-
for tip_block_id in tip_block_ids.iter() {
994-
let tip_block_id = StacksBlockId::from_hex(tip_block_id).unwrap();
995-
let header =
996-
NakamotoChainState::get_block_header_nakamoto(chainstate.db(), &tip_block_id)
997-
.unwrap()
998-
.unwrap();
999-
let sn = SortitionDB::get_block_snapshot_consensus(
1000-
sortition_db.conn(),
1001-
&header.consensus_hash,
1002-
)
1003-
.unwrap()
1004-
.unwrap();
1150+
let mut session = P2PSession::begin(peer_addr, data_port).unwrap();
10051151

1006-
let reward_cycle = sortition_db
1007-
.pox_constants
1008-
.block_height_to_reward_cycle(230, sn.block_height)
1009-
.unwrap();
1152+
// send getnakamotoinv
1153+
let get_nakamoto_inv =
1154+
StacksMessageType::GetNakamotoInv(GetNakamotoInvData { consensus_hash: ch });
10101155

1011-
let bitvec_bools = invgen
1012-
.make_tenure_bitvector(
1013-
&tip,
1014-
&sortition_db,
1015-
&chainstate,
1016-
&header.consensus_hash,
1017-
&header.anchored_header.block_hash(),
1018-
reward_cycle,
1019-
)
1020-
.unwrap();
1021-
let nakamoto_inv = NakamotoInvData::try_from(&bitvec_bools)
1022-
.map_err(|e| {
1023-
warn!("Failed to create a NakamotoInv response: {:?}", &e);
1024-
e
1025-
})
1026-
.unwrap();
1156+
let msg = session.make_peer_message(get_nakamoto_inv).unwrap();
1157+
session.send_peer_message(msg).unwrap();
1158+
let resp = session.recv_peer_message().unwrap();
10271159

1028-
println!("{}: {:?}", tip_block_id, &nakamoto_inv);
1029-
}
1030-
process::exit(0);
1160+
let StacksMessageType::NakamotoInv(inv) = &resp.payload else {
1161+
panic!("Got spurious message: {:?}", &resp);
1162+
};
1163+
1164+
println!("{:?}", inv);
10311165
}
10321166

10331167
if argv[1] == "replay-chainstate" {

0 commit comments

Comments
 (0)