Skip to content

Commit 4f6bcf1

Browse files
author
longshan.lu
committed
feat: Refactor join execution with enhanced hash join and schema handling, including optimizations for join conditions and index adjustments
1 parent ab8c06e commit 4f6bcf1

File tree

14 files changed

+989
-592
lines changed

14 files changed

+989
-592
lines changed

qurious/src/datasource/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,2 @@
1-
#[cfg(feature = "connectorx")]
2-
pub mod connectorx;
31
pub mod file;
42
pub mod memory;

qurious/src/execution/session.rs

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::common::table_relation::TableRelation;
1010
use crate::datasource::memory::MemoryTable;
1111
use crate::error::Error;
1212
use crate::functions::{all_builtin_functions, UserDefinedFunction};
13+
use crate::internal_err;
1314
use crate::logical::plan::{
1415
CreateMemoryTable, DdlStatement, DmlOperator, DmlStatement, DropTable, Filter, LogicalPlan,
1516
};
@@ -21,7 +22,6 @@ use crate::provider::schema::SchemaProvider;
2122
use crate::provider::table::TableProvider;
2223
use crate::utils::batch::make_count_batch;
2324
use crate::{error::Result, planner::DefaultQueryPlanner};
24-
use crate::{internal_err, utils};
2525

2626
use crate::execution::providers::CatalogProviderList;
2727

@@ -481,30 +481,4 @@ order by
481481
],
482482
);
483483
}
484-
485-
#[test]
486-
#[cfg(feature = "connectorx")]
487-
fn test_postgres() {
488-
use crate::datasource::connectorx::postgres::PostgresCatalogProvider;
489-
490-
let session = ExecuteSession::new().unwrap();
491-
let catalog = PostgresCatalogProvider::try_new("postgresql://root:root@localhost:5433/qurious").unwrap();
492-
493-
session.register_catalog("qurious", Arc::new(catalog)).unwrap();
494-
495-
let data = session
496-
.sql("SELECT * FROM qurious.public.schools WHERE id = 1")
497-
.unwrap();
498-
499-
assert_batch_eq(
500-
&data,
501-
vec![
502-
"+----+--------------------+",
503-
"| id | name |",
504-
"+----+--------------------+",
505-
"| 1 | BeiJing University |",
506-
"+----+--------------------+",
507-
],
508-
);
509-
}
510484
}

