11use crate :: tinydancer:: { endpoint, ClientService , ClientStatus , Cluster } ;
22use crate :: { convert_to_websocket, send_rpc_call, try_coerce_shred} ;
3+ use anyhow:: anyhow;
34use async_trait:: async_trait;
45use crossbeam:: channel:: { Receiver , Sender } ;
56use futures:: Sink ;
@@ -43,7 +44,6 @@ use tokio::{
4344} ;
4445use tungstenite:: { connect, Message } ;
4546use url:: Url ;
46- use anyhow:: anyhow;
4747
4848pub const SHRED_CF : & str = "archived_shreds" ;
4949
@@ -57,6 +57,7 @@ pub struct SampleServiceConfig {
5757 pub archive_config : ArchiveConfig ,
5858 pub instance : Arc < rocksdb:: DB > ,
5959 pub status_sampler : Arc < Mutex < ClientStatus > > ,
60+ pub sample_qty : usize ,
6061}
6162
6263#[ derive( Clone , Debug ) ]
@@ -96,6 +97,7 @@ impl ClientService<SampleServiceConfig> for SampleService {
9697 rpc_url,
9798 shred_tx,
9899 status_arc,
100+ config. sample_qty ,
99101 ) ) ) ;
100102
101103 // verify shreds + store in db in shred_archiver
@@ -137,16 +139,16 @@ pub async fn request_shreds(
137139 endpoint : String ,
138140) -> Result < GetShredResponse , serde_json:: Error > {
139141 let request = serde_json:: json!( {
140- "jsonrpc" : "2.0" ,
141- "id" : 1 ,
142- "method" : "getShreds" ,
143- "params" : [
144- slot,
145- indices,
146- { "commitment" : "confirmed" }
147- ]
148- } ) // getting one shred just to get max shreds per slot, can maybe randomize the selection here
149- . to_string ( ) ;
142+ "jsonrpc" : "2.0" ,
143+ "id" : 1 ,
144+ "method" : "getShreds" ,
145+ "params" : [
146+ slot,
147+ indices,
148+ { "commitment" : "confirmed" }
149+ ]
150+ } ) // getting one shred just to get max shreds per slot, can maybe randomize the selection here
151+ . to_string ( ) ;
150152
151153 let res = send_rpc_call ! ( endpoint, request) ;
152154 // info!("{:?}", res);
@@ -165,9 +167,9 @@ async fn slot_update_loop(
165167 * status = ClientStatus :: Crashed ( String :: from ( "Client can't connect to socket" ) ) ;
166168 None
167169 }
168- } ;
169-
170- if result. is_none ( ) {
170+ } ;
171+
172+ if result. is_none ( ) {
171173 return Err ( anyhow ! ( "" ) ) ;
172174 }
173175 let ( mut socket, _response) = result. unwrap ( ) ;
@@ -179,8 +181,7 @@ async fn slot_update_loop(
179181 loop {
180182 match socket. read_message ( ) {
181183 Ok ( msg) => {
182- let res =
183- serde_json:: from_str :: < SlotSubscribeResponse > ( msg. to_string ( ) . as_str ( ) ) ;
184+ let res = serde_json:: from_str :: < SlotSubscribeResponse > ( msg. to_string ( ) . as_str ( ) ) ;
184185
185186 // info!("res: {:?}", msg.to_string().as_str());
186187 if let Ok ( res) = res {
@@ -189,10 +190,7 @@ async fn slot_update_loop(
189190 info ! ( "slot updated: {:?}" , res. params. result. root) ;
190191 }
191192 Err ( e) => {
192- info ! (
193- "error here: {:?} {:?}" ,
194- e, res. params. result. root as u64
195- ) ;
193+ info ! ( "error here: {:?} {:?}" , e, res. params. result. root as u64 ) ;
196194 continue ; // @TODO: we should add retries here incase send fails for some reason
197195 }
198196 }
@@ -205,57 +203,57 @@ async fn slot_update_loop(
205203
206204macro_rules! unwrap_or_return {
207205 ( Result $var: ident) => {
208- if let Err ( e) = $var {
206+ if let Err ( e) = $var {
209207 return Err ( e. into( ) ) ;
210- } else {
208+ } else {
211209 $var. unwrap( )
212210 }
213211 } ;
214212 ( Option $var: ident $err: expr) => {
215- if $var. is_none( ) {
213+ if $var. is_none( ) {
216214 return Err ( anyhow!( $err) ) ;
217- } else {
215+ } else {
218216 $var. unwrap( )
219217 }
220218 } ;
221219 ( OptionRef $var: ident $err: expr) => {
222- if $var. is_none( ) {
220+ if $var. is_none( ) {
223221 return Err ( anyhow!( $err) ) ;
224- } else {
222+ } else {
225223 $var. as_ref( ) . unwrap( )
226224 }
227225 } ;
228226}
229227
230228async fn get_shreds_and_leader_for_slot (
231229 slot : u64 ,
232- endpoint : & String
233- ) -> anyhow :: Result < ( Vec < Option < Shred > > , Pubkey ) > {
234-
230+ endpoint : & String ,
231+ sample_qty : usize ,
232+ ) -> anyhow :: Result < ( Vec < Option < Shred > > , Pubkey ) > {
235233 // get shred length (max_shreds_per_slot)
236234 let first_shred = request_shreds ( slot as usize , vec ! [ 0 ] , endpoint. clone ( ) ) . await ;
237235 let first_shred = unwrap_or_return ! ( Result first_shred) ;
238236
239- let first_shred = & first_shred. result . shreds [ 1 ] ;
237+ let first_shred = & first_shred. result . shreds [ 1 ] ;
240238 let first_shred = unwrap_or_return ! ( OptionRef first_shred "first shred not found" ) ;
241239
242240 let max_shreds_per_slot = {
243- if let Some ( data_shred) = & first_shred. shred_data {
241+ if let Some ( data_shred) = & first_shred. shred_data {
244242 Shred :: ShredData ( data_shred. clone ( ) )
245243 . num_data_shreds ( )
246244 . expect ( "num data shreds error" )
247- } else if let Some ( code_shred) = & first_shred. shred_code {
245+ } else if let Some ( code_shred) = & first_shred. shred_code {
248246 Shred :: ShredCode ( code_shred. clone ( ) )
249- . num_coding_shreds ( )
250- . expect ( "num code shreds error" )
247+ . num_coding_shreds ( )
248+ . expect ( "num code shreds error" )
251249 } else {
252250 // todo
253251 return Err ( anyhow ! ( "shred isnt either data or code type" ) ) ;
254252 }
255253 } ;
256254
257255 // get a random sample of shreds
258- let mut shred_indices_for_slot = gen_random_indices ( max_shreds_per_slot as usize , 10 ) ; // unwrap only temporary
256+ let mut shred_indices_for_slot = gen_random_indices ( max_shreds_per_slot as usize , sample_qty ) ; // unwrap only temporary
259257 shred_indices_for_slot. push ( 0_usize ) ;
260258 info ! ( "indices of: {:?} {:?}" , shred_indices_for_slot, slot) ;
261259
@@ -276,9 +274,7 @@ async fn get_shreds_and_leader_for_slot(
276274 . collect ( ) ;
277275
278276 // info!("before leader");
279- let leader = solana_ledger:: shred:: Pubkey :: from_str (
280- shreds_for_slot. result . leader . as_str ( ) ,
281- ) ?;
277+ let leader = solana_ledger:: shred:: Pubkey :: from_str ( shreds_for_slot. result . leader . as_str ( ) ) ?;
282278
283279 // info!("leader {:?}", leader);
284280 let mut fullfill_count = AtomicU32 :: new ( 0u32 ) ;
@@ -311,8 +307,7 @@ async fn get_shreds_and_leader_for_slot(
311307 }
312308 } ) ;
313309
314- if ( fullfill_count. get_mut ( ) . to_owned ( ) as usize ) < shred_indices_for_slot. len ( )
315- {
310+ if ( fullfill_count. get_mut ( ) . to_owned ( ) as usize ) < shred_indices_for_slot. len ( ) {
316311 info ! ( "Received incomplete number of shreds, requested {:?} shreds for slot {:?} and received {:?}" , shred_indices_for_slot. len( ) , slot, fullfill_count) ;
317312 }
318313
@@ -324,11 +319,12 @@ async fn shred_update_loop(
324319 endpoint : String ,
325320 shred_tx : Sender < ( Vec < Option < Shred > > , solana_ledger:: shred:: Pubkey ) > ,
326321 status_sampler : Arc < Mutex < ClientStatus > > ,
322+ sample_qty : usize ,
327323) -> anyhow:: Result < ( ) > {
328324 loop {
329325 {
330326 let mut status = status_sampler. lock ( ) . unwrap ( ) ;
331- if let ClientStatus :: Crashed ( _) = & * status {
327+ if let ClientStatus :: Crashed ( _) = & * status {
332328 return Err ( anyhow ! ( "Client crashed" ) ) ;
333329 } else {
334330 * status = ClientStatus :: Active ( String :: from (
@@ -338,8 +334,8 @@ async fn shred_update_loop(
338334 }
339335
340336 if let Ok ( slot) = slot_update_rx. recv ( ) {
341- let shreds = get_shreds_and_leader_for_slot ( slot, & endpoint) . await ;
342- if let Err ( e) = shreds {
337+ let shreds = get_shreds_and_leader_for_slot ( slot, & endpoint, sample_qty ) . await ;
338+ if let Err ( e) = shreds {
343339 info ! ( "{}" , e) ;
344340 continue ;
345341 }
@@ -354,7 +350,7 @@ async fn shred_update_loop(
354350
355351// use solana_ledger::shred::dispatch;
356352
357- // verifies the merkle proof of the shread
353+ // verifies the merkle proof of the shread
358354pub fn verify_sample ( shred : & Shred , leader : solana_ledger:: shred:: Pubkey ) -> bool {
359355 // @TODO fix error handling here
360356 let verify_merkle_root = match shred {
@@ -412,7 +408,6 @@ pub async fn shred_verify_loop(
412408 }
413409}
414410
415-
416411// store verified shreds in db
417412pub async fn shred_archiver (
418413 verified_shred_rx : Receiver < ( Shred , solana_ledger:: shred:: Pubkey ) > ,
@@ -455,10 +450,9 @@ pub async fn shred_archiver(
455450 }
456451}
457452
458-
459- pub async fn pull_and_verify_shreds ( slot : usize , endpoint : String ) -> bool {
460- let shreds = get_shreds_and_leader_for_slot ( slot as u64 , & endpoint) . await ;
461- if let Err ( e) = shreds {
453+ pub async fn pull_and_verify_shreds ( slot : usize , endpoint : String , sample_qty : usize ) -> bool {
454+ let shreds = get_shreds_and_leader_for_slot ( slot as u64 , & endpoint, sample_qty) . await ;
455+ if let Err ( e) = shreds {
462456 info ! ( "{}" , e) ;
463457 return false ;
464458 }
0 commit comments