Skip to content

Commit d27f014

Browse files
authored
decouple connection registration from discovery (#39)
* feat(http): decouple connection registration from discovery * cargo fmt
1 parent 04eef98 commit d27f014

File tree

5 files changed

+473
-50
lines changed

5 files changed

+473
-50
lines changed

src/engine.rs

Lines changed: 57 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -210,8 +210,50 @@ impl RivetEngine {
210210
Ok(())
211211
}
212212

213-
/// Connect to a new external data source and register it as a catalog.
214-
pub async fn connect(&self, name: &str, source: Source) -> Result<()> {
213+
/// Register a connection without discovering tables.
214+
///
215+
/// This persists the connection config to the catalog and registers it with DataFusion,
216+
/// but does not attempt to connect to the remote database or discover tables.
217+
/// Use `discover_connection()` to discover tables after registration.
218+
pub async fn register_connection(&self, name: &str, source: Source) -> Result<i32> {
219+
let source_type = source.source_type();
220+
221+
// Store config as JSON (includes "type" from serde tag)
222+
let config_json = serde_json::to_string(&source)?;
223+
let conn_id = self
224+
.catalog
225+
.add_connection(name, source_type, &config_json)
226+
.await?;
227+
228+
// Register with DataFusion (empty catalog - no tables yet)
229+
let catalog_provider = Arc::new(RivetCatalogProvider::new(
230+
conn_id,
231+
name.to_string(),
232+
Arc::new(source),
233+
self.catalog.clone(),
234+
self.orchestrator.clone(),
235+
)) as Arc<dyn CatalogProvider>;
236+
237+
self.df_ctx.register_catalog(name, catalog_provider);
238+
239+
info!("Connection '{}' registered (discovery pending)", name);
240+
241+
Ok(conn_id)
242+
}
243+
244+
/// Discover tables for an existing connection.
245+
///
246+
/// Connects to the remote database, discovers available tables, and stores
247+
/// their metadata in the catalog. Returns the number of tables discovered.
248+
pub async fn discover_connection(&self, name: &str) -> Result<usize> {
249+
// Get connection info
250+
let conn = self
251+
.catalog
252+
.get_connection(name)
253+
.await?
254+
.ok_or_else(|| anyhow::anyhow!("Connection '{}' not found", name))?;
255+
256+
let source: Source = serde_json::from_str(&conn.config_json)?;
215257
let source_type = source.source_type();
216258

217259
// Discover tables
@@ -224,40 +266,32 @@ impl RivetEngine {
224266

225267
info!("Discovered {} tables", tables.len());
226268

227-
// Store config as JSON (includes "type" from serde tag)
228-
let config_json = serde_json::to_string(&source)?;
229-
let conn_id = self
230-
.catalog
231-
.add_connection(name, source_type, &config_json)
232-
.await?;
233-
234-
// Add discovered tables to catalog with schema in one call
269+
// Add discovered tables to catalog with schema
235270
for table in &tables {
236271
let schema = table.to_arrow_schema();
237272
let schema_json = serde_json::to_string(schema.as_ref())
238273
.map_err(|e| anyhow::anyhow!("Failed to serialize schema: {}", e))?;
239274
self.catalog
240-
.add_table(conn_id, &table.schema_name, &table.table_name, &schema_json)
275+
.add_table(conn.id, &table.schema_name, &table.table_name, &schema_json)
241276
.await?;
242277
}
243278

244-
// Register with DataFusion
245-
let catalog_provider = Arc::new(RivetCatalogProvider::new(
246-
conn_id,
247-
name.to_string(),
248-
Arc::new(source),
249-
self.catalog.clone(),
250-
self.orchestrator.clone(),
251-
)) as Arc<dyn CatalogProvider>;
252-
253-
self.df_ctx.register_catalog(name, catalog_provider);
254-
255279
info!(
256-
"Connection '{}' registered with {} tables",
280+
"Connection '{}' discovery complete: {} tables",
257281
name,
258282
tables.len()
259283
);
260284

285+
Ok(tables.len())
286+
}
287+
288+
/// Connect to a new external data source and register it as a catalog.
289+
///
290+
/// This is a convenience method that combines `register_connection()` and
291+
/// `discover_connection()`. For more control, use those methods separately.
292+
pub async fn connect(&self, name: &str, source: Source) -> Result<()> {
293+
self.register_connection(name, source).await?;
294+
self.discover_connection(name).await?;
261295
Ok(())
262296
}
263297

src/http/app_server.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::http::handlers::{
2-
create_connection_handler, delete_connection_handler, get_connection_handler, health_handler,
3-
list_connections_handler, purge_connection_cache_handler, purge_table_cache_handler,
4-
query_handler, tables_handler,
2+
create_connection_handler, delete_connection_handler, discover_connection_handler,
3+
get_connection_handler, health_handler, list_connections_handler,
4+
purge_connection_cache_handler, purge_table_cache_handler, query_handler, tables_handler,
55
};
66
use crate::RivetEngine;
77
use axum::routing::{delete, get, post};
@@ -18,6 +18,7 @@ pub const PATH_TABLES: &str = "/tables";
1818
pub const PATH_HEALTH: &str = "/health";
1919
pub const PATH_CONNECTIONS: &str = "/connections";
2020
pub const PATH_CONNECTION: &str = "/connections/{name}";
21+
pub const PATH_CONNECTION_DISCOVER: &str = "/connections/{name}/discover";
2122
pub const PATH_CONNECTION_CACHE: &str = "/connections/{name}/cache";
2223
pub const PATH_TABLE_CACHE: &str = "/connections/{name}/tables/{schema}/{table}/cache";
2324

@@ -37,6 +38,7 @@ impl AppServer {
3738
PATH_CONNECTION,
3839
get(get_connection_handler).delete(delete_connection_handler),
3940
)
41+
.route(PATH_CONNECTION_DISCOVER, post(discover_connection_handler))
4042
.route(
4143
PATH_CONNECTION_CACHE,
4244
delete(purge_connection_cache_handler),

src/http/handlers.rs

Lines changed: 68 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use crate::http::error::ApiError;
22
use crate::http::models::{
3-
ConnectionInfo, CreateConnectionRequest, CreateConnectionResponse, GetConnectionResponse,
4-
ListConnectionsResponse, QueryRequest, QueryResponse, TableInfo, TablesResponse,
3+
ConnectionInfo, CreateConnectionRequest, CreateConnectionResponse, DiscoverConnectionResponse,
4+
DiscoveryStatus, GetConnectionResponse, ListConnectionsResponse, QueryRequest, QueryResponse,
5+
TableInfo, TablesResponse,
56
};
67
use crate::http::serialization::{encode_value_at, make_array_encoder};
78
use crate::source::Source;
@@ -186,39 +187,83 @@ pub async fn create_connection_handler(
186187

187188
let source_type = source.source_type().to_string();
188189

189-
// Attempt to connect (discovers tables and registers catalog)
190-
engine.connect(&request.name, source).await.map_err(|e| {
191-
error!("Failed to connect to database: {}", e);
192-
// Extract root cause message only - don't expose full stack trace to clients
193-
let root_cause = e.root_cause().to_string();
194-
let msg = root_cause.lines().next().unwrap_or("Unknown error");
195-
196-
if msg.contains("Failed to connect") || msg.contains("connection refused") {
197-
ApiError::bad_gateway(format!("Failed to connect to database: {}", msg))
198-
} else if msg.contains("Unsupported source type") || msg.contains("Invalid configuration") {
199-
ApiError::bad_request(msg.to_string())
200-
} else {
201-
ApiError::bad_gateway(format!("Failed to connect to database: {}", msg))
202-
}
203-
})?;
204-
205-
// Count discovered tables
206-
let tables_discovered = engine
207-
.list_tables(Some(&request.name))
190+
// Step 1: Register the connection
191+
engine
192+
.register_connection(&request.name, source)
208193
.await
209-
.map(|t| t.len())
210-
.unwrap_or(0);
194+
.map_err(|e| {
195+
error!("Failed to register connection: {}", e);
196+
ApiError::internal_error(format!("Failed to register connection: {}", e))
197+
})?;
198+
199+
// Step 2: Attempt discovery - catch errors and return partial success
200+
let (tables_discovered, discovery_status, discovery_error) =
201+
match engine.discover_connection(&request.name).await {
202+
Ok(count) => (count, DiscoveryStatus::Success, None),
203+
Err(e) => {
204+
let root_cause = e.root_cause().to_string();
205+
let msg = root_cause
206+
.lines()
207+
.next()
208+
.unwrap_or("Unknown error")
209+
.to_string();
210+
error!(
211+
"Discovery failed for connection '{}': {}",
212+
request.name, msg
213+
);
214+
(0, DiscoveryStatus::Failed, Some(msg))
215+
}
216+
};
211217

212218
Ok((
213219
StatusCode::CREATED,
214220
Json(CreateConnectionResponse {
215221
name: request.name,
216222
source_type,
217223
tables_discovered,
224+
discovery_status,
225+
discovery_error,
218226
}),
219227
))
220228
}
221229

230+
/// Handler for POST /connections/{name}/discover
231+
pub async fn discover_connection_handler(
232+
State(engine): State<Arc<RivetEngine>>,
233+
Path(name): Path<String>,
234+
) -> Result<Json<DiscoverConnectionResponse>, ApiError> {
235+
// Validate connection exists
236+
if engine.catalog().get_connection(&name).await?.is_none() {
237+
return Err(ApiError::not_found(format!(
238+
"Connection '{}' not found",
239+
name
240+
)));
241+
}
242+
243+
// Attempt discovery
244+
let (tables_discovered, discovery_status, discovery_error) =
245+
match engine.discover_connection(&name).await {
246+
Ok(count) => (count, DiscoveryStatus::Success, None),
247+
Err(e) => {
248+
let root_cause = e.root_cause().to_string();
249+
let msg = root_cause
250+
.lines()
251+
.next()
252+
.unwrap_or("Unknown error")
253+
.to_string();
254+
error!("Discovery failed for connection '{}': {}", name, msg);
255+
(0, DiscoveryStatus::Failed, Some(msg))
256+
}
257+
};
258+
259+
Ok(Json(DiscoverConnectionResponse {
260+
name,
261+
tables_discovered,
262+
discovery_status,
263+
discovery_error,
264+
}))
265+
}
266+
222267
/// Handler for GET /connections
223268
pub async fn list_connections_handler(
224269
State(engine): State<Arc<RivetEngine>>,

src/http/models.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,37 @@ pub struct CreateConnectionRequest {
3939
pub config: serde_json::Value,
4040
}
4141

42+
/// Discovery status for connection creation
43+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
44+
#[serde(rename_all = "snake_case")]
45+
pub enum DiscoveryStatus {
46+
/// Discovery succeeded
47+
Success,
48+
/// Discovery was skipped (e.g., skip_discovery=true)
49+
Skipped,
50+
/// Discovery failed (connection still registered)
51+
Failed,
52+
}
53+
4254
/// Response body for POST /connections
4355
#[derive(Debug, Serialize)]
4456
pub struct CreateConnectionResponse {
4557
pub name: String,
4658
pub source_type: String,
4759
pub tables_discovered: usize,
60+
pub discovery_status: DiscoveryStatus,
61+
#[serde(skip_serializing_if = "Option::is_none")]
62+
pub discovery_error: Option<String>,
63+
}
64+
65+
/// Response body for POST /connections/{name}/discover
66+
#[derive(Debug, Serialize)]
67+
pub struct DiscoverConnectionResponse {
68+
pub name: String,
69+
pub tables_discovered: usize,
70+
pub discovery_status: DiscoveryStatus,
71+
#[serde(skip_serializing_if = "Option::is_none")]
72+
pub discovery_error: Option<String>,
4873
}
4974

5075
/// Single connection metadata for API responses

0 commit comments

Comments
 (0)