11use std:: collections:: { BTreeMap , HashSet } ;
22use std:: io:: Cursor ;
3- use std:: ops:: { Add , Deref } ;
3+ use std:: ops:: Deref ;
44use std:: sync:: Arc ;
5- use std:: time:: { Duration , Instant , SystemTime } ;
5+ use std:: time:: { Instant , SystemTime , UNIX_EPOCH } ;
66
77use lightning:: ln:: msgs:: { ChannelAnnouncement , ChannelUpdate , UnsignedChannelAnnouncement , UnsignedChannelUpdate } ;
88use lightning:: routing:: gossip:: NetworkGraph ;
@@ -79,7 +79,6 @@ pub(super) async fn connect_to_db() -> (Client, Connection<Socket, NoTlsStream>)
7979/// after `last_sync_timestamp`
8080pub ( super ) async fn fetch_channel_announcements < L : Deref > ( delta_set : & mut DeltaSet , network_graph : Arc < NetworkGraph < L > > , client : & Client , last_sync_timestamp : u32 , logger : L ) where L :: Target : Logger {
8181 log_info ! ( logger, "Obtaining channel ids from network graph" ) ;
82- let last_sync_timestamp_object = SystemTime :: UNIX_EPOCH . add ( Duration :: from_secs ( last_sync_timestamp as u64 ) ) ;
8382 let channel_ids = {
8483 let read_only_graph = network_graph. read_only ( ) ;
8584 log_info ! ( logger, "Retrieved read-only network graph copy" ) ;
@@ -89,12 +88,17 @@ pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaS
8988 . map ( |c| c. 1 . announcement_message . as_ref ( ) . unwrap ( ) . contents . short_channel_id as i64 )
9089 . collect :: < Vec < _ > > ( )
9190 } ;
91+ #[ cfg( test) ]
92+ log_info ! ( logger, "Channel IDs: {:?}" , channel_ids) ;
93+ log_info ! ( logger, "Last sync timestamp: {}" , last_sync_timestamp) ;
94+ let last_sync_timestamp_float = last_sync_timestamp as f64 ;
9295
9396 log_info ! ( logger, "Obtaining corresponding database entries" ) ;
9497 // get all the channel announcements that are currently in the network graph
9598 let announcement_rows = client. query_raw ( "SELECT announcement_signed, seen FROM channel_announcements WHERE short_channel_id = any($1) ORDER BY short_channel_id ASC" , [ & channel_ids] ) . await . unwrap ( ) ;
9699 let mut pinned_rows = Box :: pin ( announcement_rows) ;
97100
101+ let mut announcement_count = 0 ;
98102 while let Some ( row_res) = pinned_rows. next ( ) . await {
99103 let current_announcement_row = row_res. unwrap ( ) ;
100104 let blob: Vec < u8 > = current_announcement_row. get ( "announcement_signed" ) ;
@@ -110,7 +114,10 @@ pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaS
110114 announcement : unsigned_announcement,
111115 seen : current_seen_timestamp,
112116 } ) ;
117+
118+ announcement_count += 1 ;
113119 }
120+ log_info ! ( logger, "Fetched {} announcement rows" , announcement_count) ;
114121
115122 {
116123 // THIS STEP IS USED TO DETERMINE IF A CHANNEL SHOULD BE OMITTED FROM THE DELTA
@@ -124,7 +131,7 @@ pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaS
124131 // here is where the channels whose first update in either direction occurred after
125132 // `last_seen_timestamp` are added to the selection
126133 let params: [ & ( dyn tokio_postgres:: types:: ToSql + Sync ) ; 2 ] =
127- [ & channel_ids, & last_sync_timestamp_object ] ;
134+ [ & channel_ids, & last_sync_timestamp_float ] ;
128135 let newer_oldest_directional_updates = client. query_raw ( "
129136 SELECT * FROM (
130137 SELECT DISTINCT ON (short_channel_id) *
@@ -136,10 +143,11 @@ pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaS
136143 ) AS directional_last_seens
137144 ORDER BY short_channel_id ASC, seen DESC
138145 ) AS distinct_chans
139- WHERE distinct_chans.seen >= $2
146+ WHERE distinct_chans.seen >= TO_TIMESTAMP($2)
140147 " , params) . await . unwrap ( ) ;
141148 let mut pinned_updates = Box :: pin ( newer_oldest_directional_updates) ;
142149
150+ let mut newer_oldest_directional_update_count = 0 ;
143151 while let Some ( row_res) = pinned_updates. next ( ) . await {
144152 let current_row = row_res. unwrap ( ) ;
145153
@@ -151,7 +159,10 @@ pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaS
151159 let current_channel_delta = delta_set. entry ( scid as u64 ) . or_insert ( ChannelDelta :: default ( ) ) ;
152160 // first time a channel was seen in both directions
153161 ( * current_channel_delta) . first_bidirectional_updates_seen = Some ( current_seen_timestamp) ;
162+
163+ newer_oldest_directional_update_count += 1 ;
154164 }
165+ log_info ! ( logger, "Fetched {} update rows of the first update in a new direction" , newer_oldest_directional_update_count) ;
155166 }
156167
157168 {
@@ -161,7 +172,7 @@ pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaS
161172 // Steps:
162173 // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC
163174 // — From those updates, select distinct by (scid), ordered by seen ASC (to obtain the older one per direction)
164- let reminder_threshold_timestamp = SystemTime :: now ( ) . checked_sub ( config:: CHANNEL_REMINDER_AGE ) . unwrap ( ) ;
175+ let reminder_threshold_timestamp = SystemTime :: now ( ) . checked_sub ( config:: CHANNEL_REMINDER_AGE ) . unwrap ( ) . duration_since ( UNIX_EPOCH ) . unwrap ( ) . as_secs ( ) as f64 ;
165176
166177 let params: [ & ( dyn tokio_postgres:: types:: ToSql + Sync ) ; 2 ] =
167178 [ & channel_ids, & reminder_threshold_timestamp] ;
@@ -176,10 +187,11 @@ pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaS
176187 ) AS directional_last_seens
177188 ORDER BY short_channel_id ASC, seen ASC
178189 ) AS distinct_chans
179- WHERE distinct_chans.seen <= $2
190+ WHERE distinct_chans.seen <= TO_TIMESTAMP($2)
180191 " , params) . await . unwrap ( ) ;
181192 let mut pinned_updates = Box :: pin ( older_latest_directional_updates) ;
182193
194+ let mut older_latest_directional_update_count = 0 ;
183195 while let Some ( row_res) = pinned_updates. next ( ) . await {
184196 let current_row = row_res. unwrap ( ) ;
185197 let scid: i64 = current_row. get ( "short_channel_id" ) ;
@@ -211,13 +223,15 @@ pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaS
211223 // we don't send reminders if we don't have the channel
212224 continue ;
213225 }
226+ older_latest_directional_update_count += 1 ;
214227 }
228+ log_info ! ( logger, "Fetched {} update rows of the latest update in the less recently updated direction" , older_latest_directional_update_count) ;
215229 }
216230}
217231
218232pub ( super ) async fn fetch_channel_updates < L : Deref > ( delta_set : & mut DeltaSet , client : & Client , last_sync_timestamp : u32 , logger : L ) where L :: Target : Logger {
219233 let start = Instant :: now ( ) ;
220- let last_sync_timestamp_object = SystemTime :: UNIX_EPOCH . add ( Duration :: from_secs ( last_sync_timestamp as u64 ) ) ;
234+ let last_sync_timestamp_float = last_sync_timestamp as f64 ;
221235
222236 // get the latest channel update in each direction prior to last_sync_timestamp, provided
223237 // there was an update in either direction that happened after the last sync (to avoid
@@ -227,14 +241,14 @@ pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, cl
227241 WHERE id IN (
228242 SELECT DISTINCT ON (short_channel_id, direction) id
229243 FROM channel_updates
230- WHERE seen < $1 AND short_channel_id IN (
244+ WHERE seen < TO_TIMESTAMP($1) AND short_channel_id IN (
231245 SELECT DISTINCT ON (short_channel_id) short_channel_id
232246 FROM channel_updates
233- WHERE seen >= $1
247+ WHERE seen >= TO_TIMESTAMP($1)
234248 )
235249 ORDER BY short_channel_id ASC, direction ASC, seen DESC
236250 )
237- " , [ last_sync_timestamp_object ] ) . await . unwrap ( ) ;
251+ " , [ last_sync_timestamp_float ] ) . await . unwrap ( ) ;
238252 let mut pinned_rows = Box :: pin ( reference_rows) ;
239253
240254 log_info ! ( logger, "Fetched reference rows in {:?}" , start. elapsed( ) ) ;
@@ -275,8 +289,8 @@ pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, cl
275289 let intermediate_updates = client. query_raw ( "
276290 SELECT id, direction, blob_signed, seen
277291 FROM channel_updates
278- WHERE seen >= $1
279- " , [ last_sync_timestamp_object ] ) . await . unwrap ( ) ;
292+ WHERE seen >= TO_TIMESTAMP($1)
293+ " , [ last_sync_timestamp_float ] ) . await . unwrap ( ) ;
280294 let mut pinned_updates = Box :: pin ( intermediate_updates) ;
281295 log_info ! ( logger, "Fetched intermediate rows in {:?}" , start. elapsed( ) ) ;
282296
0 commit comments