Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ CARGO ?= cargo
WASM_PACK ?= wasm-pack
SQLLOGIC_PATH ?= tests/slt/**/*.slt

.PHONY: test test-wasm test-slt test-all wasm-build
.PHONY: test test-wasm test-slt test-all wasm-build check tpcc

## Run default Rust tests in the current environment (non-WASM).
test:
Expand All @@ -19,7 +19,16 @@ test-wasm:

## Run the sqllogictest harness against the configured .slt suite.
test-slt:
$(CARGO) run -p sqllogictest-test -- --path "$(SQLLOGIC_PATH)"
$(CARGO) run -p sqllogictest-test -- --path '$(SQLLOGIC_PATH)'

## Convenience target to run every suite in sequence.
test-all: test test-wasm test-slt

## Run formatting (check mode) and clippy linting together.
check:
$(CARGO) fmt --all -- --check
$(CARGO) clippy --all-targets --all-features -- -D warnings

## Execute the TPCC workload example as a standalone command.
tpcc:
$(CARGO) run -p tpcc --release
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ run `cargo run -p tpcc --release` to run tpcc
- Tips: TPC-C currently only supports single thread
```shell
<90th Percentile RT (MaxRT)>
New-Order : 0.002 (0.018)
Payment : 0.001 (0.024)
Order-Status : 0.050 (0.067)
Delivery : 0.021 (0.030)
Stock-Level : 0.003 (0.005)
New-Order : 0.002 (0.005)
Payment : 0.001 (0.003)
Order-Status : 0.057 (0.088)
Delivery : 0.001 (0.001)
Stock-Level : 0.002 (0.006)
<TpmC>
8101 Tpmc
11125 Tpmc
```
#### 👉[check more](tpcc/README.md)

Expand Down
7 changes: 6 additions & 1 deletion src/execution/dql/index_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub(crate) struct IndexScan {
index_by: IndexMetaRef,
ranges: Vec<Range>,
covered_deserializers: Option<Vec<TupleValueSerializableImpl>>,
cover_mapping: Option<Vec<usize>>,
}

impl
Expand All @@ -33,14 +34,16 @@ impl
IndexMetaRef,
Range,
Option<Vec<TupleValueSerializableImpl>>,
Option<Vec<usize>>,
)> for IndexScan
{
fn from(
(op, index_by, range, covered_deserializers): (
(op, index_by, range, covered_deserializers, cover_mapping): (
TableScanOperator,
IndexMetaRef,
Range,
Option<Vec<TupleValueSerializableImpl>>,
Option<Vec<usize>>,
),
) -> Self {
let ranges = match range {
Expand All @@ -53,6 +56,7 @@ impl
index_by,
ranges,
covered_deserializers,
cover_mapping,
}
}
}
Expand Down Expand Up @@ -83,6 +87,7 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for IndexScan {
self.ranges,
with_pk,
self.covered_deserializers,
self.cover_mapping,
)
);

Expand Down
3 changes: 2 additions & 1 deletion src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,10 @@ pub fn build_read<'a, T: Transaction + 'a>(
meta,
range: Some(range),
covered_deserializers,
cover_mapping,
})) = plan.physical_option
{
IndexScan::from((op, meta, range, covered_deserializers))
IndexScan::from((op, meta, range, covered_deserializers, cover_mapping))
.execute(cache, transaction)
} else {
SeqScan::from(op).execute(cache, transaction)
Expand Down
1 change: 1 addition & 0 deletions src/optimizer/core/memo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ mod tests {
}
])),
covered_deserializers: None,
cover_mapping: None,
}))
);

