@@ -30,6 +30,8 @@ use std::sync::Arc;
3030pub struct BitcoindRpcClient {
3131 rpc_client : Arc < RpcClient > ,
3232 latest_mempool_timestamp : AtomicU64 ,
33+ mempool_entries_cache : tokio:: sync:: Mutex < HashMap < Txid , MempoolEntry > > ,
34+ mempool_txs_cache : tokio:: sync:: Mutex < HashMap < Txid , ( Transaction , u64 ) > > ,
3335}
3436
3537impl BitcoindRpcClient {
@@ -42,7 +44,9 @@ impl BitcoindRpcClient {
4244
4345 let latest_mempool_timestamp = AtomicU64 :: new ( 0 ) ;
4446
45- Self { rpc_client, latest_mempool_timestamp }
47+ let mempool_entries_cache = tokio:: sync:: Mutex :: new ( HashMap :: new ( ) ) ;
48+ let mempool_txs_cache = tokio:: sync:: Mutex :: new ( HashMap :: new ( ) ) ;
49+ Self { rpc_client, latest_mempool_timestamp, mempool_entries_cache, mempool_txs_cache }
4650 }
4751
4852 pub ( crate ) fn rpc_client ( & self ) -> Arc < RpcClient > {
@@ -160,16 +164,25 @@ impl BitcoindRpcClient {
160164 }
161165 }
162166
163- pub ( crate ) async fn get_mempool_entries ( & self ) -> std:: io:: Result < Vec < MempoolEntry > > {
167+ pub ( crate ) async fn update_mempool_entries_cache ( & self ) -> std:: io:: Result < ( ) > {
164168 let mempool_txids = self . get_raw_mempool ( ) . await ?;
165- let mut mempool_entries = Vec :: with_capacity ( mempool_txids. len ( ) ) ;
169+
170+ let mut mempool_entries_cache = self . mempool_entries_cache . lock ( ) . await ;
171+ mempool_entries_cache. retain ( |txid, _| mempool_txids. contains ( txid) ) ;
172+
166173 for txid in mempool_txids {
167- // Push any entries that haven't been dropped since `getrawmempool`
174+ if mempool_entries_cache. contains_key ( & txid) {
175+ continue ;
176+ }
177+
168178 if let Some ( entry) = self . get_mempool_entry ( txid) . await ? {
169- mempool_entries . push ( entry) ;
179+ mempool_entries_cache . insert ( txid , entry. clone ( ) ) ;
170180 }
171181 }
172- Ok ( mempool_entries)
182+
183+ mempool_entries_cache. shrink_to_fit ( ) ;
184+
185+ Ok ( ( ) )
173186 }
174187
175188 /// Get mempool transactions, alongside their first-seen unix timestamps.
@@ -183,10 +196,14 @@ impl BitcoindRpcClient {
183196 let prev_mempool_time = self . latest_mempool_timestamp . load ( Ordering :: Relaxed ) ;
184197 let mut latest_time = prev_mempool_time;
185198
186- let mempool_entries = self . get_mempool_entries ( ) . await ?;
187- let mut txs_to_emit = Vec :: new ( ) ;
199+ self . update_mempool_entries_cache ( ) . await ?;
200+
201+ let mempool_entries_cache = self . mempool_entries_cache . lock ( ) . await ;
202+ let mut mempool_txs_cache = self . mempool_txs_cache . lock ( ) . await ;
203+ mempool_txs_cache. retain ( |txid, _| mempool_entries_cache. contains_key ( txid) ) ;
188204
189- for entry in mempool_entries {
205+ let mut txs_to_emit = Vec :: with_capacity ( mempool_entries_cache. len ( ) ) ;
206+ for ( txid, entry) in mempool_entries_cache. iter ( ) {
190207 if entry. time > latest_time {
191208 latest_time = entry. time ;
192209 }
@@ -202,8 +219,14 @@ impl BitcoindRpcClient {
202219 continue ;
203220 }
204221
222+ if let Some ( ( cached_tx, cached_time) ) = mempool_txs_cache. get ( txid) {
223+ txs_to_emit. push ( ( cached_tx. clone ( ) , * cached_time) ) ;
224+ continue ;
225+ }
226+
205227 match self . get_raw_transaction ( & entry. txid ) . await {
206228 Ok ( Some ( tx) ) => {
229+ mempool_txs_cache. insert ( entry. txid , ( tx. clone ( ) , entry. time ) ) ;
207230 txs_to_emit. push ( ( tx, entry. time ) ) ;
208231 } ,
209232 Ok ( None ) => {
0 commit comments