diff --git a/Cargo.lock b/Cargo.lock index f187a384d7..159d32343f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3716,8 +3716,11 @@ dependencies = [ "datafusion-sqllogictest", "enum-ordinalize", "indicatif", + "serde", "sqllogictest", + "tokio", "toml", + "tracing", ] [[package]] diff --git a/crates/sqllogictest/Cargo.toml b/crates/sqllogictest/Cargo.toml index ba149daeab..e335de433e 100644 --- a/crates/sqllogictest/Cargo.toml +++ b/crates/sqllogictest/Cargo.toml @@ -33,6 +33,9 @@ enum-ordinalize = { workspace = true } indicatif = { workspace = true } sqllogictest = { workspace = true } toml = { workspace = true } +serde = { workspace = true } +tracing = { workspace = true } +tokio = { workspace = true } [package.metadata.cargo-machete] # These dependencies are added to ensure minimal dependency version diff --git a/crates/sqllogictest/src/engine/datafusion.rs b/crates/sqllogictest/src/engine/datafusion.rs index f95cfb247d..49d7273d60 100644 --- a/crates/sqllogictest/src/engine/datafusion.rs +++ b/crates/sqllogictest/src/engine/datafusion.rs @@ -26,7 +26,7 @@ use indicatif::ProgressBar; use sqllogictest::runner::AsyncDB; use toml::Table as TomlTable; -use crate::engine::Engine; +use crate::engine::EngineRunner; use crate::error::Result; pub struct DataFusionEngine { @@ -34,17 +34,7 @@ pub struct DataFusionEngine { } #[async_trait::async_trait] -impl Engine for DataFusionEngine { - async fn new(config: TomlTable) -> Result { - let session_config = SessionConfig::new().with_target_partitions(4); - let ctx = SessionContext::new_with_config(session_config); - ctx.register_catalog("default", Self::create_catalog(&config).await?); - - Ok(Self { - datafusion: DataFusion::new(ctx, PathBuf::from("testdata"), ProgressBar::new(100)), - }) - } - +impl EngineRunner for DataFusionEngine { async fn run_slt_file(&mut self, path: &Path) -> Result<()> { let content = std::fs::read_to_string(path) .with_context(|| format!("Failed to read slt file {:?}", path)) @@ -61,6 +51,16 @@ impl Engine for DataFusionEngine { } impl DataFusionEngine { + pub async fn new(config: TomlTable) -> Result { + let session_config = SessionConfig::new().with_target_partitions(4); + let ctx = SessionContext::new_with_config(session_config); + ctx.register_catalog("default", Self::create_catalog(&config).await?); + + Ok(Self { + datafusion: DataFusion::new(ctx, PathBuf::from("testdata"), ProgressBar::new(100)), + }) + } + async fn create_catalog(_: &TomlTable) -> anyhow::Result> { todo!() } diff --git a/crates/sqllogictest/src/engine/mod.rs b/crates/sqllogictest/src/engine/mod.rs index 61722f663f..a1d34dd9bc 100644 --- a/crates/sqllogictest/src/engine/mod.rs +++ b/crates/sqllogictest/src/engine/mod.rs @@ -21,10 +21,68 @@ use std::path::Path; use toml::Table as TomlTable; +use crate::engine::datafusion::DataFusionEngine; use crate::error::Result; +const KEY_TYPE: &str = "type"; +const TYPE_DATAFUSION: &str = "datafusion"; + #[async_trait::async_trait] -pub trait Engine: Sized { - async fn new(config: TomlTable) -> Result; +pub trait EngineRunner: Sized { async fn run_slt_file(&mut self, path: &Path) -> Result<()>; } + +pub enum Engine { + DataFusion(DataFusionEngine), +} + +impl Engine { + pub async fn new(config: TomlTable) -> Result { + let engine_type = config + .get(KEY_TYPE) + .ok_or_else(|| anyhow::anyhow!("Missing required key: {KEY_TYPE}"))? + .as_str() + .ok_or_else(|| anyhow::anyhow!("Config value for {KEY_TYPE} must be a string"))?; + + match engine_type { + TYPE_DATAFUSION => { + let engine = DataFusionEngine::new(config).await?; + Ok(Engine::DataFusion(engine)) + } + _ => Err(anyhow::anyhow!("Unsupported engine type: {engine_type}").into()), + } + } + + pub async fn run_slt_file(&mut self, path: &Path) -> Result<()> { + match self { + Engine::DataFusion(engine) => engine.run_slt_file(path).await, + } + } +} + +#[cfg(test)] +mod tests { + use toml::Table as TomlTable; + + use crate::engine::Engine; + + #[tokio::test] + async fn test_engine_new_missing_type_key() { + let config = TomlTable::new(); + let result = Engine::new(config).await; + + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_engine_invalid_type() { + let input = r#" + [engines] + random = { type = "random_engine", url = "http://localhost:8181" } + "#; + let tbl = toml::from_str(input).unwrap(); + let result = Engine::new(tbl).await; + + assert!(result.is_err()); + } +} diff --git a/crates/sqllogictest/src/lib.rs b/crates/sqllogictest/src/lib.rs index c72d50c429..7b17727d1a 100644 --- a/crates/sqllogictest/src/lib.rs +++ b/crates/sqllogictest/src/lib.rs @@ -22,3 +22,5 @@ mod engine; #[allow(dead_code)] mod error; +#[allow(dead_code)] +mod schedule; diff --git a/crates/sqllogictest/src/schedule.rs b/crates/sqllogictest/src/schedule.rs new file mode 100644 index 0000000000..d3284949bc --- /dev/null +++ b/crates/sqllogictest/src/schedule.rs @@ -0,0 +1,197 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::fs::read_to_string; +use std::path::{Path, PathBuf}; + +use anyhow::{Context, anyhow}; +use serde::{Deserialize, Serialize}; +use toml::{Table as TomlTable, Value}; +use tracing::info; + +use crate::engine::Engine; + +pub struct Schedule { + /// Engine names to engine instances + engines: HashMap, + /// List of test steps to run + steps: Vec, + /// Path of the schedule file + schedule_file: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Step { + /// Engine name + engine: String, + /// Stl file path + slt: String, +} + +impl Schedule { + pub fn new(engines: HashMap, steps: Vec, schedule_file: String) -> Self { + Self { + engines, + steps, + schedule_file, + } + } + + pub async fn from_file>(path: P) -> anyhow::Result { + let path_str = path.as_ref().to_string_lossy().to_string(); + let content = read_to_string(path)?; + let toml_value = content.parse::()?; + let toml_table = toml_value + .as_table() + .ok_or_else(|| anyhow!("Schedule file must be a TOML table"))?; + + let engines = Schedule::parse_engines(toml_table).await?; + let steps = Schedule::parse_steps(toml_table)?; + + Ok(Self::new(engines, steps, path_str)) + } + + pub async fn run(mut self) -> anyhow::Result<()> { + info!("Starting test run with schedule: {}", self.schedule_file); + + for (idx, step) in self.steps.iter().enumerate() { + info!( + "Running step {}/{}, using engine {}, slt file path: {}", + idx + 1, + self.steps.len(), + &step.engine, + &step.slt + ); + + let engine = self + .engines + .get_mut(&step.engine) + .ok_or_else(|| anyhow!("Engine {} not found", step.engine))?; + + let step_sql_path = PathBuf::from(format!( + "{}/testdata/slts/{}", + env!("CARGO_MANIFEST_DIR"), + &step.slt + )); + + engine.run_slt_file(&step_sql_path).await?; + + info!( + "Completed step {}/{}, engine {}, slt file path: {}", + idx + 1, + self.steps.len(), + &step.engine, + &step.slt + ); + } + Ok(()) + } + + async fn parse_engines(table: &TomlTable) -> anyhow::Result> { + let engines_tbl = table + .get("engines") + .with_context(|| "Schedule file must have an 'engines' table")? + .as_table() + .ok_or_else(|| anyhow!("'engines' must be a table"))?; + + let mut engines = HashMap::new(); + + for (name, engine_val) in engines_tbl { + let cfg_tbl = engine_val + .as_table() + .ok_or_else(|| anyhow!("Config of engine '{name}' is not a table"))? + .clone(); + + let engine = Engine::new(cfg_tbl) + .await + .with_context(|| format!("Failed to construct engine '{name}'"))?; + + if engines.insert(name.clone(), engine).is_some() { + return Err(anyhow!("Duplicate engine '{name}'")); + } + } + + Ok(engines) + } + + fn parse_steps(table: &TomlTable) -> anyhow::Result> { + let steps_val = table + .get("steps") + .with_context(|| "Schedule file must have a 'steps' array")?; + + let steps: Vec = steps_val + .clone() + .try_into() + .with_context(|| "Failed to deserialize steps")?; + + Ok(steps) + } +} + +#[cfg(test)] +mod tests { + use toml::Table as TomlTable; + + use crate::schedule::Schedule; + + #[test] + fn test_parse_steps() { + let input = r#" + [[steps]] + engine = "datafusion" + slt = "test.slt" + + [[steps]] + engine = "spark" + slt = "test2.slt" + "#; + + let tbl: TomlTable = toml::from_str(input).unwrap(); + let steps = Schedule::parse_steps(&tbl).unwrap(); + + assert_eq!(steps.len(), 2); + assert_eq!(steps[0].engine, "datafusion"); + assert_eq!(steps[0].slt, "test.slt"); + assert_eq!(steps[1].engine, "spark"); + assert_eq!(steps[1].slt, "test2.slt"); + } + + #[test] + fn test_parse_steps_empty() { + let input = r#" + [[steps]] + "#; + + let tbl: TomlTable = toml::from_str(input).unwrap(); + let steps = Schedule::parse_steps(&tbl); + + assert!(steps.is_err()); + } + + #[tokio::test] + async fn test_parse_engines_invalid_table() { + let toml_content = r#" + engines = "not_a_table" + "#; + + let table: TomlTable = toml::from_str(toml_content).unwrap(); + let result = Schedule::parse_engines(&table).await; + + assert!(result.is_err()); + } +}