Skip to content

Commit 50f4295

Browse files
committed
feat(sqllogictest): use serde derived structs for schedule parsing
Replace manual TOML parsing with serde-derived structs to improve maintainability and separate parsing from engine instantiation. Changes: - Add ScheduleConfig, EngineConfig, and EngineType structs with serde derives - Use #[serde(flatten)] for forward-compatibility with future config fields - Refactor Schedule::from_file() to use toml::from_str() directly - Add instantiate_engines() to separate parsing from engine creation - Remove manual parse_engines() and parse_steps() functions - Update tests to verify deserialization behavior Closes #1952
1 parent a329a3b commit 50f4295

File tree

3 files changed

+168
-103
lines changed

3 files changed

+168
-103
lines changed

crates/sqllogictest/src/engine/datafusion.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,8 @@ use iceberg::spec::{NestedField, PrimitiveType, Schema, Transform, Type, Unbound
2727
use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation};
2828
use iceberg_datafusion::IcebergCatalogProvider;
2929
use indicatif::ProgressBar;
30-
use toml::Table as TomlTable;
3130

32-
use crate::engine::{EngineRunner, run_slt_with_runner};
31+
use crate::engine::{EngineConfig, EngineRunner, run_slt_with_runner};
3332
use crate::error::Result;
3433

