11use std:: future:: IntoFuture ;
22use std:: time:: Duration ;
33
4- use alloy_provider:: Provider ;
54use anyhow:: Result ;
65use futures:: future:: BoxFuture ;
6+ use katana_gateway_types:: BlockId ;
77use katana_primitives:: block:: BlockNumber ;
8- use katana_starknet:: StarknetCore ;
98use tokio:: sync:: watch;
109use tracing:: { error, info} ;
1110
1211pub type TipWatcherFut = BoxFuture < ' static , Result < ( ) > > ;
1312
13+ /// A trait for abstracting the source of the latest block number.
14+ ///
15+ /// This allows the chain tip watcher to work with different sources such as:
16+ /// - Starknet Core Contract on L1 (Ethereum) - tracks the settled/proven tip
17+ /// - Starknet RPC endpoints - tracks the latest L2 tip
18+ /// - Feeder Gateway - tracks the latest L2 tip from the sequencer
19+ pub trait ChainTipProvider : Send + Sync {
20+ /// Retrieves the latest block number from the source.
21+ ///
22+ /// # Returns
23+ ///
24+ /// Returns the latest block number.
25+ fn latest_number ( & self ) -> BoxFuture < ' _ , Result < BlockNumber > > ;
26+ }
27+
1428pub struct ChainTipWatcher < P > {
15- /// The Starknet Core Contract client for fetching the latest block.
16- core_contract : StarknetCore < P > ,
29+ /// The block number provider for fetching the latest block.
30+ tip_provider : P ,
1731 /// Interval for checking the new tip.
1832 watch_interval : Duration ,
1933 /// Watch channel for notifying subscribers of the latest tip.
2034 tip_sender : watch:: Sender < BlockNumber > ,
2135}
2236
23- impl < P : alloy_provider :: Provider > ChainTipWatcher < P > {
24- pub fn new ( core_contract : StarknetCore < P > ) -> Self {
37+ impl < P : ChainTipProvider > ChainTipWatcher < P > {
38+ pub fn new ( provider : P ) -> Self {
2539 let ( tip_tx, _) = watch:: channel ( 0 ) ;
2640 let watch_interval = Duration :: from_secs ( 30 ) ;
27- Self { core_contract , watch_interval, tip_sender : tip_tx }
41+ Self { tip_provider : provider , watch_interval, tip_sender : tip_tx }
2842 }
2943
3044 /// Set the watch interval for checking new tips.
@@ -47,7 +61,7 @@ impl<P: alloy_provider::Provider> ChainTipWatcher<P> {
4761 let mut prev_tip: BlockNumber = 0 ;
4862
4963 loop {
50- let block_number = self . core_contract . state_block_number ( ) . await ? as BlockNumber ;
64+ let block_number = self . tip_provider . latest_number ( ) . await ?;
5165
5266 if prev_tip != block_number {
5367 info ! ( block = %block_number, "New tip found." ) ;
@@ -64,7 +78,7 @@ impl<P: alloy_provider::Provider> ChainTipWatcher<P> {
6478 }
6579}
6680
67- impl < P : Provider + ' static > IntoFuture for ChainTipWatcher < P > {
81+ impl < P : ChainTipProvider + ' static > IntoFuture for ChainTipWatcher < P > {
6882 type Output = Result < ( ) > ;
6983 type IntoFuture = TipWatcherFut ;
7084
@@ -77,10 +91,10 @@ impl<P: Provider + 'static> IntoFuture for ChainTipWatcher<P> {
7791 }
7892}
7993
80- impl < P : std :: fmt :: Debug > std:: fmt:: Debug for ChainTipWatcher < P > {
94+ impl < P > std:: fmt:: Debug for ChainTipWatcher < P > {
8195 fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
8296 f. debug_struct ( "ChainTipWatcher" )
83- . field ( "core_contract " , & self . core_contract )
97+ . field ( "provider " , & "ChainTipProvider" )
8498 . field ( "subscribers" , & self . tip_sender . receiver_count ( ) )
8599 . field ( "watch_interval" , & self . watch_interval )
86100 . finish ( )
@@ -109,3 +123,126 @@ impl std::fmt::Debug for TipSubscription {
109123 f. debug_struct ( "TipSubscription" ) . field ( "current_tip" , & self . tip ( ) ) . finish ( )
110124 }
111125}
126+
127+ /// Implementation of [`ChainTipProvider`] for the feeder gateway client.
128+ ///
129+ /// This fetches the latest L2 block number directly from the Starknet feeder gateway,
130+ /// which may be ahead of the L1 settlement.
131+ impl ChainTipProvider for katana_gateway_client:: Client {
132+ fn latest_number ( & self ) -> BoxFuture < ' _ , Result < BlockNumber > > {
133+ Box :: pin ( async move {
134+ let block = self . get_block ( BlockId :: Latest ) . await ?;
135+ block. block_number . ok_or_else ( || anyhow:: anyhow!( "Block number not available" ) )
136+ } )
137+ }
138+ }
139+
140+ #[ cfg( test) ]
141+ mod tests {
142+ use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
143+ use std:: sync:: Arc ;
144+
145+ use super :: * ;
146+
147+ /// Mock provider that returns a sequence of block numbers from an atomic counter.
148+ #[ derive( Clone ) ]
149+ struct MockProvider {
150+ counter : Arc < AtomicU64 > ,
151+ }
152+
153+ impl MockProvider {
154+ fn new ( initial : BlockNumber ) -> Self {
155+ Self { counter : Arc :: new ( AtomicU64 :: new ( initial) ) }
156+ }
157+
158+ fn set ( & self , value : BlockNumber ) {
159+ self . counter . store ( value, Ordering :: SeqCst ) ;
160+ }
161+
162+ fn increment ( & self ) {
163+ self . counter . fetch_add ( 1 , Ordering :: SeqCst ) ;
164+ }
165+ }
166+
167+ impl ChainTipProvider for MockProvider {
168+ fn latest_number ( & self ) -> BoxFuture < ' _ , Result < BlockNumber > > {
169+ let value = self . counter . load ( Ordering :: SeqCst ) ;
170+ Box :: pin ( async move { Ok ( value) } )
171+ }
172+ }
173+
174+ #[ tokio:: test]
175+ async fn tip_updates_are_broadcast_to_subscribers ( ) {
176+ let provider = MockProvider :: new ( 100 ) ;
177+ let watcher = ChainTipWatcher :: new ( provider. clone ( ) ) . interval ( Duration :: from_millis ( 10 ) ) ;
178+
179+ let mut sub1 = watcher. subscribe ( ) ;
180+ let sub2 = watcher. subscribe ( ) ;
181+
182+ // Initial value should be 0 (default)
183+ assert_eq ! ( sub1. tip( ) , 0 ) ;
184+ assert_eq ! ( sub2. tip( ) , 0 ) ;
185+
186+ // Spawn the watcher task
187+ let handle = tokio:: spawn ( async move { watcher. run ( ) . await } ) ;
188+
189+ // Wait for first update (block 100)
190+ let tip = sub1. changed ( ) . await . unwrap ( ) ;
191+ assert_eq ! ( tip, 100 ) ;
192+ assert_eq ! ( sub2. tip( ) , 100 ) ;
193+
194+ // Update provider and wait for new tip
195+ provider. set ( 150 ) ;
196+ tokio:: time:: sleep ( Duration :: from_millis ( 20 ) ) . await ;
197+
198+ let tip = sub1. changed ( ) . await . unwrap ( ) ;
199+ assert_eq ! ( tip, 150 ) ;
200+ assert_eq ! ( sub2. tip( ) , 150 ) ;
201+
202+ handle. abort ( ) ;
203+ }
204+
205+ #[ tokio:: test]
206+ async fn duplicate_tips_are_not_rebroadcast ( ) {
207+ let provider = MockProvider :: new ( 100 ) ;
208+ let watcher = ChainTipWatcher :: new ( provider. clone ( ) ) . interval ( Duration :: from_millis ( 10 ) ) ;
209+
210+ let mut sub = watcher. subscribe ( ) ;
211+
212+ let handle = tokio:: spawn ( async move { watcher. run ( ) . await } ) ;
213+
214+ // Wait for first update
215+ let tip = sub. changed ( ) . await . unwrap ( ) ;
216+ assert_eq ! ( tip, 100 ) ;
217+
218+ // Keep the same tip for multiple intervals
219+ tokio:: time:: sleep ( Duration :: from_millis ( 50 ) ) . await ;
220+
221+ // Should timeout as no new update is expected
222+ let result = tokio:: time:: timeout ( Duration :: from_millis ( 30 ) , sub. changed ( ) ) . await ;
223+ assert ! ( result. is_err( ) , "Should not receive duplicate tip updates" ) ;
224+
225+ handle. abort ( ) ;
226+ }
227+
228+ #[ tokio:: test]
229+ async fn monotonically_increasing_tips ( ) {
230+ let provider = MockProvider :: new ( 1 ) ;
231+ let watcher = ChainTipWatcher :: new ( provider. clone ( ) ) . interval ( Duration :: from_millis ( 10 ) ) ;
232+
233+ let mut sub = watcher. subscribe ( ) ;
234+
235+ let handle = tokio:: spawn ( async move { watcher. run ( ) . await } ) ;
236+
237+ // Verify multiple sequential updates
238+ for expected in 1 ..=5 {
239+ let tip = sub. changed ( ) . await . unwrap ( ) ;
240+ assert_eq ! ( tip, expected) ;
241+
242+ provider. increment ( ) ; // Increment the chain tip
243+ tokio:: time:: sleep ( Duration :: from_millis ( 15 ) ) . await ;
244+ }
245+
246+ handle. abort ( ) ;
247+ }
248+ }
0 commit comments