Skip to content

Commit b55e181

Browse files
Implement PostgreSQL Metadata Provider (#20)
1 parent 49047bb commit b55e181

File tree

7 files changed

+1629
-30
lines changed

7 files changed

+1629
-30
lines changed

Cargo.toml

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,22 @@ futures = "0.3"
2626
tracing = "0.1"
2727
thiserror = "2.0"
2828

29+
# Multi-database metadata providers (optional)
30+
sqlx = { version = "0.8", features = ["runtime-tokio"], optional = true }
31+
2932
[dev-dependencies]
3033
testcontainers = "0.23"
31-
testcontainers-modules = { version = "0.11", features = ["minio"] }
34+
testcontainers-modules = { version = "0.11", features = ["minio", "postgres", "mysql"] }
3235
tempfile = "3.14"
3336
anyhow = "1.0"
3437
aws-sdk-s3 = "1.75"
3538
aws-credential-types = "1.2"
3639

3740
[features]
3841
# Allow skipping tests that use docker (in CI, macos doesn't support docker).
39-
skip-tests-with-docker = []
42+
skip-tests-with-docker = []
43+
44+
# Metadata provider backends
45+
metadata-postgres = ["sqlx", "sqlx/postgres", "sqlx/chrono"]
46+
# Future: metadata-sqlite = ["sqlx", "sqlx/sqlite", "sqlx/chrono"]
47+
# Future: metadata-mysql = ["sqlx", "sqlx/mysql", "sqlx/chrono"]

examples/basic_query.rs

Lines changed: 54 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! Basic DuckLake query example with snapshot isolation
22
//!
33
//! This example demonstrates how to:
4-
//! 1. Create a DuckLake catalog from a DuckDB catalog file
4+
//! 1. Create a DuckLake catalog from DuckDB or PostgreSQL
55
//! 2. Bind the catalog to a specific snapshot for query consistency
66
//! 3. Register it with DataFusion
77
//! 4. Execute a simple SELECT query
@@ -16,14 +16,24 @@
1616
//! To query data at different points in time, create separate catalogs bound to
1717
//! different snapshot IDs.
1818
//!
19-
//! To run this example, you need:
20-
//! - A DuckDB database file with DuckLake tables
21-
//! - Parquet data files referenced by the catalog
19+
//! ## Usage
2220
//!
23-
//! Usage: cargo run --example basic_query <catalog.db> <sql>
21+
//! With DuckDB catalog:
22+
//! ```bash
23+
//! cargo run --example basic_query catalog.db "SELECT * FROM main.users"
24+
//! ```
25+
//!
26+
//! With PostgreSQL catalog (requires --features metadata-postgres):
27+
//! ```bash
28+
//! cargo run --example basic_query --features metadata-postgres \
29+
//! "postgresql://user:password@localhost:5432/postgres" \
30+
//! "SELECT * FROM main.users"
31+
//! ```
2432
2533
use datafusion::execution::runtime_env::RuntimeEnv;
2634
use datafusion::prelude::*;
35+
#[cfg(feature = "metadata-postgres")]
36+
use datafusion_ducklake::PostgresMetadataProvider;
2737
use datafusion_ducklake::{
2838
DuckLakeCatalog, DuckdbMetadataProvider, MetadataProvider, register_ducklake_functions,
2939
};
@@ -38,25 +48,51 @@ use url::Url;
3848
async fn main() -> Result<(), Box<dyn std::error::Error>> {
3949
let args: Vec<String> = env::args().collect();
4050
if args.len() < 3 {
41-
eprintln!("Usage: cargo run --example basic_query catalog.db sql");
51+
eprintln!("Usage:");
52+
eprintln!(" DuckDB: cargo run --example basic_query catalog.db \"SQL\"");
53+
eprintln!(
54+
" PostgreSQL: cargo run --example basic_query --features metadata-postgres \"postgresql://...\" \"SQL\""
55+
);
4256
exit(1);
4357
}
44-
let catalog_path = &args[1];
58+
let catalog_source = &args[1];
4559
let sql = &args[2];
4660

47-
// // Path to your DuckLake catalog database
48-
// let catalog_path = "test_catalog.db";
61+
// Detect provider type based on input
62+
let is_postgres = catalog_source.starts_with("postgresql://");
4963

50-
println!("Connecting to DuckLake catalog: {}", catalog_path);
64+
if is_postgres {
65+
#[cfg(not(feature = "metadata-postgres"))]
66+
{
67+
eprintln!("Error: PostgreSQL support requires the 'metadata-postgres' feature");
68+
eprintln!("Run with: cargo run --example basic_query --features metadata-postgres");
69+
exit(1);
70+
}
5171

52-
// Create the metadata provider
53-
let provider = Arc::new(DuckdbMetadataProvider::new(catalog_path)?);
72+
#[cfg(feature = "metadata-postgres")]
73+
{
74+
println!("Connecting to PostgreSQL catalog: {}", catalog_source);
75+
let provider = Arc::new(PostgresMetadataProvider::new(catalog_source).await?);
76+
let snapshot_id = provider.get_current_snapshot()?;
77+
println!("Current snapshot ID: {}", snapshot_id);
78+
run_query(provider, snapshot_id, sql).await?;
79+
}
80+
} else {
81+
println!("Connecting to DuckDB catalog: {}", catalog_source);
82+
let provider = Arc::new(DuckdbMetadataProvider::new(catalog_source)?);
83+
let snapshot_id = provider.get_current_snapshot()?;
84+
println!("Current snapshot ID: {}", snapshot_id);
85+
run_query(provider, snapshot_id, sql).await?;
86+
}
5487

55-
// Get the current snapshot ID
56-
// This ensures query consistency - all metadata lookups will use this snapshot
57-
let snapshot_id = provider.get_current_snapshot()?;
58-
println!("Current snapshot ID: {}", snapshot_id);
88+
Ok(())
89+
}
5990

91+
async fn run_query(
92+
provider: Arc<dyn MetadataProvider>,
93+
snapshot_id: i64,
94+
sql: &str,
95+
) -> Result<(), Box<dyn std::error::Error>> {
6096
// Create runtime and register object stores
6197
// For MinIO or S3, register the object store with the runtime
6298
let runtime = Arc::new(RuntimeEnv::default());
@@ -77,11 +113,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
77113
// Create the DuckLake catalog bound to the snapshot
78114
// This ensures all queries through this catalog see consistent data
79115
// from this specific snapshot, even if the underlying data changes
80-
let ducklake_catalog = DuckLakeCatalog::with_snapshot(provider, snapshot_id)?;
81-
82-
// Alternative: Use the backward-compatible constructor that automatically
83-
// binds to the current snapshot:
84-
// let ducklake_catalog = DuckLakeCatalog::new(DuckdbMetadataProvider::new(catalog_path)?)?;
116+
let ducklake_catalog = DuckLakeCatalog::with_snapshot(provider.clone(), snapshot_id)?;
85117

86118
println!("✓ Connected to DuckLake catalog");
87119

@@ -90,9 +122,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
90122
// Create DataFusion session context
91123
let ctx = SessionContext::new_with_config_rt(config, runtime.clone());
92124

93-
// Get the provider before moving the catalog
94-
let provider = ducklake_catalog.provider();
95-
96125
// Register the DuckLake catalog (standard DataFusion pattern)
97126
ctx.register_catalog("ducklake", Arc::new(ducklake_catalog));
98127

@@ -121,17 +150,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
121150
}
122151
}
123152

