@@ -12,7 +12,7 @@ use lightning::chain::Listen;
1212use lightning_block_sync:: http:: HttpEndpoint ;
1313use lightning_block_sync:: http:: JsonResponse ;
1414use lightning_block_sync:: poll:: ValidatedBlockHeader ;
15- use lightning_block_sync:: rpc:: RpcClient ;
15+ use lightning_block_sync:: rpc:: { RpcClient , RpcError } ;
1616use lightning_block_sync:: {
1717 AsyncBlockSourceResult , BlockData , BlockHeaderData , BlockSource , Cache ,
1818} ;
@@ -24,10 +24,12 @@ use bitcoin::{BlockHash, FeeRate, Transaction, Txid};
2424use base64:: prelude:: { Engine , BASE64_STANDARD } ;
2525
2626use std:: collections:: { HashMap , VecDeque } ;
27+ use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
2728use std:: sync:: Arc ;
2829
2930pub struct BitcoindRpcClient {
3031 rpc_client : Arc < RpcClient > ,
32+ latest_mempool_timestamp : AtomicU64 ,
3133}
3234
3335impl BitcoindRpcClient {
@@ -41,7 +43,9 @@ impl BitcoindRpcClient {
4143 . expect ( "RpcClient::new is actually infallible" ) ,
4244 ) ;
4345
44- Self { rpc_client }
46+ let latest_mempool_timestamp = AtomicU64 :: new ( 0 ) ;
47+
48+ Self { rpc_client, latest_mempool_timestamp }
4549 }
4650
4751 pub ( crate ) async fn broadcast_transaction ( & self , tx : & Transaction ) -> std:: io:: Result < Txid > {
@@ -70,6 +74,99 @@ impl BitcoindRpcClient {
7074 . await
7175 . map ( |resp| resp. 0 )
7276 }
77+
78+ pub ( crate ) async fn get_raw_transaction (
79+ & self , txid : & Txid ,
80+ ) -> std:: io:: Result < Option < Transaction > > {
81+ let txid_hex = bitcoin:: consensus:: encode:: serialize_hex ( txid) ;
82+ let txid_json = serde_json:: json!( txid_hex) ;
83+ match self
84+ . rpc_client
85+ . call_method :: < GetRawTransactionResponse > ( "getrawtransaction" , & vec ! [ txid_json] )
86+ . await
87+ {
88+ Ok ( resp) => Ok ( Some ( resp. 0 ) ) ,
89+ Err ( e) => match e. into_inner ( ) {
90+ Some ( inner) => {
91+ let rpc_error_res: Result < Box < RpcError > , _ > = inner. downcast ( ) ;
92+
93+ match rpc_error_res {
94+ Ok ( rpc_error) => {
95+ // Check if it's the 'not found' error code.
96+ if rpc_error. code == -5 {
97+ Ok ( None )
98+ } else {
99+ Err ( std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , rpc_error) )
100+ }
101+ } ,
102+ Err ( _) => Err ( std:: io:: Error :: new (
103+ std:: io:: ErrorKind :: Other ,
104+ "Failed to process getrawtransaction response" ,
105+ ) ) ,
106+ }
107+ } ,
108+ None => Err ( std:: io:: Error :: new (
109+ std:: io:: ErrorKind :: Other ,
110+ "Failed to process getrawtransaction response" ,
111+ ) ) ,
112+ } ,
113+ }
114+ }
115+
116+ pub ( crate ) async fn get_raw_mempool ( & self ) -> std:: io:: Result < Vec < RawMempoolEntry > > {
117+ let verbose_flag_json = serde_json:: json!( true ) ;
118+ self . rpc_client
119+ . call_method :: < GetRawMempoolResponse > ( "getrawmempool" , & vec ! [ verbose_flag_json] )
120+ . await
121+ . map ( |resp| resp. 0 )
122+ }
123+
124+ /// Get mempool transactions, alongside their first-seen unix timestamps.
125+ ///
126+ /// This method is an adapted version of `bdk_bitcoind_rpc::Emitter::mempool`. It emits each
127+ /// transaction only once, unless we cannot assume the transaction's ancestors are already
128+ /// emitted.
129+ pub ( crate ) async fn get_mempool_transactions_and_timestamp_at_height (
130+ & self , best_processed_height : u32 ,
131+ ) -> std:: io:: Result < Vec < ( Transaction , u64 ) > > {
132+ let prev_mempool_time = self . latest_mempool_timestamp . load ( Ordering :: Relaxed ) ;
133+ let mut latest_time = prev_mempool_time;
134+
135+ let mempool_entries = self . get_raw_mempool ( ) . await ?;
136+ let mut txs_to_emit = Vec :: new ( ) ;
137+
138+ for entry in mempool_entries {
139+ if entry. time > latest_time {
140+ latest_time = entry. time ;
141+ }
142+
143+ // Avoid emitting transactions that are already emitted if we can guarantee
144+ // blocks containing ancestors are already emitted. The bitcoind rpc interface
145+ // provides us with the block height that the tx is introduced to the mempool.
146+ // If we have already emitted the block of height, we can assume that all
147+ // ancestor txs have been processed by the receiver.
148+ let ancestor_within_height = entry. height <= best_processed_height;
149+ let is_already_emitted = entry. time <= prev_mempool_time;
150+ if is_already_emitted && ancestor_within_height {
151+ continue ;
152+ }
153+
154+ match self . get_raw_transaction ( & entry. txid ) . await {
155+ Ok ( Some ( tx) ) => {
156+ txs_to_emit. push ( ( tx, entry. time ) ) ;
157+ } ,
158+ Ok ( None ) => {
159+ continue ;
160+ } ,
161+ Err ( e) => return Err ( e) ,
162+ } ;
163+ }
164+
165+ if !txs_to_emit. is_empty ( ) {
166+ self . latest_mempool_timestamp . store ( latest_time, Ordering :: Release ) ;
167+ }
168+ Ok ( txs_to_emit)
169+ }
73170}
74171
75172impl BlockSource for BitcoindRpcClient {
@@ -132,6 +229,91 @@ impl TryInto<MempoolMinFeeResponse> for JsonResponse {
132229 }
133230}
134231
232+ pub struct GetRawTransactionResponse ( pub Transaction ) ;
233+
234+ impl TryInto < GetRawTransactionResponse > for JsonResponse {
235+ type Error = std:: io:: Error ;
236+ fn try_into ( self ) -> std:: io:: Result < GetRawTransactionResponse > {
237+ let tx = self
238+ . 0
239+ . as_str ( )
240+ . ok_or ( std:: io:: Error :: new (
241+ std:: io:: ErrorKind :: Other ,
242+ "Failed to parse getrawtransaction response" ,
243+ ) )
244+ . and_then ( |s| {
245+ bitcoin:: consensus:: encode:: deserialize_hex ( s) . map_err ( |_| {
246+ std:: io:: Error :: new (
247+ std:: io:: ErrorKind :: Other ,
248+ "Failed to parse getrawtransaction response" ,
249+ )
250+ } )
251+ } ) ?;
252+
253+ Ok ( GetRawTransactionResponse ( tx) )
254+ }
255+ }
256+
257+ pub struct GetRawMempoolResponse ( Vec < RawMempoolEntry > ) ;
258+
259+ impl TryInto < GetRawMempoolResponse > for JsonResponse {
260+ type Error = std:: io:: Error ;
261+ fn try_into ( self ) -> std:: io:: Result < GetRawMempoolResponse > {
262+ let mut mempool_transactions = Vec :: new ( ) ;
263+ let res = self . 0 . as_object ( ) . ok_or ( std:: io:: Error :: new (
264+ std:: io:: ErrorKind :: Other ,
265+ "Failed to parse getrawmempool response" ,
266+ ) ) ?;
267+
268+ for ( k, v) in res {
269+ let txid = match bitcoin:: consensus:: encode:: deserialize_hex ( k) {
270+ Ok ( txid) => txid,
271+ Err ( _) => {
272+ return Err ( std:: io:: Error :: new (
273+ std:: io:: ErrorKind :: Other ,
274+ "Failed to parse getrawmempool response" ,
275+ ) ) ;
276+ } ,
277+ } ;
278+
279+ let time = match v[ "time" ] . as_u64 ( ) {
280+ Some ( time) => time,
281+ None => {
282+ return Err ( std:: io:: Error :: new (
283+ std:: io:: ErrorKind :: Other ,
284+ "Failed to parse getrawmempool response" ,
285+ ) ) ;
286+ } ,
287+ } ;
288+
289+ let height = match v[ "height" ] . as_u64 ( ) . and_then ( |h| h. try_into ( ) . ok ( ) ) {
290+ Some ( height) => height,
291+ None => {
292+ return Err ( std:: io:: Error :: new (
293+ std:: io:: ErrorKind :: Other ,
294+ "Failed to parse getrawmempool response" ,
295+ ) ) ;
296+ } ,
297+ } ;
298+ let entry = RawMempoolEntry { txid, time, height } ;
299+
300+ mempool_transactions. push ( entry) ;
301+ }
302+
303+ Ok ( GetRawMempoolResponse ( mempool_transactions) )
304+ }
305+ }
306+
307+ #[ derive( Debug , Clone ) ]
308+ pub ( crate ) struct RawMempoolEntry {
309+ /// The transaction id
310+ txid : Txid ,
311+ /// Local time transaction entered pool in seconds since 1 Jan 1970 GMT
312+ time : u64 ,
313+ /// Block height when transaction entered pool
314+ height : u32 ,
315+ }
316+
135317#[ derive( Debug , Clone , Serialize ) ]
136318#[ serde( rename_all = "UPPERCASE" ) ]
137319pub ( crate ) enum FeeRateEstimationMode {
0 commit comments