Skip to content

Commit c103573

Browse files
Reuse DuckDB connection for metadata queries instead of creating new connection per call (#43)
1 parent df99e1b commit c103573

File tree

3 files changed

+31
-26
lines changed

3 files changed

+31
-26
lines changed

CLAUDE.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ The codebase follows a layered architecture with clear separation of concerns:
3939
- Also provides individual lookup methods: `get_schema_by_name()`, `get_table_by_name()`, and `table_exists()`
4040
- `DuckdbMetadataProvider` implements the trait using DuckDB as the catalog backend
4141
- Executes SQL queries against standard DuckLake catalog tables (`ducklake_snapshot`, `ducklake_schema`, `ducklake_table`, `ducklake_column`, `ducklake_data_file`, `ducklake_delete_file`, `ducklake_metadata`)
42-
- Thread-safe: Opens a new read-only connection for each query
42+
- Thread-safe: Uses a single shared connection protected by Mutex for efficiency
4343
- Supports delete files: `get_table_files_for_select()` returns data files with associated delete files
4444

4545
2. **DataFusion Integration Layer** (`src/catalog.rs`, `src/schema.rs`, `src/table.rs`)

src/metadata_provider_duckdb.rs

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,35 +10,40 @@ use crate::metadata_provider::{
1010
};
1111
use duckdb::AccessMode::ReadOnly;
1212
use duckdb::{Config, Connection, params};
13+
use std::sync::{Arc, Mutex, MutexGuard};
1314

1415
/// DuckDB metadata provider
1516
///
16-
/// Opens a new connection for each query to avoid thread-safety issues.
17-
/// This is acceptable for read-only operations.
17+
/// Uses a single shared connection protected by a Mutex to avoid
18+
/// the overhead of creating a new connection for each metadata query.
19+
/// This is safe for read-only operations.
1820
#[derive(Debug, Clone)]
1921
pub struct DuckdbMetadataProvider {
22+
conn: Arc<Mutex<Connection>>,
23+
/// Path to the catalog database, retained for logging/debugging
24+
#[allow(dead_code)]
2025
catalog_path: String,
2126
}
2227

2328
impl DuckdbMetadataProvider {
2429
/// Create a new DuckDB metadata provider
2530
pub fn new(catalog_path: impl Into<String>) -> crate::Result<Self> {
2631
let catalog_path = catalog_path.into();
27-
28-
// Validate connection works
29-
let _conn = DuckdbMetadataProvider::open_connection_with_path(&catalog_path)?;
32+
let conn = Self::create_connection(&catalog_path)?;
3033

3134
Ok(Self {
35+
conn: Arc::new(Mutex::new(conn)),
3236
catalog_path,
3337
})
3438
}
3539

36-
fn open_connection(&self) -> crate::Result<Connection> {
37-
DuckdbMetadataProvider::open_connection_with_path(&self.catalog_path)
40+
/// Get a reference to the shared connection
41+
fn connection(&self) -> MutexGuard<'_, Connection> {
42+
self.conn.lock().expect("DuckDB connection mutex poisoned")
3843
}
3944

40-
/// Open a connection to the catalog database
41-
fn open_connection_with_path(catalog_path: &str) -> crate::Result<Connection> {
45+
/// Create a new read-only connection to the catalog database
46+
fn create_connection(catalog_path: &str) -> crate::Result<Connection> {
4247
let config = Config::default().access_mode(ReadOnly)?;
4348
match Connection::open_with_flags(catalog_path, config) {
4449
Ok(con) => Ok(con),
@@ -63,19 +68,19 @@ impl DuckdbMetadataProvider {
6368

6469
impl MetadataProvider for DuckdbMetadataProvider {
6570
fn get_current_snapshot(&self) -> crate::Result<i64> {
66-
let conn = self.open_connection()?;
71+
let conn = self.connection();
6772
let snapshot_id: i64 = conn.query_row(SQL_GET_LATEST_SNAPSHOT, [], |row| row.get(0))?;
6873
Ok(snapshot_id)
6974
}
7075

7176
fn get_data_path(&self) -> crate::Result<String> {
72-
let conn = self.open_connection()?;
77+
let conn = self.connection();
7378
let data_path: String = conn.query_row(SQL_GET_DATA_PATH, [], |row| row.get(0))?;
7479
Ok(data_path)
7580
}
7681

7782
fn list_snapshots(&self) -> crate::Result<Vec<SnapshotMetadata>> {
78-
let conn = self.open_connection()?;
83+
let conn = self.connection();
7984
let mut stmt = conn.prepare(SQL_LIST_SNAPSHOTS)?;
8085

8186
let snapshots = stmt
@@ -93,7 +98,7 @@ impl MetadataProvider for DuckdbMetadataProvider {
9398
}
9499

95100
fn list_schemas(&self, snapshot_id: i64) -> crate::Result<Vec<SchemaMetadata>> {
96-
let conn = self.open_connection()?;
101+
let conn = self.connection();
97102
let mut stmt = conn.prepare(SQL_LIST_SCHEMAS)?;
98103

99104
let schemas = stmt
@@ -115,7 +120,7 @@ impl MetadataProvider for DuckdbMetadataProvider {
115120
}
116121

117122
fn list_tables(&self, schema_id: i64, snapshot_id: i64) -> crate::Result<Vec<TableMetadata>> {
118-
let conn = self.open_connection()?;
123+
let conn = self.connection();
119124
let mut stmt = conn.prepare(SQL_LIST_TABLES)?;
120125

121126
let tables = stmt
@@ -137,7 +142,7 @@ impl MetadataProvider for DuckdbMetadataProvider {
137142
}
138143

139144
fn get_table_structure(&self, table_id: i64) -> crate::Result<Vec<DuckLakeTableColumn>> {
140-
let conn = self.open_connection()?;
145+
let conn = self.connection();
141146
let mut stmt = conn.prepare(SQL_GET_TABLE_COLUMNS)?;
142147

143148
let columns = stmt
@@ -163,7 +168,7 @@ impl MetadataProvider for DuckdbMetadataProvider {
163168
table_id: i64,
164169
snapshot_id: i64,
165170
) -> crate::Result<Vec<DuckLakeTableFile>> {
166-
let conn = self.open_connection()?;
171+
let conn = self.connection();
167172
let mut stmt = conn.prepare(SQL_GET_DATA_FILES)?;
168173

169174
let files = stmt
@@ -214,7 +219,7 @@ impl MetadataProvider for DuckdbMetadataProvider {
214219
name: &str,
215220
snapshot_id: i64,
216221
) -> crate::Result<Option<SchemaMetadata>> {
217-
let conn = self.open_connection()?;
222+
let conn = self.connection();
218223
let mut stmt = conn.prepare(SQL_GET_SCHEMA_BY_NAME)?;
219224

220225
let mut rows = stmt.query(params![name, snapshot_id, snapshot_id])?;
@@ -241,7 +246,7 @@ impl MetadataProvider for DuckdbMetadataProvider {
241246
name: &str,
242247
snapshot_id: i64,
243248
) -> crate::Result<Option<TableMetadata>> {
244-
let conn = self.open_connection()?;
249+
let conn = self.connection();
245250
let mut stmt = conn.prepare(SQL_GET_TABLE_BY_NAME)?;
246251

247252
let mut rows = stmt.query(params![&schema_id, &name, &snapshot_id, &snapshot_id])?;
@@ -263,7 +268,7 @@ impl MetadataProvider for DuckdbMetadataProvider {
263268
}
264269

265270
fn table_exists(&self, schema_id: i64, name: &str, snapshot_id: i64) -> crate::Result<bool> {
266-
let conn = self.open_connection()?;
271+
let conn = self.connection();
267272
let exists: bool = conn.query_row(
268273
SQL_TABLE_EXISTS,
269274
params![schema_id, &name, &snapshot_id, &snapshot_id],
@@ -273,7 +278,7 @@ impl MetadataProvider for DuckdbMetadataProvider {
273278
}
274279

275280
fn list_all_tables(&self, snapshot_id: i64) -> crate::Result<Vec<TableWithSchema>> {
276-
let conn = self.open_connection()?;
281+
let conn = self.connection();
277282
let mut stmt = conn.prepare(SQL_LIST_ALL_TABLES)?;
278283

279284
let tables = stmt
@@ -299,7 +304,7 @@ impl MetadataProvider for DuckdbMetadataProvider {
299304
}
300305

301306
fn list_all_columns(&self, snapshot_id: i64) -> crate::Result<Vec<ColumnWithTable>> {
302-
let conn = self.open_connection()?;
307+
let conn = self.connection();
303308
let mut stmt = conn.prepare(SQL_LIST_ALL_COLUMNS)?;
304309

305310
let columns = stmt
@@ -328,7 +333,7 @@ impl MetadataProvider for DuckdbMetadataProvider {
328333
}
329334

330335
fn list_all_files(&self, snapshot_id: i64) -> crate::Result<Vec<FileWithTable>> {
331-
let conn = self.open_connection()?;
336+
let conn = self.connection();
332337
let mut stmt = conn.prepare(SQL_LIST_ALL_FILES)?;
333338

334339
let files = stmt
@@ -396,7 +401,7 @@ impl MetadataProvider for DuckdbMetadataProvider {
396401
start_snapshot: i64,
397402
end_snapshot: i64,
398403
) -> crate::Result<Vec<DataFileChange>> {
399-
let conn = self.open_connection()?;
404+
let conn = self.connection();
400405
let mut stmt = conn.prepare(SQL_GET_DATA_FILES_ADDED_BETWEEN_SNAPSHOTS)?;
401406

402407
let files = stmt
@@ -421,7 +426,7 @@ impl MetadataProvider for DuckdbMetadataProvider {
421426
start_snapshot: i64,
422427
end_snapshot: i64,
423428
) -> crate::Result<Vec<DeleteFileChange>> {
424-
let conn = self.open_connection()?;
429+
let conn = self.connection();
425430
let mut stmt = conn.prepare(SQL_GET_DELETE_FILES_ADDED_BETWEEN_SNAPSHOTS)?;
426431

427432
let files = stmt

tests/concurrent_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
//! ## Thread Safety Guarantees
1616
//!
1717
//! The DuckLake implementation is designed to be thread-safe:
18-
//! - **MetadataProvider**: Opens a new read-only DuckDB connection per query
18+
//! - **MetadataProvider**: Uses a single shared connection protected by Mutex
1919
//! - **Catalog/Schema**: Dynamic metadata lookup with no shared mutable state
2020
//! - **Table**: Immutable metadata cached at creation time
2121
//! - **ObjectStore**: DataFusion's object stores are Arc<dyn ObjectStore> (thread-safe)

0 commit comments

Comments
 (0)