Skip to content
Merged
4 changes: 2 additions & 2 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ Unit tests stay close to the affected crate (`#[cfg(test)]` modules), and integr

## Commit & Pull Request Guidelines

History follows a Conventional-style subject such as `fix(storage): avoid stale snapshot (#19174)` or `feat: support self join elimination (#19169)`; keep the first line imperative and under 72 characters. Commits should stay scoped to a logical change set and include formatting/linting updates in the same patch. PRs must outline motivation, implementation notes, and validation commands, plus link issues or RFCs, and the final description should follow `PULL_REQUEST_TEMPLATE.md` (checkboxes, verification, screenshots when needed). Attach screenshots or sample queries when UI, SQL plans, or system tables change, and call out rollout risks (migrations, config toggles, backfills) so reviewers can plan accordingly.
History follows a Conventional-style subject such as `fix(storage): avoid stale snapshot (#19174)` or `feat(storage): support self join elimination (#19169)`; keep the first line imperative and under 72 characters. Commits should stay scoped to a logical change set and include formatting/linting updates in the same patch. PRs must outline motivation, implementation notes, and validation commands, plus link issues or RFCs, and the final description should follow `PULL_REQUEST_TEMPLATE.md` (checkboxes, verification, screenshots when needed). Attach screenshots or sample queries when UI, SQL plans, or system tables change, and call out rollout risks (migrations, config toggles, backfills) so reviewers can plan accordingly.

There is the example of pull requests:
There is the example of pull requests, you must follow the style strictly:

````
I hereby agree to the terms of the CLA available at: https://docs.databend.com/dev/policies/cla/
Expand Down
125 changes: 110 additions & 15 deletions src/query/storages/common/pruner/src/topn_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp::Ordering;
use std::sync::Arc;

use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::RemoteExpr;
use databend_common_expression::SEARCH_SCORE_COL_NAME;
use databend_common_expression::Scalar;
use databend_common_expression::TableDataType;
use databend_common_expression::TableSchemaRef;
use databend_common_expression::types::number::F32;
use databend_storages_common_table_meta::meta::BlockMeta;
use databend_storages_common_table_meta::meta::ColumnStatistics;

use crate::BlockMetaIndex;

Expand Down Expand Up @@ -205,17 +208,7 @@ impl TopNPruner {
}
Ok(pruned_metas)
} else {
id_stats.sort_by(|a, b| {
if a.1.null_count + b.1.null_count != 0 && *nulls_first {
return a.1.null_count.cmp(&b.1.null_count).reverse();
}
// no nulls
if *asc {
a.1.min().cmp(b.1.min())
} else {
a.1.max().cmp(b.1.max()).reverse()
}
});
id_stats.sort_by(|a, b| compare_block_stats(&a.1, &b.1, *asc, *nulls_first));

let pruned_metas = id_stats
.into_iter()
Expand Down Expand Up @@ -310,6 +303,61 @@ fn block_score_range(scores: &[F32]) -> Option<(F32, F32)> {
Some((min_score, max_score))
}

fn compare_scalar_for_sorting(
left: &Scalar,
right: &Scalar,
asc: bool,
nulls_first: bool,
) -> Ordering {
let left_is_null = matches!(left, Scalar::Null);
let right_is_null = matches!(right, Scalar::Null);

if left_is_null && right_is_null {
return Ordering::Equal;
}

if left_is_null {
return if nulls_first {
Ordering::Less
} else {
Ordering::Greater
};
}

if right_is_null {
return if nulls_first {
Ordering::Greater
} else {
Ordering::Less
};
}

if asc {
left.cmp(right)
} else {
left.cmp(right).reverse()
}
}

fn compare_block_stats(
left: &ColumnStatistics,
right: &ColumnStatistics,
asc: bool,
nulls_first: bool,
) -> Ordering {
if nulls_first && (left.null_count + right.null_count != 0) {
return left.null_count.cmp(&right.null_count).reverse();
}

let (left_scalar, right_scalar) = if asc {
(left.min(), right.min())
} else {
(left.max(), right.max())
};

compare_scalar_for_sorting(left_scalar, right_scalar, asc, nulls_first)
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;
Expand Down Expand Up @@ -407,18 +455,65 @@ mod tests {
assert_eq!(kept_blocks, vec![0, 1]);
}

#[test]
fn test_prune_topn_respects_nulls_last_desc() {
let schema = Arc::new(TableSchema::new(vec![TableField::new(
"c",
TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::Int64))),
)]));
let sort_expr = RemoteExpr::ColumnRef {
span: None,
id: "c".to_string(),
data_type: DataType::Nullable(Box::new(DataType::Number(NumberDataType::Int64))),
display_name: "c".to_string(),
};
let column_id = schema.column_id_of("c").unwrap();

