Skip to content

Commit 378c6cc

Browse files
authored
feat(query): support shuffle sort (#17853)
* reservoir_sampling Signed-off-by: coldWater <[email protected]> * Simpler Signed-off-by: coldWater <[email protected]> * TransformSortSimple Signed-off-by: coldWater <[email protected]> * rename * fix * trait Spill * execute * move * update * bounds * wait * remove * scalar * exchange * update * test Signed-off-by: coldWater <[email protected]> * x Signed-off-by: coldWater <[email protected]> * route Signed-off-by: coldWater <[email protected]> * builder Signed-off-by: coldWater <[email protected]> * update Signed-off-by: coldWater <[email protected]> * build Signed-off-by: coldWater <[email protected]> * rename Signed-off-by: coldWater <[email protected]> * fix Signed-off-by: coldWater <[email protected]> * fix Signed-off-by: coldWater <[email protected]> * fix Signed-off-by: coldWater <[email protected]> * fix Signed-off-by: coldWater <[email protected]> * fix Signed-off-by: coldWater <[email protected]> * update Signed-off-by: coldWater <[email protected]> * fix Signed-off-by: coldWater <[email protected]> * fix Signed-off-by: coldWater <[email protected]> * fix Signed-off-by: coldWater <[email protected]> * x * logical plan * x * x * x * x * x * x * x * x * x * x * x * format * x * x * allow_adjust_parallelism * InputBoundStream * BoundedMultiSortMergeProcessor * build_bounded_merge_sort * bound_index * SortBoundEdge * route * clean up * enable * fix * fix * fix * test * fix * fix * fix * fix * fix * fix * force_disable_distributed_optimization * trace and refine * default_num_merge --------- Signed-off-by: coldWater <[email protected]>
1 parent 73bfbb9 commit 378c6cc

File tree

97 files changed

+4407
-1018
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

97 files changed

+4407
-1018
lines changed

.github/workflows/reuse.sqllogic.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,8 @@ jobs:
219219
tests:
220220
- { dirs: "query", runner: "4c16g" }
221221
- { dirs: "duckdb", runner: "4c16g" }
222-
- { dirs: "crdb", runner: "2c8g" }
223-
- { dirs: "base", runner: "2c8g" }
222+
- { dirs: "crdb", runner: "2c8g", parallel: "2" }
223+
- { dirs: "base", runner: "2c8g", parallel: "2" }
224224
- { dirs: "ydb", runner: "2c8g" }
225225
- { dirs: "tpcds", runner: "2c8g" }
226226
- { dirs: "tpch", runner: "2c8g" }

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/base/src/base/watch_notify.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,16 @@ impl WatchNotify {
4141
let _ = rx.changed().await;
4242
}
4343

44+
pub fn has_notified(&self) -> bool {
45+
match self.rx.has_changed() {
46+
Ok(b) => b,
47+
Err(_) => {
48+
// The sender has never dropped before
49+
unreachable!()
50+
}
51+
}
52+
}
53+
4454
pub fn notify_waiters(&self) {
4555
let _ = self.tx.send_replace(true);
4656
}
@@ -61,11 +71,18 @@ mod tests {
6171
#[tokio::test]
6272
async fn test_notify_waiters_ahead() {
6373
let notify = WatchNotify::new();
74+
assert!(!notify.has_notified());
75+
let notified1 = notify.notified();
76+
assert!(!notify.has_notified());
77+
6478
// notify_waiters ahead of notified being instantiated and awaited
6579
notify.notify_waiters();
66-
80+
assert!(notify.has_notified());
6781
// this should not await indefinitely
68-
let notified = notify.notified();
69-
notified.await;
82+
let notified2 = notify.notified();
83+
notified2.await;
84+
85+
notified1.await;
86+
assert!(notify.has_notified());
7087
}
7188
}

src/query/expression/src/block.rs

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -332,10 +332,34 @@ pub trait BlockMetaInfoDowncast: Sized + BlockMetaInfo {
332332
let boxed = boxed.as_ref() as &dyn Any;
333333
boxed.downcast_ref()
334334
}
335+
336+
fn downcast_from_err(boxed: BlockMetaInfoPtr) -> std::result::Result<Self, BlockMetaInfoPtr> {
337+
if (boxed.as_ref() as &dyn Any).is::<Self>() {
338+
Ok(*(boxed as Box<dyn Any>).downcast().unwrap())
339+
} else {
340+
Err(boxed)
341+
}
342+
}
343+
344+
fn downcast_mut(boxed: &mut BlockMetaInfoPtr) -> Option<&mut Self> {
345+
let boxed = boxed.as_mut() as &mut dyn Any;
346+
boxed.downcast_mut()
347+
}
335348
}
336349

