Skip to content

Commit dc25c75

Browse files
authored
chore(query): refactor parquet part to make reader work on parallel (#17987)
* chore(query): refactor parquet part to make reader work on parallel * chore(query): refactor parquet part to make reader work on parallel * chore(query): refactor parquet part to make reader work on parallel * increase timeout * update * update * update * update
1 parent 5fbd4c0 commit dc25c75

File tree

11 files changed

+86
-17
lines changed

11 files changed

+86
-17
lines changed

src/query/settings/src/settings_default.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -661,7 +661,13 @@ impl DefaultSettings {
661661
scope: SettingScope::Both,
662662
range: Some(SettingRange::Numeric(0..=u64::MAX)),
663663
}),
664-
664+
("parquet_rowgroup_hint_bytes", DefaultSettingValue {
665+
value: UserSettingValue::UInt64(128 * 1024 * 1024),
666+
desc: "Parquet file is very large, we will divide it into multiple rowgroups to read, the config is the hint bytes of each rowgroup, Default value: 128MB",
667+
mode: SettingMode::Both,
668+
scope: SettingScope::Both,
669+
range: Some(SettingRange::Numeric(1024 * 1024..=u64::MAX)),
670+
}),
665671
// enterprise license related settings
666672
("enterprise_license", DefaultSettingValue {
667673
value: UserSettingValue::String("".to_owned()),

src/query/settings/src/settings_getter_setter.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -518,6 +518,10 @@ impl Settings {
518518
self.try_get_u64("parquet_fast_read_bytes")
519519
}
520520

521+
pub fn get_parquet_rowgroup_hint_bytes(&self) -> Result<u64> {
522+
self.try_get_u64("parquet_rowgroup_hint_bytes")
523+
}
524+
521525
pub fn get_enable_table_lock(&self) -> Result<bool> {
522526
Ok(self.try_get_u64("enable_table_lock")? != 0)
523527
}

src/query/storages/delta/src/table.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,8 @@ impl DeltaTable {
344344
file: add.path.clone(),
345345
compressed_size: add.size as u64,
346346
estimated_uncompressed_size: add.size as u64, // This field is not used here.
347-
dedup_key: format!("{}_{}", add.modification_time, add.size)
347+
dedup_key: format!("{}_{}", add.modification_time, add.size),
348+
bucket_option: None,
348349
},
349350
),
350351
}) as _))

src/query/storages/iceberg/src/partition.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ pub(crate) fn convert_file_scan_task(task: iceberg::scan::FileScanTask) -> Box<d
3333
compressed_size: task.length,
3434
estimated_uncompressed_size: task.length * 5,
3535
dedup_key: format!("{}_{}", task.data_file_path, task.length),
36+
bucket_option: None,
3637
};
3738

3839
if !task.deletes.is_empty() {

src/query/storages/parquet/src/parquet_part.rs

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,27 @@ pub struct ParquetFilePart {
6363
pub file: String,
6464
pub compressed_size: u64,
6565
pub estimated_uncompressed_size: u64,
66+
// used to cache parquet metadata
6667
pub dedup_key: String,
68+
69+
// For large parquet files, we will split the file into multiple parts
70+
// But we don't read metadata during plan stage, so we split them by 128MB into buckets
71+
// (bucket_idx, bucket_num)
72+
pub bucket_option: Option<(usize, usize)>,
6773
}
6874

6975
impl ParquetFilePart {
7076
pub fn compressed_size(&self) -> u64 {
71-
self.compressed_size
77+
match self.bucket_option {
78+
Some((_, num)) => self.compressed_size / num as u64,
79+
None => self.compressed_size,
80+
}
7281
}
7382
pub fn uncompressed_size(&self) -> u64 {
74-
self.estimated_uncompressed_size
83+
match self.bucket_option {
84+
Some((_, num)) => self.estimated_uncompressed_size / num as u64,
85+
None => self.estimated_uncompressed_size,
86+
}
7587
}
7688
}
7789

@@ -170,6 +182,7 @@ pub(crate) fn collect_small_file_parts(
170182
compressed_size: size,
171183
estimated_uncompressed_size: (size as f64 / max_compression_ratio) as u64,
172184
dedup_key,
185+
bucket_option: None,
173186
})
174187
.collect::<Vec<_>>();
175188

