Skip to content

Commit be7d40b

Browse files
authored
feat(cubestore): Allow to configure max message_size/frame_size for transport (#6369)
1 parent 6d0c66a commit be7d40b

File tree

2 files changed

+35
-3
lines changed

2 files changed

+35
-3
lines changed

rust/cubestore/cubestore/src/config/mod.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,9 @@ pub trait ConfigObj: DIService {
446446
fn max_disk_space_per_worker(&self) -> u64;
447447

448448
fn disk_space_cache_duration_secs(&self) -> u64;
449+
450+
fn transport_max_message_size(&self) -> usize;
451+
fn transport_max_frame_size(&self) -> usize;
449452
}
450453

451454
#[derive(Debug, Clone)]
@@ -508,6 +511,8 @@ pub struct ConfigObjImpl {
508511
pub max_disk_space: u64,
509512
pub max_disk_space_per_worker: u64,
510513
pub disk_space_cache_duration_secs: u64,
514+
pub transport_max_message_size: usize,
515+
pub transport_max_frame_size: usize,
511516
}
512517

513518
crate::di_service!(ConfigObjImpl, [ConfigObj]);
@@ -731,6 +736,14 @@ impl ConfigObj for ConfigObjImpl {
731736
fn disk_space_cache_duration_secs(&self) -> u64 {
732737
self.disk_space_cache_duration_secs
733738
}
739+
740+
fn transport_max_message_size(&self) -> usize {
741+
self.transport_max_message_size
742+
}
743+
744+
fn transport_max_frame_size(&self) -> usize {
745+
self.transport_max_frame_size
746+
}
734747
}
735748

736749
lazy_static! {
@@ -953,6 +966,8 @@ impl Config {
953966
* 1024
954967
* 1024,
955968
disk_space_cache_duration_secs: 300,
969+
transport_max_message_size: env_parse("TRANSPORT_MAX_MESSAGE_SIZE", 64 << 20),
970+
transport_max_frame_size: env_parse("TRANSPORT_MAX_FRAME_SIZE", 16 << 20),
956971
}),
957972
}
958973
}
@@ -1027,6 +1042,8 @@ impl Config {
10271042
max_disk_space: 0,
10281043
max_disk_space_per_worker: 0,
10291044
disk_space_cache_duration_secs: 0,
1045+
transport_max_message_size: 64 << 20,
1046+
transport_max_frame_size: 16 << 20,
10301047
}),
10311048
}
10321049
}
@@ -1596,6 +1613,8 @@ impl Config {
15961613
Duration::from_secs(config.check_ws_orphaned_messages_interval_secs()),
15971614
Duration::from_secs(config.drop_ws_processing_messages_after_secs()),
15981615
Duration::from_secs(config.drop_ws_complete_messages_after_secs()),
1616+
config.transport_max_message_size(),
1617+
config.transport_max_frame_size(),
15991618
)
16001619
})
16011620
.await;

rust/cubestore/cubestore/src/http/mod.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ pub struct HttpServer {
5050
worker_loop: WorkerLoop,
5151
drop_orphaned_messages_loop: WorkerLoop,
5252
cancel_token: CancellationToken,
53+
max_message_size: usize,
54+
max_frame_size: usize,
5355
}
5456

5557
crate::di_service!(HttpServer, []);
@@ -81,6 +83,8 @@ impl HttpServer {
8183
check_orphaned_messages_interval: Duration,
8284
drop_processing_messages_after: Duration,
8385
drop_complete_messages_after: Duration,
86+
max_message_size: usize,
87+
max_frame_size: usize,
8488
) -> Arc<Self> {
8589
Arc::new(Self {
8690
bind_address,
@@ -89,6 +93,8 @@ impl HttpServer {
8993
check_orphaned_messages_interval,
9094
drop_processing_messages_after,
9195
drop_complete_messages_after,
96+
max_message_size,
97+
max_frame_size,
9298
worker_loop: WorkerLoop::new("HttpServer message processing"),
9399
drop_orphaned_messages_loop: WorkerLoop::new("HttpServer drop orphaned messages"),
94100
cancel_token: CancellationToken::new(),
@@ -121,14 +127,16 @@ impl HttpServer {
121127
let context_filter = tx_to_move_filter.and(auth_filter.clone());
122128

123129
let context_filter_to_move = context_filter.clone();
130+
let max_frame_size = self.max_frame_size.clone();
131+
let max_message_size = self.max_message_size.clone();
124132

125133
let query_route = warp::path!("ws")
126134
.and(context_filter_to_move)
127135
.and(warp::ws::ws())
128-
.and_then(|tx: mpsc::Sender<(mpsc::Sender<Arc<HttpMessage>>, SqlQueryContext, HttpMessage)>, sql_query_context: SqlQueryContext, ws: Ws| async move {
136+
.and_then(move |tx: mpsc::Sender<(mpsc::Sender<Arc<HttpMessage>>, SqlQueryContext, HttpMessage)>, sql_query_context: SqlQueryContext, ws: Ws| async move {
129137
let tx_to_move = tx.clone();
130138
let sql_query_context = sql_query_context.clone();
131-
Result::<_, Rejection>::Ok(ws.on_upgrade(async move |mut web_socket| {
139+
Result::<_, Rejection>::Ok(ws.max_frame_size(max_frame_size).max_message_size(max_message_size).on_upgrade(async move |mut web_socket| {
132140
let (response_tx, mut response_rx) = mpsc::channel::<Arc<HttpMessage>>(10000);
133141
loop {
134142
tokio::select! {
@@ -818,7 +826,7 @@ impl HttpMessage {
818826
#[cfg(test)]
819827
mod tests {
820828
use crate::codegen::{HttpMessageArgs, HttpQuery, HttpQueryArgs, HttpTable, HttpTableArgs};
821-
use crate::config::init_test_logger;
829+
use crate::config::{init_test_logger, Config};
822830
use crate::http::{HttpCommand, HttpMessage, HttpServer};
823831
use crate::metastore::{Column, ColumnType};
824832
use crate::mysql::MockSqlAuthService;
@@ -1018,13 +1026,18 @@ mod tests {
10181026
};
10191027
let mut auth = MockSqlAuthService::new();
10201028
auth.expect_authenticate().return_const(Ok(None));
1029+
1030+
let config = Config::test("ws_test").config_obj();
1031+
10211032
let http_server = Arc::new(HttpServer::new(
10221033
"127.0.0.1:53031".to_string(),
10231034
Arc::new(auth),
10241035
Arc::new(sql_service),
10251036
Duration::from_millis(100),
10261037
Duration::from_millis(10000),
10271038
Duration::from_millis(1000),
1039+
config.transport_max_message_size(),
1040+
config.transport_max_frame_size(),
10281041
));
10291042
{
10301043
let http_server = http_server.clone();

0 commit comments

Comments
 (0)