File tree Expand file tree Collapse file tree 3 files changed +16
-7
lines changed
tools/heartbeats-processor/src Expand file tree Collapse file tree 3 files changed +16
-7
lines changed Original file line number Diff line number Diff line change @@ -245,9 +245,11 @@ pub async fn process_heartbeats(
245
245
db : & FirestoreDb ,
246
246
pool : & SqlitePool ,
247
247
config : & Config ,
248
- ) -> Result < ( ) > {
248
+ ) -> Result < usize > {
249
249
let last_processed_time = get_last_processed_time ( pool, Some ( config) ) . await ?;
250
250
let now = Utc :: now ( ) ;
251
+ // Don't fetch heartbeats beyond window range end
252
+ let end_time = config. window_range_end . min ( now) ;
251
253
252
254
let mut total_heartbeats = 0 ;
253
255
let mut latest_time = last_processed_time;
@@ -266,7 +268,8 @@ pub async fn process_heartbeats(
266
268
} ;
267
269
268
270
loop {
269
- let heartbeats = crate :: remote_db:: fetch_heartbeat_chunk ( db, & mut chunk_state, now) . await ?;
271
+ let heartbeats =
272
+ crate :: remote_db:: fetch_heartbeat_chunk ( db, & mut chunk_state, end_time) . await ?;
270
273
if heartbeats. is_empty ( ) {
271
274
break ;
272
275
}
@@ -430,7 +433,7 @@ pub async fn process_heartbeats(
430
433
update_last_processed_time ( pool, latest_time) . await ?;
431
434
}
432
435
433
- Ok ( ( ) )
436
+ Ok ( total_heartbeats )
434
437
}
435
438
436
439
pub async fn create_tables_from_file ( pool : & SqlitePool ) -> Result < ( ) > {
Original file line number Diff line number Diff line change @@ -112,10 +112,12 @@ async fn run_process_loop(
112
112
113
113
loop {
114
114
println ! ( "Processing heartbeats..." ) ;
115
- local_db:: process_heartbeats ( db, pool, config) . await ?;
115
+ let count = local_db:: process_heartbeats ( db, pool, config) . await ?;
116
116
117
- println ! ( "Posting scores..." ) ;
118
- post_scores_to_firestore ( pool, db, config) . await ?;
117
+ if count > 0 {
118
+ println ! ( "Posting scores..." ) ;
119
+ post_scores_to_firestore ( pool, db, config) . await ?;
120
+ }
119
121
120
122
println ! ( "Sleeping for {} seconds..." , interval_seconds) ;
121
123
tokio:: time:: sleep ( interval) . await ;
Original file line number Diff line number Diff line change @@ -171,10 +171,14 @@ pub async fn fetch_heartbeat_chunk(
171
171
let chunk_end = ( state. chunk_start + chunk_duration) . min ( end_time) ;
172
172
173
173
if state. chunk_start >= end_time {
174
+ println ! ( "Reached end of testing window: {}" , end_time) ;
174
175
return Ok ( Vec :: new ( ) ) ;
175
176
}
176
177
177
- println ! ( "Fetching heartbeat chunk... {}" , state. chunk_start) ;
178
+ println ! (
179
+ "Fetching heartbeat chunk... {} to {}" ,
180
+ state. chunk_start, chunk_end
181
+ ) ;
178
182
179
183
let query = db
180
184
. fluent ( )
You can’t perform that action at this time.
0 commit comments