Skip to content

Commit d4337b7

Browse files
wip
1 parent d3b46ab commit d4337b7

File tree

8 files changed

+608
-179
lines changed

8 files changed

+608
-179
lines changed

Cargo.lock

Lines changed: 266 additions & 24 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ parquet = { version = "55.2.0", optional = true }
3939
arrow = { version = "55.2.0", optional = true }
4040
tokio-stream = { version = "0.1.17", optional = true }
4141
hyper-util = { version = "0.1.16", optional = true }
42+
sqllogictest = { version = "0.20", optional = true }
43+
regex = { version = "1.0", optional = true }
44+
clap = { version = "4.0", features = ["derive"], optional = true }
45+
env_logger = { version = "0.10", optional = true }
4246
pin-project = "1.1.10"
4347

4448
[features]
@@ -50,6 +54,10 @@ integration = [
5054
"arrow",
5155
"tokio-stream",
5256
"hyper-util",
57+
"sqllogictest",
58+
"regex",
59+
"clap",
60+
"env_logger",
5361
]
5462

5563
tpch = ["integration"]

src/bin/logictest.rs

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
#[cfg(feature = "integration")]
2+
use clap::Parser;
3+
#[cfg(feature = "integration")]
4+
use datafusion_distributed::test_utils::sqllogictest::DatafusionDistributedDB;
5+
#[cfg(feature = "integration")]
6+
use sqllogictest::Runner;
7+
#[cfg(feature = "integration")]
8+
use std::path::PathBuf;
9+
10+
#[cfg(feature = "integration")]
11+
#[derive(Parser)]
12+
#[command(name = "logictest")]
13+
#[command(about = "A SQLLogicTest runner for DataFusion Distributed")]
14+
struct Args {
15+
/// Test files or directories to run
16+
#[arg(required = true)]
17+
files: Vec<PathBuf>,
18+
19+
/// Override mode: update test files with actual output
20+
#[arg(long = "override")]
21+
override_mode: bool,
22+
23+
/// Number of distributed nodes to start
24+
#[arg(long, default_value = "3")]
25+
nodes: usize,
26+
}
27+
28+
#[cfg(feature = "integration")]
29+
#[tokio::main]
30+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
31+
let args = Args::parse();
32+
33+
// Create a closure that creates new database connections
34+
let nodes = args.nodes;
35+
let mut runner =
36+
Runner::new(move || async move { Ok(DatafusionDistributedDB::new(nodes).await) });
37+
38+
// Configure runner based on override mode
39+
if args.override_mode {
40+
// Override mode: use sqllogictest's built-in override functionality
41+
for file_path in &args.files {
42+
if file_path.is_file() {
43+
let file_path_str = file_path.to_str().expect("Invalid file path");
44+
45+
// Use the built-in update_test_file with default comparison functions
46+
match runner
47+
.update_test_file(
48+
file_path_str,
49+
file_path_str,
50+
sqllogictest::default_validator,
51+
sqllogictest::default_column_validator,
52+
)
53+
.await
54+
{
55+
Ok(_) => println!("✅ {}: Generated", file_path.display()),
56+
Err(e) => {
57+
eprintln!("❌ {}: Failed to generate", file_path.display());
58+
eprintln!(" Error: {}", e);
59+
}
60+
}
61+
} else {
62+
eprintln!("Override mode only works with individual files, not directories");
63+
}
64+
}
65+
} else {
66+
// Verify mode: compare results against expected output
67+
for file_path in &args.files {
68+
if file_path.is_file() {
69+
match runner.run_file_async(file_path).await {
70+
Ok(_) => println!("✅ {}: PASSED", file_path.display()),
71+
Err(e) => {
72+
eprintln!("❌ {}: FAILED", file_path.display());
73+
eprintln!(" Error: {}", e);
74+
}
75+
}
76+
} else if file_path.is_dir() {
77+
println!("Running tests in directory: {}", file_path.display());
78+
run_directory(&mut runner, file_path).await?;
79+
} else {
80+
eprintln!(
81+
"Warning: {} is neither a file nor directory",
82+
file_path.display()
83+
);
84+
}
85+
}
86+
}
87+
88+
Ok(())
89+
}
90+
91+
#[cfg(feature = "integration")]
92+
async fn run_directory<D, M>(
93+
runner: &mut Runner<D, M>,
94+
dir_path: &PathBuf,
95+
) -> Result<(), Box<dyn std::error::Error>>
96+
where
97+
D: sqllogictest::AsyncDB,
98+
M: sqllogictest::MakeConnection<Conn = D>,
99+
{
100+
let mut entries: Vec<_> = std::fs::read_dir(dir_path)?
101+
.filter_map(|entry| entry.ok())
102+
.filter(|entry| {
103+
entry
104+
.path()
105+
.extension()
106+
.and_then(|ext| ext.to_str())
107+
.map(|ext| ext == "slt")
108+
.unwrap_or(false)
109+
})
110+
.collect();
111+
112+
// Sort entries for consistent order
113+
entries.sort_by_key(|entry| entry.path());
114+
115+
for entry in entries {
116+
let file_path = entry.path();
117+
println!("Running test file: {}", file_path.display());
118+
match runner.run_file_async(&file_path).await {
119+
Ok(_) => println!("✅ {}: PASSED", file_path.display()),
120+
Err(e) => {
121+
eprintln!("❌ {}: FAILED", file_path.display());
122+
eprintln!(" Error: {}", e);
123+
}
124+
}
125+
}
126+
127+
Ok(())
128+
}
129+
130+
#[cfg(not(feature = "integration"))]
131+
fn main() {
132+
eprintln!("This binary requires the 'integration' feature to be enabled.");
133+
eprintln!("Run with: cargo run --features integration --bin sqllogictest-runner");
134+
std::process::exit(1);
135+
}