qurious/src/logical/builder.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ mod tests {
197197
"SELECT * FROM users a JOIN repos b ON a.id = b.owner_id",
198198
vec![
199199
"Projection: (a.email, a.id, b.id, a.name, b.name, b.owner_id)",
200-
" Inner Join: Filter: users.id = repos.owner_id",
200+
" Inner Join: Filter: a.id = b.owner_id",
201201
" SubqueryAlias: a",
202202
" TableScan: users",
203203
" SubqueryAlias: b",
@@ -212,7 +212,7 @@ mod tests {
212212
"SELECT * FROM users a LEFT JOIN repos b ON a.id = b.owner_id",
213213
vec![
214214
"Projection: (a.email, a.id, b.id, a.name, b.name, b.owner_id)",
215-
" Left Join: Filter: users.id = repos.owner_id",
215+
" Left Join: Filter: a.id = b.owner_id",
216216
" SubqueryAlias: a",
217217
" TableScan: users",
218218
" SubqueryAlias: b",
@@ -227,7 +227,7 @@ mod tests {
227227
"SELECT * FROM users a RIGHT JOIN repos b ON a.id = b.owner_id",
228228
vec![
229229
"Projection: (a.email, a.id, b.id, a.name, b.name, b.owner_id)",
230-
" Right Join: Filter: users.id = repos.owner_id",
230+
" Right Join: Filter: a.id = b.owner_id",
231231
" SubqueryAlias: a",
232232
" TableScan: users",
233233
" SubqueryAlias: b",
@@ -242,7 +242,7 @@ mod tests {
242242
"SELECT * FROM users a FULL JOIN repos b ON a.id = b.owner_id",
243243
vec![
244244
"Projection: (a.email, a.id, b.id, a.name, b.name, b.owner_id)",
245-
" Full Join: Filter: users.id = repos.owner_id",
245+
" Full Join: Filter: a.id = b.owner_id",
246246
" SubqueryAlias: a",
247247
" TableScan: users",
248248
" SubqueryAlias: b",
@@ -257,8 +257,8 @@ mod tests {
257257
"SELECT * FROM users a JOIN repos b ON a.id = b.owner_id WHERE a.name = 'test'",
258258
vec![
259259
"Projection: (a.email, a.id, b.id, a.name, b.name, b.owner_id)",
260-
" Filter: users.name = Utf8('test')",
261-
" Inner Join: Filter: users.id = repos.owner_id",
260+
" Filter: a.name = Utf8('test')",
261+
" Inner Join: Filter: a.id = b.owner_id",
262262
" SubqueryAlias: a",
263263
" TableScan: users",
264264
" SubqueryAlias: b",
@@ -273,7 +273,7 @@ mod tests {
273273
"SELECT * FROM users a JOIN repos b ON a.id = b.owner_id AND a.name = b.name",
274274
vec![
275275
"Projection: (a.email, a.id, b.id, a.name, b.name, b.owner_id)",
276-
" Inner Join: Filter: users.id = repos.owner_id AND users.name = repos.name",
276+
" Inner Join: Filter: a.id = b.owner_id AND a.name = b.name",
277277
" SubqueryAlias: a",
278278
" TableScan: users",
279279
" SubqueryAlias: b",

qurious/src/optimizer/pushdown_filter_inner_join.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -257,21 +257,21 @@ mod tests {
257257
#[test]
258258
fn test_not_valid_join_pair() {
259259
assert_after_optimizer(
260-
"SELECT * FROM users,repos WHERE (users.id = repos.user_id AND users.id = 10) OR (users.name = repos.name AND repos.id = 20)",
260+
"SELECT * FROM users,repos WHERE (users.id = repos.owner_id AND users.id = 10) OR (users.name = repos.name AND repos.id = 20)",
261261
vec![
262262
"Projection: (users.email, repos.id, users.id, repos.name, users.name, repos.owner_id)",
263-
" Filter: users.id = repos.user_id AND users.id = Int64(10) OR users.name = repos.name AND repos.id = Int64(20)",
263+
" Filter: users.id = repos.owner_id AND users.id = Int64(10) OR users.name = repos.name AND repos.id = Int64(20)",
264264
" CrossJoin",
265265
" TableScan: users",
266266
" TableScan: repos",
267267
],
268268
);
269269

270270
assert_after_optimizer(
271-
"SELECT * FROM users,repos WHERE (users.id = repos.user_id AND users.id = 10) OR (users.id = repos.id OR users.id = 20)",
271+
"SELECT * FROM users,repos WHERE (users.id = repos.owner_id AND users.id = 10) OR (users.id = repos.id OR users.id = 20)",
272272
vec![
273273
"Projection: (users.email, repos.id, users.id, repos.name, users.name, repos.owner_id)",
274-
" Filter: users.id = repos.user_id AND users.id = Int64(10) OR users.id = repos.id OR users.id = Int64(20)",
274+
" Filter: users.id = repos.owner_id AND users.id = Int64(10) OR users.id = repos.id OR users.id = Int64(20)",
275275
" CrossJoin",
276276
" TableScan: users",
277277
" TableScan: repos",
@@ -282,11 +282,11 @@ mod tests {
282282
#[test]
283283
fn should_not_pushdown_filter_for_inner_join() {
284284
assert_after_optimizer(
285-
"SELECT * FROM users INNER JOIN repos ON users.id = repos.user_id WHERE users.id = 10",
285+
"SELECT * FROM users INNER JOIN repos ON users.id = repos.owner_id WHERE users.id = 10",
286286
vec![
287287
"Projection: (users.email, repos.id, users.id, repos.name, users.name, repos.owner_id)",
288288
" Filter: users.id = Int64(10)",
289-
" Inner Join: Filter: users.id = repos.user_id",
289+
" Inner Join: Filter: users.id = repos.owner_id",
290290
" TableScan: users",
291291
" TableScan: repos",
292292
],

qurious/src/physical/expr/column.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use arrow::{array::ArrayRef, record_batch::RecordBatch};
55
use super::PhysicalExpr;
66
use crate::error::{Error, Result};
77

8-
#[derive(Debug)]
8+
#[derive(Debug, Hash, PartialEq, Eq)]
99
pub struct Column {
1010
name: String,
1111
index: usize,

qurious/src/physical/plan/aggregate/hash.rs

Lines changed: 15 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,27 @@
1+
use crate::arrow_err;
12
use crate::error::{Error, Result};
23
use crate::physical::expr::Accumulator;
34
use crate::physical::{
45
expr::{AggregateExpr, PhysicalExpr},
56
plan::PhysicalPlan,
67
};
7-
use crate::{arrow_err, hash_array};
8+
use crate::utils::array::create_hashes;
89
use arrow::compute::TakeOptions;
910
use arrow::row::{RowConverter, SortField};
10-
use arrow::{
11-
array::{Array, Int32Array, Int64Array, StringArray, UInt8Array},
12-
datatypes::*,
13-
};
1411
use arrow::{
1512
array::{ArrayRef, UInt64Array},
1613
compute,
1714
datatypes::SchemaRef,
1815
record_batch::RecordBatch,
1916
};
20-
use std::{
21-
collections::HashMap,
22-
fmt::Display,
23-
hash::{DefaultHasher, Hash, Hasher},
24-
sync::Arc,
25-
};
17+
use std::{collections::HashMap, fmt::Display, hash::DefaultHasher, sync::Arc};
2618

2719
struct GroupAccumulator<'a> {
2820
/// accumulators each group has a vector of accumulators
2921
/// Key: group index Value: (group_values, accumulators, indices)
3022
accumulators: HashMap<usize, (Vec<ArrayRef>, Vec<Box<dyn Accumulator>>)>,
3123
/// Key: row num Value: hash
32-
hashes_buffer: HashMap<usize, DefaultHasher>,
24+
hashes_buffer: Vec<DefaultHasher>,
3325
/// Key: hash Value: group index
3426
map: HashMap<u64, usize>,
3527
///
@@ -43,51 +35,36 @@ impl<'a> GroupAccumulator<'a> {
4335
{
4436
Ok(Self {
4537
accumulators: HashMap::new(),
46-
hashes_buffer: HashMap::new(),
38+
hashes_buffer: Vec::new(),
4739
map: HashMap::new(),
4840
accumlator_factory: f,
4941
})
5042
}
5143

52-
fn update(&mut self, group_by_values: &[ArrayRef], input_values: &[ArrayRef]) -> Result<()> {
44+
fn update(&mut self, rows: usize, group_by_values: &[ArrayRef], input_values: &[ArrayRef]) -> Result<()> {
5345
self.hashes_buffer.clear();
46+
self.hashes_buffer.resize(rows, DefaultHasher::new());
5447

55-
for col in group_by_values {
56-
match col.data_type() {
57-
DataType::UInt8 => hash_array!(UInt8Array, col, &mut self.hashes_buffer),
58-
DataType::Int32 => hash_array!(Int32Array, col, &mut self.hashes_buffer),
59-
DataType::Int64 => hash_array!(Int64Array, col, &mut self.hashes_buffer),
60-
DataType::Utf8 => hash_array!(StringArray, col, &mut self.hashes_buffer),
61-
_ => {
62-
return Err(Error::InternalError(format!(
63-
"[group_indices] unsupported data type {:?}",
64-
col.data_type()
65-
)))
66-
}
67-
}
68-
}
69-
48+
let hashes = create_hashes(&group_by_values, &mut self.hashes_buffer)?;
7049
let mut accs_indices = HashMap::new();
71-
for (row, target_hash_buffer) in &self.hashes_buffer {
72-
let target_hash = target_hash_buffer.finish();
73-
50+
for (row, target_hash) in hashes.into_iter().enumerate() {
7451
match self.map.get_mut(&target_hash) {
7552
Some(group_index) => {
76-
accs_indices.entry(*group_index).or_insert(vec![]).push(*row as u64);
53+
accs_indices.entry(*group_index).or_insert(vec![]).push(row as u64);
7754
}
7855
None => {
79-
self.map.insert(target_hash, *row);
56+
self.map.insert(target_hash, row);
8057

81-
accs_indices.insert(*row, vec![*row as u64]);
58+
accs_indices.insert(row, vec![row as u64]);
8259

8360
let accs = (self.accumlator_factory)()?;
84-
let indices = UInt64Array::from_iter(vec![*row as u64]);
61+
let indices = UInt64Array::from_iter(vec![row as u64]);
8562
let group_values = group_by_values
8663
.iter()
8764
.map(|values| compute::take(&values, &indices, None).map_err(|e| arrow_err!(e)))
8865
.collect::<Result<Vec<_>>>()?;
8966

90-
self.accumulators.insert(*row, (group_values, accs));
67+
self.accumulators.insert(row, (group_values, accs));
9168
}
9269
}
9370
}
@@ -181,7 +158,7 @@ impl PhysicalPlan for HashAggregate {
181158
.map(|e| e.expression().evaluate(&batch))
182159
.collect::<Result<Vec<ArrayRef>>>()?;
183160

184-
group_accumulator.update(&group_by_values, &input_values)?;
161+
group_accumulator.update(batch.num_rows(), &group_by_values, &input_values)?;
185162
}
186163

187164
RecordBatch::try_new(self.schema.clone(), group_accumulator.output(&self.schema)?)

0 commit comments

Comments
 (0)