Skip to content

Commit 8f288ab

Browse files
lliangyu-linliurenjie1024Leon Lin
authored
feat(sqllogictest): Add sqllogictest schedule definition and parsing (#1630)
## Which issue does this PR close? - Closes #1214 . ## What changes are included in this PR? * Add schedule definition * Parse schedule to engines involved and test steps * Part of #1621 ## Are these changes tested? Yes, through unit tests. Some parts, like parse engines, require integrations and will be tested once engine and catalog support is added as part of the sql logic tests. --------- Co-authored-by: liurenjie1024 <[email protected]> Co-authored-by: Leon Lin <[email protected]>
1 parent cd53ac2 commit 8f288ab

File tree

6 files changed

+277
-14
lines changed

6 files changed

+277
-14
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/sqllogictest/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ enum-ordinalize = { workspace = true }
3333
indicatif = { workspace = true }
3434
sqllogictest = { workspace = true }
3535
toml = { workspace = true }
36+
serde = { workspace = true }
37+
tracing = { workspace = true }
38+
tokio = { workspace = true }
3639

3740
[package.metadata.cargo-machete]
3841
# These dependencies are added to ensure minimal dependency version

crates/sqllogictest/src/engine/datafusion.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,25 +26,15 @@ use indicatif::ProgressBar;
2626
use sqllogictest::runner::AsyncDB;
2727
use toml::Table as TomlTable;
2828

29-
use crate::engine::Engine;
29+
use crate::engine::EngineRunner;
3030
use crate::error::Result;
3131

3232
pub struct DataFusionEngine {
3333
datafusion: DataFusion,
3434
}
3535

3636
#[async_trait::async_trait]
37-
impl Engine for DataFusionEngine {
38-
async fn new(config: TomlTable) -> Result<Self> {
39-
let session_config = SessionConfig::new().with_target_partitions(4);
40-
let ctx = SessionContext::new_with_config(session_config);
41-
ctx.register_catalog("default", Self::create_catalog(&config).await?);
42-
43-
Ok(Self {
44-
datafusion: DataFusion::new(ctx, PathBuf::from("testdata"), ProgressBar::new(100)),
45-
})
46-
}
47-
37+
impl EngineRunner for DataFusionEngine {
4838
async fn run_slt_file(&mut self, path: &Path) -> Result<()> {
4939
let content = std::fs::read_to_string(path)
5040
.with_context(|| format!("Failed to read slt file {:?}", path))
@@ -61,6 +51,16 @@ impl Engine for DataFusionEngine {
6151
}
6252

6353
impl DataFusionEngine {
54+
pub async fn new(config: TomlTable) -> Result<Self> {
55+
let session_config = SessionConfig::new().with_target_partitions(4);
56+
let ctx = SessionContext::new_with_config(session_config);
57+
ctx.register_catalog("default", Self::create_catalog(&config).await?);
58+
59+
Ok(Self {
60+
datafusion: DataFusion::new(ctx, PathBuf::from("testdata"), ProgressBar::new(100)),
61+
})
62+
}
63+
6464
async fn create_catalog(_: &TomlTable) -> anyhow::Result<Arc<dyn CatalogProvider>> {
6565
todo!()
6666
}

crates/sqllogictest/src/engine/mod.rs

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,68 @@ use std::path::Path;
2121

2222
use toml::Table as TomlTable;
2323

24+
use crate::engine::datafusion::DataFusionEngine;
2425
use crate::error::Result;
2526

27+
const KEY_TYPE: &str = "type";
28+
const TYPE_DATAFUSION: &str = "datafusion";
29+
2630
#[async_trait::async_trait]
27-
pub trait Engine: Sized {
28-
async fn new(config: TomlTable) -> Result<Self>;
31+
pub trait EngineRunner: Sized {
2932
async fn run_slt_file(&mut self, path: &Path) -> Result<()>;
3033
}
34+
35+
pub enum Engine {
36+
DataFusion(DataFusionEngine),
37+
}
38+
39+
impl Engine {
40+
pub async fn new(config: TomlTable) -> Result<Self> {
41+
let engine_type = config
42+
.get(KEY_TYPE)
43+
.ok_or_else(|| anyhow::anyhow!("Missing required key: {KEY_TYPE}"))?
44+
.as_str()
45+
.ok_or_else(|| anyhow::anyhow!("Config value for {KEY_TYPE} must be a string"))?;
46+
47+
match engine_type {
48+
TYPE_DATAFUSION => {
49+
let engine = DataFusionEngine::new(config).await?;
50+
Ok(Engine::DataFusion(engine))
51+
}
52+
_ => Err(anyhow::anyhow!("Unsupported engine type: {engine_type}").into()),
53+
}
54+
}
55+
56+
pub async fn run_slt_file(&mut self, path: &Path) -> Result<()> {
57+
match self {
58+
Engine::DataFusion(engine) => engine.run_slt_file(path).await,
59+
}
60+
}
61+
}
62+
63+
#[cfg(test)]
64+
mod tests {
65+
use toml::Table as TomlTable;
66+
67+
use crate::engine::Engine;
68+
69+
#[tokio::test]
70+
async fn test_engine_new_missing_type_key() {
71+
let config = TomlTable::new();
72+
let result = Engine::new(config).await;
73+
74+
assert!(result.is_err());
75+
}
76+
77+
#[tokio::test]
78+
async fn test_engine_invalid_type() {
79+
let input = r#"
80+
[engines]
81+
random = { type = "random_engine", url = "http://localhost:8181" }
82+
"#;
83+
let tbl = toml::from_str(input).unwrap();
84+
let result = Engine::new(tbl).await;
85+
86+
assert!(result.is_err());
87+
}
88+
}

crates/sqllogictest/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,5 @@
2222
mod engine;
2323
#[allow(dead_code)]
2424
mod error;
25+
#[allow(dead_code)]
26+
mod schedule;
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::HashMap;
19+
use std::fs::read_to_string;
20+
use std::path::{Path, PathBuf};
21+
22+
use anyhow::{Context, anyhow};
23+
use serde::{Deserialize, Serialize};
24+
use toml::{Table as TomlTable, Value};
25+
use tracing::info;
26+
27+
use crate::engine::Engine;
28+
29+
pub struct Schedule {
30+
/// Engine names to engine instances
31+
engines: HashMap<String, Engine>,
32+
/// List of test steps to run
33+
steps: Vec<Step>,
34+
/// Path of the schedule file
35+
schedule_file: String,
36+
}
37+
38+
#[derive(Debug, Clone, Serialize, Deserialize)]
39+
pub struct Step {
40+
/// Engine name
41+
engine: String,
42+
/// Stl file path
43+
slt: String,
44+
}
45+
46+
impl Schedule {
47+
pub fn new(engines: HashMap<String, Engine>, steps: Vec<Step>, schedule_file: String) -> Self {
48+
Self {
49+
engines,
50+
steps,
51+
schedule_file,
52+
}
53+
}
54+
55+
pub async fn from_file<P: AsRef<Path>>(path: P) -> anyhow::Result<Self> {
56+
let path_str = path.as_ref().to_string_lossy().to_string();
57+
let content = read_to_string(path)?;
58+
let toml_value = content.parse::<Value>()?;
59+
let toml_table = toml_value
60+
.as_table()
61+
.ok_or_else(|| anyhow!("Schedule file must be a TOML table"))?;
62+
63+
let engines = Schedule::parse_engines(toml_table).await?;
64+
let steps = Schedule::parse_steps(toml_table)?;
65+
66+
Ok(Self::new(engines, steps, path_str))
67+
}
68+
69+
pub async fn run(mut self) -> anyhow::Result<()> {
70+
info!("Starting test run with schedule: {}", self.schedule_file);
71+
72+
for (idx, step) in self.steps.iter().enumerate() {
73+
info!(
74+
"Running step {}/{}, using engine {}, slt file path: {}",
75+
idx + 1,
76+
self.steps.len(),
77+
&step.engine,
78+
&step.slt
79+
);
80+
81+
let engine = self
82+
.engines
83+
.get_mut(&step.engine)
84+
.ok_or_else(|| anyhow!("Engine {} not found", step.engine))?;
85+
86+
let step_sql_path = PathBuf::from(format!(
87+
"{}/testdata/slts/{}",
88+
env!("CARGO_MANIFEST_DIR"),
89+
&step.slt
90+
));
91+
92+
engine.run_slt_file(&step_sql_path).await?;
93+
94+
info!(
95+
"Completed step {}/{}, engine {}, slt file path: {}",
96+
idx + 1,
97+
self.steps.len(),
98+
&step.engine,
99+
&step.slt
100+
);
101+
}
102+
Ok(())
103+
}
104+
105+
async fn parse_engines(table: &TomlTable) -> anyhow::Result<HashMap<String, Engine>> {
106+
let engines_tbl = table
107+
.get("engines")
108+
.with_context(|| "Schedule file must have an 'engines' table")?
109+
.as_table()
110+
.ok_or_else(|| anyhow!("'engines' must be a table"))?;
111+
112+
let mut engines = HashMap::new();
113+
114+
for (name, engine_val) in engines_tbl {
115+
let cfg_tbl = engine_val
116+
.as_table()
117+
.ok_or_else(|| anyhow!("Config of engine '{name}' is not a table"))?
118+
.clone();
119+
120+
let engine = Engine::new(cfg_tbl)
121+
.await
122+
.with_context(|| format!("Failed to construct engine '{name}'"))?;
123+
124+
if engines.insert(name.clone(), engine).is_some() {
125+
return Err(anyhow!("Duplicate engine '{name}'"));
126+
}
127+
}
128+
129+
Ok(engines)
130+
}
131+
132+
fn parse_steps(table: &TomlTable) -> anyhow::Result<Vec<Step>> {
133+
let steps_val = table
134+
.get("steps")
135+
.with_context(|| "Schedule file must have a 'steps' array")?;
136+
137+
let steps: Vec<Step> = steps_val
138+
.clone()
139+
.try_into()
140+
.with_context(|| "Failed to deserialize steps")?;
141+
142+
Ok(steps)
143+
}
144+
}
145+
146+
#[cfg(test)]
147+
mod tests {
148+
use toml::Table as TomlTable;
149+
150+
use crate::schedule::Schedule;
151+
152+
#[test]
153+
fn test_parse_steps() {
154+
let input = r#"
155+
[[steps]]
156+
engine = "datafusion"
157+
slt = "test.slt"
158+
159+
[[steps]]
160+
engine = "spark"
161+
slt = "test2.slt"
162+
"#;
163+
164+
let tbl: TomlTable = toml::from_str(input).unwrap();
165+
let steps = Schedule::parse_steps(&tbl).unwrap();
166+
167+
assert_eq!(steps.len(), 2);
168+
assert_eq!(steps[0].engine, "datafusion");
169+
assert_eq!(steps[0].slt, "test.slt");
170+
assert_eq!(steps[1].engine, "spark");
171+
assert_eq!(steps[1].slt, "test2.slt");
172+
}
173+
174+
#[test]
175+
fn test_parse_steps_empty() {
176+
let input = r#"
177+
[[steps]]
178+
"#;
179+
180+
let tbl: TomlTable = toml::from_str(input).unwrap();
181+
let steps = Schedule::parse_steps(&tbl);
182+
183+
assert!(steps.is_err());
184+
}
185+
186+
#[tokio::test]
187+
async fn test_parse_engines_invalid_table() {
188+
let toml_content = r#"
189+
engines = "not_a_table"
190+
"#;
191+
192+
let table: TomlTable = toml::from_str(toml_content).unwrap();
193+
let result = Schedule::parse_engines(&table).await;
194+
195+
assert!(result.is_err());
196+
}
197+
}

0 commit comments

Comments
 (0)