Skip to content

Commit 8b49653

Browse files
authored
Merge pull request #2 from kination/improve/metadata-format
Apply metadata update
2 parents 3b3075b + 4be288c commit 8b49653

File tree

5 files changed

+515
-3
lines changed

5 files changed

+515
-3
lines changed

vine-core/src/metadata.rs

Lines changed: 246 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
use serde::{Deserialize, Serialize};
2-
use std::fs::File;
3-
use std::io::{self, Write};
2+
use std::fs::{self, File};
3+
use std::io::{self};
4+
use std::path::Path;
5+
use parquet::file::reader::{FileReader, SerializedFileReader};
6+
use parquet::basic::Type as PhysicalType;
47

58
#[derive(Serialize, Deserialize, Clone)]
69
pub struct Metadata {
@@ -38,4 +41,245 @@ impl Metadata {
3841
serde_json::to_writer(file, &self)?;
3942
Ok(())
4043
}
44+
45+
// ========================================================================
46+
// Schema-on-Read Functions
47+
// ========================================================================
48+
49+
/// Infer schema from Parquet file in directory
50+
///
51+
/// This enables schema-on-read pattern where metadata is extracted
52+
/// from Parquet files instead of requiring vine_meta.json.
53+
///
54+
/// # Arguments
55+
/// * `base_path` - Root directory containing date-partitioned Parquet files
56+
///
57+
/// # Returns
58+
/// Metadata inferred from Parquet schema
59+
pub fn infer_from_parquet<P: AsRef<Path>>(base_path: P) -> Result<Self, Box<dyn std::error::Error>> {
60+
let base_path = base_path.as_ref();
61+
62+
// Find first parquet file (scan date directories)
63+
let parquet_file = Self::find_first_parquet_file(base_path)?;
64+
65+
// Read Parquet metadata
66+
let file = File::open(&parquet_file)
67+
.map_err(|e| format!("Failed to open parquet file {:?}: {}", parquet_file, e))?;
68+
let reader = SerializedFileReader::new(file)
69+
.map_err(|e| format!("Failed to read parquet file: {}", e))?;
70+
71+
let parquet_metadata = reader.metadata();
72+
let schema = parquet_metadata.file_metadata().schema();
73+
74+
// Convert Parquet schema to Vine Metadata
75+
let fields: Vec<MetadataField> = schema
76+
.get_fields()
77+
.iter()
78+
.enumerate()
79+
.map(|(i, field)| {
80+
let data_type = Self::map_parquet_type_to_vine(field.get_physical_type());
81+
MetadataField {
82+
id: (i + 1) as i32,
83+
name: field.name().to_string(),
84+
data_type,
85+
is_required: !field.is_optional(),
86+
}
87+
})
88+
.collect();
89+
90+
if fields.is_empty() {
91+
return Err("No fields found in Parquet schema".into());
92+
}
93+
94+
Ok(Metadata {
95+
table_name: "inferred".to_string(),
96+
fields,
97+
})
98+
}
99+
100+
/// Find the first (or latest) Parquet file in directory tree
101+
fn find_first_parquet_file(base_path: &Path) -> Result<std::path::PathBuf, Box<dyn std::error::Error>> {
102+
// First, check for direct .parquet files in base_path
103+
if let Ok(entries) = fs::read_dir(base_path) {
104+
for entry in entries.flatten() {
105+
let path = entry.path();
106+
if path.extension().map_or(false, |ext| ext == "parquet") {
107+
return Ok(path);
108+
}
109+
}
110+
}
111+
112+
// Then check date-partitioned directories (YYYY-MM-DD format)
113+
// Sort in reverse to get the latest first
114+
let mut date_dirs: Vec<_> = fs::read_dir(base_path)?
115+
.filter_map(|e| e.ok())
116+
.filter(|e| e.path().is_dir())
117+
.filter(|e| {
118+
// Check if directory name matches date pattern
119+
e.file_name()
120+
.to_str()
121+
.map_or(false, |name| name.len() == 10 && name.chars().nth(4) == Some('-'))
122+
})
123+
.collect();
124+
125+
// Sort by name descending (latest date first)
126+
date_dirs.sort_by(|a, b| b.file_name().cmp(&a.file_name()));
127+
128+
for dir_entry in date_dirs {
129+
let dir_path = dir_entry.path();
130+
if let Ok(files) = fs::read_dir(&dir_path) {
131+
// Get the latest parquet file in this directory
132+
let mut parquet_files: Vec<_> = files
133+
.filter_map(|e| e.ok())
134+
.filter(|e| {
135+
e.path()
136+
.extension()
137+
.map_or(false, |ext| ext == "parquet")
138+
})
139+
.collect();
140+
141+
// Sort by name descending (latest file first)
142+
parquet_files.sort_by(|a, b| b.file_name().cmp(&a.file_name()));
143+
144+
if let Some(file_entry) = parquet_files.first() {
145+
return Ok(file_entry.path());
146+
}
147+
}
148+
}
149+
150+
Err(format!("No Parquet files found in {:?}", base_path).into())
151+
}
152+
153+
/// Map Parquet physical type to Vine data type string
154+
fn map_parquet_type_to_vine(physical_type: PhysicalType) -> String {
155+
match physical_type {
156+
PhysicalType::INT32 => "integer".to_string(),
157+
PhysicalType::INT64 => "integer".to_string(), // Map to integer for now
158+
PhysicalType::BOOLEAN => "boolean".to_string(),
159+
PhysicalType::FLOAT => "double".to_string(),
160+
PhysicalType::DOUBLE => "double".to_string(),
161+
PhysicalType::BYTE_ARRAY => "string".to_string(),
162+
PhysicalType::FIXED_LEN_BYTE_ARRAY => "string".to_string(),
163+
PhysicalType::INT96 => "string".to_string(), // Timestamp, treat as string for now
164+
}
165+
}
166+
167+
/// Update metadata cache asynchronously
168+
///
169+
/// This function updates _meta/schema.json in the background without
170+
/// blocking the write path.
171+
///
172+
/// # Arguments
173+
/// * `base_path` - Root directory
174+
pub fn update_cache_async<P: AsRef<Path> + Send + 'static>(base_path: P)
175+
where
176+
P: Clone,
177+
{
178+
let path = base_path.as_ref().to_path_buf();
179+
std::thread::spawn(move || {
180+
match Self::infer_from_parquet(&path) {
181+
Ok(metadata) => {
182+
if let Err(e) = metadata.save_to_cache(&path) {
183+
eprintln!("Warning: Failed to save metadata cache: {}", e);
184+
}
185+
}
186+
Err(e) => {
187+
eprintln!("Warning: Failed to infer schema for cache: {}", e);
188+
}
189+
}
190+
});
191+
}
192+
193+
/// Load cached schema from _meta/schema.json
194+
///
195+
/// # Arguments
196+
/// * `base_path` - Root directory
197+
///
198+
/// # Returns
199+
/// Cached metadata if exists, None otherwise
200+
pub fn load_cached<P: AsRef<Path>>(base_path: P) -> Option<Self> {
201+
let cache_path = base_path.as_ref().join("_meta").join("schema.json");
202+
if cache_path.exists() {
203+
let content = fs::read_to_string(&cache_path).ok()?;
204+
serde_json::from_str(&content).ok()
205+
} else {
206+
None
207+
}
208+
}
209+
210+
/// Save metadata to cache location (_meta/schema.json)
211+
///
212+
/// # Arguments
213+
/// * `base_path` - Root directory
214+
pub fn save_to_cache<P: AsRef<Path>>(&self, base_path: P) -> io::Result<()> {
215+
let cache_dir = base_path.as_ref().join("_meta");
216+
fs::create_dir_all(&cache_dir)?;
217+
let cache_path = cache_dir.join("schema.json");
218+
self.save(cache_path.to_str().unwrap())
219+
}
220+
221+
// ========================================================================
222+
// Versioning Functions (Skeleton - Future Phase)
223+
// ========================================================================
224+
225+
/// Save metadata with version tracking
226+
///
227+
/// Creates a new version entry in _meta/versions/ directory.
228+
///
229+
/// # Arguments
230+
/// * `base_path` - Root directory
231+
/// * `operation` - Operation type (e.g., "CREATE", "ADD_COLUMN", "ROLLBACK")
232+
///
233+
/// # Returns
234+
/// New version number
235+
///
236+
/// # Implementation TODO (Phase 2)
237+
/// 1. Create _meta/versions/ directory
238+
/// 2. Determine next version number
239+
/// 3. Create version file with metadata + operation info
240+
/// 4. Update vine_meta.json as cache
241+
pub fn save_versioned<P: AsRef<Path>>(
242+
&self,
243+
_base_path: P,
244+
_operation: &str,
245+
) -> io::Result<i64> {
246+
// TODO: Implement versioning (Phase 2)
247+
// Future: Delta Lake-style transaction log
248+
Ok(0)
249+
}
250+
251+
/// Load specific version of metadata
252+
///
253+
/// # Arguments
254+
/// * `base_path` - Root directory
255+
/// * `version` - Version number to load
256+
///
257+
/// # Implementation TODO (Phase 2)
258+
/// 1. Read _meta/versions/{version:020}.json
259+
/// 2. Parse and return metadata
260+
pub fn load_version<P: AsRef<Path>>(
261+
_base_path: P,
262+
_version: i64,
263+
) -> io::Result<Self> {
264+
// TODO: Implement version loading (Phase 2)
265+
unimplemented!("Version loading not yet implemented")
266+
}
267+
268+
/// Rollback to specific version
269+
///
270+
/// # Arguments
271+
/// * `base_path` - Root directory
272+
/// * `target_version` - Version to rollback to
273+
///
274+
/// # Implementation TODO (Phase 2)
275+
/// 1. Load target version metadata
276+
/// 2. Update vine_meta.json
277+
/// 3. Create new version entry with operation="ROLLBACK"
278+
pub fn rollback<P: AsRef<Path>>(
279+
_base_path: P,
280+
_target_version: i64,
281+
) -> io::Result<()> {
282+
// TODO: Implement rollback (Phase 2)
283+
unimplemented!("Rollback not yet implemented")
284+
}
41285
}

