@@ -11,17 +11,21 @@ use {
1111 blockstore:: { Blockstore , CompletedDataSetInfo } ,
1212 deshred_transaction_notifier_interface:: DeshredTransactionNotifierArc ,
1313 } ,
14+ solana_measure:: measure:: Measure ,
15+ solana_message:: { v0:: LoadedAddresses , VersionedMessage } ,
16+ solana_metrics:: * ,
1417 solana_rpc:: { max_slots:: MaxSlots , rpc_subscriptions:: RpcSubscriptions } ,
15- solana_message :: VersionedMessage ,
18+ solana_runtime :: bank_forks :: BankForks ,
1619 solana_signature:: Signature ,
20+ solana_svm_transaction:: message_address_table_lookup:: SVMMessageAddressTableLookup ,
1721 solana_transaction:: {
1822 simple_vote_transaction_checker:: is_simple_vote_transaction_impl,
1923 versioned:: VersionedTransaction ,
2024 } ,
2125 std:: {
2226 sync:: {
2327 atomic:: { AtomicBool , Ordering } ,
24- Arc ,
28+ Arc , RwLock ,
2529 } ,
2630 thread:: { self , Builder , JoinHandle } ,
2731 time:: Duration ,
@@ -45,6 +49,32 @@ fn is_simple_vote_transaction(tx: &VersionedTransaction) -> bool {
4549 is_simple_vote_transaction_impl ( & tx. signatures , is_legacy, instruction_programs)
4650}
4751
52+ /// Load addresses from address lookup tables for a versioned transaction.
53+ /// Returns None for legacy transactions or if address resolution fails.
54+ /// Takes a Bank reference to avoid repeated lock acquisition.
55+ fn load_transaction_addresses (
56+ tx : & VersionedTransaction ,
57+ bank : & solana_runtime:: bank:: Bank ,
58+ ) -> Option < LoadedAddresses > {
59+ let VersionedMessage :: V0 ( message) = & tx. message else {
60+ // Legacy transactions don't have address table lookups
61+ return None ;
62+ } ;
63+
64+ if message. address_table_lookups . is_empty ( ) {
65+ return None ;
66+ }
67+
68+ bank. load_addresses_from_ref (
69+ message
70+ . address_table_lookups
71+ . iter ( )
72+ . map ( SVMMessageAddressTableLookup :: from) ,
73+ )
74+ . ok ( )
75+ . map ( |( addresses, _deactivation_slot) | addresses)
76+ }
77+
4878pub struct CompletedDataSetsService {
4979 thread_hdl : JoinHandle < ( ) > ,
5080}
@@ -57,6 +87,7 @@ impl CompletedDataSetsService {
5787 deshred_transaction_notifier : Option < DeshredTransactionNotifierArc > ,
5888 exit : Arc < AtomicBool > ,
5989 max_slots : Arc < MaxSlots > ,
90+ bank_forks : Arc < RwLock < BankForks > > ,
6091 ) -> Self {
6192 let thread_hdl = Builder :: new ( )
6293 . name ( "solComplDataSet" . to_string ( ) )
@@ -72,6 +103,7 @@ impl CompletedDataSetsService {
72103 & rpc_subscriptions,
73104 & deshred_transaction_notifier,
74105 & max_slots,
106+ & bank_forks,
75107 ) {
76108 break ;
77109 }
@@ -88,21 +120,70 @@ impl CompletedDataSetsService {
88120 rpc_subscriptions : & RpcSubscriptions ,
89121 deshred_transaction_notifier : & Option < DeshredTransactionNotifierArc > ,
90122 max_slots : & Arc < MaxSlots > ,
123+ bank_forks : & RwLock < BankForks > ,
91124 ) -> Result < ( ) , RecvTimeoutError > {
92125 const RECV_TIMEOUT : Duration = Duration :: from_secs ( 1 ) ;
93- let handle_completed_data_set_info = |completed_data_set_info| {
126+
127+ let mut batch_measure = Measure :: start ( "deshred_geyser_batch" ) ;
128+
129+ // Get root bank once per batch to minimize lock contention
130+ let root_bank = deshred_transaction_notifier
131+ . as_ref ( )
132+ . and_then ( |_| bank_forks. read ( ) . ok ( ) )
133+ . map ( |forks| forks. root_bank ( ) ) ;
134+
135+ // Metrics accumulators
136+ let mut total_lut_load_us: u64 = 0 ;
137+ let mut total_notify_us: u64 = 0 ;
138+ let mut total_transactions: u64 = 0 ;
139+ let mut total_entries: u64 = 0 ;
140+ let mut total_data_sets: u64 = 0 ;
141+ let mut lut_transactions: u64 = 0 ;
142+
143+ let handle_completed_data_set_info = |completed_data_set_info : CompletedDataSetInfo ,
144+ total_lut_load_us : & mut u64 ,
145+ total_notify_us : & mut u64 ,
146+ total_transactions : & mut u64 ,
147+ total_entries : & mut u64 ,
148+ total_data_sets : & mut u64 ,
149+ lut_transactions : & mut u64 | {
94150 let CompletedDataSetInfo { slot, indices } = completed_data_set_info;
95151 match blockstore. get_entries_in_data_block ( slot, indices, /*slot_meta:*/ None ) {
96152 Ok ( entries) => {
153+ * total_data_sets += 1 ;
154+ * total_entries += entries. len ( ) as u64 ;
155+
97156 // Notify deshred transactions if notifier is enabled
98157 if let Some ( notifier) = deshred_transaction_notifier {
99158 for entry in entries. iter ( ) {
100159 for tx in & entry. transactions {
101160 if let Some ( signature) = tx. signatures . first ( ) {
161+ * total_transactions += 1 ;
102162 let is_vote = is_simple_vote_transaction ( tx) ;
163+
164+ // Measure LUT loading time
165+ let mut lut_measure = Measure :: start ( "load_lut" ) ;
166+ let loaded_addresses = root_bank
167+ . as_ref ( )
168+ . and_then ( |bank| load_transaction_addresses ( tx, bank) ) ;
169+ lut_measure. stop ( ) ;
170+
171+ if loaded_addresses. is_some ( ) {
172+ * lut_transactions += 1 ;
173+ * total_lut_load_us += lut_measure. as_us ( ) ;
174+ }
175+
176+ // Measure notification time
177+ let mut notify_measure = Measure :: start ( "notify_deshred" ) ;
103178 notifier. notify_deshred_transaction (
104- slot, signature, is_vote, tx,
179+ slot,
180+ signature,
181+ is_vote,
182+ tx,
183+ loaded_addresses. as_ref ( ) ,
105184 ) ;
185+ notify_measure. stop ( ) ;
186+ * total_notify_us += notify_measure. as_us ( ) ;
106187 }
107188 }
108189 }
@@ -118,15 +199,49 @@ impl CompletedDataSetsService {
118199 }
119200 slot
120201 } ;
202+
121203 let slots = completed_sets_receiver
122204 . recv_timeout ( RECV_TIMEOUT )
123205 . map ( std:: iter:: once) ?
124206 . chain ( completed_sets_receiver. try_iter ( ) )
125207 . flatten ( )
126- . map ( handle_completed_data_set_info) ;
208+ . map ( |info| {
209+ handle_completed_data_set_info (
210+ info,
211+ & mut total_lut_load_us,
212+ & mut total_notify_us,
213+ & mut total_transactions,
214+ & mut total_entries,
215+ & mut total_data_sets,
216+ & mut lut_transactions,
217+ )
218+ } ) ;
219+
127220 if let Some ( slot) = slots. max ( ) {
128221 max_slots. shred_insert . fetch_max ( slot, Ordering :: Relaxed ) ;
129222 }
223+
224+ batch_measure. stop ( ) ;
225+
226+ // Report metrics if we processed any transactions
227+ if total_transactions > 0 {
228+ datapoint_info ! (
229+ "deshred_geyser_timing" ,
230+ ( "batch_total_us" , batch_measure. as_us( ) as i64 , i64 ) ,
231+ ( "notify_total_us" , total_notify_us as i64 , i64 ) ,
232+ ( "lut_load_total_us" , total_lut_load_us as i64 , i64 ) ,
233+ ( "transactions_count" , total_transactions as i64 , i64 ) ,
234+ ( "lut_transactions_count" , lut_transactions as i64 , i64 ) ,
235+ ( "entries_count" , total_entries as i64 , i64 ) ,
236+ ( "data_sets_count" , total_data_sets as i64 , i64 ) ,
237+ (
238+ "avg_notify_us" ,
239+ ( total_notify_us / total_transactions) as i64 ,
240+ i64
241+ ) ,
242+ ) ;
243+ }
244+
130245 Ok ( ( ) )
131246 }
132247
0 commit comments