@@ -12,6 +12,7 @@ use std::{
1212 collections:: HashMap ,
1313 str:: FromStr ,
1414 sync:: { Arc , RwLock } ,
15+ time:: Instant ,
1516} ;
1617use thegraph_core:: DeploymentId ;
1718use tracing:: error;
@@ -39,6 +40,7 @@ pub struct AgoraQuery {
3940
4041type CostModelMap = Arc < RwLock < HashMap < DeploymentId , CostModel > > > ;
4142type GlobalModel = Arc < RwLock < Option < CostModel > > > ;
43+ type GracePeriod = Arc < RwLock < Instant > > ;
4244
4345/// Represents the check for minimum for a receipt
4446///
@@ -48,13 +50,16 @@ pub struct MinimumValue {
4850 cost_model_map : CostModelMap ,
4951 global_model : GlobalModel ,
5052 watcher_cancel_token : tokio_util:: sync:: CancellationToken ,
53+ updated_at : GracePeriod ,
54+ grace_period : u64 ,
5155}
5256
5357struct CostModelWatcher {
5458 pgpool : PgPool ,
5559
5660 cost_models : CostModelMap ,
5761 global_model : GlobalModel ,
62+ updated_at : GracePeriod ,
5863}
5964
6065impl CostModelWatcher {
@@ -64,11 +69,13 @@ impl CostModelWatcher {
6469 cost_models : CostModelMap ,
6570 global_model : GlobalModel ,
6671 cancel_token : tokio_util:: sync:: CancellationToken ,
72+ grace_period : GracePeriod ,
6773 ) {
6874 let cost_model_watcher = CostModelWatcher {
6975 pgpool,
7076 global_model,
7177 cost_models,
78+ updated_at : grace_period,
7279 } ;
7380
7481 loop {
@@ -123,6 +130,8 @@ impl CostModelWatcher {
123130 }
124131 } ,
125132 } ;
133+
134+ * self . updated_at . write ( ) . unwrap ( ) = Instant :: now ( ) ;
126135 }
127136
128137 fn handle_delete ( & self , deployment : String ) {
@@ -142,6 +151,7 @@ impl CostModelWatcher {
142151 }
143152 } ,
144153 } ;
154+ * self . updated_at . write ( ) . unwrap ( ) = Instant :: now ( ) ;
145155 }
146156
147157 async fn handle_unexpected_notification ( & self , payload : & str ) {
@@ -157,7 +167,9 @@ impl CostModelWatcher {
157167 self . global_model . clone ( ) ,
158168 )
159169 . await
160- . expect ( "should be able to reload cost models" )
170+ . expect ( "should be able to reload cost models" ) ;
171+
172+ * self . updated_at . write ( ) . unwrap ( ) = Instant :: now ( ) ;
161173 }
162174}
163175
@@ -170,9 +182,10 @@ impl Drop for MinimumValue {
170182}
171183
172184impl MinimumValue {
173- pub async fn new ( pgpool : PgPool ) -> Self {
185+ pub async fn new ( pgpool : PgPool , grace_period : u64 ) -> Self {
174186 let cost_model_map: CostModelMap = Default :: default ( ) ;
175187 let global_model: GlobalModel = Default :: default ( ) ;
188+ let updated_at: GracePeriod = Arc :: new ( RwLock :: new ( Instant :: now ( ) ) ) ;
176189 Self :: value_check_reload ( & pgpool, cost_model_map. clone ( ) , global_model. clone ( ) )
177190 . await
178191 . expect ( "should be able to reload cost models" ) ;
@@ -193,15 +206,22 @@ impl MinimumValue {
193206 cost_model_map. clone ( ) ,
194207 global_model. clone ( ) ,
195208 watcher_cancel_token. clone ( ) ,
209+ updated_at. clone ( ) ,
196210 ) ) ;
197-
198211 Self {
199212 global_model,
200213 cost_model_map,
201214 watcher_cancel_token,
215+ updated_at,
216+ grace_period,
202217 }
203218 }
204219
220+ fn inside_grace_period ( & self ) -> bool {
221+ let time_elapsed = Instant :: now ( ) . duration_since ( * self . updated_at . read ( ) . unwrap ( ) ) ;
222+ time_elapsed. as_secs ( ) < self . grace_period
223+ }
224+
205225 fn expected_value ( & self , agora_query : & AgoraQuery ) -> anyhow:: Result < u128 > {
206226 // get agora model for the deployment_id
207227 let model = self . cost_model_map . read ( ) . unwrap ( ) ;
@@ -271,14 +291,17 @@ impl Check for MinimumValue {
271291 let agora_query = ctx
272292 . get ( )
273293 . ok_or ( CheckError :: Failed ( anyhow ! ( "Could not find agora query" ) ) ) ?;
294+ // get value
295+ let value = receipt. signed_receipt ( ) . message . value ;
296+
297+ if self . inside_grace_period ( ) && value > MINIMAL_VALUE {
298+ return Ok ( ( ) ) ;
299+ }
274300
275301 let expected_value = self
276302 . expected_value ( agora_query)
277303 . map_err ( CheckError :: Failed ) ?;
278304
279- // get value
280- let value = receipt. signed_receipt ( ) . message . value ;
281-
282305 let should_accept = value >= expected_value;
283306
284307 tracing:: trace!(
@@ -339,7 +362,7 @@ mod tests {
339362
340363 #[ sqlx:: test( migrations = "../migrations" ) ]
341364 async fn initialize_check ( pgpool : PgPool ) {
342- let check = MinimumValue :: new ( pgpool) . await ;
365+ let check = MinimumValue :: new ( pgpool, 0 ) . await ;
343366 assert_eq ! ( check. cost_model_map. read( ) . unwrap( ) . len( ) , 0 ) ;
344367 }
345368
@@ -350,7 +373,7 @@ mod tests {
350373
351374 add_cost_models ( & pgpool, to_db_models ( test_models. clone ( ) ) ) . await ;
352375
353- let check = MinimumValue :: new ( pgpool) . await ;
376+ let check = MinimumValue :: new ( pgpool, 0 ) . await ;
354377 assert_eq ! ( check. cost_model_map. read( ) . unwrap( ) . len( ) , 2 ) ;
355378
356379 // no global model
@@ -359,7 +382,7 @@ mod tests {
359382
360383 #[ sqlx:: test( migrations = "../migrations" ) ]
361384 async fn should_watch_model_insert ( pgpool : PgPool ) {
362- let check = MinimumValue :: new ( pgpool. clone ( ) ) . await ;
385+ let check = MinimumValue :: new ( pgpool. clone ( ) , 0 ) . await ;
363386 assert_eq ! ( check. cost_model_map. read( ) . unwrap( ) . len( ) , 0 ) ;
364387
365388 // insert 2 cost models for different deployment_id
@@ -379,7 +402,7 @@ mod tests {
379402 let test_models = crate :: cost_model:: test:: test_data ( ) ;
380403 add_cost_models ( & pgpool, to_db_models ( test_models. clone ( ) ) ) . await ;
381404
382- let check = MinimumValue :: new ( pgpool. clone ( ) ) . await ;
405+ let check = MinimumValue :: new ( pgpool. clone ( ) , 0 ) . await ;
383406 assert_eq ! ( check. cost_model_map. read( ) . unwrap( ) . len( ) , 2 ) ;
384407
385408 // remove
@@ -398,13 +421,13 @@ mod tests {
398421 let global_model = global_cost_model ( ) ;
399422 add_cost_models ( & pgpool, vec ! [ global_model. clone( ) ] ) . await ;
400423
401- let check = MinimumValue :: new ( pgpool. clone ( ) ) . await ;
424+ let check = MinimumValue :: new ( pgpool. clone ( ) , 0 ) . await ;
402425 assert ! ( check. global_model. read( ) . unwrap( ) . is_some( ) ) ;
403426 }
404427
405428 #[ sqlx:: test( migrations = "../migrations" ) ]
406429 async fn should_watch_global_model ( pgpool : PgPool ) {
407- let check = MinimumValue :: new ( pgpool. clone ( ) ) . await ;
430+ let check = MinimumValue :: new ( pgpool. clone ( ) , 0 ) . await ;
408431
409432 let global_model = global_cost_model ( ) ;
410433 add_cost_models ( & pgpool, vec ! [ global_model. clone( ) ] ) . await ;
@@ -418,7 +441,7 @@ mod tests {
418441 let global_model = global_cost_model ( ) ;
419442 add_cost_models ( & pgpool, vec ! [ global_model. clone( ) ] ) . await ;
420443
421- let check = MinimumValue :: new ( pgpool. clone ( ) ) . await ;
444+ let check = MinimumValue :: new ( pgpool. clone ( ) , 0 ) . await ;
422445 assert ! ( check. global_model. read( ) . unwrap( ) . is_some( ) ) ;
423446
424447 sqlx:: query!( r#"DELETE FROM "CostModels""# )
@@ -440,7 +463,7 @@ mod tests {
440463
441464 add_cost_models ( & pgpool, to_db_models ( test_models. clone ( ) ) ) . await ;
442465
443- let check = MinimumValue :: new ( pgpool) . await ;
466+ let check = MinimumValue :: new ( pgpool, 1 ) . await ;
444467
445468 let deployment_id = test_models[ 0 ] . deployment ;
446469 let mut ctx = Context :: new ( ) ;
@@ -477,8 +500,16 @@ mod tests {
477500 let signed_receipt =
478501 create_signed_receipt ( ALLOCATION_ID , u64:: MAX , u64:: MAX , minimal_value - 1 ) . await ;
479502 let receipt = ReceiptWithState :: new ( signed_receipt) ;
503+
480504 assert ! (
481- check. check( & ctx, & receipt) . await . is_err( ) ,
505+ check. check( & ctx, & receipt) . await . is_ok( ) ,
506+ "Should accept since its inside grace period "
507+ ) ;
508+
509+ sleep ( Duration :: from_millis ( 1010 ) ) . await ;
510+
511+ assert ! (
512+ check. check( & ctx, & receipt) . await . is_ok( ) ,
482513 "Should require minimal value"
483514 ) ;
484515
@@ -508,7 +539,7 @@ mod tests {
508539 add_cost_models ( & pgpool, vec ! [ global_model. clone( ) ] ) . await ;
509540 add_cost_models ( & pgpool, to_db_models ( test_models. clone ( ) ) ) . await ;
510541
511- let check = MinimumValue :: new ( pgpool) . await ;
542+ let check = MinimumValue :: new ( pgpool, 0 ) . await ;
512543
513544 let deployment_id = test_models[ 0 ] . deployment ;
514545 let mut ctx = Context :: new ( ) ;
0 commit comments