|
1 | 1 | // Copyright 2023-, GraphOps and Semiotic Labs. |
2 | 2 | // SPDX-License-Identifier: Apache-2.0 |
3 | 3 |
|
4 | | -use alloy::signers::Signature; |
5 | 4 | use anyhow::anyhow; |
6 | 5 | use bigdecimal::ToPrimitive; |
7 | 6 | use cost_model::CostModel; |
| 7 | +use sqlx::{postgres::PgListener, PgPool}; |
| 8 | +use tracing::error; |
8 | 9 | use std::{ |
9 | 10 | cmp::min, |
10 | 11 | collections::HashMap, |
@@ -89,6 +90,67 @@ impl MinimumValue { |
89 | 90 | model_handle, |
90 | 91 | } |
91 | 92 | } |
| 93 | + |
| 94 | + async fn cost_models_watcher( |
| 95 | + pgpool: PgPool, |
| 96 | + mut pglistener: PgListener, |
| 97 | + denylist: Arc<Mutex<HashMap<DeploymentId, CostModelCache>>>, |
| 98 | + cancel_token: tokio_util::sync::CancellationToken, |
| 99 | + ) { |
| 100 | + #[derive(serde::Deserialize)] |
| 101 | + struct DenylistNotification { |
| 102 | + tg_op: String, |
| 103 | + deployment: DeploymentId, |
| 104 | + } |
| 105 | + |
| 106 | + loop { |
| 107 | + tokio::select! { |
| 108 | + _ = cancel_token.cancelled() => { |
| 109 | + break; |
| 110 | + } |
| 111 | + |
| 112 | + pg_notification = pglistener.recv() => { |
| 113 | + let pg_notification = pg_notification.expect( |
| 114 | + "should be able to receive Postgres Notify events on the channel \ |
| 115 | + 'scalar_tap_deny_notification'", |
| 116 | + ); |
| 117 | + |
| 118 | + let denylist_notification: DenylistNotification = |
| 119 | + serde_json::from_str(pg_notification.payload()).expect( |
| 120 | + "should be able to deserialize the Postgres Notify event payload as a \ |
| 121 | + DenylistNotification", |
| 122 | + ); |
| 123 | + |
| 124 | + match denylist_notification.tg_op.as_str() { |
| 125 | + "INSERT" => { |
| 126 | + denylist |
| 127 | + .write() |
| 128 | + .unwrap() |
| 129 | + .insert(denylist_notification.sender_address); |
| 130 | + } |
| 131 | + "DELETE" => { |
| 132 | + denylist |
| 133 | + .write() |
| 134 | + .unwrap() |
| 135 | + .remove(&denylist_notification.sender_address); |
| 136 | + } |
| 137 | + // UPDATE and TRUNCATE are not expected to happen. Reload the entire denylist. |
| 138 | + _ => { |
| 139 | + error!( |
| 140 | + "Received an unexpected denylist table notification: {}. Reloading entire \ |
| 141 | + denylist.", |
| 142 | + denylist_notification.tg_op |
| 143 | + ); |
| 144 | + |
| 145 | + Self::sender_denylist_reload(pgpool.clone(), denylist.clone()) |
| 146 | + .await |
| 147 | + .expect("should be able to reload the sender denylist") |
| 148 | + } |
| 149 | + } |
| 150 | + } |
| 151 | + } |
| 152 | + } |
| 153 | + } |
92 | 154 | } |
93 | 155 |
|
94 | 156 | impl Drop for MinimumValue { |
@@ -155,7 +217,6 @@ fn compile_cost_model(src: CostModelSource) -> anyhow::Result<CostModel> { |
155 | 217 | } |
156 | 218 |
|
157 | 219 | pub struct AgoraQuery { |
158 | | - pub signature: Signature, |
159 | 220 | pub deployment_id: DeploymentId, |
160 | 221 | pub query: String, |
161 | 222 | pub variables: String, |
|
0 commit comments