|
1 | 1 |
|
2 | 2 | use std::path::Path; |
| 3 | +use std::process::Command; |
3 | 4 | use datafusion::{ |
4 | 5 | execution::context::SessionContext, |
5 | 6 | prelude::ParquetReadOptions, |
@@ -170,6 +171,94 @@ pub async fn register_available_tpcds_tables( |
170 | 171 | Ok((registered_tables, missing_tables)) |
171 | 172 | } |
172 | 173 |
|
| 174 | +/// Generate TPCDS data by calling the generate.sh script |
| 175 | +/// |
| 176 | +/// # Arguments |
| 177 | +/// * `scale_factor` - Optional scale factor for data generation (e.g., "1", "10", "100") |
| 178 | +/// |
| 179 | +/// # Returns |
| 180 | +/// * `Result<()>` - Ok if generation successful, error otherwise |
| 181 | +pub fn generate_tpcds_data(scale_factor: Option<&str>) -> Result<()> { |
| 182 | + let tpcds_dir = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata/tpcds"); |
| 183 | + let generate_script = tpcds_dir.join("generate.sh"); |
| 184 | + |
| 185 | + if !generate_script.exists() { |
| 186 | + return Err(DataFusionError::Execution(format!( |
| 187 | + "TPCDS generate script not found: {}", |
| 188 | + generate_script.display() |
| 189 | + ))); |
| 190 | + } |
| 191 | + |
| 192 | + let mut cmd = Command::new("bash"); |
| 193 | + cmd.arg(&generate_script); |
| 194 | + cmd.current_dir(&tpcds_dir); |
| 195 | + |
| 196 | + // Add scale factor if provided |
| 197 | + if let Some(scale) = scale_factor { |
| 198 | + cmd.arg(scale); |
| 199 | + } |
| 200 | + |
| 201 | + let output = cmd.output().map_err(|e| { |
| 202 | + DataFusionError::Execution(format!( |
| 203 | + "Failed to execute TPCDS generate script: {}", |
| 204 | + e |
| 205 | + )) |
| 206 | + })?; |
| 207 | + |
| 208 | + if !output.status.success() { |
| 209 | + let stderr = String::from_utf8_lossy(&output.stderr); |
| 210 | + let stdout = String::from_utf8_lossy(&output.stdout); |
| 211 | + return Err(DataFusionError::Execution(format!( |
| 212 | + "TPCDS generate script failed with status: {}\nstdout: {}\nstderr: {}", |
| 213 | + output.status, stdout, stderr |
| 214 | + ))); |
| 215 | + } |
| 216 | + |
| 217 | + println!("TPCDS data generation completed successfully"); |
| 218 | + if !output.stdout.is_empty() { |
| 219 | + println!("Output: {}", String::from_utf8_lossy(&output.stdout)); |
| 220 | + } |
| 221 | + |
| 222 | + Ok(()) |
| 223 | +} |
| 224 | + |
| 225 | +/// Ensure TPCDS data is available, generating it if necessary |
| 226 | +/// |
| 227 | +/// # Arguments |
| 228 | +/// * `scale_factor` - Optional scale factor for data generation |
| 229 | +/// * `force_regenerate` - If true, regenerate data even if it already exists |
| 230 | +/// |
| 231 | +/// # Returns |
| 232 | +/// * `Result<()>` - Ok if data is available, error otherwise |
| 233 | +pub fn ensure_tpcds_data(scale_factor: Option<&str>, force_regenerate: bool) -> Result<()> { |
| 234 | + let data_dir = get_data_dir(); |
| 235 | + |
| 236 | + // Check if data already exists (basic check for a few key tables) |
| 237 | + let key_tables = ["date_dim", "item", "customer"]; |
| 238 | + let mut tables_exist = true; |
| 239 | + |
| 240 | + if !force_regenerate { |
| 241 | + for table in &key_tables { |
| 242 | + let table_path = data_dir.join(table); |
| 243 | + if !table_path.exists() { |
| 244 | + tables_exist = false; |
| 245 | + break; |
| 246 | + } |
| 247 | + } |
| 248 | + } else { |
| 249 | + tables_exist = false; |
| 250 | + } |
| 251 | + |
| 252 | + if !tables_exist { |
| 253 | + println!("Generating TPCDS data (scale factor: {:?})", scale_factor); |
| 254 | + generate_tpcds_data(scale_factor)?; |
| 255 | + } else { |
| 256 | + println!("TPCDS data already exists, skipping generation"); |
| 257 | + } |
| 258 | + |
| 259 | + Ok(()) |
| 260 | +} |
| 261 | + |
173 | 262 | #[cfg(test)] |
174 | 263 | mod tests { |
175 | 264 | use super::*; |
|
0 commit comments