Skip to content

Commit dc0bc20

Browse files
authored
Add initial support for fetching data from remote databases (postgres, motherduck, duckdb) (#2)
* chore: add adbc dependencies * feat(datafetch): add DataFetchError enum * feat(datafetch): add TableMetadata and ColumnMetadata types * feat(datafetch): add DataFetcher trait * feat(datafetch): add DriverManager for ADBC driver loading * feat(datafetch): add AdbcFetcher with discovery skeleton * feat(datafetch): implement get_objects result parsing * feat(datafetch): implement fetch_table with parquet writing * feat(catalog): add arrow_schema_json to TableInfo * feat: integrate DataFetcher with SchemaProvider * feat(engine): implement connect() with ADBC discovery * test: add datafetch integration tests * chore: add driver vendoring directory structure * fix: implement build.rs for driver vendoring * fix: use proper Arrow schema serialization * feat: implement LazyTableProvider for on-demand fetch * fix: quote SQL identifiers to prevent injection * fix: allow syncing empty tables * fix: improve sql_type_to_arrow with NULL handling and warnings * docs: add documentation to datafetch public APIs * chore: replace ADBC deps with sqlx * refactor(datafetch): remove ADBC, add NativeFetcher skeleton * feat(datafetch): add StreamingParquetWriter * refactor(catalog): migrate postgres_manager to sqlx * feat(datafetch): add DuckDB discovery * feat(datafetch): add DuckDB fetch_table * feat(datafetch): add PostgreSQL discovery * feat(datafetch): add PostgreSQL fetch with Arrow conversion * refactor(catalog): merge add_table and update_table_schema * test: update tests for native datafetch * refactor(datafetch): centralize streaming Parquet writer * test: add comprehensive tests for streaming parquet writer * log errors * build connection string from input * automatically handle ssl for postgres * simplify to use Source instead of raw json * fix projection push down * add support for filters/limits at table scan * add access to catalog * use correct size for ordinal position * duckdb (motherduck) should only return tables for the configured catalog * first pass at golden path integration test * refactor to use test harness/fixture abstraction for better re-use * remove existing integration tests in favor of new "golden path" tests * remove un-used function * refactor to match enum, not string * cargo fmt * inline postgres arrow logic * refactor to stream postgres results * fix log message * refactor duckdb to write results in a streaming fashion * use parameterized query for duckdb discovery
1 parent 08e793d commit dc0bc20

31 files changed

+3273
-623
lines changed

Cargo.lock

Lines changed: 458 additions & 248 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ path = "src/bin/server.rs"
1414
[dependencies]
1515
datafusion = "50.2"
1616
duckdb = { version = "1.4", features = ["bundled"] }
17+
sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "chrono", "tls-rustls"] }
1718
rustyline = "17.0"
1819
tokio = { version = "1.47", features = ["full"] }
1920
serde = { version = "1.0", features = ["derive"] }
@@ -28,10 +29,7 @@ env_logger = "0.11"
2829
clap = { version = "4.5", features = ["derive"] }
2930
arrow-csv = "56.2"
3031
arrow-json = "56.2"
31-
tokio-postgres = "0.7"
32-
postgres-native-tls = "0.5"
33-
native-tls = "0.2"
34-
deadpool-postgres = "0.14"
32+
arrow-schema = { version = "56.2", features = ["serde"] }
3533
object_store = { version = "0.12", features = ["aws"] }
3634
url = "2.5"
3735
axum = "0.8.7"
@@ -40,6 +38,9 @@ tower-http = { version = "0.6.6", features = ["trace", "cors"] }
4038
config = "0.15.19"
4139
tracing = "0.1"
4240
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
41+
thiserror = "2.0.17"
42+
uuid = { version = "1.11", features = ["v4"] }
43+
urlencoding = "2.1"
4344

4445
[dev-dependencies]
4546
testcontainers = "0.23"

src/bin/repl.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use anyhow::Result;
22
use clap::{Parser, ValueEnum};
3+
use rivetdb::catalog::CatalogManager;
4+
use rivetdb::datafusion::{HotDataEngine, QueryResponse};
5+
use rivetdb::source::Source;
36
use rustyline::completion::{Completer, Pair};
47
use rustyline::error::ReadlineError;
58
use rustyline::highlight::Highlighter;
@@ -9,8 +12,6 @@ use rustyline::{Context, Editor, Helper};
912
use std::path::PathBuf;
1013
use std::sync::Arc;
1114
use std::time::Duration;
12-
use rivetdb::catalog::CatalogManager;
13-
use rivetdb::datafusion::{HotDataEngine, QueryResponse};
1415

