Skip to content

Commit b39920f

Browse files
authored
fix(cubestore): Large memory consumption while waiting for the queue cache: drop context (#6005)
1 parent 8c029bb commit b39920f

File tree

2 files changed

+54
-40
lines changed

2 files changed

+54
-40
lines changed

rust/cubestore/cubestore/src/sql/cache.rs

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::queryplanner::serialized_plan::SerializedPlan;
22
use crate::sql::InlineTables;
3+
use crate::sql::SqlQueryContext;
34
use crate::store::DataFrame;
45
use crate::CubeError;
56
use futures::Future;
@@ -71,17 +72,18 @@ impl SqlResultCache {
7172
}
7273
}
7374

74-
#[tracing::instrument(level = "trace", skip(self, inline_tables, plan, exec))]
75+
#[tracing::instrument(level = "trace", skip(self, context, plan, exec))]
7576
pub async fn get<F>(
7677
&self,
7778
query: &str,
78-
inline_tables: &InlineTables,
79+
context: SqlQueryContext,
7980
plan: SerializedPlan,
8081
exec: impl FnOnce(SerializedPlan) -> F,
8182
) -> Result<Arc<DataFrame>, CubeError>
8283
where
8384
F: Future<Output = Result<DataFrame, CubeError>> + Send + 'static,
8485
{
86+
let inline_tables = &context.inline_tables;
8587
let result_key = SqlResultCacheKey::from_plan(query, inline_tables, &plan);
8688
let cached_result = {
8789
let mut result_cache = self.result_cache.lock().await;
@@ -132,6 +134,7 @@ impl SqlResultCache {
132134

133135
std::mem::drop(plan);
134136
std::mem::drop(result_key);
137+
std::mem::drop(context);
135138

136139
self.wait_for_queue(receiver, query).await
137140
}
@@ -162,7 +165,7 @@ mod tests {
162165
use crate::queryplanner::serialized_plan::SerializedPlan;
163166
use crate::queryplanner::PlanningMeta;
164167
use crate::sql::cache::SqlResultCache;
165-
use crate::sql::InlineTables;
168+
use crate::sql::{InlineTables, SqlQueryContext};
166169
use crate::store::DataFrame;
167170
use crate::table::{Row, TableValue};
168171
use crate::CubeError;
@@ -200,13 +203,33 @@ mod tests {
200203
)])],
201204
))
202205
};
203-
let inline_tables = InlineTables::new();
206+
204207
let futures = vec![
205-
cache.get("SELECT 1", &inline_tables, plan.clone(), exec.clone()),
206-
cache.get("SELECT 2", &inline_tables, plan.clone(), exec.clone()),
207-
cache.get("SELECT 3", &inline_tables, plan.clone(), exec.clone()),
208-
cache.get("SELECT 1", &inline_tables, plan.clone(), exec.clone()),
209-
cache.get("SELECT 1", &inline_tables, plan, exec),
208+
cache.get(
209+
"SELECT 1",
210+
SqlQueryContext::default(),
211+
plan.clone(),
212+
exec.clone(),
213+
),
214+
cache.get(
215+
"SELECT 2",
216+
SqlQueryContext::default(),
217+
plan.clone(),
218+
exec.clone(),
219+
),
220+
cache.get(
221+
"SELECT 3",
222+
SqlQueryContext::default(),
223+
plan.clone(),
224+
exec.clone(),
225+
),
226+
cache.get(
227+
"SELECT 1",
228+
SqlQueryContext::default(),
229+
plan.clone(),
230+
exec.clone(),
231+
),
232+
cache.get("SELECT 1", SqlQueryContext::default(), plan, exec),
210233
];
211234

212235
let res = join_all(futures)

rust/cubestore/cubestore/src/sql/mod.rs

Lines changed: 22 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1183,37 +1183,28 @@ impl SqlService for SqlServiceImpl {
11831183
timeout(
11841184
self.query_timeout,
11851185
self.cache
1186-
.get(
1187-
query,
1188-
&context.inline_tables,
1189-
serialized,
1190-
async move |plan| {
1191-
let records;
1192-
if workers.len() == 0 {
1193-
records = executor
1194-
.execute_router_plan(plan, cluster)
1195-
.await?
1196-
.1;
1197-
} else {
1198-
// Pick one of the workers to run as main for the request.
1199-
let i =
1200-
thread_rng().sample(Uniform::new(0, workers.len()));
1201-
let rs =
1202-
cluster.route_select(&workers[i], plan).await?.1;
1203-
records = rs
1204-
.into_iter()
1205-
.map(|r| r.read())
1206-
.collect::<Result<Vec<_>, _>>()?;
1207-
}
1208-
Ok(cube_ext::spawn_blocking(
1209-
move || -> Result<DataFrame, CubeError> {
1210-
let df = batch_to_dataframe(&records)?;
1211-
Ok(df)
1212-
},
1213-
)
1214-
.await??)
1215-
},
1216-
)
1186+
.get(query, context, serialized, async move |plan| {
1187+
let records;
1188+
if workers.len() == 0 {
1189+
records =
1190+
executor.execute_router_plan(plan, cluster).await?.1;
1191+
} else {
1192+
// Pick one of the workers to run as main for the request.
1193+
let i = thread_rng().sample(Uniform::new(0, workers.len()));
1194+
let rs = cluster.route_select(&workers[i], plan).await?.1;
1195+
records = rs
1196+
.into_iter()
1197+
.map(|r| r.read())
1198+
.collect::<Result<Vec<_>, _>>()?;
1199+
}
1200+
Ok(cube_ext::spawn_blocking(
1201+
move || -> Result<DataFrame, CubeError> {
1202+
let df = batch_to_dataframe(&records)?;
1203+
Ok(df)
1204+
},
1205+
)
1206+
.await??)
1207+
})
12171208
.with_current_subscriber(),
12181209
)
12191210
.await??

0 commit comments

Comments
 (0)