@@ -43,6 +43,10 @@ pub struct Emitter<'c, C> {
4343 /// The last emitted block during our last mempool emission. This is used to determine whether
4444 /// there has been a reorg since our last mempool emission.
4545 last_mempool_tip : Option < u32 > ,
46+
47+ /// Unconfirmed txids that are expected to appear in mempool. This is used to determine if any
48+ /// known txids have been evicted.
49+ expected_mempool_txids : HashSet < Txid > ,
4650}
4751
4852impl < ' c , C : bitcoincore_rpc:: RpcApi > Emitter < ' c , C > {
@@ -53,7 +57,15 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
5357 ///
5458 /// `start_height` starts emission from a given height (if there are no conflicts with the
5559 /// original chain).
56- pub fn new ( client : & ' c C , last_cp : CheckPoint , start_height : u32 ) -> Self {
60+ ///
61+ /// `expected_mempool_txids` is the initial set of unconfirmed txids. Once at tip, any that are
62+ /// no longer in mempool are marked evicted.
63+ pub fn new (
64+ client : & ' c C ,
65+ last_cp : CheckPoint ,
66+ start_height : u32 ,
67+ expected_mempool_txids : HashSet < Txid > ,
68+ ) -> Self {
5769 Self {
5870 client,
5971 start_height,
@@ -64,7 +76,14 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
6476 }
6577 }
6678
67- /// Emit mempool transactions, alongside their first-seen unix timestamps.
79+ /// Emit mempool transactions and any evicted [`Txid`]s. Returns a `latest_update_time` which is
80+ /// used for setting the timestamp for evicted transactions.
81+ ///
82+ /// This method returns a [`MempoolEvent`] containing the full transactions (with their
83+ /// first-seen unix timestamps) that were emitted, and [`MempoolEvent::evicted_txids`] which are
84+ /// any [`Txid`]s which were previously expected and are now missing from the mempool. Note that
85+ /// [`Txid`]s are only evicted if the emitter is at the chain tip with the same height and hash
86+ /// as the best block from the RPC.
6887 ///
6988 /// This method emits each transaction only once, unless we cannot guarantee the transaction's
7089 /// ancestors are already emitted.
@@ -84,6 +103,32 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
84103 // `start_height` has been emitted.
85104 . unwrap_or ( self . start_height . saturating_sub ( 1 ) ) ;
86105
106+ // Get the raw mempool result from the RPC client which will be used to determine if any
107+ // transactions have been evicted.
108+ let raw_mempool = client. get_raw_mempool_verbose ( ) ?;
109+ let raw_mempool_txids: HashSet < Txid > = raw_mempool. keys ( ) . copied ( ) . collect ( ) ;
110+
111+ // Determine if height and hash matches the best block from the RPC. Evictions are deferred
112+ // if we are not at the best block.
113+ let height = client. get_block_count ( ) ?;
114+ let at_tip = if height != self . last_cp . height ( ) as u64 {
115+ false
116+ } else {
117+ // Verify if block hash matches in case of re-org.
118+ client. get_block_hash ( height) ? == self . last_cp . hash ( )
119+ } ;
120+
121+ // If at tip, any expected txid missing from raw mempool is considered evicted;
122+ // if not at tip, we don't evict anything.
123+ let mut evicted_txids: HashSet < Txid > = if at_tip {
124+ self . expected_mempool_txids
125+ . difference ( & raw_mempool_txids)
126+ . copied ( )
127+ . collect ( )
128+ } else {
129+ HashSet :: new ( )
130+ } ;
131+
87132 // Mempool txs come with a timestamp of when the tx is introduced to the mempool. We keep
88133 // track of the latest mempool tx's timestamp to determine whether we have seen a tx
89134 // before. `prev_mempool_time` is the previous timestamp and `last_time` records what will
@@ -128,7 +173,20 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
128173 self . last_mempool_time = latest_time;
129174 self . last_mempool_tip = Some ( self . last_cp . height ( ) ) ;
130175
131- Ok ( txs_to_emit)
176+ // If at tip, we replace `expected_mempool_txids` with just the new txids. Otherwise, we’re
177+ // still catching up to the tip and keep accumulating.
178+ if at_tip {
179+ self . expected_mempool_txids = new_txs. iter ( ) . map ( |( tx, _) | tx. compute_txid ( ) ) . collect ( ) ;
180+ } else {
181+ self . expected_mempool_txids
182+ . extend ( new_txs. iter ( ) . map ( |( tx, _) | tx. compute_txid ( ) ) ) ;
183+ }
184+
185+ Ok ( MempoolEvent {
186+ new_txs,
187+ evicted_txids,
188+ latest_update_time : latest_time as u64 ,
189+ } )
132190 }
133191
134192 /// Emit the next block height and header (if any).
@@ -139,11 +197,38 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
139197
140198 /// Emit the next block height and block (if any).
141199 pub fn next_block ( & mut self ) -> Result < Option < BlockEvent < Block > > , bitcoincore_rpc:: Error > {
142- Ok ( poll ( self , |hash| self . client . get_block ( hash) ) ?
143- . map ( |( checkpoint, block) | BlockEvent { block, checkpoint } ) )
200+ if let Some ( ( checkpoint, block) ) = poll ( self , |hash| self . client . get_block ( hash) ) ? {
201+ // Stop tracking unconfirmed transactions that have been confirmed in this block.
202+ for tx in & block. txdata {
203+ self . expected_mempool_txids . remove ( & tx. compute_txid ( ) ) ;
204+ }
205+ return Ok ( Some ( BlockEvent { block, checkpoint } ) ) ;
206+ }
207+ Ok ( None )
144208 }
145209}
146210
211+ /// A new emission from mempool.
212+ #[ derive( Debug ) ]
213+ pub struct MempoolEvent {
214+ /// Unemitted transactions or transactions with ancestors that are unseen by the receiver.
215+ ///
216+ /// To understand the second condition, consider a receiver which filters transactions based on
217+ /// whether it alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction
218+ /// spends a tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to
219+ /// block of height `h-1`, we want to re-emit this transaction until the receiver has seen the
220+ /// block at height `h`.
221+ pub new_txs : Vec < ( Transaction , u64 ) > ,
222+
223+ /// [`Txid`]s of all transactions that have been evicted from mempool.
224+ pub evicted_txids : HashSet < Txid > ,
225+
226+ /// The latest timestamp of when a transaction entered the mempool.
227+ ///
228+ /// This is useful for setting the timestamp for evicted transactions.
229+ pub latest_update_time : u64 ,
230+ }
231+
147232/// A newly emitted block from [`Emitter`].
148233#[ derive( Debug ) ]
149234pub struct BlockEvent < B > {
0 commit comments