Skip to content

Commit 83a6547

Browse files
authored
fix(query): parquet don't need sort by meta infos (#10982)
* fix(query): parquet don't need sort by meta infos * fix(query): parquet don't need sort by meta infos
1 parent 345f373 commit 83a6547

File tree

2 files changed

+1
-82
lines changed

2 files changed

+1
-82
lines changed

src/query/storages/parquet/src/deserialize_transform.rs

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,6 @@ pub struct ParquetDeserializeTransform {
7777

7878
// Used for remain reading
7979
remain_reader: Arc<ParquetReader>,
80-
// Used for top k optimization
81-
top_k_finished: bool,
8280
}
8381

8482
impl ParquetDeserializeTransform {
@@ -108,8 +106,6 @@ impl ParquetDeserializeTransform {
108106

109107
prewhere_info,
110108
remain_reader,
111-
112-
top_k_finished: false,
113109
},
114110
)))
115111
}
@@ -127,26 +123,6 @@ impl ParquetDeserializeTransform {
127123
self.output_data = Some(data_block);
128124
Ok(())
129125
}
130-
131-
/// check topk should return finished or not
132-
fn check_topn(&mut self) {
133-
if let Some(ParquetPrewhereInfo {
134-
top_k: Some((_, sorter)),
135-
..
136-
}) = &mut self.prewhere_info.as_mut()
137-
{
138-
if let Some(next_part) = self.parts.front() {
139-
let next_part = next_part
140-
.as_any()
141-
.downcast_ref::<ParquetRowGroupPart>()
142-
.unwrap();
143-
144-
if let Some(sort_min_max) = &next_part.sort_min_max {
145-
self.top_k_finished = sorter.never_match(sort_min_max);
146-
}
147-
}
148-
}
149-
}
150126
}
151127

152128
impl Processor for ParquetDeserializeTransform {
@@ -181,26 +157,12 @@ impl Processor for ParquetDeserializeTransform {
181157
return Ok(Event::Sync);
182158
}
183159

184-
if self.top_k_finished {
185-
self.input.finish();
186-
self.output.finish();
187-
return Ok(Event::Finished);
188-
}
189-
190160
if self.input.has_data() {
191161
let mut data_block = self.input.pull_data().unwrap()?;
192162
let source_meta = data_block.take_meta().unwrap();
193163
let source_meta = ParquetSourceMeta::downcast_from(source_meta).unwrap();
194164

195165
self.parts = VecDeque::from(source_meta.parts);
196-
197-
self.check_topn();
198-
if self.top_k_finished {
199-
self.input.finish();
200-
self.output.finish();
201-
return Ok(Event::Finished);
202-
}
203-
204166
self.data_readers = VecDeque::from(source_meta.readers);
205167
return Ok(Event::Sync);
206168
}

src/query/storages/parquet/src/pruning.rs

Lines changed: 1 addition & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,6 @@ impl PartitionPruner {
121121
// 2. Use file meta to prune row groups or pages.
122122

123123
// If one row group does not have stats, we cannot use the stats for topk optimization.
124-
let mut all_have_minmax = true;
125-
126124
for (file_id, file_meta) in file_metas.iter().enumerate() {
127125
partitions_total += file_meta.row_groups.len();
128126
let mut row_group_pruned = vec![false; file_meta.row_groups.len()];
@@ -159,9 +157,6 @@ impl PartitionPruner {
159157
None
160158
};
161159

162-
// If one row group does not have stats, we cannot use the stats for topk optimization.
163-
all_have_minmax &= row_group_stats.is_some();
164-
165160
for (rg_idx, rg) in file_meta.row_groups.iter().enumerate() {
166161
if row_group_pruned[rg_idx] {
167162
continue;
@@ -221,45 +216,7 @@ impl PartitionPruner {
221216
}
222217
}
223218

224-
// 3. Check if can conduct topk push down optimization.
225-
// Only all row groups have min/max stats can we use topk optimization.
226-
// If we can use topk optimization, we should use `PartitionsShuffleKind::Seq`.
227-
let partition_kind = if let (Some((top_k, _)), true) = (top_k, all_have_minmax) {
228-
partitions.sort_by(|a, b| {
229-
let (a_min, a_max) = a
230-
.column_metas
231-
.get(&(top_k.column_id as usize))
232-
.unwrap()
233-
.min_max
234-
.as_ref()
235-
.unwrap();
236-
let (b_min, b_max) = b
237-
.column_metas
238-
.get(&(top_k.column_id as usize))
239-
.unwrap()
240-
.min_max
241-
.as_ref()
242-
.unwrap();
243-
244-
if top_k.asc {
245-
(a_min.as_ref(), a_max.as_ref()).cmp(&(b_min.as_ref(), b_max.as_ref()))
246-
} else {
247-
(b_max.as_ref(), b_min.as_ref()).cmp(&(a_max.as_ref(), a_min.as_ref()))
248-
}
249-
});
250-
for part in partitions.iter_mut() {
251-
part.sort_min_max = part
252-
.column_metas
253-
.get(&(top_k.column_id as usize))
254-
.unwrap()
255-
.min_max
256-
.clone();
257-
}
258-
PartitionsShuffleKind::Seq
259-
} else {
260-
PartitionsShuffleKind::Mod
261-
};
262-
219+
let partition_kind = PartitionsShuffleKind::Mod;
263220
let partitions = partitions
264221
.into_iter()
265222
.map(|p| p.convert_to_part_info())

0 commit comments

Comments
 (0)