Skip to content

Commit 3e7fc5d

Browse files
apollo_http_server: add dynamic config poller service
1 parent cd54d4c commit 3e7fc5d

File tree

1 file changed

+63
-20
lines changed

1 file changed

+63
-20
lines changed

crates/apollo_http_server/src/http_server.rs

Lines changed: 63 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::clone::Clone;
22
use std::net::SocketAddr;
33
use std::string::String;
4+
use std::time::Duration;
45

56
use apollo_config_manager_types::communication::SharedConfigManagerClient;
67
use apollo_gateway_types::communication::{GatewayClientError, SharedGatewayClient};
@@ -15,19 +16,20 @@ use apollo_gateway_types::gateway_types::{
1516
GatewayOutput,
1617
SUPPORTED_TRANSACTION_VERSIONS,
1718
};
18-
use apollo_http_server_config::config::HttpServerConfig;
19+
use apollo_http_server_config::config::{HttpServerConfig, HttpServerDynamicConfig};
1920
use apollo_infra::component_definitions::ComponentStarter;
2021
use apollo_infra_utils::type_name::short_type_name;
2122
use apollo_proc_macros::sequencer_latency_histogram;
22-
use axum::extract::State;
2323
use axum::http::HeaderMap;
2424
use axum::routing::{get, post};
25-
use axum::{async_trait, Json, Router};
25+
use axum::{async_trait, Extension, Json, Router};
2626
use blockifier_reexecution::serde_utils::deserialize_transaction_json_to_starknet_api_tx;
2727
use serde::de::Error;
2828
use starknet_api::rpc_transaction::RpcTransaction;
2929
use starknet_api::serde_utils::bytes_from_hex_str;
3030
use starknet_api::transaction::fields::ValidResourceBounds;
31+
use tokio::sync::watch::{channel, Receiver, Sender};
32+
use tokio::time;
3133
use tracing::{debug, info, instrument, warn};
3234

3335
use crate::deprecated_gateway_transaction::DeprecatedGatewayTransactionV3;
@@ -51,14 +53,27 @@ pub type HttpServerResult<T> = Result<T, HttpServerError>;
5153
const CLIENT_REGION_HEADER: &str = "X-Client-Region";
5254

5355
pub struct HttpServer {
54-
pub config: HttpServerConfig,
56+
config: HttpServerConfig,
5557
app_state: AppState,
58+
config_manager_client: SharedConfigManagerClient,
59+
dynamic_config_tx: Sender<HttpServerDynamicConfig>,
5660
}
5761

5862
#[derive(Clone)]
5963
pub struct AppState {
60-
config_manager_client: SharedConfigManagerClient,
61-
pub gateway_client: SharedGatewayClient,
64+
gateway_client: SharedGatewayClient,
65+
dynamic_config_rx: Receiver<HttpServerDynamicConfig>,
66+
}
67+
68+
impl AppState {
69+
fn get_dynamic_config(&self) -> HttpServerDynamicConfig {
70+
// `borrow()` returns a reference to the value owned by the channel, hence we clone it.
71+
let config = {
72+
let config = self.dynamic_config_rx.borrow();
73+
config.clone()
74+
};
75+
config
76+
}
6277
}
6378

