Skip to content

Commit 082e3b7

Browse files
authored
refactor(query): refactor row fetcher for avoid oom (#18440)
* refactor(query): refactor row fetcher for avoid oom * refactor(query): refactor row fetcher for avoid oom * refactor(query): refactor row fetcher for avoid oom * refactor(query): refactor row fetcher for avoid oom * refactor(query): refactor row fetcher for avoid oom * refactor(query): refactor row fetcher for avoid oom * refactor(query): refactor row fetcher for avoid oom * refactor(query): refactor row fetcher for avoid oom * refactor(query): refactor row fetcher for avoid oom * refactor(query): refactor row fetcher for avoid oom * refactor(query): refactor row fetcher for avoid oom
1 parent d48b7bc commit 082e3b7

File tree

15 files changed

+539
-608
lines changed

15 files changed

+539
-608
lines changed

src/query/catalog/src/table.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ pub trait Table: Sync + Send {
7676
false
7777
}
7878

79+
fn supported_lazy_materialize(&self) -> bool {
80+
false
81+
}
82+
7983
fn schema(&self) -> Arc<TableSchema> {
8084
self.get_table_info().schema()
8185
}

src/query/service/src/pipelines/builders/builder_row_fetch.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::sync::Arc;
16-
17-
use databend_common_base::runtime::GlobalIORuntime;
1815
use databend_common_exception::Result;
1916
use databend_common_pipeline_core::processors::InputPort;
2017
use databend_common_pipeline_core::processors::OutputPort;
@@ -24,24 +21,18 @@ use databend_common_pipeline_transforms::processors::create_dummy_item;
2421
use databend_common_sql::executor::physical_plans::RowFetch;
2522
use databend_common_sql::executor::PhysicalPlan;
2623
use databend_common_storages_fuse::operations::row_fetch_processor;
27-
use databend_common_storages_fuse::TableContext;
28-
use tokio::sync::Semaphore;
2924

3025
use crate::pipelines::PipelineBuilder;
3126
impl PipelineBuilder {
3227
pub(crate) fn build_row_fetch(&mut self, row_fetch: &RowFetch) -> Result<()> {
3328
self.build_pipeline(&row_fetch.input)?;
34-
let max_io_requests = self.ctx.get_settings().get_max_storage_io_requests()? as usize;
35-
let row_fetch_runtime = GlobalIORuntime::instance();
36-
let row_fetch_semaphore = Arc::new(Semaphore::new(max_io_requests));
29+
3730
let processor = row_fetch_processor(
3831
self.ctx.clone(),
3932
row_fetch.row_id_col_offset,
4033
&row_fetch.source,
4134
row_fetch.cols_to_fetch.clone(),
4235
row_fetch.need_wrap_nullable,
43-
row_fetch_semaphore,
44-
row_fetch_runtime,
4536
)?;
4637
if !matches!(&*row_fetch.input, PhysicalPlan::MutationSplit(_)) {
4738
self.main_pipeline.add_transform(processor)?;

src/query/settings/src/settings_default.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,11 +145,18 @@ impl DefaultSettings {
145145
}),
146146
("max_block_size", DefaultSettingValue {
147147
value: UserSettingValue::UInt64(65536),
148-
desc: "Sets the maximum byte size of a single data block that can be read.",
148+
desc: "Sets the maximum rows size of a single data block that can be read.",
149149
mode: SettingMode::Both,
150150
scope: SettingScope::Both,
151151
range: Some(SettingRange::Numeric(1..=u64::MAX)),
152152
}),
153+
("max_block_bytes", DefaultSettingValue {
154+
value: UserSettingValue::UInt64(50 * 1024 * 1024),
155+
desc: "Sets the maximum byte size of a single data block that can be read.",
156+
mode: SettingMode::Both,
157+
scope: SettingScope::Both,
158+
range: Some(SettingRange::Numeric(1024 * 1024..=u64::MAX)),
159+
}),
153160
("sequence_step_size", DefaultSettingValue {
154161
value: UserSettingValue::UInt64(65536),
155162
desc: "Sets the sequence step size for nextval function.",

src/query/settings/src/settings_getter_setter.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,10 @@ impl Settings {
188188
self.try_get_u64("max_block_size")
189189
}
190190

191+
pub fn get_max_block_bytes(&self) -> Result<u64> {
192+
self.try_get_u64("max_block_bytes")
193+
}
194+
191195
// Set max_block_size.
192196
pub fn set_max_block_size(&self, val: u64) -> Result<()> {
193197
self.try_set_u64("max_block_size", val)

src/query/sql/src/executor/physical_plans/physical_table_scan.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,16 @@ impl PhysicalPlanBuilder {
147147

148148
let mut prewhere = scan.prewhere.clone();
149149
let mut used: ColumnSet = required.intersection(&columns).cloned().collect();
150-
if scan.is_lazy_table {
150+
151+
let supported_lazy_materialize = {
152+
self.metadata
153+
.read()
154+
.table(scan.table_index)
155+
.table()
156+
.supported_lazy_materialize()
157+
};
158+
159+
if scan.is_lazy_table && supported_lazy_materialize {
151160
let lazy_columns = columns.difference(&used).cloned().collect();
152161
let mut metadata = self.metadata.write();
153162
metadata.set_table_lazy_columns(scan.table_index, lazy_columns);

src/query/sql/src/planner/binder/bind_mutation/mutation_expression.rs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,14 @@ impl MutationExpression {
151151
if *mutation_strategy == MutationStrategy::NotMatchedOnly {
152152
update_stream_columns = false;
153153
}
154-
let is_lazy_table = *mutation_strategy != MutationStrategy::NotMatchedOnly;
154+
let is_lazy_table = {
155+
let metadata = binder.metadata.read();
156+
*mutation_strategy != MutationStrategy::NotMatchedOnly
157+
&& metadata
158+
.table(target_table_index)
159+
.table()
160+
.supported_lazy_materialize()
161+
};
155162
target_s_expr =
156163
Self::update_target_scan(&target_s_expr, is_lazy_table, update_stream_columns)?;
157164

@@ -173,6 +180,12 @@ impl MutationExpression {
173180
)
174181
.await?;
175182

183+
if !is_lazy_table {
184+
for column_index in bind_context.column_set().iter() {
185+
required_columns.insert(*column_index);
186+
}
187+
}
188+
176189
Ok(MutationExpressionBindResult {
177190
input: join_s_expr,
178191
mutation_type,
@@ -276,7 +289,14 @@ impl MutationExpression {
276289
target_table_row_id_index: DUMMY_COLUMN_INDEX,
277290
})
278291
} else {
279-
let is_lazy_table = mutation_type != MutationType::Delete;
292+
let is_lazy_table = {
293+
let metadata = binder.metadata.read();
294+
mutation_type != MutationType::Delete
295+
&& metadata
296+
.table(target_table_index)
297+
.table()
298+
.supported_lazy_materialize()
299+
};
280300
s_expr =
281301
Self::update_target_scan(&s_expr, is_lazy_table, update_stream_columns)?;
282302

@@ -321,6 +341,13 @@ impl MutationExpression {
321341
let mut rewriter = SubqueryDecorrelatorOptimizer::new(opt_ctx, None);
322342
let s_expr = rewriter.optimize_sync(&s_expr)?;
323343

344+
// The delete operation only requires the row ID to locate the row to be deleted and does not need to extract any other columns.
345+
if !is_lazy_table && mutation_type != MutationType::Delete {
346+
for column_index in bind_context.column_set().iter() {
347+
required_columns.insert(*column_index);
348+
}
349+
}
350+
324351
Ok(MutationExpressionBindResult {
325352
input: s_expr,
326353
mutation_type,

src/query/sql/src/planner/binder/bind_mutation/update.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -313,11 +313,11 @@ impl Binder {
313313
};
314314
}
315315

316-
mutation.required_columns = Box::new(
317-
std::iter::once(mutation.row_id_index)
318-
.chain(any_columns.into_iter().map(|c| c.new))
319-
.collect(),
320-
);
316+
// update required columns
317+
for any_column in any_columns {
318+
mutation.required_columns.remove(&any_column.old);
319+
mutation.required_columns.insert(any_column.new);
320+
}
321321

322322
let aggr_expr =
323323
self.bind_aggregate(&mut mutation.bind_context, s_expr.unary_child().clone())?;

src/query/sql/src/planner/binder/select.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,10 @@ impl Binder {
574574
return Ok(());
575575
}
576576

577+
if !metadata.table(0).table().supported_lazy_materialize() {
578+
return Ok(());
579+
}
580+
577581
let cols = metadata.columns();
578582
let virtual_cols = cols
579583
.iter()

src/query/storages/fuse/src/fuse_table.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -769,6 +769,10 @@ impl Table for FuseTable {
769769
column_id >= VECTOR_SCORE_COLUMN_ID
770770
}
771771

772+
fn supported_lazy_materialize(&self) -> bool {
773+
!matches!(self.storage_format, FuseStorageFormat::Native)
774+
}
775+
772776
fn support_column_projection(&self) -> bool {
773777
true
774778
}

src/query/storages/fuse/src/io/read/block/block_reader_merge_io_sync.rs

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashMap;
1516
use std::collections::HashSet;
1617

1718
use databend_common_catalog::plan::PartInfoPtr;
@@ -22,6 +23,7 @@ use databend_storages_common_cache::CacheManager;
2223
use databend_storages_common_cache::TableDataCacheKey;
2324
use databend_storages_common_io::MergeIOReader;
2425
use databend_storages_common_io::ReadSettings;
26+
use databend_storages_common_table_meta::meta::ColumnMeta;
2527

2628
use crate::fuse_part::FuseBlockPartInfo;
2729
use crate::io::BlockReader;
@@ -35,6 +37,24 @@ impl BlockReader {
3537
ignore_column_ids: &Option<HashSet<ColumnId>>,
3638
) -> Result<BlockReadResult> {
3739
let part = FuseBlockPartInfo::from_part(part)?;
40+
let location = &part.location;
41+
let columns_meta = &part.columns_meta;
42+
43+
self.sync_read_columns_data_by_merge_io_2(
44+
settings,
45+
location,
46+
columns_meta,
47+
ignore_column_ids,
48+
)
49+
}
50+
51+
pub fn sync_read_columns_data_by_merge_io_2(
52+
&self,
53+
settings: &ReadSettings,
54+
location: &str,
55+
columns_meta: &HashMap<ColumnId, ColumnMeta>,
56+
ignore_column_ids: &Option<HashSet<ColumnId>>,
57+
) -> Result<BlockReadResult> {
3858
let column_array_cache = CacheManager::instance().get_table_data_array_cache();
3959

4060
let mut ranges = vec![];
@@ -45,9 +65,10 @@ impl BlockReader {
4565
continue;
4666
}
4767
}
48-
let block_path = &part.location;
4968

50-
if let Some(column_meta) = part.columns_meta.get(column_id) {
69+
let block_path = location;
70+
71+
if let Some(column_meta) = columns_meta.get(column_id) {
5172
// first, check column array object cache
5273
let (offset, len) = column_meta.offset_length();
5374
let column_cache_key = TableDataCacheKey::new(block_path, *column_id, offset, len);
@@ -59,12 +80,8 @@ impl BlockReader {
5980
}
6081
}
6182

62-
let merge_io_result = MergeIOReader::sync_merge_io_read(
63-
settings,
64-
self.operator.clone(),
65-
&part.location,
66-
&ranges,
67-
)?;
83+
let merge_io_result =
84+
MergeIOReader::sync_merge_io_read(settings, self.operator.clone(), location, &ranges)?;
6885

6986
// for sync read, we disable table data cache
7087
let cached_column_data = vec![];

0 commit comments

Comments
 (0)