Skip to content

Commit d8a3be3

Browse files
wip
1 parent d3b46ab commit d8a3be3

File tree

8 files changed

+634
-179
lines changed

8 files changed

+634
-179
lines changed

Cargo.lock

Lines changed: 266 additions & 24 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ parquet = { version = "55.2.0", optional = true }
3939
arrow = { version = "55.2.0", optional = true }
4040
tokio-stream = { version = "0.1.17", optional = true }
4141
hyper-util = { version = "0.1.16", optional = true }
42+
sqllogictest = { version = "0.20", optional = true }
43+
regex = { version = "1.0", optional = true }
44+
clap = { version = "4.0", features = ["derive"], optional = true }
45+
env_logger = { version = "0.10", optional = true }
4246
pin-project = "1.1.10"
4347

4448
[features]
@@ -50,6 +54,10 @@ integration = [
5054
"arrow",
5155
"tokio-stream",
5256
"hyper-util",
57+
"sqllogictest",
58+
"regex",
59+
"clap",
60+
"env_logger",
5361
]
5462

5563
tpch = ["integration"]

src/bin/logictest.rs

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
#[cfg(feature = "integration")]
2+
use clap::Parser;
3+
#[cfg(feature = "integration")]
4+
use datafusion_distributed::test_utils::sqllogictest::sqllogictest::DataFusionDistributedDB;
5+
#[cfg(feature = "integration")]
6+
use sqllogictest::Runner;
7+
#[cfg(feature = "integration")]
8+
use std::path::PathBuf;
9+
10+
#[cfg(feature = "integration")]
11+
#[derive(Parser)]
12+
#[command(name = "logictest")]
13+
#[command(about = "A SQLLogicTest runner for DataFusion Distributed")]
14+
struct Args {
15+
/// Test files or directories to run
16+
#[arg(required = true)]
17+
files: Vec<PathBuf>,
18+
19+
/// Override mode: update test files with actual output
20+
#[arg(long = "override")]
21+
override_mode: bool,
22+
23+
/// Number of distributed nodes to start
24+
#[arg(long, default_value = "3")]
25+
nodes: usize,
26+
27+
}
28+
29+
30+
#[cfg(feature = "integration")]
31+
#[tokio::main]
32+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
33+
let args = Args::parse();
34+
35+
36+
println!("Starting DataFusion Distributed SQLLogicTest Runner");
37+
println!("Mode: {}", if args.override_mode { "Override" } else { "Verify" });
38+
println!("Nodes: {}", args.nodes);
39+
40+
// Create a closure that creates new database connections
41+
let nodes = args.nodes;
42+
let mut runner = Runner::new(move || async move {
43+
Ok(DataFusionDistributedDB::new(nodes).await)
44+
});
45+
46+
// Configure runner based on override mode
47+
if args.override_mode {
48+
// Override mode: use sqllogictest's built-in override functionality
49+
for file_path in &args.files {
50+
if file_path.is_file() {
51+
println!("Generating completion for: {}", file_path.display());
52+
let file_path_str = file_path.to_str().expect("Invalid file path");
53+
54+
// Use the built-in update_test_file with default comparison functions
55+
match runner.update_test_file(
56+
file_path_str,
57+
file_path_str,
58+
sqllogictest::default_validator,
59+
sqllogictest::default_column_validator
60+
).await {
61+
Ok(_) => println!("✅ {}: Generated", file_path.display()),
62+
Err(e) => {
63+
eprintln!("❌ {}: Failed to generate", file_path.display());
64+
eprintln!(" Error: {}", e);
65+
}
66+
}
67+
} else {
68+
eprintln!("Override mode only works with individual files, not directories");
69+
}
70+
}
71+
} else {
72+
// Verify mode: compare results against expected output
73+
for file_path in &args.files {
74+
if file_path.is_file() {
75+
println!("Running test file: {}", file_path.display());
76+
match runner.run_file_async(file_path).await {
77+
Ok(_) => println!("✅ {}: PASSED", file_path.display()),
78+
Err(e) => {
79+
eprintln!("❌ {}: FAILED", file_path.display());
80+
eprintln!(" Error: {}", e);
81+
}
82+
}
83+
} else if file_path.is_dir() {
84+
println!("Running tests in directory: {}", file_path.display());
85+
run_directory(&mut runner, file_path).await?;
86+
} else {
87+
eprintln!("Warning: {} is neither a file nor directory", file_path.display());
88+
}
89+
}
90+
}
91+
92+
Ok(())
93+
}
94+
95+
#[cfg(feature = "integration")]
96+
async fn run_directory<D, M>(
97+
runner: &mut Runner<D, M>,
98+
dir_path: &PathBuf,
99+
) -> Result<(), Box<dyn std::error::Error>>
100+
where
101+
D: sqllogictest::AsyncDB,
102+
M: sqllogictest::MakeConnection<Conn = D>,
103+
{
104+
let mut entries: Vec<_> = std::fs::read_dir(dir_path)?
105+
.filter_map(|entry| entry.ok())
106+
.filter(|entry| {
107+
entry.path().extension()
108+
.and_then(|ext| ext.to_str())
109+
.map(|ext| ext == "slt")
110+
.unwrap_or(false)
111+
})
112+
.collect();
113+
114+
// Sort entries for consistent order
115+
entries.sort_by_key(|entry| entry.path());
116+
117+
for entry in entries {
118+
let file_path = entry.path();
119+
println!("Running test file: {}", file_path.display());
120+
match runner.run_file_async(&file_path).await {
121+
Ok(_) => println!("✅ {}: PASSED", file_path.display()),
122+
Err(e) => {
123+
eprintln!("❌ {}: FAILED", file_path.display());
124+
eprintln!(" Error: {}", e);
125+
}
126+
}
127+
}
128+
129+
Ok(())
130+
}
131+
132+
#[cfg(not(feature = "integration"))]
133+
fn main() {
134+
eprintln!("This binary requires the 'integration' feature to be enabled.");
135+
eprintln!("Run with: cargo run --features integration --bin sqllogictest-runner");
136+
std::process::exit(1);
137+
}

