@@ -36,6 +36,8 @@ use std::collections::{BTreeMap, HashMap, HashSet};
36
36
use std:: fs:: File ;
37
37
use std:: io:: prelude:: * ;
38
38
use std:: io:: BufReader ;
39
+ use std:: net:: { SocketAddr , TcpStream , ToSocketAddrs } ;
40
+ use std:: time:: Duration ;
39
41
use std:: { env, fs, io, process, thread} ;
40
42
41
43
use blockstack_lib:: burnchains:: bitcoin:: { spv, BitcoinNetworkType } ;
@@ -62,10 +64,12 @@ use blockstack_lib::clarity::vm::ClarityVersion;
62
64
use blockstack_lib:: core:: { MemPoolDB , * } ;
63
65
use blockstack_lib:: cost_estimates:: metrics:: UnitMetric ;
64
66
use blockstack_lib:: cost_estimates:: UnitEstimator ;
67
+ use blockstack_lib:: net:: api:: getinfo:: RPCPeerInfoData ;
65
68
use blockstack_lib:: net:: db:: LocalPeer ;
69
+ use blockstack_lib:: net:: httpcore:: { send_http_request, StacksHttpRequest } ;
66
70
use blockstack_lib:: net:: p2p:: PeerNetwork ;
67
71
use blockstack_lib:: net:: relay:: Relayer ;
68
- use blockstack_lib:: net:: StacksMessage ;
72
+ use blockstack_lib:: net:: { GetNakamotoInvData , HandshakeData , StacksMessage , StacksMessageType } ;
69
73
use blockstack_lib:: util_lib:: db:: sqlite_open;
70
74
use blockstack_lib:: util_lib:: strings:: UrlString ;
71
75
use blockstack_lib:: { clarity_cli, cli} ;
@@ -76,7 +80,7 @@ use stacks_common::codec::{read_next, StacksMessageCodec};
76
80
use stacks_common:: types:: chainstate:: {
77
81
BlockHeaderHash , BurnchainHeaderHash , StacksAddress , StacksBlockId ,
78
82
} ;
79
- use stacks_common:: types:: net:: PeerAddress ;
83
+ use stacks_common:: types:: net:: { PeerAddress , PeerHost } ;
80
84
use stacks_common:: types:: sqlite:: NO_PARAMS ;
81
85
use stacks_common:: types:: MempoolCollectionBehavior ;
82
86
use stacks_common:: util:: hash:: { hex_bytes, to_hex, Hash160 } ;
@@ -85,6 +89,164 @@ use stacks_common::util::secp256k1::{Secp256k1PrivateKey, Secp256k1PublicKey};
85
89
use stacks_common:: util:: vrf:: VRFProof ;
86
90
use stacks_common:: util:: { get_epoch_time_ms, sleep_ms} ;
87
91
92
+ struct P2PSession {
93
+ pub local_peer : LocalPeer ,
94
+ peer_info : RPCPeerInfoData ,
95
+ burn_block_hash : BurnchainHeaderHash ,
96
+ stable_burn_block_hash : BurnchainHeaderHash ,
97
+ tcp_socket : TcpStream ,
98
+ seq : u32 ,
99
+ }
100
+
101
+ impl P2PSession {
102
+ /// Make a StacksMessage. Sign it and set a sequence number.
103
+ fn make_peer_message ( & mut self , payload : StacksMessageType ) -> Result < StacksMessage , String > {
104
+ let mut msg = StacksMessage :: new (
105
+ self . peer_info . peer_version ,
106
+ self . peer_info . network_id ,
107
+ self . peer_info . burn_block_height ,
108
+ & self . burn_block_hash ,
109
+ self . peer_info . stable_burn_block_height ,
110
+ & self . stable_burn_block_hash ,
111
+ payload,
112
+ ) ;
113
+
114
+ msg. sign ( self . seq , & self . local_peer . private_key )
115
+ . map_err ( |e| format ! ( "Failed to sign message {:?}: {:?}" , & msg, & e) ) ?;
116
+ self . seq = self . seq . wrapping_add ( 1 ) ;
117
+
118
+ Ok ( msg)
119
+ }
120
+
121
+ /// Send a p2p message.
122
+ /// Returns error text on failure.
123
+ fn send_peer_message ( & mut self , msg : StacksMessage ) -> Result < ( ) , String > {
124
+ msg. consensus_serialize ( & mut self . tcp_socket )
125
+ . map_err ( |e| format ! ( "Failed to send message {:?}: {:?}" , & msg, & e) )
126
+ }
127
+
128
+ /// Receive a p2p message.
129
+ /// Returns error text on failure.
130
+ fn recv_peer_message ( & mut self ) -> Result < StacksMessage , String > {
131
+ let msg: StacksMessage = read_next ( & mut self . tcp_socket )
132
+ . map_err ( |e| format ! ( "Failed to receive message: {:?}" , & e) ) ?;
133
+ Ok ( msg)
134
+ }
135
+
136
+ /// Begin a p2p session.
137
+ /// Synthesizes a LocalPeer from the remote peer's responses to /v2/info and /v2/pox.
138
+ /// Performs the initial handshake for you.
139
+ ///
140
+ /// Returns the session handle on success.
141
+ /// Returns error text on failure.
142
+ pub fn begin ( peer_addr : SocketAddr , data_port : u16 ) -> Result < Self , String > {
143
+ let mut data_addr = peer_addr. clone ( ) ;
144
+ data_addr. set_port ( data_port) ;
145
+
146
+ // get /v2/info
147
+ let peer_info = send_http_request (
148
+ & format ! ( "{}" , data_addr. ip( ) ) ,
149
+ data_addr. port ( ) ,
150
+ StacksHttpRequest :: new_getinfo ( PeerHost :: from ( data_addr. clone ( ) ) , None )
151
+ . with_header ( "Connection" . to_string ( ) , "close" . to_string ( ) ) ,
152
+ Duration :: from_secs ( 60 ) ,
153
+ )
154
+ . map_err ( |e| format ! ( "Failed to query /v2/info: {:?}" , & e) ) ?
155
+ . decode_peer_info ( )
156
+ . map_err ( |e| format ! ( "Failed to decode response from /v2/info: {:?}" , & e) ) ?;
157
+
158
+ // convert `pox_consensus` and `stable_pox_consensus` into their respective burn block
159
+ // hashes
160
+ let sort_info = send_http_request (
161
+ & format ! ( "{}" , data_addr. ip( ) ) ,
162
+ data_addr. port ( ) ,
163
+ StacksHttpRequest :: new_get_sortition_consensus (
164
+ PeerHost :: from ( data_addr. clone ( ) ) ,
165
+ & peer_info. pox_consensus ,
166
+ )
167
+ . with_header ( "Connection" . to_string ( ) , "close" . to_string ( ) ) ,
168
+ Duration :: from_secs ( 60 ) ,
169
+ )
170
+ . map_err ( |e| format ! ( "Failed to query /v3/sortitions: {:?}" , & e) ) ?
171
+ . decode_sortition_info ( )
172
+ . map_err ( |e| format ! ( "Failed to decode response from /v3/sortitions: {:?}" , & e) ) ?
173
+ . pop ( )
174
+ . ok_or_else ( || format ! ( "No sortition returned for {}" , & peer_info. pox_consensus) ) ?;
175
+
176
+ let stable_sort_info = send_http_request (
177
+ & format ! ( "{}" , data_addr. ip( ) ) ,
178
+ data_addr. port ( ) ,
179
+ StacksHttpRequest :: new_get_sortition_consensus (
180
+ PeerHost :: from ( data_addr. clone ( ) ) ,
181
+ & peer_info. stable_pox_consensus ,
182
+ )
183
+ . with_header ( "Connection" . to_string ( ) , "close" . to_string ( ) ) ,
184
+ Duration :: from_secs ( 60 ) ,
185
+ )
186
+ . map_err ( |e| format ! ( "Failed to query stable /v3/sortitions: {:?}" , & e) ) ?
187
+ . decode_sortition_info ( )
188
+ . map_err ( |e| {
189
+ format ! (
190
+ "Failed to decode response from stable /v3/sortitions: {:?}" ,
191
+ & e
192
+ )
193
+ } ) ?
194
+ . pop ( )
195
+ . ok_or_else ( || {
196
+ format ! (
197
+ "No sortition returned for {}" ,
198
+ & peer_info. stable_pox_consensus
199
+ )
200
+ } ) ?;
201
+
202
+ let burn_block_hash = sort_info. burn_block_hash ;
203
+ let stable_burn_block_hash = stable_sort_info. burn_block_hash ;
204
+
205
+ let local_peer = LocalPeer :: new (
206
+ peer_info. network_id ,
207
+ peer_info. parent_network_id ,
208
+ PeerAddress :: from_socketaddr ( & peer_addr) ,
209
+ peer_addr. port ( ) ,
210
+ Some ( StacksPrivateKey :: new ( ) ) ,
211
+ u64:: MAX ,
212
+ UrlString :: try_from ( format ! ( "http://127.0.0.1:{}" , data_port) . as_str ( ) ) . unwrap ( ) ,
213
+ vec ! [ ] ,
214
+ ) ;
215
+
216
+ let tcp_socket = TcpStream :: connect ( & peer_addr)
217
+ . map_err ( |e| format ! ( "Failed to open {:?}: {:?}" , & peer_addr, & e) ) ?;
218
+
219
+ let mut session = Self {
220
+ local_peer,
221
+ peer_info,
222
+ burn_block_hash,
223
+ stable_burn_block_hash,
224
+ tcp_socket,
225
+ seq : 0 ,
226
+ } ;
227
+
228
+ // perform the handshake
229
+ let handshake_data =
230
+ StacksMessageType :: Handshake ( HandshakeData :: from_local_peer ( & session. local_peer ) ) ;
231
+ let handshake = session. make_peer_message ( handshake_data) ?;
232
+ session. send_peer_message ( handshake) ?;
233
+
234
+ let resp = session. recv_peer_message ( ) ?;
235
+ match resp. payload {
236
+ StacksMessageType :: HandshakeAccept ( ..)
237
+ | StacksMessageType :: StackerDBHandshakeAccept ( ..) => { }
238
+ x => {
239
+ return Err ( format ! (
240
+ "Peer returned unexpected message (expected HandshakeAccept variant): {:?}" ,
241
+ & x
242
+ ) ) ;
243
+ }
244
+ }
245
+
246
+ Ok ( session)
247
+ }
248
+ }
249
+
88
250
#[ cfg_attr( test, mutants:: skip) ]
89
251
fn main ( ) {
90
252
let mut argv: Vec < String > = env:: args ( ) . collect ( ) ;
@@ -974,6 +1136,36 @@ simulating a miner.
974
1136
process:: exit ( 1 ) ;
975
1137
}
976
1138
1139
+ if argv[ 1 ] == "getnakamotoinv" {
1140
+ if argv. len ( ) < 5 {
1141
+ eprintln ! (
1142
+ "Usage: {} getnakamotoinv HOST:PORT DATA_PORT CONSENSUS_HASH" ,
1143
+ & argv[ 0 ]
1144
+ ) ;
1145
+ process:: exit ( 1 ) ;
1146
+ }
1147
+
1148
+ let peer_addr: SocketAddr = argv[ 2 ] . to_socket_addrs ( ) . unwrap ( ) . next ( ) . unwrap ( ) ;
1149
+ let data_port: u16 = argv[ 3 ] . parse ( ) . unwrap ( ) ;
1150
+ let ch = ConsensusHash :: from_hex ( & argv[ 4 ] ) . unwrap ( ) ;
1151
+
1152
+ let mut session = P2PSession :: begin ( peer_addr, data_port) . unwrap ( ) ;
1153
+
1154
+ // send getnakamotoinv
1155
+ let get_nakamoto_inv =
1156
+ StacksMessageType :: GetNakamotoInv ( GetNakamotoInvData { consensus_hash : ch } ) ;
1157
+
1158
+ let msg = session. make_peer_message ( get_nakamoto_inv) . unwrap ( ) ;
1159
+ session. send_peer_message ( msg) . unwrap ( ) ;
1160
+ let resp = session. recv_peer_message ( ) . unwrap ( ) ;
1161
+
1162
+ let StacksMessageType :: NakamotoInv ( inv) = & resp. payload else {
1163
+ panic ! ( "Got spurious message: {:?}" , & resp) ;
1164
+ } ;
1165
+
1166
+ println ! ( "{:?}" , inv) ;
1167
+ }
1168
+
977
1169
if argv[ 1 ] == "replay-chainstate" {
978
1170
if argv. len ( ) < 7 {
979
1171
eprintln ! ( "Usage: {} OLD_CHAINSTATE_PATH OLD_SORTITION_DB_PATH OLD_BURNCHAIN_DB_PATH NEW_CHAINSTATE_PATH NEW_BURNCHAIN_DB_PATH" , & argv[ 0 ] ) ;
0 commit comments