@@ -191,6 +204,7 @@ pub(crate) fn collect_file_parts(
191204
stats: &mut PartStatistics,
192205
num_columns_to_read: usize,
193206
total_columns_to_read: usize,
207+
rowgroup_hint_bytes: u64,
194208
) {
195209
for (file, size, dedup_key) in files.into_iter() {
196210
stats.read_bytes += size as usize;
@@ -199,20 +213,24 @@ pub(crate) fn collect_file_parts(
199213
(size as f64) * (num_columns_to_read as f64) / (total_columns_to_read as f64);
200214

201215
let estimated_uncompressed_size = read_bytes * compress_ratio;
216+
let bucket_num = size.div_ceil(rowgroup_hint_bytes) as usize;
217+
for bucket in 0..bucket_num {
218+
partitions
219+
.partitions
220+
.push(Arc::new(Box::new(ParquetPart::File(ParquetFilePart {
221+
file: file.clone(),
222+
compressed_size: size,
223+
estimated_uncompressed_size: estimated_uncompressed_size as u64,
224+
dedup_key: dedup_key.clone(),
225+
bucket_option: Some((bucket, bucket_num)),
226+
})) as Box<dyn PartInfo>));
202227

203-
partitions
204-
.partitions
205-
.push(Arc::new(Box::new(ParquetPart::File(ParquetFilePart {
206-
file,
207-
compressed_size: size,
208-
estimated_uncompressed_size: estimated_uncompressed_size as u64,
209-
dedup_key,
210-
})) as Box<dyn PartInfo>));
228+
stats.partitions_scanned += 1;
229+
stats.partitions_total += 1;
230+
}
211231

212232
stats.read_bytes += read_bytes as usize;
213233
stats.read_rows += estimated_read_rows as usize;
214234
stats.is_exact = false;
215-
stats.partitions_scanned += 1;
216-
stats.partitions_total += 1;
217235
}
218236
}

src/query/storages/parquet/src/parquet_table/partition.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ impl ParquetTable {
8383
let mut stats = PartStatistics::default();
8484

8585
let fast_read_bytes = ctx.get_settings().get_parquet_fast_read_bytes()?;
86+
let rowgroup_hint_bytes = ctx.get_settings().get_parquet_rowgroup_hint_bytes()?;
87+
8688
let mut large_files = vec![];
8789
let mut small_files = vec![];
8890
for (location, size, dedup_key) in file_locations.into_iter() {
@@ -100,6 +102,7 @@ impl ParquetTable {
100102
&mut stats,
101103
num_columns_to_read,
102104
self.schema().num_fields(),
105+
rowgroup_hint_bytes,
103106
);
104107

105108
if !small_files.is_empty() {

src/query/storages/parquet/src/source.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,13 @@ impl ParquetSource {
407407
Cow::Borrowed(&self.row_group_reader)
408408
};
409409

410+
let should_read = |rowgroup_idx: usize, bucket_option: Option<(usize, usize)>| -> bool {
411+
if let Some((bucket, bucket_num)) = bucket_option {
412+
return rowgroup_idx % bucket_num == bucket;
413+
}
414+
true
415+
};
416+
410417
let mut start_row = 0;
411418
let mut readers = VecDeque::with_capacity(meta.num_row_groups());
412419
// Deleted files only belong to the same Parquet, so they only need to be loaded once
@@ -415,7 +422,13 @@ impl ParquetSource {
415422
.as_ref()
416423
.map(|files| (meta.as_ref(), files.as_slice()));
417424

418-
for rg in meta.row_groups() {
425+
for (rowgroup_idx, rg) in meta.row_groups().iter().enumerate() {
426+
start_row += rg.num_rows() as u64;
427+
// filter by bucket option
428+
if !should_read(rowgroup_idx, part.bucket_option) {
429+
continue;
430+
}
431+
419432
let part = ParquetRowGroupPart {
420433
location: part.file.clone(),
421434
start_row,
@@ -428,7 +441,6 @@ impl ParquetSource {
428441
page_locations: None,
429442
selectors: None,
430443
};
431-
start_row += rg.num_rows() as u64;
432444

433445
let reader = reader
434446
.create_read_policy(

src/query/storages/result_cache/src/table_function/table.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ impl Table for ResultScan {
126126
compressed_size: self.file_size,
127127
estimated_uncompressed_size: self.file_size,
128128
dedup_key: format!("{}_{}", self.location, self.file_size),
129+
bucket_option: None,
129130
});
130131

131132
let part_info: Box<dyn PartInfo> = Box::new(part);

tests/databend-test

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -728,7 +728,7 @@ if __name__ == "__main__":
728728
"-t",
729729
"--timeout",
730730
type=int,
731-
default=600,
731+
default=900,
732732
help="Timeout for each test case in seconds",
733733
)
734734
parser.add_argument(

tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_00_basic.result

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,8 @@
1515
2 70
1616
1 [1,2,3]
1717
2 {"k":"v"}
18+
--- large parquet file should be worked on parallel by rowgroups
19+
5000000 213888890
20+
├── partitions total: 3
21+
├── partitions scanned: 3
22+
5000000 213888890

0 commit comments

Comments
 (0)