src/test_utils/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ pub mod metrics;
55
pub mod mock_exec;
66
pub mod parquet;
77
pub mod session_context;
8+
pub mod sqllogictest;
89
pub mod tpch;

src/test_utils/sqllogictest.rs

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
use crate::DefaultSessionBuilder;
2+
use crate::test_utils::localhost::start_localhost_context;
3+
use crate::test_utils::parquet::register_parquet_tables;
4+
use async_trait::async_trait;
5+
use datafusion::arrow::array::RecordBatch;
6+
use datafusion::arrow::util::display::array_value_to_string;
7+
use datafusion::common::runtime::JoinSet;
8+
use datafusion::error::DataFusionError;
9+
use datafusion::execution::context::SessionContext;
10+
use sqllogictest::{AsyncDB, DBOutput, DefaultColumnType};
11+
use std::sync::Arc;
12+
13+
pub struct DatafusionDistributedDB {
14+
ctx: SessionContext,
15+
_guard: JoinSet<()>,
16+
}
17+
18+
impl DatafusionDistributedDB {
19+
pub async fn new(num_nodes: usize) -> Self {
20+
let (ctx, _guard) = start_localhost_context(num_nodes, DefaultSessionBuilder).await;
21+
register_parquet_tables(&ctx).await.unwrap();
22+
Self { ctx, _guard }
23+
}
24+
25+
fn convert_batches_to_output(
26+
&self,
27+
batches: Vec<RecordBatch>,
28+
) -> Result<DBOutput<DefaultColumnType>, datafusion::error::DataFusionError> {
29+
if batches.is_empty() {
30+
return Ok(DBOutput::Rows {
31+
types: vec![],
32+
rows: vec![],
33+
});
34+
}
35+
36+
let num_columns = batches[0].num_columns();
37+
let column_types = vec![DefaultColumnType::Text; num_columns]; // Everything as text
38+
39+
let mut rows = Vec::new();
40+
for batch in batches {
41+
for row_idx in 0..batch.num_rows() {
42+
let mut row = Vec::new();
43+
for col_idx in 0..batch.num_columns() {
44+
let column = batch.column(col_idx);
45+
let value = array_value_to_string(column, row_idx)
46+
.map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
47+
row.push(value);
48+
}
49+
rows.push(row);
50+
}
51+
}
52+
53+
Ok(DBOutput::Rows {
54+
types: column_types,
55+
rows,
56+
})
57+
}
58+
59+
async fn handle_explain_analyze(
60+
&mut self,
61+
_sql: &str,
62+
) -> Result<DBOutput<DefaultColumnType>, datafusion::error::DataFusionError> {
63+
unimplemented!();
64+
}
65+
66+
async fn handle_explain(
67+
&mut self,
68+
sql: &str,
69+
) -> Result<DBOutput<DefaultColumnType>, datafusion::error::DataFusionError> {
70+
let query = sql.trim_start_matches("EXPLAIN").trim();
71+
let df = self.ctx.sql(query).await?;
72+
let physical_plan = df.create_physical_plan().await?;
73+
74+
// Apply distributed optimizer to get the distributed plan
75+
use crate::DistributedPhysicalOptimizerRule;
76+
use datafusion::physical_optimizer::PhysicalOptimizerRule;
77+
use datafusion::physical_plan::displayable;
78+
79+
let physical_distributed = DistributedPhysicalOptimizerRule::default()
80+
.with_network_shuffle_tasks(2)
81+
.with_network_coalesce_tasks(2)
82+
.optimize(physical_plan, &Default::default())?;
83+
84+
let physical_distributed_str = displayable(physical_distributed.as_ref())
85+
.indent(true)
86+
.to_string();
87+
88+
// Create a RecordBatch with the plan string
89+
use datafusion::arrow::array::{ArrayRef, StringArray};
90+
use datafusion::arrow::datatypes::{DataType, Field, Schema};
91+
92+
let lines: Vec<String> = physical_distributed_str
93+
.lines()
94+
.map(|s| s.to_string())
95+
.collect();
96+
let schema = Arc::new(Schema::new(vec![Field::new("plan", DataType::Utf8, false)]));
97+
let batch =
98+
RecordBatch::try_new(schema, vec![Arc::new(StringArray::from(lines)) as ArrayRef])?;
99+
100+
self.convert_batches_to_output(vec![batch])
101+
}
102+
}
103+
104+
#[async_trait]
105+
impl AsyncDB for DatafusionDistributedDB {
106+
type Error = datafusion::error::DataFusionError;
107+
type ColumnType = DefaultColumnType;
108+
109+
async fn run(&mut self, sql: &str) -> Result<DBOutput<Self::ColumnType>, Self::Error> {
110+
let sql = sql.trim();
111+
112+
// Handle different types of SQL statements
113+
if sql.to_uppercase().starts_with("CREATE")
114+
|| sql.to_uppercase().starts_with("INSERT")
115+
|| sql.to_uppercase().starts_with("DROP")
116+
{
117+
// For DDL/DML statements, just return an empty result
118+
return Ok(DBOutput::StatementComplete(0));
119+
}
120+
121+
// Handle EXPLAIN ANALYZE
122+
if sql.to_uppercase().starts_with("EXPLAIN ANALYZE") {
123+
return self.handle_explain_analyze(sql).await;
124+
}
125+
126+
// Handle regular EXPLAIN - use distributed optimizer
127+
if sql.to_uppercase().starts_with("EXPLAIN") {
128+
return self.handle_explain(sql).await;
129+
}
130+
131+
let df = self.ctx.sql(sql).await?;
132+
let batches = df.collect().await?;
133+
134+
self.convert_batches_to_output(batches)
135+
}
136+
137+
fn engine_name(&self) -> &str {
138+
"datafusion-distributed"
139+
}
140+
}

0 commit comments

Comments
 (0)