@@ -60,7 +60,7 @@ use std::sync::Arc;
6060
6161use diesel:: { sql_query, PgConnection , RunQueryDsl as _} ;
6262
63- use diesel:: sql_types:: { Integer , Timestamptz } ;
63+ use diesel:: sql_types:: { Integer , Nullable , Timestamptz } ;
6464use graph:: blockchain:: BlockTime ;
6565use graph:: components:: store:: { BlockNumber , StoreError } ;
6666use graph:: constraint_violation;
@@ -70,6 +70,7 @@ use graph::schema::{
7070} ;
7171use graph:: sqlparser:: ast as p;
7272use graph:: sqlparser:: parser:: ParserError ;
73+ use itertools:: Itertools ;
7374
7475use crate :: relational:: Table ;
7576
@@ -229,6 +230,10 @@ pub(crate) struct Rollup {
229230 #[ allow( dead_code) ]
230231 agg_table : Arc < Table > ,
231232 insert_sql : String ,
233+ /// A query that determines the last time a rollup was done. The query
234+ /// finds the latest timestamp in the aggregation table and adds the
235+ /// length of the aggregation interval to deduce the last rollup time
236+ last_rollup_sql : String ,
232237}
233238
234239impl Rollup {
@@ -256,10 +261,12 @@ impl Rollup {
256261 ) ;
257262 let mut insert_sql = String :: new ( ) ;
258263 sql. insert ( & mut insert_sql) ?;
264+ let last_rollup_sql = sql. last_rollup ( ) ;
259265 Ok ( Self {
260266 interval,
261267 agg_table,
262268 insert_sql,
269+ last_rollup_sql,
263270 } )
264271 }
265272
@@ -275,6 +282,32 @@ impl Rollup {
275282 . bind :: < Integer , _ > ( block) ;
276283 query. execute ( conn)
277284 }
285+
286+ pub ( crate ) fn last_rollup (
287+ rollups : & [ Rollup ] ,
288+ conn : & mut PgConnection ,
289+ ) -> Result < Option < BlockTime > , StoreError > {
290+ #[ derive( QueryableByName ) ]
291+ #[ diesel( check_for_backend( diesel:: pg:: Pg ) ) ]
292+ struct BlockTimeRes {
293+ #[ diesel( sql_type = Nullable <Timestamptz >) ]
294+ last_rollup : Option < BlockTime > ,
295+ }
296+
297+ if rollups. is_empty ( ) {
298+ return Ok ( None ) ;
299+ }
300+
301+ let union_all = rollups
302+ . iter ( )
303+ . map ( |rollup| & rollup. last_rollup_sql )
304+ . join ( " union all " ) ;
305+ let query = format ! ( "select max(last_rollup) as last_rollup from ({union_all}) as a" ) ;
306+ let last_rollup = sql_query ( & query)
307+ . get_result :: < BlockTimeRes > ( conn)
308+ . map ( |res| res. last_rollup ) ?;
309+ Ok ( last_rollup)
310+ }
278311}
279312
280313struct RollupSql < ' a > {
@@ -479,6 +512,19 @@ impl<'a> RollupSql<'a> {
479512 self . insert_bucket ( w)
480513 }
481514 }
515+
516+ /// Generate a query that selects the timestamp of the last rollup
517+ fn last_rollup ( & self ) -> String {
518+ // The timestamp column contains the timestamp of the start of the
519+ // last bucket. The last rollup was therefore at least
520+ // `self.interval` after that. We add 1 second to make sure we are
521+ // well within the next bucket
522+ let secs = self . interval . as_duration ( ) . as_secs ( ) + 1 ;
523+ format ! (
524+ "select max(timestamp) + '{} s'::interval as last_rollup from {}" ,
525+ secs, self . agg_table. qualified_name
526+ )
527+ }
482528}
483529
484530/// Write the elements in `list` separated by commas into `w`. The list
0 commit comments