Skip to content

Commit 15f9a0f

Browse files
authored
fix(cubestore): Reduce memory usage while converting to DataFrame (#8599)
1 parent 604085e commit 15f9a0f

File tree

4 files changed

+14
-13
lines changed

4 files changed

+14
-13
lines changed

rust/cubestore/cubestore/src/queryplanner/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use crate::queryplanner::info_schema::{
4040
use crate::queryplanner::now::MaterializeNow;
4141
use crate::queryplanner::planning::{choose_index_ext, ClusterSendNode};
4242
use crate::queryplanner::query_executor::{
43-
batch_to_dataframe, ClusterSendExec, InlineTableProvider,
43+
batches_to_dataframe, ClusterSendExec, InlineTableProvider,
4444
};
4545
use crate::queryplanner::serialized_plan::SerializedPlan;
4646
use crate::queryplanner::topk::ClusterAggregateTopK;
@@ -168,7 +168,7 @@ impl QueryPlanner for QueryPlannerImpl {
168168
let execution_time = execution_time.elapsed()?;
169169
app_metrics::META_QUERY_TIME_MS.report(execution_time.as_millis() as i64);
170170
debug!("Meta query data processing time: {:?}", execution_time,);
171-
let data_frame = cube_ext::spawn_blocking(move || batch_to_dataframe(&results)).await??;
171+
let data_frame = cube_ext::spawn_blocking(move || batches_to_dataframe(results)).await??;
172172
Ok(data_frame)
173173
}
174174
}

rust/cubestore/cubestore/src/queryplanner/query_executor.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1409,25 +1409,26 @@ macro_rules! convert_array {
14091409
}};
14101410
}
14111411

1412-
pub fn batch_to_dataframe(batches: &Vec<RecordBatch>) -> Result<DataFrame, CubeError> {
1412+
pub fn batches_to_dataframe(batches: Vec<RecordBatch>) -> Result<DataFrame, CubeError> {
14131413
let mut cols = vec![];
14141414
let mut all_rows = vec![];
14151415

1416-
for batch in batches.iter() {
1416+
for batch in batches.into_iter() {
14171417
if cols.len() == 0 {
1418-
let schema = batch.schema().clone();
1419-
for (i, field) in schema.fields().iter().enumerate() {
1418+
for (i, field) in batch.schema().fields().iter().enumerate() {
14201419
cols.push(Column::new(
14211420
field.name().clone(),
14221421
arrow_to_column_type(field.data_type().clone())?,
14231422
i,
14241423
));
14251424
}
14261425
}
1426+
14271427
if batch.num_rows() == 0 {
14281428
continue;
14291429
}
1430-
let mut rows = vec![];
1430+
1431+
let mut rows = Vec::with_capacity(batch.num_rows());
14311432

14321433
for _ in 0..batch.num_rows() {
14331434
rows.push(Row::new(Vec::with_capacity(batch.num_columns())));
@@ -1769,7 +1770,7 @@ mod tests {
17691770
Field::new("int32", DataType::Int32, true),
17701771
Field::new("str32", DataType::Utf8, true),
17711772
]));
1772-
let result = batch_to_dataframe(&vec![RecordBatch::try_new(
1773+
let result = batches_to_dataframe(vec![RecordBatch::try_new(
17731774
schema,
17741775
vec![
17751776
Arc::new(UInt32Array::from_iter(vec![Some(1), None])) as ArrayRef,

rust/cubestore/cubestore/src/sql/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ use crate::metastore::{
4949
};
5050
use crate::queryplanner::panic::PanicWorkerNode;
5151
use crate::queryplanner::pretty_printers::{pp_phys_plan, pp_plan};
52-
use crate::queryplanner::query_executor::{batch_to_dataframe, ClusterSendExec, QueryExecutor};
52+
use crate::queryplanner::query_executor::{batches_to_dataframe, ClusterSendExec, QueryExecutor};
5353
use crate::queryplanner::serialized_plan::{RowFilter, SerializedPlan};
5454
use crate::queryplanner::{PlanningMeta, QueryPlan, QueryPlanner};
5555
use crate::remotefs::RemoteFs;
@@ -1072,7 +1072,7 @@ impl SqlService for SqlServiceImpl {
10721072
}
10731073
Ok(cube_ext::spawn_blocking(
10741074
move || -> Result<DataFrame, CubeError> {
1075-
let df = batch_to_dataframe(&records)?;
1075+
let df = batches_to_dataframe(records)?;
10761076
Ok(df)
10771077
},
10781078
)

rust/cubestore/cubestore/src/streaming/kafka.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,7 @@ impl StreamingSource for KafkaStreamingSource {
402402
mod tests {
403403
use super::*;
404404
use crate::metastore::{Column, ColumnType};
405-
use crate::queryplanner::query_executor::batch_to_dataframe;
405+
use crate::queryplanner::query_executor::batches_to_dataframe;
406406
use crate::sql::MySqlDialectWithBackTicks;
407407
use crate::streaming::topic_table_provider::TopicTableProvider;
408408
use arrow::array::StringArray;
@@ -432,7 +432,7 @@ mod tests {
432432
let phys_plan = plan_ctx.create_physical_plan(&logical_plan).unwrap();
433433

434434
let batches = collect(phys_plan).await.unwrap();
435-
let res = batch_to_dataframe(&batches).unwrap();
435+
let res = batches_to_dataframe(batches).unwrap();
436436
res.get_rows()[0].values()[0].clone()
437437
}
438438

@@ -462,7 +462,7 @@ mod tests {
462462
let phys_plan = phys_plan.with_new_children(vec![inp]).unwrap();
463463

464464
let batches = collect(phys_plan).await.unwrap();
465-
let res = batch_to_dataframe(&batches).unwrap();
465+
let res = batches_to_dataframe(batches).unwrap();
466466
res.get_rows().to_vec()
467467
}
468468

0 commit comments

Comments
 (0)