let metas = vec![
build_null_block(column_id, 0, 5),
build_block(column_id, 1, 100, 200, 5),
];

let pruner = TopNPruner::create(
schema.clone(),
vec![(sort_expr.clone(), false, false)],
1,
false,
);
let result = pruner.prune(metas).unwrap();
let kept_blocks: Vec<_> = result.iter().map(|(idx, _)| idx.block_id).collect();
assert_eq!(kept_blocks, vec![1]);
}

fn build_block(
column_id: ColumnId,
block_id: usize,
min: i64,
max: i64,
matched_rows: usize,
) -> (BlockMetaIndex, Arc<BlockMeta>) {
let column_stats = ColumnStatistics::new(Scalar::from(min), Scalar::from(max), 0, 0, None);
build_block_with_stats(column_id, block_id, column_stats, matched_rows)
}

fn build_null_block(
column_id: ColumnId,
block_id: usize,
matched_rows: usize,
) -> (BlockMetaIndex, Arc<BlockMeta>) {
let column_stats =
ColumnStatistics::new(Scalar::Null, Scalar::Null, matched_rows as u64, 0, None);
build_block_with_stats(column_id, block_id, column_stats, matched_rows)
}

fn build_block_with_stats(
column_id: ColumnId,
block_id: usize,
column_stats: ColumnStatistics,
matched_rows: usize,
) -> (BlockMetaIndex, Arc<BlockMeta>) {
let mut col_stats = HashMap::new();
col_stats.insert(
column_id,
ColumnStatistics::new(Scalar::from(min), Scalar::from(max), 0, 0, None),
);
col_stats.insert(column_id, column_stats);

let column_metas: HashMap<ColumnId, ColumnMeta> = HashMap::new();

Expand Down
52 changes: 52 additions & 0 deletions tests/sqllogictests/suites/query/order.test
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,55 @@ unset force_sort_data_spill;

statement ok
unset spilling_file_format;

statement ok
set enable_compact_after_write = 0;

statement ok
drop table if exists order_null_ts;

statement ok
create table order_null_ts (
ts timestamp null,
level varchar null,
message varchar null,
extra variant null
);

statement ok
insert into order_null_ts select null, 'null_level', null, null from numbers(3);

statement ok
insert into order_null_ts select
'2024-01-01 00:00:00', 'info', 'first', null from numbers(3);

statement ok
insert into order_null_ts select
'2024-02-01 00:00:00', 'warn', 'second', null from numbers(2);

query IIII
select ts, level, message, extra from order_null_ts order by ts desc limit 1;
----
2024-02-01 00:00:00.000000 warn second NULL


query IIII
select ts, level, message, extra from order_null_ts order by ts desc limit 2;
----
2024-02-01 00:00:00.000000 warn second NULL
2024-02-01 00:00:00.000000 warn second NULL


query IIII
select ts, level, message, extra from order_null_ts order by ts desc limit 3;
----
2024-02-01 00:00:00.000000 warn second NULL
2024-02-01 00:00:00.000000 warn second NULL
2024-01-01 00:00:00.000000 info first NULL


statement ok
drop table order_null_ts;

statement ok
unset enable_compact_after_write;
Loading