vine-core/src/reader_cache.rs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use crate::metadata::Metadata;
22
use std::path::PathBuf;
3-
use std::sync::Arc;
43

54
/// Caching 'reader metadata'/'schema information'
65
/// Prevent frequent metadata parsing and ensures consistency with 'writer'
@@ -46,4 +45,49 @@ impl ReaderCache {
4645
}
4746
Ok(())
4847
}
48+
49+
// ========================================================================
50+
// Schema-on-Read Functions
51+
// ========================================================================
52+
53+
/// Create reader cache with schema-on-read fallback
54+
///
55+
/// Tries multiple sources in order:
56+
/// 1. vine_meta.json (traditional)
57+
/// 2. _meta/schema.json (cache)
58+
/// 3. Infer from Parquet files
59+
pub fn new_with_fallback(base_path: PathBuf) -> Result<Self, Box<dyn std::error::Error>> {
60+
// Option 1: vine_meta.json (traditional, highest priority)
61+
let meta_path = base_path.join("vine_meta.json");
62+
if meta_path.exists() {
63+
return Self::new(base_path);
64+
}
65+
66+
// Option 2: _meta/schema.json (cached schema)
67+
if let Some(metadata) = Metadata::load_cached(&base_path) {
68+
// Validate metadata
69+
if metadata.fields.is_empty() {
70+
return Err("Cached metadata must have at least one field".into());
71+
}
72+
return Ok(Self {
73+
metadata,
74+
base_path,
75+
});
76+
}
77+
78+
// Option 3: Infer from Parquet files
79+
let metadata = Metadata::infer_from_parquet(&base_path)?;
80+
81+
// Optionally save to cache for future reads (async, non-blocking)
82+
let cache_path = base_path.clone();
83+
let metadata_clone = metadata.clone();
84+
std::thread::spawn(move || {
85+
let _ = metadata_clone.save_to_cache(&cache_path);
86+
});
87+
88+
Ok(Self {
89+
metadata,
90+
base_path,
91+
})
92+
}
4993
}