3534
pub struct DataFusionEngine {
@@ -59,20 +58,22 @@ impl EngineRunner for DataFusionEngine {
5958
}
6059

6160
impl DataFusionEngine {
62-
pub async fn new(config: TomlTable) -> Result<Self> {
61+
pub async fn new(_name: &str, config: &EngineConfig) -> Result<Self> {
6362
let session_config = SessionConfig::new()
6463
.with_target_partitions(4)
6564
.with_information_schema(true);
6665
let ctx = SessionContext::new_with_config(session_config);
67-
ctx.register_catalog("default", Self::create_catalog(&config).await?);
66+
ctx.register_catalog("default", Self::create_catalog(&config.extra).await?);
6867

6968
Ok(Self {
7069
test_data_path: PathBuf::from("testdata"),
7170
session_context: ctx,
7271
})
7372
}
7473

75-
async fn create_catalog(_: &TomlTable) -> anyhow::Result<Arc<dyn CatalogProvider>> {
74+
async fn create_catalog(
75+
_extra: &HashMap<String, toml::Value>,
76+
) -> anyhow::Result<Arc<dyn CatalogProvider>> {
7677
// TODO: support dynamic catalog configuration
7778
// See: https://github.com/apache/iceberg-rust/issues/1780
7879
let catalog = MemoryCatalogBuilder::default()

crates/sqllogictest/src/engine/mod.rs

Lines changed: 54 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,29 +17,44 @@
1717

1818
mod datafusion;
1919

20+
use std::collections::HashMap;
2021
use std::path::Path;
2122

2223
use anyhow::anyhow;
24+
use serde::Deserialize;
2325
use sqllogictest::{AsyncDB, MakeConnection, Runner, parse_file};
24-
use toml::Table as TomlTable;
2526

2627
use crate::engine::datafusion::DataFusionEngine;
2728
use crate::error::{Error, Result};
2829

29-
const TYPE_DATAFUSION: &str = "datafusion";
30+
/// Supported engine types for sqllogictest
31+
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
32+
#[serde(rename_all = "lowercase")]
33+
pub enum EngineType {
34+
Datafusion,
35+
}
36+
37+
/// Configuration for a single engine instance
38+
#[derive(Debug, Clone, Deserialize)]
39+
pub struct EngineConfig {
40+
/// The type of engine
41+
#[serde(rename = "type")]
42+
pub engine_type: EngineType,
43+
44+
/// Additional configuration fields for extensibility
45+
/// This allows forward-compatibility with future fields like catalog_type, catalog_properties
46+
#[serde(flatten)]
47+
pub extra: HashMap<String, toml::Value>,
48+
}
3049

3150
#[async_trait::async_trait]
3251
pub trait EngineRunner: Send {
3352
async fn run_slt_file(&mut self, path: &Path) -> Result<()>;
3453
}
3554

36-
pub async fn load_engine_runner(
37-
engine_type: &str,
38-
cfg: TomlTable,
39-
) -> Result<Box<dyn EngineRunner>> {
40-
match engine_type {
41-
TYPE_DATAFUSION => Ok(Box::new(DataFusionEngine::new(cfg).await?)),
42-
_ => Err(anyhow::anyhow!("Unsupported engine type: {engine_type}").into()),
55+
pub async fn load_engine_runner(name: &str, config: EngineConfig) -> Result<Box<dyn EngineRunner>> {
56+
match config.engine_type {
57+
EngineType::Datafusion => Ok(Box::new(DataFusionEngine::new(name, &config).await?)),
4358
}
4459
}
4560

@@ -65,29 +80,45 @@ where
6580

6681
#[cfg(test)]
6782
mod tests {
68-
use crate::engine::{TYPE_DATAFUSION, load_engine_runner};
83+
use std::collections::HashMap;
6984

70-
#[tokio::test]
71-
async fn test_engine_invalid_type() {
85+
use crate::engine::{EngineConfig, EngineType, load_engine_runner};
86+
87+
#[test]
88+
fn test_deserialize_engine_config() {
7289
let input = r#"
73-
[engines]
74-
random = { type = "random_engine", url = "http://localhost:8181" }
90+
type = "datafusion"
7591
"#;
76-
let tbl = toml::from_str(input).unwrap();
77-
let result = load_engine_runner("random_engine", tbl).await;
7892

79-
assert!(result.is_err());
93+
let config: EngineConfig = toml::from_str(input).unwrap();
94+
assert_eq!(config.engine_type, EngineType::Datafusion);
95+
assert!(config.extra.is_empty());
8096
}
8197

82-
#[tokio::test]
83-
async fn test_load_datafusion() {
98+
#[test]
99+
fn test_deserialize_engine_config_with_extras() {
84100
let input = r#"
85-
[engines]
86-
df = { type = "datafusion" }
101+
type = "datafusion"
102+
catalog_type = "rest"
103+
104+
[catalog_properties]
105+
uri = "http://localhost:8181"
87106
"#;
88-
let tbl = toml::from_str(input).unwrap();
89-
let result = load_engine_runner(TYPE_DATAFUSION, tbl).await;
90107

108+
let config: EngineConfig = toml::from_str(input).unwrap();
109+
assert_eq!(config.engine_type, EngineType::Datafusion);
110+
assert!(config.extra.contains_key("catalog_type"));
111+
assert!(config.extra.contains_key("catalog_properties"));
112+
}
113+
114+
#[tokio::test]
115+
async fn test_load_datafusion() {
116+
let config = EngineConfig {
117+
engine_type: EngineType::Datafusion,
118+
extra: HashMap::new(),
119+
};
120+
121+
let result = load_engine_runner("df", config).await;
91122
assert!(result.is_ok());
92123
}
93124
}

crates/sqllogictest/src/schedule.rs

Lines changed: 108 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,18 @@ use std::path::{Path, PathBuf};
2121

2222
use anyhow::{Context, anyhow};
2323
use serde::{Deserialize, Serialize};
24-
use toml::{Table as TomlTable, Value};
2524
use tracing::info;
2625

27-
use crate::engine::{EngineRunner, load_engine_runner};
26+
use crate::engine::{EngineConfig, EngineRunner, load_engine_runner};
27+
28+
/// Raw configuration parsed from the schedule TOML file
29+
#[derive(Debug, Clone, Deserialize)]
30+
pub struct ScheduleConfig {
31+
/// Engine name to engine configuration
32+
pub engines: HashMap<String, EngineConfig>,
33+
/// List of test steps to run
34+
pub steps: Vec<Step>,
35+
}
2836

2937
pub struct Schedule {
3038
/// Engine names to engine instances
@@ -59,15 +67,27 @@ impl Schedule {
5967
pub async fn from_file<P: AsRef<Path>>(path: P) -> anyhow::Result<Self> {
6068
let path_str = path.as_ref().to_string_lossy().to_string();
6169
let content = read_to_string(path)?;
62-
let toml_value = content.parse::<Value>()?;
63-
let toml_table = toml_value
64-
.as_table()
65-
.ok_or_else(|| anyhow!("Schedule file must be a TOML table"))?;
6670

67-
let engines = Schedule::parse_engines(toml_table).await?;
68-
let steps = Schedule::parse_steps(toml_table)?;
71+
let config: ScheduleConfig = toml::from_str(&content)
72+
.with_context(|| format!("Failed to parse schedule file: {path_str}"))?;
73+
74+
let engines = Self::instantiate_engines(config.engines).await?;
75+
76+
Ok(Self::new(engines, config.steps, path_str))
77+
}
78+
79+
/// Instantiate engine runners from their configurations
80+
async fn instantiate_engines(
81+
configs: HashMap<String, EngineConfig>,
82+
) -> anyhow::Result<HashMap<String, Box<dyn EngineRunner>>> {
83+
let mut engines = HashMap::new();
84+
85+
for (name, config) in configs {
86+
let engine = load_engine_runner(&name, config).await?;
87+
engines.insert(name, engine);
88+
}
6989

70-
Ok(Self::new(engines, steps, path_str))
90+
Ok(engines)
7191
}
7292

7393
pub async fn run(mut self) -> anyhow::Result<()> {
@@ -105,103 +125,116 @@ impl Schedule {
105125
}
106126
Ok(())
107127
}
128+
}
108129

109-
async fn parse_engines(
110-
table: &TomlTable,
111-
) -> anyhow::Result<HashMap<String, Box<dyn EngineRunner>>> {
112-
let engines_tbl = table
113-
.get("engines")
114-
.with_context(|| "Schedule file must have an 'engines' table")?
115-
.as_table()
116-
.ok_or_else(|| anyhow!("'engines' must be a table"))?;
117-
118-
let mut engines = HashMap::new();
119-
120-
for (name, engine_val) in engines_tbl {
121-
let cfg_tbl = engine_val
122-
.as_table()
123-
.ok_or_else(|| anyhow!("Config of engine '{name}' is not a table"))?
124-
.clone();
125-
126-
let engine_type = cfg_tbl
127-
.get("type")
128-
.ok_or_else(|| anyhow::anyhow!("Engine {name} doesn't have a 'type' field"))?
129-
.as_str()
130-
.ok_or_else(|| anyhow::anyhow!("Engine {name} type must be a string"))?;
131-
132-
let engine = load_engine_runner(engine_type, cfg_tbl.clone()).await?;
133-
134-
if engines.insert(name.clone(), engine).is_some() {
135-
return Err(anyhow!("Duplicate engine '{name}'"));
136-
}
137-
}
130+
#[cfg(test)]
131+
mod tests {
132+
use crate::engine::EngineType;
133+
use crate::schedule::ScheduleConfig;
138134

139-
Ok(engines)
140-
}
135+
#[test]
136+
fn test_deserialize_schedule_config() {
137+
let input = r#"
138+
[engines]
139+
df = { type = "datafusion" }
141140
142-
fn parse_steps(table: &TomlTable) -> anyhow::Result<Vec<Step>> {
143-
let steps_val = table
144-
.get("steps")
145-
.with_context(|| "Schedule file must have a 'steps' array")?;
141+
[[steps]]
142+
engine = "df"
143+
slt = "test.slt"
144+
"#;
146145

147-
let steps: Vec<Step> = steps_val
148-
.clone()
149-
.try_into()
150-
.with_context(|| "Failed to deserialize steps")?;
146+
let config: ScheduleConfig = toml::from_str(input).unwrap();
151147

152-
Ok(steps)
148+
assert_eq!(config.engines.len(), 1);
149+
assert!(config.engines.contains_key("df"));
150+
assert_eq!(config.engines["df"].engine_type, EngineType::Datafusion);
151+
assert_eq!(config.steps.len(), 1);
152+
assert_eq!(config.steps[0].engine, "df");
153+
assert_eq!(config.steps[0].slt, "test.slt");
153154
}
154-
}
155-
156-
#[cfg(test)]
157-
mod tests {
158-
use toml::Table as TomlTable;
159-
160-
use crate::schedule::Schedule;
161155

162156
#[test]
163-
fn test_parse_steps() {
157+
fn test_deserialize_multiple_steps() {
164158
let input = r#"
159+
[engines]
160+
datafusion = { type = "datafusion" }
161+
165162
[[steps]]
166163
engine = "datafusion"
167164
slt = "test.slt"
168165
169166
[[steps]]
170-
engine = "spark"
167+
engine = "datafusion"
171168
slt = "test2.slt"
172169
"#;
173170

174-
let tbl: TomlTable = toml::from_str(input).unwrap();
175-
let steps = Schedule::parse_steps(&tbl).unwrap();
171+
let config: ScheduleConfig = toml::from_str(input).unwrap();
176172

177-
assert_eq!(steps.len(), 2);
178-
assert_eq!(steps[0].engine, "datafusion");
179-
assert_eq!(steps[0].slt, "test.slt");
180-
assert_eq!(steps[1].engine, "spark");
181-
assert_eq!(steps[1].slt, "test2.slt");
173+
assert_eq!(config.steps.len(), 2);
174+
assert_eq!(config.steps[0].engine, "datafusion");
175+
assert_eq!(config.steps[0].slt, "test.slt");
176+
assert_eq!(config.steps[1].engine, "datafusion");
177+
assert_eq!(config.steps[1].slt, "test2.slt");
182178
}
183179

184180
#[test]
185-
fn test_parse_steps_empty() {
181+
fn test_deserialize_with_extra_fields() {
182+
// Test forward-compatibility with extra fields (for PR #1943)
186183
let input = r#"
184+
[engines]
185+
df = { type = "datafusion", catalog_type = "rest", some_future_field = "value" }
186+
187187
[[steps]]
188+
engine = "df"
189+
slt = "test.slt"
188190
"#;
189191

190-
let tbl: TomlTable = toml::from_str(input).unwrap();
191-
let steps = Schedule::parse_steps(&tbl);
192+
let config: ScheduleConfig = toml::from_str(input).unwrap();
192193

193-
assert!(steps.is_err());
194+
assert!(config.engines["df"].extra.contains_key("catalog_type"));
195+
assert!(config.engines["df"].extra.contains_key("some_future_field"));
194196
}
195197

196-
#[tokio::test]
197-
async fn test_parse_engines_invalid_table() {
198-
let toml_content = r#"
199-
engines = "not_a_table"
198+
#[test]
199+
fn test_deserialize_missing_engine_type() {
200+
let input = r#"
201+
[engines]
202+
df = { }
203+
204+
[[steps]]
205+
engine = "df"
206+
slt = "test.slt"
200207
"#;
201208

202-
let table: TomlTable = toml::from_str(toml_content).unwrap();
203-
let result = Schedule::parse_engines(&table).await;
209+
let result: Result<ScheduleConfig, _> = toml::from_str(input);
210+
assert!(result.is_err());
211+
}
212+
213+
#[test]
214+
fn test_deserialize_invalid_engine_type() {
215+
let input = r#"
216+
[engines]
217+
df = { type = "unknown_engine" }
218+
219+
[[steps]]
220+
engine = "df"
221+
slt = "test.slt"
222+
"#;
223+
224+
let result: Result<ScheduleConfig, _> = toml::from_str(input);
225+
assert!(result.is_err());
226+
}
227+
228+
#[test]
229+
fn test_deserialize_missing_step_fields() {
230+
let input = r#"
231+
[engines]
232+
df = { type = "datafusion" }
233+
234+
[[steps]]
235+
"#;
204236

237+
let result: Result<ScheduleConfig, _> = toml::from_str(input);
205238
assert!(result.is_err());
206239
}
207240
}

0 commit comments

Comments
 (0)