337350
impl<T: BlockMetaInfo> BlockMetaInfoDowncast for T {}
338351

352+
#[typetag::serde(name = "empty")]
353+
impl BlockMetaInfo for () {
354+
fn equals(&self, info: &Box<dyn BlockMetaInfo>) -> bool {
355+
<() as BlockMetaInfoDowncast>::downcast_ref_from(info).is_some()
356+
}
357+
358+
fn clone_self(&self) -> Box<dyn BlockMetaInfo> {
359+
Box::new(())
360+
}
361+
}
362+
339363
impl DataBlock {
340364
#[inline]
341365
pub fn new(entries: Vec<BlockEntry>, num_rows: usize) -> Self {
@@ -431,11 +455,6 @@ impl DataBlock {
431455
DataBlock::new_with_meta(vec![], 0, Some(meta))
432456
}
433457

434-
#[inline]
435-
pub fn take_meta(&mut self) -> Option<BlockMetaInfoPtr> {
436-
self.meta.take()
437-
}
438-
439458
#[inline]
440459
pub fn columns(&self) -> &[BlockEntry] {
441460
&self.entries
@@ -702,6 +721,16 @@ impl DataBlock {
702721
})
703722
}
704723

724+
#[inline]
725+
pub fn take_meta(&mut self) -> Option<BlockMetaInfoPtr> {
726+
self.meta.take()
727+
}
728+
729+
#[inline]
730+
pub fn mut_meta(&mut self) -> Option<&mut BlockMetaInfoPtr> {
731+
self.meta.as_mut()
732+
}
733+
705734
#[inline]
706735
pub fn replace_meta(&mut self, meta: BlockMetaInfoPtr) {
707736
self.meta.replace(meta);

src/query/expression/src/types/array.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ impl<T: AccessType> AccessType for ArrayType<T> {
5454
scalar.clone()
5555
}
5656

57-
fn try_downcast_scalar<'a>(scalar: &'a ScalarRef) -> Option<Self::ScalarRef<'a>> {
57+
fn try_downcast_scalar<'a>(scalar: &ScalarRef<'a>) -> Option<Self::ScalarRef<'a>> {
5858
match scalar {
5959
ScalarRef::Array(array) => T::try_downcast_column(array),
6060
_ => None,

src/query/expression/src/types/boolean.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ impl AccessType for BooleanType {
5050
*scalar
5151
}
5252

53-
fn try_downcast_scalar<'a>(scalar: &'a ScalarRef) -> Option<Self::ScalarRef<'a>> {
53+
fn try_downcast_scalar<'a>(scalar: &ScalarRef<'a>) -> Option<Self::ScalarRef<'a>> {
5454
match scalar {
5555
ScalarRef::Boolean(scalar) => Some(*scalar),
5656
_ => None,

src/query/expression/src/types/map.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ impl<K: AccessType, V: AccessType> AccessType for MapType<K, V> {
451451
MapInternal::<K, V>::to_scalar_ref(scalar)
452452
}
453453

454-
fn try_downcast_scalar<'a>(scalar: &'a ScalarRef) -> Option<Self::ScalarRef<'a>> {
454+
fn try_downcast_scalar<'a>(scalar: &ScalarRef<'a>) -> Option<Self::ScalarRef<'a>> {
455455
match scalar {
456456
ScalarRef::Map(array) => KvPair::<K, V>::try_downcast_column(array),
457457
_ => None,

src/query/pipeline/core/src/pipeline.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -460,15 +460,29 @@ impl Pipeline {
460460
self.sinks = new_sinks;
461461
}
462462

463-
pub fn exchange<T: Exchange>(&mut self, n: usize, exchange: Arc<T>) {
463+
pub fn exchange<T: Exchange>(&mut self, n: usize, exchange: Arc<T>) -> Result<()> {
464+
self.exchange_with_merge(n, exchange.clone(), |inputs, output| {
465+
Ok(MergePartitionProcessor::create(
466+
inputs,
467+
output,
468+
exchange.clone(),
469+
))
470+
})
471+
}
472+
473+
pub fn exchange_with_merge<T, F>(&mut self, n: usize, exchange: Arc<T>, f: F) -> Result<()>
474+
where
475+
T: Exchange,
476+
F: Fn(Vec<Arc<InputPort>>, Arc<OutputPort>) -> Result<ProcessorPtr>,
477+
{
464478
if self.sinks.is_empty() {
465-
return;
479+
return Ok(());
466480
}
467481

468482
let input_len = self.sinks.len();
469483
let mut items = Vec::with_capacity(input_len);
470484

471-
for _index in 0..input_len {
485+
for _ in 0..input_len {
472486
let input = InputPort::create();
473487
let outputs: Vec<_> = (0..n).map(|_| OutputPort::create()).collect();
474488
items.push(PipeItem::create(
@@ -493,14 +507,15 @@ impl Pipeline {
493507
let output = OutputPort::create();
494508
let inputs: Vec<_> = (0..input_len).map(|_| InputPort::create()).collect();
495509
items.push(PipeItem::create(
496-
MergePartitionProcessor::create(inputs.clone(), output.clone(), exchange.clone()),
510+
f(inputs.clone(), output.clone())?,
497511
inputs,
498512
vec![output],
499513
));
500514
}
501515

502516
// merge partition
503-
self.add_pipe(Pipe::create(input_len * n, n, items))
517+
self.add_pipe(Pipe::create(input_len * n, n, items));
518+
Ok(())
504519
}
505520

506521
#[track_caller]

src/query/pipeline/transforms/src/processors/transforms/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ mod transform_accumulating;
1818
mod transform_accumulating_async;
1919
mod transform_async;
2020
mod transform_blocking;
21+
mod transform_blocking_async;
2122
mod transform_compact_block;
2223
mod transform_compact_builder;
2324
mod transform_compact_no_split_builder;
2425
mod transform_dummy;
26+
mod transform_hook;
2527
mod transform_k_way_merge_sort;
2628
mod transform_multi_sort_merge;
2729
mod transform_pipeline_helper;
@@ -36,12 +38,14 @@ pub use transform_accumulating::*;
3638
pub use transform_accumulating_async::*;
3739
pub use transform_async::*;
3840
pub use transform_blocking::*;
41+
pub use transform_blocking_async::*;
3942
pub use transform_compact_block::*;
4043
pub use transform_compact_builder::*;
4144
pub use transform_compact_no_split_builder::*;
4245
pub use transform_dummy::*;
46+
pub use transform_hook::*;
4347
pub use transform_k_way_merge_sort::*;
44-
pub use transform_multi_sort_merge::try_add_multi_sort_merge;
48+
pub use transform_multi_sort_merge::*;
4549
pub use transform_pipeline_helper::TransformPipelineHelper;
4650
pub use transform_retry_async::*;
4751
pub use transform_sort_merge::sort_merge;

src/query/pipeline/transforms/src/processors/transforms/sort/rows/common.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,17 @@ impl Rows for BinaryColumn {
5858
fn slice(&self, range: Range<usize>) -> Self {
5959
self.slice(range)
6060
}
61+
62+
fn scalar_as_item<'a>(s: &'a Scalar) -> Self::Item<'a> {
63+
match s {
64+
Scalar::Binary(s) => s,
65+
_ => unreachable!(),
66+
}
67+
}
68+
69+
fn owned_item(item: Self::Item<'_>) -> Scalar {
70+
Scalar::Binary(Vec::from(item))
71+
}
6172
}
6273

6374
impl RowConverter<BinaryColumn> for CommonRowConverter {

0 commit comments

Comments
 (0)