1
1
use std:: collections:: { BTreeMap , HashSet } ;
2
2
use std:: io:: Cursor ;
3
- use std:: ops:: { Add , Deref } ;
3
+ use std:: ops:: Deref ;
4
4
use std:: sync:: Arc ;
5
- use std:: time:: { Duration , Instant , SystemTime } ;
5
+ use std:: time:: { Instant , SystemTime , UNIX_EPOCH } ;
6
6
7
7
use lightning:: ln:: msgs:: { ChannelAnnouncement , ChannelUpdate , UnsignedChannelAnnouncement , UnsignedChannelUpdate } ;
8
8
use lightning:: routing:: gossip:: NetworkGraph ;
@@ -79,7 +79,6 @@ pub(super) async fn connect_to_db() -> (Client, Connection<Socket, NoTlsStream>)
79
79
/// after `last_sync_timestamp`
80
80
pub ( super ) async fn fetch_channel_announcements < L : Deref > ( delta_set : & mut DeltaSet , network_graph : Arc < NetworkGraph < L > > , client : & Client , last_sync_timestamp : u32 , logger : L ) where L :: Target : Logger {
81
81
log_info ! ( logger, "Obtaining channel ids from network graph" ) ;
82
- let last_sync_timestamp_object = SystemTime :: UNIX_EPOCH . add ( Duration :: from_secs ( last_sync_timestamp as u64 ) ) ;
83
82
let channel_ids = {
84
83
let read_only_graph = network_graph. read_only ( ) ;
85
84
log_info ! ( logger, "Retrieved read-only network graph copy" ) ;
@@ -89,28 +88,35 @@ pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaS
89
88
. map ( |c| c. 1 . announcement_message . as_ref ( ) . unwrap ( ) . contents . short_channel_id as i64 )
90
89
. collect :: < Vec < _ > > ( )
91
90
} ;
91
+ #[ cfg( test) ]
92
+ log_info ! ( logger, "Channel IDs: {:?}" , channel_ids) ;
93
+ log_info ! ( logger, "Last sync timestamp: {}" , last_sync_timestamp) ;
94
+ let last_sync_timestamp_float = last_sync_timestamp as f64 ;
92
95
93
96
log_info ! ( logger, "Obtaining corresponding database entries" ) ;
94
97
// get all the channel announcements that are currently in the network graph
95
- let announcement_rows = client. query_raw ( "SELECT announcement_signed, seen FROM channel_announcements WHERE short_channel_id = any($1) ORDER BY short_channel_id ASC" , [ & channel_ids] ) . await . unwrap ( ) ;
98
+ let announcement_rows = client. query_raw ( "SELECT announcement_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM channel_announcements WHERE short_channel_id = any($1) ORDER BY short_channel_id ASC" , [ & channel_ids] ) . await . unwrap ( ) ;
96
99
let mut pinned_rows = Box :: pin ( announcement_rows) ;
97
100
101
+ let mut announcement_count = 0 ;
98
102
while let Some ( row_res) = pinned_rows. next ( ) . await {
99
103
let current_announcement_row = row_res. unwrap ( ) ;
100
104
let blob: Vec < u8 > = current_announcement_row. get ( "announcement_signed" ) ;
101
105
let mut readable = Cursor :: new ( blob) ;
102
106
let unsigned_announcement = ChannelAnnouncement :: read ( & mut readable) . unwrap ( ) . contents ;
103
107
104
108
let scid = unsigned_announcement. short_channel_id ;
105
- let current_seen_timestamp_object: SystemTime = current_announcement_row. get ( "seen" ) ;
106
- let current_seen_timestamp: u32 = current_seen_timestamp_object. duration_since ( SystemTime :: UNIX_EPOCH ) . unwrap ( ) . as_secs ( ) as u32 ;
109
+ let current_seen_timestamp = current_announcement_row. get :: < _ , i64 > ( "seen" ) as u32 ;
107
110
108
111
let current_channel_delta = delta_set. entry ( scid) . or_insert ( ChannelDelta :: default ( ) ) ;
109
112
( * current_channel_delta) . announcement = Some ( AnnouncementDelta {
110
113
announcement : unsigned_announcement,
111
114
seen : current_seen_timestamp,
112
115
} ) ;
116
+
117
+ announcement_count += 1 ;
113
118
}
119
+ log_info ! ( logger, "Fetched {} announcement rows" , announcement_count) ;
114
120
115
121
{
116
122
// THIS STEP IS USED TO DETERMINE IF A CHANNEL SHOULD BE OMITTED FROM THE DELTA
@@ -124,9 +130,9 @@ pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaS
124
130
// here is where the channels whose first update in either direction occurred after
125
131
// `last_seen_timestamp` are added to the selection
126
132
let params: [ & ( dyn tokio_postgres:: types:: ToSql + Sync ) ; 2 ] =
127
- [ & channel_ids, & last_sync_timestamp_object ] ;
133
+ [ & channel_ids, & last_sync_timestamp_float ] ;
128
134
let newer_oldest_directional_updates = client. query_raw ( "
129
- SELECT * FROM (
135
+ SELECT short_channel_id, CAST(EXTRACT('epoch' from distinct_chans.seen) AS BIGINT) AS seen FROM (
130
136
SELECT DISTINCT ON (short_channel_id) *
131
137
FROM (
132
138
SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, seen
@@ -136,22 +142,25 @@ pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaS
136
142
) AS directional_last_seens
137
143
ORDER BY short_channel_id ASC, seen DESC
138
144
) AS distinct_chans
139
- WHERE distinct_chans.seen >= $2
145
+ WHERE distinct_chans.seen >= TO_TIMESTAMP($2)
140
146
" , params) . await . unwrap ( ) ;
141
147
let mut pinned_updates = Box :: pin ( newer_oldest_directional_updates) ;
142
148
149
+ let mut newer_oldest_directional_update_count = 0 ;
143
150
while let Some ( row_res) = pinned_updates. next ( ) . await {
144
151
let current_row = row_res. unwrap ( ) ;
145
152
146
153
let scid: i64 = current_row. get ( "short_channel_id" ) ;
147
- let current_seen_timestamp_object: SystemTime = current_row. get ( "seen" ) ;
148
- let current_seen_timestamp: u32 = current_seen_timestamp_object. duration_since ( SystemTime :: UNIX_EPOCH ) . unwrap ( ) . as_secs ( ) as u32 ;
154
+ let current_seen_timestamp = current_row. get :: < _ , i64 > ( "seen" ) as u32 ;
149
155
150
156
// the newer of the two oldest seen directional updates came after last sync timestamp
151
157
let current_channel_delta = delta_set. entry ( scid as u64 ) . or_insert ( ChannelDelta :: default ( ) ) ;
152
158
// first time a channel was seen in both directions
153
159
( * current_channel_delta) . first_bidirectional_updates_seen = Some ( current_seen_timestamp) ;
160
+
161
+ newer_oldest_directional_update_count += 1 ;
154
162
}
163
+ log_info ! ( logger, "Fetched {} update rows of the first update in a new direction" , newer_oldest_directional_update_count) ;
155
164
}
156
165
157
166
{
@@ -161,7 +170,7 @@ pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaS
161
170
// Steps:
162
171
// — Obtain all updates, distinct by (scid, direction), ordered by seen DESC
163
172
// — From those updates, select distinct by (scid), ordered by seen ASC (to obtain the older one per direction)
164
- let reminder_threshold_timestamp = SystemTime :: now ( ) . checked_sub ( config:: CHANNEL_REMINDER_AGE ) . unwrap ( ) ;
173
+ let reminder_threshold_timestamp = SystemTime :: now ( ) . checked_sub ( config:: CHANNEL_REMINDER_AGE ) . unwrap ( ) . duration_since ( UNIX_EPOCH ) . unwrap ( ) . as_secs ( ) as f64 ;
165
174
166
175
let params: [ & ( dyn tokio_postgres:: types:: ToSql + Sync ) ; 2 ] =
167
176
[ & channel_ids, & reminder_threshold_timestamp] ;
@@ -176,10 +185,11 @@ pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaS
176
185
) AS directional_last_seens
177
186
ORDER BY short_channel_id ASC, seen ASC
178
187
) AS distinct_chans
179
- WHERE distinct_chans.seen <= $2
188
+ WHERE distinct_chans.seen <= TO_TIMESTAMP($2)
180
189
" , params) . await . unwrap ( ) ;
181
190
let mut pinned_updates = Box :: pin ( older_latest_directional_updates) ;
182
191
192
+ let mut older_latest_directional_update_count = 0 ;
183
193
while let Some ( row_res) = pinned_updates. next ( ) . await {
184
194
let current_row = row_res. unwrap ( ) ;
185
195
let scid: i64 = current_row. get ( "short_channel_id" ) ;
@@ -211,13 +221,15 @@ pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaS
211
221
// we don't send reminders if we don't have the channel
212
222
continue ;
213
223
}
224
+ older_latest_directional_update_count += 1 ;
214
225
}
226
+ log_info ! ( logger, "Fetched {} update rows of the latest update in the less recently updated direction" , older_latest_directional_update_count) ;
215
227
}
216
228
}
217
229
218
230
pub ( super ) async fn fetch_channel_updates < L : Deref > ( delta_set : & mut DeltaSet , client : & Client , last_sync_timestamp : u32 , logger : L ) where L :: Target : Logger {
219
231
let start = Instant :: now ( ) ;
220
- let last_sync_timestamp_object = SystemTime :: UNIX_EPOCH . add ( Duration :: from_secs ( last_sync_timestamp as u64 ) ) ;
232
+ let last_sync_timestamp_float = last_sync_timestamp as f64 ;
221
233
222
234
// get the latest channel update in each direction prior to last_sync_timestamp, provided
223
235
// there was an update in either direction that happened after the last sync (to avoid
@@ -227,14 +239,14 @@ pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, cl
227
239
WHERE id IN (
228
240
SELECT DISTINCT ON (short_channel_id, direction) id
229
241
FROM channel_updates
230
- WHERE seen < $1 AND short_channel_id IN (
242
+ WHERE seen < TO_TIMESTAMP($1) AND short_channel_id IN (
231
243
SELECT DISTINCT ON (short_channel_id) short_channel_id
232
244
FROM channel_updates
233
- WHERE seen >= $1
245
+ WHERE seen >= TO_TIMESTAMP($1)
234
246
)
235
247
ORDER BY short_channel_id ASC, direction ASC, seen DESC
236
248
)
237
- " , [ last_sync_timestamp_object ] ) . await . unwrap ( ) ;
249
+ " , [ last_sync_timestamp_float ] ) . await . unwrap ( ) ;
238
250
let mut pinned_rows = Box :: pin ( reference_rows) ;
239
251
240
252
log_info ! ( logger, "Fetched reference rows in {:?}" , start. elapsed( ) ) ;
@@ -273,10 +285,10 @@ pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, cl
273
285
// have been omitted)
274
286
275
287
let intermediate_updates = client. query_raw ( "
276
- SELECT id, direction, blob_signed, seen
288
+ SELECT id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen
277
289
FROM channel_updates
278
- WHERE seen >= $1
279
- " , [ last_sync_timestamp_object ] ) . await . unwrap ( ) ;
290
+ WHERE seen >= TO_TIMESTAMP($1)
291
+ " , [ last_sync_timestamp_float ] ) . await . unwrap ( ) ;
280
292
let mut pinned_updates = Box :: pin ( intermediate_updates) ;
281
293
log_info ! ( logger, "Fetched intermediate rows in {:?}" , start. elapsed( ) ) ;
282
294
@@ -294,8 +306,7 @@ pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, cl
294
306
intermediate_update_count += 1 ;
295
307
296
308
let direction: bool = intermediate_update. get ( "direction" ) ;
297
- let current_seen_timestamp_object: SystemTime = intermediate_update. get ( "seen" ) ;
298
- let current_seen_timestamp: u32 = current_seen_timestamp_object. duration_since ( SystemTime :: UNIX_EPOCH ) . unwrap ( ) . as_secs ( ) as u32 ;
309
+ let current_seen_timestamp = intermediate_update. get :: < _ , i64 > ( "seen" ) as u32 ;
299
310
let blob: Vec < u8 > = intermediate_update. get ( "blob_signed" ) ;
300
311
let mut readable = Cursor :: new ( blob) ;
301
312
let unsigned_channel_update = ChannelUpdate :: read ( & mut readable) . unwrap ( ) . contents ;
0 commit comments