@@ -95,63 +95,69 @@ impl CostModelWatcher {
9595 deployment,
9696 model,
9797 variables,
98- } ) => {
99- let model = compile_cost_model ( model, variables) . unwrap ( ) ;
100-
101- match deployment. as_str ( ) {
102- "global" => {
103- * self . global_model . write ( ) . unwrap ( ) = Some ( model) ;
104- }
105- deployment_id => match DeploymentId :: from_str ( deployment_id) {
106- Ok ( deployment_id) => {
107- let mut cost_model_write = self . cost_models . write ( ) . unwrap ( ) ;
108- cost_model_write. insert ( deployment_id, model) ;
109- }
110- Err ( _) => {
111- error ! (
112- "Received insert request for an invalid deployment_id: {}" ,
113- deployment_id
114- )
115- }
116- } ,
117- } ;
118- }
119- Ok ( CostModelNotification :: Delete { deployment } ) => {
120- match deployment. as_str ( ) {
121- "global" => {
122- * self . global_model . write ( ) . unwrap ( ) = None ;
123- }
124- deployment_id => match DeploymentId :: from_str ( deployment_id) {
125- Ok ( deployment_id) => {
126- self . cost_models . write ( ) . unwrap ( ) . remove ( & deployment_id) ;
127- }
128- Err ( _) => {
129- error ! (
130- "Received delete request for an invalid deployment_id: {}" ,
131- deployment_id
132- )
133- }
134- } ,
135- } ;
136- }
98+ } ) => self . handle_insert ( deployment, model, variables) ,
99+ Ok ( CostModelNotification :: Delete { deployment } ) => self . handle_delete ( deployment) ,
137100 // UPDATE and TRUNCATE are not expected to happen. Reload the entire cost
138101 // model cache.
139- Err ( _) => {
140- error ! (
141- "Received an unexpected cost model table notification: {}. Reloading entire \
142- cost model.",
143- payload
144- ) ;
102+ Err ( _) => self . handle_unexpected_notification ( payload) . await ,
103+ }
104+ }
145105
146- MinimumValue :: value_check_reload (
147- & self . pgpool ,
148- self . cost_models . clone ( ) ,
149- self . global_model . clone ( ) ,
150- )
151- . await
152- . expect ( "should be able to reload cost models" )
106+ fn handle_insert ( & self , deployment : String , model : String , variables : String ) {
107+ let model = compile_cost_model ( model, variables) . unwrap ( ) ;
108+
109+ match deployment. as_str ( ) {
110+ "global" => {
111+ * self . global_model . write ( ) . unwrap ( ) = Some ( model) ;
153112 }
154- }
113+ deployment_id => match DeploymentId :: from_str ( deployment_id) {
114+ Ok ( deployment_id) => {
115+ let mut cost_model_write = self . cost_models . write ( ) . unwrap ( ) ;
116+ cost_model_write. insert ( deployment_id, model) ;
117+ }
118+ Err ( _) => {
119+ error ! (
120+ "Received insert request for an invalid deployment_id: {}" ,
121+ deployment_id
122+ )
123+ }
124+ } ,
125+ } ;
126+ }
127+
128+ fn handle_delete ( & self , deployment : String ) {
129+ match deployment. as_str ( ) {
130+ "global" => {
131+ * self . global_model . write ( ) . unwrap ( ) = None ;
132+ }
133+ deployment_id => match DeploymentId :: from_str ( deployment_id) {
134+ Ok ( deployment_id) => {
135+ self . cost_models . write ( ) . unwrap ( ) . remove ( & deployment_id) ;
136+ }
137+ Err ( _) => {
138+ error ! (
139+ "Received delete request for an invalid deployment_id: {}" ,
140+ deployment_id
141+ )
142+ }
143+ } ,
144+ } ;
145+ }
146+
147+ async fn handle_unexpected_notification ( & self , payload : & str ) {
148+ error ! (
149+ "Received an unexpected cost model table notification: {}. Reloading entire \
150+ cost model.",
151+ payload
152+ ) ;
153+
154+ MinimumValue :: value_check_reload (
155+ & self . pgpool ,
156+ self . cost_models . clone ( ) ,
157+ self . global_model . clone ( ) ,
158+ )
159+ . await
160+ . expect ( "should be able to reload cost models" )
155161 }
156162}
157163
@@ -196,7 +202,7 @@ impl MinimumValue {
196202 }
197203 }
198204
199- fn get_expected_value ( & self , agora_query : & AgoraQuery ) -> anyhow:: Result < u128 > {
205+ fn expected_value ( & self , agora_query : & AgoraQuery ) -> anyhow:: Result < u128 > {
200206 // get agora model for the deployment_id
201207 let model = self . cost_model_map . read ( ) . unwrap ( ) ;
202208 let subgraph_model = model. get ( & agora_query. deployment_id ) ;
@@ -267,7 +273,7 @@ impl Check for MinimumValue {
267273 . ok_or ( CheckError :: Failed ( anyhow ! ( "Could not find agora query" ) ) ) ?;
268274
269275 let expected_value = self
270- . get_expected_value ( agora_query)
276+ . expected_value ( agora_query)
271277 . map_err ( CheckError :: Failed ) ?;
272278
273279 // get value
0 commit comments