@@ -9,22 +9,29 @@ use plerkle_serialization::serializer::serialize_account;
99use plerkle_snapshot:: append_vec:: StoredMeta ;
1010use solana_sdk:: account:: { Account , AccountSharedData , ReadableAccount } ;
1111use std:: error:: Error ;
12- use std:: sync:: atomic:: AtomicU64 ;
13- use std:: sync:: atomic:: Ordering ;
1412use std:: sync:: Arc ;
1513use tokio:: sync:: Mutex ;
1614
1715use crate :: accounts_selector:: AccountsSelector ;
1816
1917const ACCOUNT_STREAM_KEY : & str = "ACC" ;
18+ // the upper limit of accounts stream length for when the snapshot is in progress
19+ const MAX_INTERMEDIATE_STREAM_LEN : u64 = 50_000_000 ;
20+ // every PROCESSED_CHECKPOINT we check the stream length and reset the local stream_counter
21+ const PROCESSED_CHECKPOINT : u64 = 20_000_000 ;
2022
2123#[ derive( Clone ) ]
2224pub ( crate ) struct GeyserDumper {
2325 messenger : Arc < Mutex < RedisMessenger > > ,
2426 throttle_nanos : u64 ,
2527 accounts_selector : AccountsSelector ,
2628 pub accounts_spinner : ProgressBar ,
27- pub accounts_count : Arc < AtomicU64 > ,
29+ /// how many accounts were processed in total during the snapshot run.
30+ pub accounts_count : u64 ,
31+ /// intermediate counter of accounts sent to regulate XLEN checks.
32+ /// the reason for a separate field is that we initialize it as the current
33+ /// stream length, which might be non-zero.
34+ pub stream_counter : u64 ,
2835}
2936
3037impl GeyserDumper {
@@ -61,13 +68,18 @@ impl GeyserDumper {
6168 messenger
6269 . set_buffer_size ( ACCOUNT_STREAM_KEY , 100_000_000 )
6370 . await ;
71+ let initial_stream_len = messenger
72+ . stream_len ( & ACCOUNT_STREAM_KEY )
73+ . await
74+ . expect ( "get initial stream len of accounts" ) ;
6475
6576 Self {
6677 messenger : Arc :: new ( Mutex :: new ( messenger) ) ,
6778 accounts_spinner,
6879 accounts_selector,
69- accounts_count : Arc :: new ( AtomicU64 :: new ( 0 ) ) ,
80+ accounts_count : 0 ,
7081 throttle_nanos,
82+ stream_counter : initial_stream_len,
7183 }
7284 }
7385
@@ -76,6 +88,21 @@ impl GeyserDumper {
7688 ( meta, account) : ( StoredMeta , AccountSharedData ) ,
7789 slot : u64 ,
7890 ) -> Result < ( ) , Box < dyn Error > > {
91+ if self . stream_counter >= PROCESSED_CHECKPOINT {
92+ loop {
93+ let stream_len = self
94+ . messenger
95+ . lock ( )
96+ . await
97+ . stream_len ( ACCOUNT_STREAM_KEY )
98+ . await ?;
99+ if stream_len < MAX_INTERMEDIATE_STREAM_LEN {
100+ self . stream_counter = 0 ;
101+ break ;
102+ }
103+ tokio:: time:: sleep ( std:: time:: Duration :: from_secs ( 1 ) ) . await ;
104+ }
105+ }
79106 if self
80107 . accounts_selector
81108 . is_account_selected ( meta. pubkey . as_ref ( ) , account. owner ( ) . as_ref ( ) )
@@ -116,18 +143,19 @@ impl GeyserDumper {
116143 return Ok ( ( ) ) ;
117144 }
118145
119- let prev = self . accounts_count . fetch_add ( 1 , Ordering :: Relaxed ) ;
120- self . accounts_spinner . set_position ( prev + 1 ) ;
146+ self . accounts_count += 1 ;
147+ self . accounts_spinner . set_position ( self . accounts_count ) ;
148+ self . stream_counter += 1 ;
121149
122150 if self . throttle_nanos > 0 {
123151 tokio:: time:: sleep ( std:: time:: Duration :: from_nanos ( self . throttle_nanos ) ) . await ;
124152 }
153+
125154 Ok ( ( ) )
126155 }
127156
128157 pub async fn force_flush ( self ) {
129- self . accounts_spinner
130- . set_position ( self . accounts_count . load ( Ordering :: Relaxed ) ) ;
158+ self . accounts_spinner . set_position ( self . accounts_count ) ;
131159 self . accounts_spinner
132160 . finish_with_message ( "Finished processing snapshot!" ) ;
133161 let messenger_mutex = Arc :: into_inner ( self . messenger )
0 commit comments