diff --git a/src/datafusion/engine.rs b/src/datafusion/engine.rs index b932a28..286b076 100644 --- a/src/datafusion/engine.rs +++ b/src/datafusion/engine.rs @@ -230,7 +230,7 @@ impl HotDataEngine { } /// Purge all cached data for a connection (clears parquet files and resets sync state). - pub async fn purge_connection(&mut self, name: &str) -> Result<()> { + pub async fn purge_connection(&self, name: &str) -> Result<()> { // Get connection info (validates it exists and gives us the ID) let conn = self .catalog @@ -263,7 +263,7 @@ impl HotDataEngine { /// Purge cached data for a single table (clears parquet file and resets sync state). pub async fn purge_table( - &mut self, + &self, connection_name: &str, schema_name: &str, table_name: &str, @@ -303,7 +303,7 @@ impl HotDataEngine { } /// Remove a connection entirely (removes from catalog and deletes all data). - pub async fn remove_connection(&mut self, name: &str) -> Result<()> { + pub async fn remove_connection(&self, name: &str) -> Result<()> { // Get connection info (validates it exists and gives us the ID) let conn = self .catalog diff --git a/src/http/app_server.rs b/src/http/app_server.rs index 966c365..1088761 100644 --- a/src/http/app_server.rs +++ b/src/http/app_server.rs @@ -1,9 +1,10 @@ use crate::datafusion::HotDataEngine; use crate::http::handlers::{ - create_connection_handler, health_handler, list_connections_handler, query_handler, - tables_handler, + create_connection_handler, delete_connection_handler, get_connection_handler, health_handler, + list_connections_handler, purge_connection_cache_handler, purge_table_cache_handler, + query_handler, tables_handler, }; -use axum::routing::{get, post}; +use axum::routing::{delete, get, post}; use axum::Router; use std::sync::Arc; @@ -16,6 +17,9 @@ pub const PATH_QUERY: &str = "/query"; pub const PATH_TABLES: &str = "/tables"; pub const PATH_HEALTH: &str = "/health"; pub const PATH_CONNECTIONS: &str = "/connections"; +pub const PATH_CONNECTION: &str = "/connections/{name}"; +pub const PATH_CONNECTION_CACHE: &str = "/connections/{name}/cache"; +pub const PATH_TABLE_CACHE: &str = "/connections/{name}/tables/{schema}/{table}/cache"; impl AppServer { pub fn new(engine: HotDataEngine) -> Self { @@ -29,6 +33,15 @@ impl AppServer { PATH_CONNECTIONS, post(create_connection_handler).get(list_connections_handler), ) + .route( + PATH_CONNECTION, + get(get_connection_handler).delete(delete_connection_handler), + ) + .route( + PATH_CONNECTION_CACHE, + delete(purge_connection_cache_handler), + ) + .route(PATH_TABLE_CACHE, delete(purge_table_cache_handler)) .with_state(engine.clone()), engine, } diff --git a/src/http/handlers.rs b/src/http/handlers.rs index d3b268f..bd3f9ac 100644 --- a/src/http/handlers.rs +++ b/src/http/handlers.rs @@ -1,12 +1,17 @@ use crate::datafusion::HotDataEngine; use crate::http::error::ApiError; use crate::http::models::{ - ConnectionInfo, CreateConnectionRequest, CreateConnectionResponse, ListConnectionsResponse, - QueryRequest, QueryResponse, TableInfo, TablesResponse, + ConnectionInfo, CreateConnectionRequest, CreateConnectionResponse, GetConnectionResponse, + ListConnectionsResponse, QueryRequest, QueryResponse, TableInfo, TablesResponse, }; use crate::http::serialization::{encode_value_at, make_array_encoder}; use crate::source::Source; -use axum::{extract::Query as QueryParams, extract::State, http::StatusCode, Json}; +use axum::{ + extract::{Path, Query as QueryParams, State}, + http::StatusCode, + Json, +}; +use serde::Deserialize; use std::collections::HashMap; use std::sync::Arc; use std::time::Instant; @@ -231,3 +236,88 @@ pub async fn list_connections_handler( connections: connection_infos, })) } + +/// Handler for GET /connections/{name} +pub async fn get_connection_handler( + State(engine): State>, + Path(name): Path, +) -> Result, ApiError> { + // Get connection info + let conn = engine + .catalog() + .get_connection(&name)? + .ok_or_else(|| ApiError::not_found(format!("Connection '{}' not found", name)))?; + + // Get table counts + let tables = engine.list_tables(Some(&name))?; + let table_count = tables.len(); + let synced_table_count = tables.iter().filter(|t| t.parquet_path.is_some()).count(); + + Ok(Json(GetConnectionResponse { + id: conn.id, + name: conn.name, + source_type: conn.source_type, + table_count, + synced_table_count, + })) +} + +/// Handler for DELETE /connections/{name} +pub async fn delete_connection_handler( + State(engine): State>, + Path(name): Path, +) -> Result { + engine.remove_connection(&name).await.map_err(|e| { + if e.to_string().contains("not found") { + ApiError::not_found(format!("Connection '{}' not found", name)) + } else { + ApiError::internal_error(e.to_string()) + } + })?; + + Ok(StatusCode::NO_CONTENT) +} + +/// Handler for DELETE /connections/{name}/cache +pub async fn purge_connection_cache_handler( + State(engine): State>, + Path(name): Path, +) -> Result { + engine.purge_connection(&name).await.map_err(|e| { + if e.to_string().contains("not found") { + ApiError::not_found(format!("Connection '{}' not found", name)) + } else { + ApiError::internal_error(e.to_string()) + } + })?; + + Ok(StatusCode::NO_CONTENT) +} + +/// Path parameters for table cache operations +#[derive(Deserialize)] +pub struct TableCachePath { + name: String, + schema: String, + table: String, +} + +/// Handler for DELETE /connections/{name}/tables/{schema}/{table}/cache +pub async fn purge_table_cache_handler( + State(engine): State>, + Path(params): Path, +) -> Result { + engine + .purge_table(¶ms.name, ¶ms.schema, ¶ms.table) + .await + .map_err(|e| { + let msg = e.to_string(); + if msg.contains("not found") { + ApiError::not_found(msg) + } else { + ApiError::internal_error(msg) + } + })?; + + Ok(StatusCode::NO_CONTENT) +} diff --git a/src/http/models.rs b/src/http/models.rs index bd38c79..e01debf 100644 --- a/src/http/models.rs +++ b/src/http/models.rs @@ -60,3 +60,13 @@ pub struct ConnectionInfo { pub struct ListConnectionsResponse { pub connections: Vec, } + +/// Response body for GET /connections/{name} +#[derive(Debug, Serialize)] +pub struct GetConnectionResponse { + pub id: i32, + pub name: String, + pub source_type: String, + pub table_count: usize, + pub synced_table_count: usize, +} diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 77c2064..f30f515 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -121,7 +121,7 @@ impl ConnectionResult { /// Table info from list operations. struct TablesResult { - tables: Vec<(String, String)>, // (schema, table) + tables: Vec<(String, String, bool)>, // (schema, table, synced) } impl TablesResult { @@ -129,7 +129,13 @@ impl TablesResult { Self { tables: tables .iter() - .map(|t| (t.schema_name.clone(), t.table_name.clone())) + .map(|t| { + ( + t.schema_name.clone(), + t.table_name.clone(), + t.parquet_path.is_some(), + ) + }) .collect(), } } @@ -142,7 +148,8 @@ impl TablesResult { .filter_map(|t| { let schema = t["schema"].as_str()?; let table = t["table"].as_str()?; - Some((schema.to_string(), table.to_string())) + let synced = t["synced"].as_bool().unwrap_or(false); + Some((schema.to_string(), table.to_string(), synced)) }) .collect() }) @@ -152,7 +159,9 @@ impl TablesResult { fn assert_has_table(&self, schema: &str, table: &str) { assert!( - self.tables.iter().any(|(s, t)| s == schema && t == table), + self.tables + .iter() + .any(|(s, t, _)| s == schema && t == table), "Table '{}.{}' not found. Available: {:?}", schema, table, @@ -163,6 +172,57 @@ impl TablesResult { fn assert_not_empty(&self) { assert!(!self.tables.is_empty(), "Expected tables but found none"); } + + fn synced_count(&self) -> usize { + self.tables.iter().filter(|(_, _, synced)| *synced).count() + } + + fn assert_none_synced(&self) { + assert_eq!(self.synced_count(), 0, "Expected no tables to be synced"); + } + + fn is_table_synced(&self, schema: &str, table: &str) -> bool { + self.tables + .iter() + .find(|(s, t, _)| s == schema && t == table) + .map(|(_, _, synced)| *synced) + .unwrap_or(false) + } +} + +/// Single connection details from get operations. +#[allow(dead_code)] +struct ConnectionDetails { + id: i32, + name: String, + source_type: String, + table_count: usize, + synced_table_count: usize, +} + +impl ConnectionDetails { + fn from_engine( + conn: &rivetdb::catalog::ConnectionInfo, + tables: &[rivetdb::catalog::TableInfo], + ) -> Self { + Self { + id: conn.id, + name: conn.name.clone(), + source_type: conn.source_type.clone(), + table_count: tables.len(), + synced_table_count: tables.iter().filter(|t| t.parquet_path.is_some()).count(), + } + } + + fn from_api(json: &serde_json::Value) -> Self { + Self { + id: json["id"].as_i64().unwrap_or(0) as i32, + name: json["name"].as_str().unwrap_or("").to_string(), + source_type: json["source_type"].as_str().unwrap_or("").to_string(), + table_count: json["table_count"].as_u64().unwrap_or(0) as usize, + synced_table_count: json["synced_table_count"].as_u64().unwrap_or(0) as usize, + } + } } /// Trait for test execution - allows same test logic for engine vs API. @@ -172,6 +232,12 @@ trait TestExecutor: Send + Sync { async fn list_connections(&self) -> ConnectionResult; async fn list_tables(&self, connection: &str) -> TablesResult; async fn query(&self, sql: &str) -> QueryResult; + + // New methods for CRUD operations + async fn get_connection(&self, name: &str) -> Option; + async fn delete_connection(&self, name: &str) -> bool; + async fn purge_connection_cache(&self, name: &str) -> bool; + async fn purge_table_cache(&self, conn: &str, schema: &str, table: &str) -> bool; } /// Engine-based test executor. @@ -199,6 +265,24 @@ impl TestExecutor for EngineExecutor { async fn query(&self, sql: &str) -> QueryResult { QueryResult::from_engine(&self.engine.execute_query(sql).await.unwrap()) } + + async fn get_connection(&self, name: &str) -> Option { + let conn = self.engine.catalog().get_connection(name).ok()??; + let tables = self.engine.list_tables(Some(name)).ok()?; + Some(ConnectionDetails::from_engine(&conn, &tables)) + } + + async fn delete_connection(&self, name: &str) -> bool { + self.engine.remove_connection(name).await.is_ok() + } + + async fn purge_connection_cache(&self, name: &str) -> bool { + self.engine.purge_connection(name).await.is_ok() + } + + async fn purge_table_cache(&self, conn: &str, schema: &str, table: &str) -> bool { + self.engine.purge_table(conn, schema, table).await.is_ok() + } } /// REST API-based test executor. @@ -309,6 +393,84 @@ impl TestExecutor for ApiExecutor { .unwrap(); QueryResult::from_api(&serde_json::from_slice(&body).unwrap()) } + + async fn get_connection(&self, name: &str) -> Option { + let uri = format!("/connections/{}", name); + let response = self + .router + .clone() + .oneshot( + Request::builder() + .method("GET") + .uri(&uri) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + if response.status() != StatusCode::OK { + return None; + } + + let body = axum::body::to_bytes(response.into_body(), usize::MAX) + .await + .unwrap(); + Some(ConnectionDetails::from_api( + &serde_json::from_slice(&body).unwrap(), + )) + } + + async fn delete_connection(&self, name: &str) -> bool { + let uri = format!("/connections/{}", name); + let response = self + .router + .clone() + .oneshot( + Request::builder() + .method("DELETE") + .uri(&uri) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + response.status() == StatusCode::NO_CONTENT + } + + async fn purge_connection_cache(&self, name: &str) -> bool { + let uri = format!("/connections/{}/cache", name); + let response = self + .router + .clone() + .oneshot( + Request::builder() + .method("DELETE") + .uri(&uri) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + response.status() == StatusCode::NO_CONTENT + } + + async fn purge_table_cache(&self, conn: &str, schema: &str, table: &str) -> bool { + let uri = format!("/connections/{}/tables/{}/{}/cache", conn, schema, table); + let response = self + .router + .clone() + .oneshot( + Request::builder() + .method("DELETE") + .uri(&uri) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + response.status() == StatusCode::NO_CONTENT + } } /// Test context providing both executors. @@ -421,6 +583,96 @@ async fn run_multi_schema_test(executor: &dyn TestExecutor, source: &Source, con products.assert_row_count(1); } +/// Run delete connection test scenario. +async fn run_delete_connection_test(executor: &dyn TestExecutor, source: &Source, conn_name: &str) { + // Create connection + executor.connect(conn_name, source).await; + + // Verify it exists + let conn = executor.get_connection(conn_name).await; + assert!(conn.is_some(), "Connection should exist after creation"); + + // Delete it + let deleted = executor.delete_connection(conn_name).await; + assert!(deleted, "Delete should succeed"); + + // Verify it's gone + let conn = executor.get_connection(conn_name).await; + assert!(conn.is_none(), "Connection should not exist after deletion"); + + // Verify delete of non-existent returns false + let deleted_again = executor.delete_connection(conn_name).await; + assert!(!deleted_again, "Delete of non-existent should fail"); +} + +/// Run purge connection cache test scenario. +async fn run_purge_connection_cache_test( + executor: &dyn TestExecutor, + source: &Source, + conn_name: &str, +) { + // Create connection + executor.connect(conn_name, source).await; + + // Query to trigger sync + let _ = executor.query(&queries::select_orders(conn_name)).await; + + // Verify tables are synced + let tables = executor.list_tables(conn_name).await; + assert!( + tables.synced_count() > 0, + "Should have synced tables after query" + ); + + // Purge cache + let purged = executor.purge_connection_cache(conn_name).await; + assert!(purged, "Purge should succeed"); + + // Verify tables still exist but not synced + let tables = executor.list_tables(conn_name).await; + tables.assert_not_empty(); + tables.assert_none_synced(); + + // Connection should still exist + let conn = executor.get_connection(conn_name).await; + assert!( + conn.is_some(), + "Connection should still exist after cache purge" + ); +} + +/// Run purge table cache test scenario. +async fn run_purge_table_cache_test(executor: &dyn TestExecutor, source: &Source, conn_name: &str) { + // Create connection + executor.connect(conn_name, source).await; + + // Query to trigger sync of orders table + let _ = executor.query(&queries::select_orders(conn_name)).await; + + // Verify orders table is synced + let tables = executor.list_tables(conn_name).await; + assert!( + tables.is_table_synced("sales", "orders"), + "orders should be synced" + ); + + // Purge just the orders table cache + let purged = executor + .purge_table_cache(conn_name, "sales", "orders") + .await; + assert!(purged, "Purge table should succeed"); + + // Verify orders is no longer synced + let tables = executor.list_tables(conn_name).await; + assert!( + !tables.is_table_synced("sales", "orders"), + "orders should not be synced after purge" + ); + + // Table should still be listed + tables.assert_has_table("sales", "orders"); +} + // ============================================================================ // Data Source Fixtures // ============================================================================ @@ -610,6 +862,48 @@ mod duckdb_tests { let harness = TestHarness::new(); run_multi_schema_test(harness.api(), &source, "duckdb_conn").await; } + + #[tokio::test] + async fn test_engine_delete_connection() { + let (_dir, source) = fixtures::duckdb_standard(); + let harness = TestHarness::new(); + run_delete_connection_test(harness.engine(), &source, "duckdb_conn").await; + } + + #[tokio::test] + async fn test_api_delete_connection() { + let (_dir, source) = fixtures::duckdb_standard(); + let harness = TestHarness::new(); + run_delete_connection_test(harness.api(), &source, "duckdb_conn").await; + } + + #[tokio::test] + async fn test_engine_purge_connection_cache() { + let (_dir, source) = fixtures::duckdb_standard(); + let harness = TestHarness::new(); + run_purge_connection_cache_test(harness.engine(), &source, "duckdb_conn").await; + } + + #[tokio::test] + async fn test_api_purge_connection_cache() { + let (_dir, source) = fixtures::duckdb_standard(); + let harness = TestHarness::new(); + run_purge_connection_cache_test(harness.api(), &source, "duckdb_conn").await; + } + + #[tokio::test] + async fn test_engine_purge_table_cache() { + let (_dir, source) = fixtures::duckdb_standard(); + let harness = TestHarness::new(); + run_purge_table_cache_test(harness.engine(), &source, "duckdb_conn").await; + } + + #[tokio::test] + async fn test_api_purge_table_cache() { + let (_dir, source) = fixtures::duckdb_standard(); + let harness = TestHarness::new(); + run_purge_table_cache_test(harness.api(), &source, "duckdb_conn").await; + } } // ============================================================================ @@ -647,3 +941,54 @@ mod postgres_tests { run_multi_schema_test(harness.api(), &fixture.source, "pg_conn").await; } } + +// ============================================================================ +// Tests - Error Cases +// ============================================================================ + +mod error_tests { + use super::*; + + #[tokio::test] + async fn test_get_nonexistent_connection() { + let harness = TestHarness::new(); + + // Engine + let conn = harness.engine().get_connection("nonexistent").await; + assert!(conn.is_none()); + + // API + let conn = harness.api().get_connection("nonexistent").await; + assert!(conn.is_none()); + } + + #[tokio::test] + async fn test_delete_nonexistent_connection() { + let harness = TestHarness::new(); + + let deleted = harness.api().delete_connection("nonexistent").await; + assert!(!deleted, "Delete of nonexistent should return false/404"); + } + + #[tokio::test] + async fn test_purge_nonexistent_connection_cache() { + let harness = TestHarness::new(); + + let purged = harness.api().purge_connection_cache("nonexistent").await; + assert!(!purged, "Purge of nonexistent should return false/404"); + } + + #[tokio::test] + async fn test_purge_nonexistent_table_cache() { + let harness = TestHarness::new(); + + let purged = harness + .api() + .purge_table_cache("nonexistent", "schema", "table") + .await; + assert!( + !purged, + "Purge of nonexistent table should return false/404" + ); + } +}