1516
#[derive(Parser)]
1617
#[command(name = "hotdata", about = "HotData Query Engine", version)]
@@ -405,7 +406,7 @@ async fn handle_command(state: &mut ReplState, line: &str) -> Result<()> {
405406
print_help();
406407
}
407408
_ if line.starts_with("connect ") => {
408-
handle_connect(state, line)?;
409+
handle_connect(state, line).await?;
409410
}
410411
"list-connections" => {
411412
handle_list_connections(state)?;
@@ -462,7 +463,7 @@ fn print_duration_sec(d: Duration) {
462463
println!("execution time {}", time);
463464
}
464465

465-
fn handle_connect(state: &mut ReplState, line: &str) -> Result<()> {
466+
async fn handle_connect(state: &mut ReplState, line: &str) -> Result<()> {
466467
// Parse: connect <type> <name> <key=value> ...
467468
let parts: Vec<&str> = line.split_whitespace().collect();
468469

@@ -534,8 +535,11 @@ fn handle_connect(state: &mut ReplState, line: &str) -> Result<()> {
534535

535536
let config_value = serde_json::Value::Object(config);
536537

538+
// Deserialize to Source enum
539+
let source: Source = serde_json::from_value(config_value)?;
540+
537541
// Connect through the engine
538-
state.engine.connect(source_type, name, config_value)?;
542+
state.engine.connect(name, source).await?;
539543

540544
Ok(())
541545
}

src/catalog/duckdb_manager.rs

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,15 @@ impl DuckdbCatalogManager {
123123
conn.execute("INSERT INTO schema_migrations (version) VALUES (1)", [])?;
124124
}
125125

126+
// Migration 2: Add arrow_schema_json column to tables
127+
if current_version < 2 {
128+
conn.execute(
129+
"ALTER TABLE tables ADD COLUMN IF NOT EXISTS arrow_schema_json TEXT",
130+
[],
131+
)?;
132+
conn.execute("INSERT INTO schema_migrations (version) VALUES (2)", [])?;
133+
}
134+
126135
Ok(())
127136
}
128137

@@ -135,6 +144,7 @@ impl DuckdbCatalogManager {
135144
parquet_path: row.get(4)?,
136145
state_path: row.get(5)?,
137146
last_sync: row.get(6)?,
147+
arrow_schema_json: row.get(7)?,
138148
})
139149
}
140150
}
@@ -221,18 +231,25 @@ impl CatalogManager for DuckdbCatalogManager {
221231
}
222232
}
223233

224-
fn add_table(&self, connection_id: i32, schema_name: &str, table_name: &str) -> Result<i32> {
234+
fn add_table(
235+
&self,
236+
connection_id: i32,
237+
schema_name: &str,
238+
table_name: &str,
239+
arrow_schema_json: &str,
240+
) -> Result<i32> {
225241
if self.readonly {
226242
anyhow::bail!("Cannot add table in readonly mode");
227243
}
228244

229245
let conn_guard = self.get_connection_guard()?;
230246
let conn = conn_guard.as_ref().unwrap(); // Safe: get_connection_guard verified it's Some
231247

232-
// add table to catalog. if we already know about the table (we're reimporting connection), ignore it
248+
// Insert or update table with schema
233249
conn.execute(
234-
"INSERT INTO tables (connection_id, schema_name, table_name) VALUES (?, ?, ?) ON CONFLICT DO NOTHING",
235-
params![connection_id, schema_name, table_name],
250+
"INSERT INTO tables (connection_id, schema_name, table_name, arrow_schema_json) VALUES (?, ?, ?, ?)
251+
ON CONFLICT (connection_id, schema_name, table_name) DO UPDATE SET arrow_schema_json = excluded.arrow_schema_json",
252+
params![connection_id, schema_name, table_name, arrow_schema_json],
236253
)?;
237254

238255
let id: i32 = conn.query_row(
@@ -253,7 +270,7 @@ impl CatalogManager for DuckdbCatalogManager {
253270
if let Some(conn_id) = connection_id {
254271
let mut stmt = conn.prepare(
255272
"SELECT id, connection_id, schema_name, table_name, parquet_path, state_path, \
256-
CAST(last_sync AS VARCHAR) as last_sync \
273+
CAST(last_sync AS VARCHAR) as last_sync, arrow_schema_json \
257274
FROM tables WHERE connection_id = ? ORDER BY schema_name, table_name",
258275
)?;
259276
let rows = stmt.query_map(params![conn_id], DuckdbCatalogManager::table_mapper)?;
@@ -264,7 +281,7 @@ impl CatalogManager for DuckdbCatalogManager {
264281
} else {
265282
let mut stmt = conn.prepare(
266283
"SELECT id, connection_id, schema_name, table_name, parquet_path, state_path, \
267-
CAST(last_sync AS VARCHAR) as last_sync \
284+
CAST(last_sync AS VARCHAR) as last_sync, arrow_schema_json \
268285
FROM tables ORDER BY schema_name, table_name",
269286
)?;
270287
let rows = stmt.query_map([], DuckdbCatalogManager::table_mapper)?;
@@ -288,7 +305,7 @@ impl CatalogManager for DuckdbCatalogManager {
288305

289306
let mut stmt = conn.prepare(
290307
"SELECT id, connection_id, schema_name, table_name, parquet_path, state_path, \
291-
CAST(last_sync AS VARCHAR) as last_sync \
308+
CAST(last_sync AS VARCHAR) as last_sync, arrow_schema_json \
292309
FROM tables WHERE connection_id = ? AND schema_name = ? AND table_name = ?",
293310
)?;
294311

src/catalog/manager.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ pub struct TableInfo {
1919
pub parquet_path: Option<String>,
2020
pub state_path: Option<String>,
2121
pub last_sync: Option<String>,
22+
pub arrow_schema_json: Option<String>,
2223
}
2324

2425
pub trait CatalogManager: Debug + Send + Sync {
@@ -29,7 +30,13 @@ pub trait CatalogManager: Debug + Send + Sync {
2930
fn list_connections(&self) -> Result<Vec<ConnectionInfo>>;
3031
fn add_connection(&self, name: &str, source_type: &str, config_json: &str) -> Result<i32>;
3132
fn get_connection(&self, name: &str) -> Result<Option<ConnectionInfo>>;
32-
fn add_table(&self, connection_id: i32, schema_name: &str, table_name: &str) -> Result<i32>;
33+
fn add_table(
34+
&self,
35+
connection_id: i32,
36+
schema_name: &str,
37+
table_name: &str,
38+
arrow_schema_json: &str,
39+
) -> Result<i32>;
3340
fn list_tables(&self, connection_id: Option<i32>) -> Result<Vec<TableInfo>>;
3441
fn get_table(
3542
&self,

0 commit comments

Comments
 (0)