Skip to content

Commit 3ede8e6

Browse files
Shefeek JinnahShefeek Jinnah
authored andcommitted
Implement postgresql metadata provider
1 parent 0b06715 commit 3ede8e6

File tree

7 files changed

+1797
-2
lines changed

7 files changed

+1797
-2
lines changed

Cargo.toml

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,22 @@ futures = "0.3"
2626
tracing = "0.1"
2727
thiserror = "2.0"
2828

29+
# Multi-database metadata providers (optional)
30+
sqlx = { version = "0.8", features = ["runtime-tokio"], optional = true }
31+
2932
[dev-dependencies]
3033
testcontainers = "0.23"
31-
testcontainers-modules = { version = "0.11", features = ["minio"] }
34+
testcontainers-modules = { version = "0.11", features = ["minio", "postgres", "mysql"] }
3235
tempfile = "3.14"
3336
anyhow = "1.0"
3437
aws-sdk-s3 = "1.75"
3538
aws-credential-types = "1.2"
3639

3740
[features]
3841
# Allow skipping tests that use docker (in CI, macos doesn't support docker).
39-
skip-tests-with-docker = []
42+
skip-tests-with-docker = []
43+
44+
# Metadata provider backends
45+
metadata-postgres = ["sqlx", "sqlx/postgres", "sqlx/chrono"]
46+
# Future: metadata-sqlite = ["sqlx", "sqlx/sqlite", "sqlx/chrono"]
47+
# Future: metadata-mysql = ["sqlx", "sqlx/mysql", "sqlx/chrono"]

examples/postgres_catalog.rs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
//! PostgreSQL catalog example
2+
//!
3+
//! ## Usage
4+
//!
5+
//! Default (connects to localhost:5432):
6+
//! ```bash
7+
//! cargo run --example postgres_catalog --features metadata-postgres
8+
//! ```
9+
//!
10+
//! Custom PostgreSQL connection:
11+
//! ```bash
12+
//! POSTGRES_URL="postgresql://user:pass@host:5432/dbname" \
13+
//! cargo run --example postgres_catalog --features metadata-postgres
14+
//! ```
15+
16+
use datafusion::prelude::*;
17+
use datafusion_ducklake::{DuckLakeCatalog, PostgresMetadataProvider};
18+
use std::sync::Arc;
19+
20+
#[cfg(feature = "metadata-postgres")]
21+
#[tokio::main]
22+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
23+
use std::env;
24+
25+
println!("==> PostgreSQL catalog example\n");
26+
27+
let conn_str = env::var("POSTGRES_URL").unwrap_or_else(|_| {
28+
"postgresql://postgres:postgres@localhost:5432/ducklake_catalog".to_string()
29+
});
30+
println!("Connecting to: {}", conn_str);
31+
32+
let provider = PostgresMetadataProvider::new(&conn_str).await?;
33+
println!("✓ Connected\n");
34+
35+
let catalog = DuckLakeCatalog::new(provider)?;
36+
let ctx = SessionContext::new();
37+
ctx.register_catalog("ducklake", Arc::new(catalog));
38+
39+
println!("==> Schemas:");
40+
ctx.sql("SELECT * FROM ducklake.information_schema.schemata")
41+
.await?
42+
.show()
43+
.await?;
44+
45+
println!("\n==> Tables:");
46+
ctx.sql("SELECT * FROM ducklake.information_schema.tables")
47+
.await?
48+
.show()
49+
.await?;
50+
51+
println!("\n==> Snapshots:");
52+
ctx.sql("SELECT * FROM ducklake.information_schema.snapshots")
53+
.await?
54+
.show()
55+
.await?;
56+
57+
println!("\n✓ Example completed successfully");
58+
println!("\nNext steps:");
59+
println!(" 1. Populate the catalog with schemas, tables, and data files");
60+
println!(" 2. Query your DuckLake tables using SQL");
61+
println!(" 3. Enjoy PostgreSQL metadata catalog support!");
62+
63+
Ok(())
64+
}
65+
66+
#[cfg(not(feature = "metadata-postgres"))]
67+
fn main() {
68+
eprintln!("Error: This example requires the 'metadata-postgres' feature");
69+
eprintln!("Run with: cargo run --example postgres_catalog --features metadata-postgres");
70+
std::process::exit(1);
71+
}

src/error.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ pub enum DuckLakeError {
1717
#[error("DuckDB error: {0}")]
1818
DuckDb(#[from] duckdb::Error),
1919

20+
/// sqlx database error (for PostgreSQL metadata provider)
21+
#[cfg(feature = "metadata-postgres")]
22+
#[error("Database error: {0}")]
23+
Sqlx(#[from] sqlx::Error),
24+
2025
/// Catalog not found
2126
#[error("Catalog not found: {0}")]
2227
CatalogNotFound(String),

src/lib.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ pub mod table;
4747
pub mod table_functions;
4848
pub mod types;
4949

50+
// Multi-database metadata providers (optional, feature-gated)
51+
#[cfg(feature = "metadata-postgres")]
52+
pub mod metadata_provider_postgres;
53+
5054
// Result type for DuckLake operations
5155
pub type Result<T> = std::result::Result<T, DuckLakeError>;
5256

@@ -58,3 +62,7 @@ pub use metadata_provider_duckdb::DuckdbMetadataProvider;
5862
pub use schema::DuckLakeSchema;
5963
pub use table::DuckLakeTable;
6064
pub use table_functions::register_ducklake_functions;
65+
66+
// Re-export multi-database metadata providers (feature-gated)
67+
#[cfg(feature = "metadata-postgres")]
68+
pub use metadata_provider_postgres::PostgresMetadataProvider;

src/metadata_provider.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,3 +324,12 @@ pub trait MetadataProvider: Send + Sync + std::fmt::Debug {
324324
/// List all files across all tables for a snapshot
325325
fn list_all_files(&self, snapshot_id: i64) -> Result<Vec<FileWithTable>>;
326326
}
327+
328+
#[cfg(feature = "metadata-postgres")]
329+
/// Helper function to bridge async sqlx operations to sync MetadataProvider trait
330+
pub(crate) fn block_on<F, T>(f: F) -> T
331+
where
332+
F: std::future::Future<Output = T>,
333+
{
334+
tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(f))
335+
}

0 commit comments

Comments
 (0)