88 } ,
99 solana_commitment_config:: CommitmentConfig ,
1010 solana_keypair:: Keypair ,
11- solana_pubkey:: Pubkey ,
1211 solana_rpc_client:: http_sender:: HttpSender ,
1312 std:: {
1413 collections:: HashMap ,
@@ -43,11 +42,14 @@ use {
4342 transactions:: SendTransactionRequest ,
4443 util:: WaitShutdown ,
4544 } ,
46- yellowstone_jet_tpu_client:: yellowstone_grpc:: sender:: {
47- Blocklist , Endpoints , NewYellowstoneTpuSender , YellowstoneTpuSender ,
48- create_yellowstone_tpu_sender,
45+ yellowstone_jet_tpu_client:: {
46+ core:: { TpuSenderResponse , TpuSenderResponseCallback } ,
47+ yellowstone_grpc:: sender:: {
48+ Endpoints , NewYellowstoneTpuSender , ShieldBlockList , YellowstoneTpuSender ,
49+ create_yellowstone_tpu_sender_with_callback,
50+ } ,
4951 } ,
50- yellowstone_shield_store:: { PolicyStore , PolicyStoreTrait } ,
52+ yellowstone_shield_store:: PolicyStore ,
5153} ;
5254
5355#[ cfg( not( target_env = "msvc" ) ) ]
@@ -194,18 +196,45 @@ async fn run_jet(
194196 grpc : config. upstream . grpc . endpoint . clone ( ) ,
195197 grpc_x_token : config. upstream . grpc . x_token . clone ( ) ,
196198 } ;
199+ #[ derive( Clone ) ]
200+ struct LoggingCallback ;
201+
202+ impl TpuSenderResponseCallback for LoggingCallback {
203+ fn call ( & self , response : TpuSenderResponse ) {
204+ use std:: io:: Write ;
205+ let mut stdout = std:: io:: stdout ( ) ;
206+ match response {
207+ TpuSenderResponse :: TxSent ( info) => {
208+ writeln ! (
209+ & mut stdout,
210+ "Transaction {} send to {}" ,
211+ info. tx_sig, info. remote_peer_identity
212+ )
213+ . expect ( "writeln" ) ;
214+ }
215+ TpuSenderResponse :: TxFailed ( info) => {
216+ writeln ! ( & mut stdout, "Transaction failed: {}" , info. tx_sig) . expect ( "writeln" ) ;
217+ }
218+ TpuSenderResponse :: TxDrop ( info) => {
219+ for ( txn, _) in info. dropped_tx_vec {
220+ writeln ! ( & mut stdout, "Transaction dropped: {}" , txn. tx_sig)
221+ . expect ( "writeln" ) ;
222+ }
223+ }
224+ }
225+ }
226+ }
197227 let NewYellowstoneTpuSender {
198228 sender,
199229 related_objects_jh,
200- response,
201- } = create_yellowstone_tpu_sender (
230+ } = create_yellowstone_tpu_sender_with_callback (
202231 Default :: default ( ) ,
203232 initial_identity. insecure_clone ( ) ,
204233 tpu_sender_endpoints,
234+ LoggingCallback ,
205235 )
206236 . await
207237 . expect ( "yellowstone-tpu-sender" ) ;
208- drop ( response) ; // drop response handle, we don't need it
209238
210239 let ah = tg. spawn ( async move {
211240 related_objects_jh
@@ -225,7 +254,12 @@ async fn run_jet(
225254 proxy_preflight_check : false ,
226255 } ;
227256
228- let ah = tg. spawn ( tpu_sender_loop ( scheduler_out, sender, shield_policy_store) ) ;
257+ let ah = tg. spawn ( tpu_sender_loop (
258+ scheduler_out,
259+ sender,
260+ shield_policy_store,
261+ jet_cancellation_token. child_token ( ) ,
262+ ) ) ;
229263
230264 tg_name_map. insert ( ah. id ( ) , "tpu_sender_loop" . to_string ( ) ) ;
231265
@@ -386,22 +420,12 @@ pub async fn tpu_sender_loop(
386420 mut incoming : UnboundedReceiver < Arc < SendTransactionRequest > > ,
387421 mut tpu_sender : YellowstoneTpuSender ,
388422 shield : Option < PolicyStore > ,
423+ cancellation_token : CancellationToken ,
389424) {
390- struct ShieldBlocklist < ' a > {
391- shield : & ' a PolicyStore ,
392- policies : & ' a [ Pubkey ] ,
393- }
394-
395- impl Blocklist for ShieldBlocklist < ' _ > {
396- fn is_blocked ( & self , account : & Pubkey ) -> bool {
397- self . shield
398- . snapshot ( )
399- . is_allowed ( self . policies , account)
400- . unwrap_or ( true )
401- }
402- }
403-
404425 while let Some ( request) = incoming. recv ( ) . await {
426+ if cancellation_token. is_cancelled ( ) {
427+ break ;
428+ }
405429 let request = Arc :: unwrap_or_clone ( request) ;
406430 let SendTransactionRequest {
407431 signature,
@@ -411,13 +435,14 @@ pub async fn tpu_sender_loop(
411435 policies,
412436 } = request;
413437
414- let blocklist = shield. as_ref ( ) . map ( |shield| ShieldBlocklist {
415- shield,
416- policies : & policies,
438+ let blocklist = shield. as_ref ( ) . map ( |shield| ShieldBlockList {
439+ policy_store : shield,
440+ shield_policy_addresses : & policies,
441+ default_return_value : false ,
417442 } ) ;
418443
419444 let result = tpu_sender
420- . send_txn_with_blocklist ( signature, wire_transaction, blocklist)
445+ . send_txn_fanout_with_blocklist ( signature, wire_transaction, 2 , blocklist)
421446 . await ;
422447 if let Err ( e) = result {
423448 tracing:: error!(
0 commit comments