src/test_utils/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ pub mod metrics;
55
pub mod mock_exec;
66
pub mod parquet;
77
pub mod session_context;
8+
pub mod sqllogictest;
89
pub mod tpch;

src/test_utils/sqllogictest.rs

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
#[cfg(feature = "integration")]
2+
pub mod sqllogictest {
3+
use async_trait::async_trait;
4+
use datafusion::arrow::array::RecordBatch;
5+
use datafusion::arrow::datatypes::{DataType, Field, Schema};
6+
use datafusion::execution::context::SessionContext;
7+
use crate::test_utils::localhost::start_localhost_context;
8+
use crate::DefaultSessionBuilder;
9+
use sqllogictest::{AsyncDB, DefaultColumnType, DBOutput};
10+
use std::sync::Arc;
11+
12+
pub struct DataFusionDistributedDB {
13+
ctx: SessionContext,
14+
}
15+
16+
impl DataFusionDistributedDB {
17+
pub async fn new(num_nodes: usize) -> Self {
18+
// Start distributed context with specified number of nodes
19+
let (ctx, _guard) = start_localhost_context(num_nodes, DefaultSessionBuilder).await;
20+
21+
// Use existing parquet tables from the test_utils
22+
use crate::test_utils::parquet::register_parquet_tables;
23+
register_parquet_tables(&ctx).await.unwrap();
24+
25+
// Keep the guard alive by forgetting it (for CLI purposes)
26+
std::mem::forget(_guard);
27+
28+
Self { ctx }
29+
}
30+
31+
async fn handle_explain_analyze(&mut self, sql: &str) -> Result<DBOutput<DefaultColumnType>, datafusion::error::DataFusionError> {
32+
// For now, just treat it as a regular EXPLAIN
33+
// TODO: Implement proper distributed EXPLAIN ANALYZE with metrics
34+
let explain_sql = sql.replace("EXPLAIN ANALYZE", "EXPLAIN");
35+
let df = self.ctx.sql(&explain_sql).await?;
36+
let batches = df.collect().await?;
37+
self.convert_batches_to_output(batches)
38+
}
39+
40+
// TODO: Implement proper metrics obfuscation for EXPLAIN ANALYZE
41+
#[allow(dead_code)]
42+
fn obfuscate_metrics(&self, plan_str: &str) -> String {
43+
use regex::Regex;
44+
45+
let mut obfuscated = plan_str.to_string();
46+
47+
// Replace timestamps with <TIMESTAMP>
48+
let timestamp_regex = Regex::new(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+").unwrap();
49+
obfuscated = timestamp_regex.replace_all(&obfuscated, "<TIMESTAMP>").to_string();
50+
51+
// Replace durations (e.g., "123.456ms", "1.234s") with <DURATION>
52+
let duration_regex = Regex::new(r"\d+\.\d+[μmn]?s").unwrap();
53+
obfuscated = duration_regex.replace_all(&obfuscated, "<DURATION>").to_string();
54+
55+
// Replace memory sizes (e.g., "1.2MB", "345KB") with <MEMORY>
56+
let memory_regex = Regex::new(r"\d+\.\d+[KMGT]?B").unwrap();
57+
obfuscated = memory_regex.replace_all(&obfuscated, "<MEMORY>").to_string();
58+
59+
// Replace row counts and other large numbers with <COUNT>
60+
let count_regex = Regex::new(r"rows=\d+").unwrap();
61+
obfuscated = count_regex.replace_all(&obfuscated, "rows=<COUNT>").to_string();
62+
63+
obfuscated
64+
}
65+
66+
fn convert_batches_to_output(&self, batches: Vec<RecordBatch>) -> Result<DBOutput<DefaultColumnType>, datafusion::error::DataFusionError> {
67+
if batches.is_empty() {
68+
return Ok(DBOutput::Rows {
69+
types: vec![],
70+
rows: vec![],
71+
});
72+
}
73+
74+
let num_columns = batches[0].num_columns();
75+
let column_types = vec![DefaultColumnType::Text; num_columns]; // Everything as text
76+
77+
let mut rows = Vec::new();
78+
for batch in batches {
79+
for row_idx in 0..batch.num_rows() {
80+
let mut row = Vec::new();
81+
for col_idx in 0..batch.num_columns() {
82+
let column = batch.column(col_idx);
83+
let value = datafusion::arrow::util::display::array_value_to_string(column, row_idx)
84+
.map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))?;
85+
row.push(value);
86+
}
87+
rows.push(row);
88+
}
89+
}
90+
91+
Ok(DBOutput::Rows {
92+
types: column_types,
93+
rows,
94+
})
95+
}
96+
}
97+
98+
#[async_trait]
99+
impl AsyncDB for DataFusionDistributedDB {
100+
type Error = datafusion::error::DataFusionError;
101+
type ColumnType = DefaultColumnType;
102+
103+
async fn run(&mut self, sql: &str) -> Result<DBOutput<Self::ColumnType>, Self::Error> {
104+
let sql = sql.trim();
105+
106+
// Handle different types of SQL statements
107+
if sql.to_uppercase().starts_with("CREATE") ||
108+
sql.to_uppercase().starts_with("INSERT") ||
109+
sql.to_uppercase().starts_with("DROP") {
110+
// For DDL/DML statements, just return an empty result
111+
return Ok(DBOutput::StatementComplete(0));
112+
}
113+
114+
// Handle EXPLAIN ANALYZE
115+
if sql.to_uppercase().starts_with("EXPLAIN ANALYZE") {
116+
return self.handle_explain_analyze(sql).await;
117+
}
118+
119+
// Handle regular EXPLAIN - use distributed optimizer
120+
if sql.to_uppercase().starts_with("EXPLAIN") {
121+
let query = sql.trim_start_matches("EXPLAIN").trim();
122+
let df = self.ctx.sql(query).await?;
123+
let physical_plan = df.create_physical_plan().await?;
124+
125+
// Apply distributed optimizer to get the distributed plan
126+
use crate::DistributedPhysicalOptimizerRule;
127+
use datafusion::physical_optimizer::PhysicalOptimizerRule;
128+
use datafusion::physical_plan::displayable;
129+
130+
let physical_distributed = DistributedPhysicalOptimizerRule::default()
131+
.with_network_shuffle_tasks(2)
132+
.with_network_coalesce_tasks(2)
133+
.optimize(physical_plan, &Default::default())?;
134+
135+
let physical_distributed_str = displayable(physical_distributed.as_ref())
136+
.indent(true)
137+
.to_string();
138+
139+
// Create a RecordBatch with the plan string
140+
use datafusion::arrow::array::{ArrayRef, StringArray};
141+
use datafusion::arrow::datatypes::{DataType, Field, Schema};
142+
143+
let lines: Vec<String> = physical_distributed_str.lines().map(|s| s.to_string()).collect();
144+
let schema = Arc::new(Schema::new(vec![Field::new("plan", DataType::Utf8, false)]));
145+
let batch = RecordBatch::try_new(
146+
schema,
147+
vec![Arc::new(StringArray::from(lines)) as ArrayRef],
148+
)?;
149+
150+
return self.convert_batches_to_output(vec![batch]);
151+
}
152+
153+
// Execute query
154+
let df = self.ctx.sql(sql).await?;
155+
let batches = df.collect().await?;
156+
157+
self.convert_batches_to_output(batches)
158+
}
159+
160+
fn engine_name(&self) -> &str {
161+
"datafusion-distributed"
162+
}
163+
}
164+
}

0 commit comments

Comments
 (0)