6479
impl HttpServer {
@@ -67,8 +82,10 @@ impl HttpServer {
6782
config_manager_client: SharedConfigManagerClient,
6883
gateway_client: SharedGatewayClient,
6984
) -> Self {
70-
let app_state = AppState { config_manager_client, gateway_client };
71-
HttpServer { config, app_state }
85+
let (dynamic_config_tx, dynamic_config_rx) =
86+
channel::<HttpServerDynamicConfig>(config.dynamic_config.clone());
87+
let app_state = AppState { gateway_client, dynamic_config_rx };
88+
HttpServer { config, app_state, config_manager_client, dynamic_config_tx }
7289
}
7390

7491
pub async fn run(&mut self) -> Result<(), HttpServerRunError> {
@@ -80,6 +97,18 @@ impl HttpServer {
8097
let app = self.app();
8198
info!("HttpServer running using socket: {}", addr);
8299

100+
// TODO(Tsabary): make the poll interval part of the config.
101+
let poll_interval = Duration::from_millis(1_000);
102+
tokio::spawn(dynamic_config_poll(
103+
self.dynamic_config_tx.clone(),
104+
self.config_manager_client.clone(),
105+
poll_interval,
106+
));
107+
108+
// TODO(update the http server struct to hold optional fields of the dynamic_config_tx,
109+
// config_manager_client, and a JoinHandle for the polling task. Then, use `set` and `take`
110+
// to move these around as needed.
111+
83112
// Create a server that runs forever.
84113
Ok(axum::Server::bind(&addr).serve(app.into_make_service()).await?)
85114
}
@@ -89,12 +118,8 @@ impl HttpServer {
89118
Router::new()
90119
// Json Rpc endpoint
91120
.route("/gateway/add_rpc_transaction", post(add_rpc_tx))
92-
.with_state(self.app_state.clone())
93121
// Rest api endpoint
94-
.route("/gateway/add_transaction", post({
95-
move |app_state: State<AppState>, headers: HeaderMap, tx: String| add_tx(app_state, headers, tx)
96-
}))
97-
.with_state(self.app_state.clone())
122+
.route("/gateway/add_transaction", post(add_tx))
98123
// TODO(shahak): Remove this once we fix the centralized simulator to not use is_alive
99124
// and is_ready.
100125
.route(
@@ -105,21 +130,22 @@ impl HttpServer {
105130
"/gateway/is_ready",
106131
get(|| futures::future::ready("Gateway is ready".to_owned()))
107132
)
133+
.layer(Extension(self.app_state.clone()))
108134
}
109135
}
110136

111137
// HttpServer handlers.
112138

113139
#[instrument(skip(app_state))]
114140
async fn add_rpc_tx(
115-
State(app_state): State<AppState>,
141+
Extension(app_state): Extension<AppState>,
116142
headers: HeaderMap,
117143
Json(tx): Json<RpcTransaction>,
118144
) -> HttpServerResult<Json<GatewayOutput>> {
119145
debug!("ADD_TX_START: Http server received a new transaction.");
120146

121-
let dynamic_config = app_state.config_manager_client.get_http_server_dynamic_config().await?;
122-
check_new_transactions_are_allowed(dynamic_config.accept_new_txs)?;
147+
let HttpServerDynamicConfig { accept_new_txs, .. } = app_state.get_dynamic_config();
148+
check_new_transactions_are_allowed(accept_new_txs)?;
123149

124150
ADDED_TRANSACTIONS_TOTAL.increment(1);
125151
add_tx_inner(app_state, headers, tx).await
@@ -128,14 +154,15 @@ async fn add_rpc_tx(
128154
#[instrument(skip(app_state))]
129155
#[sequencer_latency_histogram(HTTP_SERVER_ADD_TX_LATENCY, true)]
130156
async fn add_tx(
131-
State(app_state): State<AppState>,
157+
Extension(app_state): Extension<AppState>,
132158
headers: HeaderMap,
133159
tx: String,
134160
) -> HttpServerResult<Json<GatewayOutput>> {
135161
debug!("ADD_TX_START: Http server received a new transaction.");
136162

137-
let dynamic_config = app_state.config_manager_client.get_http_server_dynamic_config().await?;
138-
check_new_transactions_are_allowed(dynamic_config.accept_new_txs)?;
163+
let HttpServerDynamicConfig { accept_new_txs, max_sierra_program_size } =
164+
app_state.get_dynamic_config();
165+
check_new_transactions_are_allowed(accept_new_txs)?;
139166

140167
ADDED_TRANSACTIONS_TOTAL.increment(1);
141168
let tx: DeprecatedGatewayTransactionV3 = match serde_json::from_str(&tx) {
@@ -152,7 +179,7 @@ async fn add_tx(
152179
}
153180
};
154181

155-
let rpc_tx = tx.convert_to_rpc_tx(dynamic_config.max_sierra_program_size).inspect_err(|e| {
182+
let rpc_tx = tx.convert_to_rpc_tx(max_sierra_program_size).inspect_err(|e| {
156183
debug!("Error while converting deprecated gateway transaction into RPC transaction: {}", e);
157184
})?;
158185

@@ -304,3 +331,19 @@ fn increment_failure_metrics(err: &HttpServerError) {
304331
ADDED_TRANSACTIONS_INTERNAL_ERROR.increment(1);
305332
}
306333
}
334+
335+
async fn dynamic_config_poll(
336+
tx: Sender<HttpServerDynamicConfig>,
337+
config_manager_client: SharedConfigManagerClient,
338+
poll_interval: Duration,
339+
) {
340+
let mut interval = time::interval(poll_interval);
341+
loop {
342+
interval.tick().await;
343+
let dynamic_config_result = config_manager_client.get_http_server_dynamic_config().await;
344+
// Make the config available if it was successfully updated.
345+
if let Ok(dynamic_config) = dynamic_config_result {
346+
let _ = tx.send(dynamic_config);
347+
}
348+
}
349+
}

0 commit comments

Comments
 (0)