vine-core/src/writer_cache.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,4 +69,46 @@ impl WriterCache {
6969

7070
Ok(())
7171
}
72+
73+
// ========================================================================
74+
// Schema-on-Write Optional Functions (Skeleton - TODO: Implement)
75+
// ========================================================================
76+
77+
/// Create cache optionally (returns None if metadata file not found)
78+
///
79+
/// This enables schema-on-read pattern where metadata file is optional.
80+
///
81+
/// # Implementation TODO
82+
/// 1. Try to load metadata
83+
/// 2. Return Some(cache) if successful
84+
/// 3. Return None if file not found (not an error!)
85+
pub fn new_optional(base_path: PathBuf) -> Option<Self> {
86+
// TODO: Implement optional loading
87+
// Self::new(base_path).ok()
88+
Self::new(base_path).ok()
89+
}
90+
91+
/// Create cache from explicit schema (no metadata file needed)
92+
///
93+
/// This allows writing Parquet files without vine_meta.json.
94+
///
95+
/// # Arguments
96+
/// * `base_path` - Root directory
97+
/// * `metadata` - Explicit metadata to use
98+
///
99+
/// # Implementation TODO
100+
/// 1. Build Parquet schema from metadata
101+
/// 2. Return cache without reading any files
102+
pub fn from_metadata(
103+
base_path: PathBuf,
104+
metadata: Metadata,
105+
) -> Result<Self, Box<dyn std::error::Error>> {
106+
// TODO: Implement construction from explicit metadata
107+
let schema = Self::build_schema(&metadata)?;
108+
Ok(Self {
109+
metadata,
110+
schema,
111+
base_path,
112+
})
113+
}
72114
}

0 commit comments

Comments
 (0)