Expand Down
212 changes: 198 additions & 14 deletions src/optimizer/rule/normalization/pushdown_predicates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ impl NormalizationRule for PushPredicateIntoScan {
meta,
range,
covered_deserializers,
cover_mapping,
} in &mut scan_op.index_infos
{
if range.is_some() {
Expand All @@ -265,26 +266,37 @@ impl NormalizationRule for PushPredicateIntoScan {
}
changed = true;

let mut deserializers = Vec::with_capacity(meta.column_ids.len());
let mut cover_count = 0;
*covered_deserializers = None;
*cover_mapping = None;

// try index covered
let mut mapping_slots = vec![usize::MAX; scan_op.columns.len()];
let mut needs_mapping = false;
let index_column_types = match &meta.value_ty {
LogicalType::Tuple(tys) => tys,
ty => slice::from_ref(ty),
};
for (i, column_id) in meta.column_ids.iter().enumerate() {
for column in scan_op.columns.values() {
deserializers.push(
if column.id().map(|id| id == *column_id).unwrap_or(false) {
cover_count += 1;
column.datatype().serializable()
} else {
index_column_types[i].skip_serializable()
},
);
let mut deserializers = Vec::with_capacity(meta.column_ids.len());

for (idx, column_id) in meta.column_ids.iter().enumerate() {
if let Some((scan_idx, column)) =
scan_op.columns.values().enumerate().find(|(_, column)| {
column.id().map(|id| id == *column_id).unwrap_or(false)
})
{
mapping_slots[scan_idx] = idx;
needs_mapping |= scan_idx != idx;
deserializers.push(column.datatype().serializable());
} else {
deserializers.push(index_column_types[idx].skip_serializable());
}
}
if cover_count == scan_op.columns.len() {

if mapping_slots.iter().all(|slot| *slot != usize::MAX) {
*covered_deserializers = Some(deserializers);
if needs_mapping {
*cover_mapping = Some(mapping_slots);
}
}
}
return Ok(changed);
Expand Down Expand Up @@ -354,17 +366,24 @@ impl PushPredicateIntoScan {
#[cfg(all(test, not(target_arch = "wasm32")))]
mod tests {
use crate::binder::test::build_t1_table;
use crate::catalog::{ColumnCatalog, ColumnDesc, ColumnRef, TableName};
use crate::errors::DatabaseError;
use crate::expression::range_detacher::Range;
use crate::expression::{BinaryOperator, ScalarExpression};
use crate::optimizer::heuristic::batch::HepBatchStrategy;
use crate::optimizer::heuristic::optimizer::HepOptimizer;
use crate::optimizer::rule::normalization::NormalizationRuleImpl;
use crate::planner::operator::filter::FilterOperator;
use crate::planner::operator::table_scan::TableScanOperator;
use crate::planner::operator::Operator;
use crate::planner::{Childrens, LogicalPlan};
use crate::storage::rocksdb::RocksTransaction;
use crate::types::index::{IndexInfo, IndexMeta, IndexType};
use crate::types::value::DataValue;
use crate::types::LogicalType;
use std::collections::Bound;
use std::collections::{BTreeMap, Bound};
use std::sync::Arc;
use ulid::Ulid;

#[test]
fn test_push_predicate_into_scan() -> Result<(), DatabaseError> {
Expand Down Expand Up @@ -400,6 +419,171 @@ mod tests {
Ok(())
}

#[test]
fn test_cover_mapping_matches_scan_order() -> Result<(), DatabaseError> {
let table_name: TableName = Arc::from("mock_table");
let c1_id = Ulid::new();
let c2_id = Ulid::new();
let c3_id = Ulid::new();

let mut c1 = ColumnCatalog::new(
"c1".to_string(),
false,
ColumnDesc::new(LogicalType::Integer, Some(0), false, None)?,
);
c1.set_ref_table(table_name.clone(), c1_id, false);
let c1_ref = ColumnRef::from(c1.clone());

let mut c2 = ColumnCatalog::new(
"c2".to_string(),
false,
ColumnDesc::new(LogicalType::Integer, None, false, None)?,
);
c2.set_ref_table(table_name.clone(), c2_id, false);
let c2_ref = ColumnRef::from(c2.clone());

let mut c3 = ColumnCatalog::new(
"c3".to_string(),
false,
ColumnDesc::new(LogicalType::Integer, None, false, None)?,
);
c3.set_ref_table(table_name.clone(), c3_id, false);

let mut columns = BTreeMap::new();
columns.insert(0, c1_ref.clone());
columns.insert(1, c2_ref.clone());

let index_meta_reordered = Arc::new(IndexMeta {
id: 0,
column_ids: vec![c2_id, c3_id, c1_id],
table_name: table_name.clone(),
pk_ty: LogicalType::Integer,
value_ty: LogicalType::Tuple(vec![
LogicalType::Integer,
LogicalType::Integer,
LogicalType::Integer,
]),
name: "idx_c2_c3_c1".to_string(),
ty: IndexType::Composite,
});
let index_meta_aligned = Arc::new(IndexMeta {
id: 1,
column_ids: vec![c1_id, c2_id],
table_name: table_name.clone(),
pk_ty: LogicalType::Integer,
value_ty: LogicalType::Tuple(vec![LogicalType::Integer, LogicalType::Integer]),
name: "idx_c1_c2".to_string(),
ty: IndexType::Composite,
});

let scan_plan = LogicalPlan::new(
Operator::TableScan(TableScanOperator {
table_name: table_name.clone(),
primary_keys: vec![c1_id],
columns,
limit: (None, None),
index_infos: vec![
IndexInfo {
meta: index_meta_reordered,
range: None,
covered_deserializers: None,
cover_mapping: None,
},
IndexInfo {
meta: index_meta_aligned,
range: None,
covered_deserializers: None,
cover_mapping: None,
},
],
with_pk: false,
}),
Childrens::None,
);

let c1_gt = ScalarExpression::Binary {
op: BinaryOperator::Gt,
left_expr: Box::new(ScalarExpression::column_expr(c1_ref.clone())),
right_expr: Box::new(ScalarExpression::Constant(DataValue::Int32(0))),
evaluator: None,
ty: LogicalType::Boolean,
};
let c2_gt = ScalarExpression::Binary {
op: BinaryOperator::Gt,
left_expr: Box::new(ScalarExpression::column_expr(c2_ref.clone())),
right_expr: Box::new(ScalarExpression::Constant(DataValue::Int32(0))),
evaluator: None,
ty: LogicalType::Boolean,
};
let predicate = ScalarExpression::Binary {
op: BinaryOperator::And,
left_expr: Box::new(c1_gt),
right_expr: Box::new(c2_gt),
evaluator: None,
ty: LogicalType::Boolean,
};

let filter_plan = LogicalPlan::new(
Operator::Filter(FilterOperator {
predicate,
is_optimized: false,
having: false,
}),
Childrens::Only(Box::new(scan_plan)),
);

let best_plan = HepOptimizer::new(filter_plan)
.batch(
"push_cover_mapping".to_string(),
HepBatchStrategy::once_topdown(),
vec![NormalizationRuleImpl::PushPredicateIntoScan],
)
.find_best::<RocksTransaction>(None)?;

let table_scan = best_plan.childrens.pop_only();
if let Operator::TableScan(op) = &table_scan.operator {
let index_infos = &op.index_infos;
assert_eq!(index_infos.len(), 2);

// verify the first index (reordered scan columns) still uses mapping
let reordered_index = &index_infos[0];
let deserializers = reordered_index
.covered_deserializers
.as_ref()
.expect("expected covering deserializers");
assert_eq!(deserializers.len(), 3);
assert_eq!(
deserializers[0],
c2_ref.datatype().serializable(),
"first serializer should align with c2"
);
assert_eq!(
deserializers[1],
c3.datatype().skip_serializable(),
"non-projected index column should be skipped"
);
assert_eq!(
deserializers[2],
c1_ref.datatype().serializable(),
"last serializer should align with c1"
);
let mapping = reordered_index.cover_mapping.as_ref().map(|m| m.as_slice());
assert_eq!(mapping, Some(&[2, 0][..]));

// verify the second index matches scan order exactly so mapping is omitted
let ordered_index = &index_infos[1];
assert!(ordered_index.covered_deserializers.is_some());
assert!(
ordered_index.cover_mapping.is_none(),
"mapping should be None when index/scan order already match"
);
} else {
unreachable!("expected table scan");
}

Ok(())
}

#[test]
fn test_push_predicate_through_join_in_left_join() -> Result<(), DatabaseError> {
let table_state = build_t1_table()?;
Expand Down
1 change: 1 addition & 0 deletions src/planner/operator/table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ impl TableScanOperator {
meta: meta.clone(),
range: None,
covered_deserializers: None,
cover_mapping: None,
})
.collect_vec();

Expand Down
2 changes: 2 additions & 0 deletions src/storage/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ mod wasm_tests {
}],
true,
None,
None,
)?;

let mut result = Vec::new();
Expand Down Expand Up @@ -365,6 +366,7 @@ mod native_tests {
}],
true,
None,
None,
)?;

let mut result = Vec::new();
Expand Down
Loading
Loading