124-
// Example query (adjust schema and table names to match your data)
125-
// Uncomment and modify this once you have actual DuckLake data:
126-
153+
// Execute the query
127154
println!("\nExecuting query...");
128155
let df = ctx.sql(sql).await?;
129156

130157
// Show the query results
131158
df.show().await?;
132159

133160
println!("\n✓ Example completed successfully!");
134-
println!("\nTo run a query, create a DuckLake database and uncomment the query section.");
135161

136162
Ok(())
137163
}

src/error.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ pub enum DuckLakeError {
1717
#[error("DuckDB error: {0}")]
1818
DuckDb(#[from] duckdb::Error),
1919

20+
/// sqlx database error (for PostgreSQL metadata provider)
21+
#[cfg(feature = "metadata-postgres")]
22+
#[error("Database error: {0}")]
23+
Sqlx(#[from] sqlx::Error),
24+
2025
/// Catalog not found
2126
#[error("Catalog not found: {0}")]
2227
CatalogNotFound(String),

src/lib.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ pub mod table;
4747
pub mod table_functions;
4848
pub mod types;
4949

50+
// Multi-database metadata providers (optional, feature-gated)
51+
#[cfg(feature = "metadata-postgres")]
52+
pub mod metadata_provider_postgres;
53+
5054
// Result type for DuckLake operations
5155
pub type Result<T> = std::result::Result<T, DuckLakeError>;
5256

@@ -58,3 +62,7 @@ pub use metadata_provider_duckdb::DuckdbMetadataProvider;
5862
pub use schema::DuckLakeSchema;
5963
pub use table::DuckLakeTable;
6064
pub use table_functions::register_ducklake_functions;
65+
66+
// Re-export multi-database metadata providers (feature-gated)
67+
#[cfg(feature = "metadata-postgres")]
68+
pub use metadata_provider_postgres::PostgresMetadataProvider;

src/metadata_provider.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,3 +324,12 @@ pub trait MetadataProvider: Send + Sync + std::fmt::Debug {
324324
/// List all files across all tables for a snapshot
325325
fn list_all_files(&self, snapshot_id: i64) -> Result<Vec<FileWithTable>>;
326326
}
327+
328+
#[cfg(feature = "metadata-postgres")]
329+
/// Helper function to bridge async sqlx operations to sync MetadataProvider trait
330+
pub(crate) fn block_on<F, T>(f: F) -> T
331+
where
332+
F: std::future::Future<Output = T>,
333+
{
334+
tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(f))
335+
}

0 commit comments

Comments
 (0)