diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0ac3c49..5481cfa 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -31,7 +31,7 @@ jobs: strategy: matrix: os: - - ubuntu-latest + - ubuntu-latest-m - macos-latest rust: [stable] steps: diff --git a/CLAUDE.md b/CLAUDE.md index fe92816..ef50b08 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -44,10 +44,11 @@ The codebase follows a layered architecture with clear separation of concerns: 2. **DataFusion Integration Layer** (`src/catalog.rs`, `src/schema.rs`, `src/table.rs`) - Bridges DuckLake concepts to DataFusion's catalog system - - `DuckLakeCatalog`: Implements `CatalogProvider`, uses dynamic metadata lookup (queries on every call to `schema()` and `schema_names()`) + - `DuckLakeCatalog`: Implements `CatalogProvider`, uses dynamic metadata lookup with configurable snapshot resolution - `DuckLakeSchema`: Implements `SchemaProvider`, uses dynamic metadata lookup (queries on every call to `table()` and `table_names()`) - `DuckLakeTable`: Implements `TableProvider`, caches table structure and file lists at creation time - **No HashMaps**: Catalog and schema providers query metadata on-demand rather than caching + - **Snapshot Resolution**: Configurable TTL (time-to-live) for balancing freshness and performance 3. **Path Resolution** (`src/path_resolver.rs`) - Centralized utilities for parsing object store URLs and resolving hierarchical paths @@ -77,7 +78,8 @@ The catalog uses a **pure dynamic lookup** approach with no caching at the catal - **DuckLakeCatalog** (`catalog.rs`): - `schema_names()`: Queries `list_schemas()` on every call - `schema()`: Queries `get_schema_by_name()` on every call - - `new()`: O(1) - only fetches snapshot ID and data_path + - `new()`: O(1) - only fetches data_path + - **Snapshot Resolution**: Configurable via `SnapshotConfig` - **DuckLakeSchema** (`schema.rs`): - `table_names()`: Queries `list_tables()` on every call @@ -92,12 +94,12 @@ The catalog uses a **pure dynamic lookup** approach with no caching at the catal **Benefits**: - O(1) memory usage regardless of catalog size - Fast catalog startup (no upfront schema/table listing) -- Always fresh metadata (no stale cache issues) -- Simple implementation (no cache invalidation logic) +- Configurable freshness vs performance trade-off +- Simple implementation (no complex cache invalidation logic) **Trade-offs**: - Small query overhead per metadata lookup (acceptable for read-only DuckDB connections) -- Future optimization: Add optional caching layer via wrapper implementation +- Snapshot resolution adds one SQL query per catalog operation (configurable via TTL) ### Data Flow diff --git a/examples/basic_query.rs b/examples/basic_query.rs index e70cf60..59b01e1 100644 --- a/examples/basic_query.rs +++ b/examples/basic_query.rs @@ -2,8 +2,9 @@ //! //! This example demonstrates how to: //! 1. Create a DuckLake catalog from a DuckDB catalog file -//! 2. Register it with DataFusion -//! 3. Execute a simple SELECT query +//! 2. Configure snapshot resolution with TTL (time-to-live) +//! 3. Register it with DataFusion +//! 4. Execute a simple SELECT query //! //! To run this example, you need: //! - A DuckDB database file with DuckLake tables @@ -14,6 +15,8 @@ use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::prelude::*; use datafusion_ducklake::{DuckLakeCatalog, DuckdbMetadataProvider}; +// Uncomment when using custom snapshot config: +// use datafusion_ducklake::SnapshotConfig; use object_store::ObjectStore; use object_store::aws::AmazonS3Builder; use std::env; @@ -56,9 +59,20 @@ async fn main() -> Result<(), Box> { ); runtime.register_object_store(&Url::parse("s3://ducklake-data/")?, s3); - // Create the DuckLake catalog + // Configure snapshot resolution behavior + // + // Option 1: Default configuration (TTL=0) - Always fresh, queries snapshot on every access let ducklake_catalog = DuckLakeCatalog::new(provider)?; + // Option 2: Custom TTL - Balance freshness and performance + // Caches snapshot for 5 seconds, then refreshes + // let config = SnapshotConfig { ttl_seconds: Some(5) }; + // let ducklake_catalog = DuckLakeCatalog::new_with_config(provider, config)?; + + // Option 3: Cache forever - Maximum performance, snapshot frozen at catalog creation + // let config = SnapshotConfig { ttl_seconds: None }; + // let ducklake_catalog = DuckLakeCatalog::new_with_config(provider, config)?; + println!("✓ Connected to DuckLake catalog"); let config = SessionConfig::new().with_default_catalog_and_schema("ducklake", "main"); diff --git a/src/catalog.rs b/src/catalog.rs index 3d3fbad..7e16c31 100644 --- a/src/catalog.rs +++ b/src/catalog.rs @@ -1,7 +1,8 @@ //! DuckLake catalog provider implementation use std::any::Any; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; +use std::time::{Duration, Instant}; use crate::Result; use crate::metadata_provider::MetadataProvider; @@ -10,10 +11,52 @@ use crate::schema::DuckLakeSchema; use datafusion::catalog::{CatalogProvider, SchemaProvider}; use datafusion::datasource::object_store::ObjectStoreUrl; -/// DuckLake catalog provider +/// Trait for providing current time (allows mocking in tests) +trait Clock: Send + Sync + std::fmt::Debug { + fn now(&self) -> Instant; +} + +/// Standard clock using std::time::Instant +#[derive(Debug)] +struct SystemClock; + +impl Clock for SystemClock { + fn now(&self) -> Instant { + Instant::now() + } +} + +/// Configuration for snapshot resolution behavior +#[derive(Debug, Clone)] +pub struct SnapshotConfig { + /// Time-to-live for cached snapshot ID + /// - Some(Duration::ZERO): Always query for latest snapshot (maximum freshness) + /// - Some(duration) where duration > 0: Cache snapshot for specified duration + /// - None: Cache forever (snapshot frozen at catalog creation) + pub ttl: Option, +} + +impl Default for SnapshotConfig { + fn default() -> Self { + Self { + // Default to Duration::ZERO for maximum freshness + ttl: Some(Duration::ZERO), + } + } +} + +/// Cached snapshot with timestamp +#[derive(Debug, Clone)] +struct SnapshotCache { + snapshot_id: i64, + cached_at: Instant, +} + +/// DuckLake catalog provider with configurable snapshot resolution /// /// Connects to a DuckLake catalog database and provides access to schemas and tables. /// Uses dynamic metadata lookup - schemas are queried on-demand from the catalog database. +/// Supports configurable snapshot resolution with TTL for balancing freshness and performance. #[derive(Debug)] pub struct DuckLakeCatalog { /// Metadata provider for querying catalog @@ -22,12 +65,36 @@ pub struct DuckLakeCatalog { object_store_url: Arc, /// Catalog base path component for resolving relative schema paths (e.g., /prefix/) catalog_path: String, + /// Configuration for snapshot resolution + config: SnapshotConfig, + /// Cached snapshot with timestamp + cached_snapshot: RwLock>, + /// Clock provider for time operations (allows mocking in tests) + clock: Arc, } impl DuckLakeCatalog { - /// Create a new DuckLake catalog with a metadata provider + /// Create a new DuckLake catalog with default configuration (TTL = 0 for maximum freshness) + /// + /// During catalog creation, only fetches data_path from the metadata provider. pub fn new(provider: impl MetadataProvider + 'static) -> Result { + Self::new_with_config(provider, SnapshotConfig::default()) + } + + /// Create a new DuckLake catalog with custom snapshot configuration + pub fn new_with_config( + provider: impl MetadataProvider + 'static, + config: SnapshotConfig, + ) -> Result { let provider = Arc::new(provider) as Arc; + Self::from_arc_provider(provider, config) + } + + /// Internal constructor that accepts Arc + fn from_arc_provider( + provider: Arc, + config: SnapshotConfig, + ) -> Result { let data_path = provider.get_data_path()?; let (object_store_url, catalog_path) = parse_object_store_url(&data_path)?; @@ -35,13 +102,97 @@ impl DuckLakeCatalog { provider, object_store_url: Arc::new(object_store_url), catalog_path, + config, + cached_snapshot: RwLock::new(None), + clock: Arc::new(SystemClock), }) } - fn get_current_snapshot_id(&self) -> Result { - self.provider + /// Internal constructor with injectable clock (for testing) + #[cfg(test)] + fn from_arc_provider_with_clock( + provider: Arc, + config: SnapshotConfig, + clock: Arc, + ) -> Result { + let data_path = provider.get_data_path()?; + let (object_store_url, catalog_path) = parse_object_store_url(&data_path)?; + + Ok(Self { + provider, + object_store_url: Arc::new(object_store_url), + catalog_path, + config, + cached_snapshot: RwLock::new(None), + clock, + }) + } + + /// Helper function for double-checked locking pattern to get or refresh snapshot + /// Returns cached snapshot if valid according to predicate, otherwise queries fresh snapshot + fn get_or_refresh_snapshot(&self, is_valid: F) -> Result + where + F: Fn(&SnapshotCache, Instant) -> bool, + { + // Check if cache is valid (read lock) + { + let cache = self + .cached_snapshot + .read() + .expect("Snapshot cache lock poisoned"); + if let Some(cached) = cache.as_ref() + && is_valid(cached, self.clock.now()) + { + return Ok(cached.snapshot_id); + } + } + + // Cache invalid or empty, refresh (write lock) + let mut cache = self + .cached_snapshot + .write() + .expect("Snapshot cache lock poisoned"); + + // Re-calculate now for precise timing (time may have elapsed acquiring write lock) + let now = self.clock.now(); + + // Double-check (another thread might have refreshed) + if let Some(cached) = cache.as_ref() + && is_valid(cached, now) + { + return Ok(cached.snapshot_id); + } + + // Query fresh snapshot + let snapshot_id = self + .provider .get_current_snapshot() - .inspect_err(|e| tracing::error!(error = %e, "Failed to get current snapshot")) + .inspect_err(|e| tracing::error!(error = %e, "Failed to get current snapshot"))?; + *cache = Some(SnapshotCache { + snapshot_id, + cached_at: now, + }); + + Ok(snapshot_id) + } + + fn get_current_snapshot_id(&self) -> Result { + match self.config.ttl { + // TTL = Duration::ZERO: Always query for fresh snapshot + Some(ttl) if ttl.is_zero() => self + .provider + .get_current_snapshot() + .inspect_err(|e| tracing::error!(error = %e, "Failed to get current snapshot")), + + // TTL > 0: Use cache if not expired + Some(ttl) => self.get_or_refresh_snapshot(|cached, now| { + let age = now.duration_since(cached.cached_at); + age < ttl + }), + + // TTL = None: Cache forever + None => self.get_or_refresh_snapshot(|_cached, _now| true), + } } } @@ -97,3 +248,231 @@ impl CatalogProvider for DuckLakeCatalog { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::metadata_provider::{ + DuckLakeTableColumn, DuckLakeTableFile, SchemaMetadata, TableMetadata, + }; + use std::cell::RefCell; + use std::sync::Mutex; + use std::sync::atomic::{AtomicI64, Ordering}; + use std::thread; + + /// Mock clock for deterministic time testing + #[derive(Debug)] + struct MockClock { + current_time: Mutex>, + } + + impl MockClock { + fn new() -> Self { + Self { + current_time: Mutex::new(RefCell::new(Instant::now())), + } + } + + fn advance(&self, duration: Duration) { + let guard = self.current_time.lock().unwrap(); + let mut time = guard.borrow_mut(); + *time += duration; + } + } + + impl Clock for MockClock { + fn now(&self) -> Instant { + let guard = self.current_time.lock().unwrap(); + *guard.borrow() + } + } + + /// Mock metadata provider for testing snapshot resolution + #[derive(Debug)] + struct MockMetadataProvider { + snapshot_counter: AtomicI64, + } + + impl MockMetadataProvider { + fn new(initial_snapshot: i64) -> Self { + Self { + snapshot_counter: AtomicI64::new(initial_snapshot), + } + } + + fn increment_snapshot(&self) { + self.snapshot_counter.fetch_add(1, Ordering::SeqCst); + } + } + + impl crate::metadata_provider::MetadataProvider for MockMetadataProvider { + fn get_current_snapshot(&self) -> crate::Result { + Ok(self.snapshot_counter.load(Ordering::SeqCst)) + } + + fn get_data_path(&self) -> crate::Result { + Ok("file:///tmp/test".to_string()) + } + + fn list_schemas(&self, _snapshot_id: i64) -> crate::Result> { + Ok(vec![]) + } + + fn list_tables( + &self, + _schema_id: i64, + _snapshot_id: i64, + ) -> crate::Result> { + Ok(vec![]) + } + + fn get_table_structure(&self, _table_id: i64) -> crate::Result> { + Ok(vec![]) + } + + fn get_table_files_for_select( + &self, + _table_id: i64, + _snapshot_id: i64, + ) -> crate::Result> { + Ok(vec![]) + } + + fn get_schema_by_name( + &self, + _name: &str, + _snapshot_id: i64, + ) -> crate::Result> { + Ok(None) + } + + fn get_table_by_name( + &self, + _schema_id: i64, + _name: &str, + _snapshot_id: i64, + ) -> crate::Result> { + Ok(None) + } + + fn table_exists( + &self, + _schema_id: i64, + _name: &str, + _snapshot_id: i64, + ) -> crate::Result { + Ok(false) + } + } + + #[test] + fn test_snapshot_config_default() { + let config = SnapshotConfig::default(); + assert_eq!(config.ttl, Some(Duration::ZERO)); + } + + #[test] + fn test_ttl_zero_always_queries_fresh() { + let provider = Arc::new(MockMetadataProvider::new(100)); + let provider_trait: Arc = provider.clone(); + let config = SnapshotConfig { + ttl: Some(Duration::ZERO), + }; + let catalog = DuckLakeCatalog::from_arc_provider(provider_trait, config).unwrap(); + + // First query should return snapshot 100 + let snapshot1 = catalog.get_current_snapshot_id().unwrap(); + assert_eq!(snapshot1, 100); + + // Increment snapshot externally + provider.increment_snapshot(); + + // Second query should see new snapshot (no caching) + let snapshot2 = catalog.get_current_snapshot_id().unwrap(); + assert_eq!(snapshot2, 101); + } + + #[test] + fn test_ttl_none_caches_forever() { + let provider = Arc::new(MockMetadataProvider::new(100)); + let provider_trait: Arc = provider.clone(); + let config = SnapshotConfig { + ttl: None, + }; + let catalog = DuckLakeCatalog::from_arc_provider(provider_trait, config).unwrap(); + + // First query caches snapshot 100 + let snapshot1 = catalog.get_current_snapshot_id().unwrap(); + assert_eq!(snapshot1, 100); + + // Increment snapshot externally + provider.increment_snapshot(); + + // Second query should still return cached value + let snapshot2 = catalog.get_current_snapshot_id().unwrap(); + assert_eq!(snapshot2, 100); + } + + #[test] + fn test_ttl_with_duration() { + let provider = Arc::new(MockMetadataProvider::new(100)); + let provider_trait: Arc = provider.clone(); + let mock_clock = Arc::new(MockClock::new()); + let config = SnapshotConfig { + ttl: Some(Duration::from_secs(1)), + }; + let catalog = DuckLakeCatalog::from_arc_provider_with_clock( + provider_trait, + config, + mock_clock.clone(), + ) + .unwrap(); + + // First query caches snapshot 100 + let snapshot1 = catalog.get_current_snapshot_id().unwrap(); + assert_eq!(snapshot1, 100); + + // Increment snapshot externally + provider.increment_snapshot(); + + // Immediate second query uses cache + let snapshot2 = catalog.get_current_snapshot_id().unwrap(); + assert_eq!(snapshot2, 100); + + // Advance time past TTL + mock_clock.advance(Duration::from_secs(2)); + + // Query after expiration fetches fresh snapshot + let snapshot3 = catalog.get_current_snapshot_id().unwrap(); + assert_eq!(snapshot3, 101); + } + + #[test] + fn test_concurrent_snapshot_access() { + let provider = MockMetadataProvider::new(100); + let config = SnapshotConfig { + ttl: Some(Duration::from_secs(1)), + }; + let catalog = Arc::new(DuckLakeCatalog::new_with_config(provider, config).unwrap()); + + let mut handles = vec![]; + + // Spawn multiple threads accessing snapshot concurrently + for _ in 0..10 { + let catalog_clone = Arc::clone(&catalog); + let handle = thread::spawn(move || { + for _ in 0..10 { + let _ = catalog_clone.get_current_snapshot_id(); + } + }); + handles.push(handle); + } + + // Wait for all threads to complete + for handle in handles { + handle.join().unwrap(); + } + + // Test passes if no panics occurred (validates RwLock safety) + } +} diff --git a/src/lib.rs b/src/lib.rs index 8ca11ee..1e77814 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,8 +24,10 @@ //! // Create a DuckDB metadata provider //! let provider = DuckdbMetadataProvider::new("path/to/catalog.ducklake")?; //! -//! // Register a DuckLake catalog with the provider +//! // Create catalog (default: always queries latest snapshot) //! let catalog = DuckLakeCatalog::new(provider)?; +//! +//! // Register the catalog with DataFusion //! ctx.register_catalog("ducklake", std::sync::Arc::new(catalog)); //! //! // Query tables from the catalog @@ -49,7 +51,7 @@ pub mod types; pub type Result = std::result::Result; // Re-export main types for convenience -pub use catalog::DuckLakeCatalog; +pub use catalog::{DuckLakeCatalog, SnapshotConfig}; pub use error::DuckLakeError; pub use metadata_provider::MetadataProvider; pub use metadata_provider_duckdb::DuckdbMetadataProvider;