diff --git a/src/bin/server.rs b/src/bin/server.rs index 1bae03b..99c294c 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -1,8 +1,8 @@ use anyhow::Result; use clap::Parser; use rivetdb::config::AppConfig; -use rivetdb::datafusion::HotDataEngine; use rivetdb::http::app_server::AppServer; +use rivetdb::RivetEngine; use std::time::Instant; #[derive(Parser)] @@ -34,7 +34,7 @@ async fn main() -> Result<()> { tracing::info!("Configuration '{}' loaded successfully", &cli.config); // Initialize engine from config - let engine = HotDataEngine::from_config(&config).await?; + let engine = RivetEngine::from_config(&config).await?; tracing::info!("Engine initialized"); diff --git a/src/datafusion/catalog_provider.rs b/src/datafusion/catalog_provider.rs index 2a7b327..97606aa 100644 --- a/src/datafusion/catalog_provider.rs +++ b/src/datafusion/catalog_provider.rs @@ -1,5 +1,5 @@ use super::block_on; -use super::schema_provider::HotDataSchemaProvider; +use super::schema_provider::RivetSchemaProvider; use crate::catalog::CatalogManager; use crate::datafetch::{DataFetcher, NativeFetcher}; use crate::source::Source; @@ -12,7 +12,7 @@ use std::sync::{Arc, RwLock}; /// A catalog provider that represents a single connection. /// Lazily creates schema providers as they are accessed. #[derive(Debug)] -pub struct HotDataCatalogProvider { +pub struct RivetCatalogProvider { connection_id: i32, connection_name: String, source: Arc, @@ -22,7 +22,7 @@ pub struct HotDataCatalogProvider { fetcher: Arc, } -impl HotDataCatalogProvider { +impl RivetCatalogProvider { pub fn new( connection_id: i32, connection_name: String, @@ -60,7 +60,7 @@ impl HotDataCatalogProvider { } // Create new schema provider - let schema_provider = Arc::new(HotDataSchemaProvider::new( + let schema_provider = Arc::new(RivetSchemaProvider::new( self.connection_id, self.connection_name.clone(), schema_name.to_string(), @@ -77,7 +77,7 @@ impl HotDataCatalogProvider { } #[async_trait] -impl CatalogProvider for HotDataCatalogProvider { +impl CatalogProvider for RivetCatalogProvider { fn as_any(&self) -> &dyn std::any::Any { self } diff --git a/src/datafusion/mod.rs b/src/datafusion/mod.rs index 85cecb8..54538b4 100644 --- a/src/datafusion/mod.rs +++ b/src/datafusion/mod.rs @@ -1,5 +1,4 @@ mod catalog_provider; -mod engine; mod lazy_table_provider; mod schema_provider; @@ -17,9 +16,6 @@ where block_in_place(|| tokio::runtime::Handle::current().block_on(f)) } -pub use catalog_provider::HotDataCatalogProvider; -pub use engine::HotDataEngine; -pub use engine::HotDataEngineBuilder; -pub use engine::QueryResponse; +pub use catalog_provider::RivetCatalogProvider; pub use lazy_table_provider::LazyTableProvider; -pub use schema_provider::HotDataSchemaProvider; +pub use schema_provider::RivetSchemaProvider; diff --git a/src/datafusion/schema_provider.rs b/src/datafusion/schema_provider.rs index cac86db..05dac78 100644 --- a/src/datafusion/schema_provider.rs +++ b/src/datafusion/schema_provider.rs @@ -14,7 +14,7 @@ use super::lazy_table_provider::LazyTableProvider; /// A schema provider that syncs tables on-demand from remote sources. /// Wraps MemorySchemaProvider for caching already-loaded tables. #[derive(Debug)] -pub struct HotDataSchemaProvider { +pub struct RivetSchemaProvider { connection_id: i32, #[allow(dead_code)] connection_name: String, @@ -26,7 +26,7 @@ pub struct HotDataSchemaProvider { fetcher: Arc, } -impl HotDataSchemaProvider { +impl RivetSchemaProvider { #[allow(clippy::too_many_arguments)] pub fn new( connection_id: i32, @@ -51,7 +51,7 @@ impl HotDataSchemaProvider { } #[async_trait] -impl SchemaProvider for HotDataSchemaProvider { +impl SchemaProvider for RivetSchemaProvider { fn as_any(&self) -> &dyn std::any::Any { self } diff --git a/src/datafusion/engine.rs b/src/engine.rs similarity index 95% rename from src/datafusion/engine.rs rename to src/engine.rs index cfec8c5..f6fb050 100644 --- a/src/datafusion/engine.rs +++ b/src/engine.rs @@ -1,7 +1,6 @@ -use super::block_on; -use super::catalog_provider::HotDataCatalogProvider; use crate::catalog::{CatalogManager, ConnectionInfo, SqliteCatalogManager, TableInfo}; use crate::datafetch::DataFetcher; +use crate::datafusion::{block_on, RivetCatalogProvider}; use crate::source::Source; use crate::storage::{FilesystemStorage, StorageManager}; use anyhow::Result; @@ -20,13 +19,13 @@ pub struct QueryResponse { } /// The main query engine that manages connections, catalogs, and query execution. -pub struct HotDataEngine { +pub struct RivetEngine { catalog: Arc, df_ctx: SessionContext, storage: Arc, } -impl HotDataEngine { +impl RivetEngine { /// Create a new engine instance and register all existing connections. pub async fn new(catalog_path: &str) -> Result { Self::new_with_paths(catalog_path, "cache", "state", false).await @@ -123,7 +122,7 @@ impl HotDataEngine { for conn in connections { let source: Source = serde_json::from_str(&conn.config_json)?; - let catalog_provider = Arc::new(HotDataCatalogProvider::new( + let catalog_provider = Arc::new(RivetCatalogProvider::new( conn.id, conn.name.clone(), Arc::new(source), @@ -169,7 +168,7 @@ impl HotDataEngine { } // Register with DataFusion - let catalog_provider = Arc::new(HotDataCatalogProvider::new( + let catalog_provider = Arc::new(RivetCatalogProvider::new( conn_id, name.to_string(), Arc::new(source), @@ -249,7 +248,7 @@ impl HotDataEngine { // This causes DataFusion to drop any open file handles to the cached files let source: Source = serde_json::from_str(&conn.config_json)?; - let catalog_provider = Arc::new(HotDataCatalogProvider::new( + let catalog_provider = Arc::new(RivetCatalogProvider::new( conn.id, conn.name.clone(), Arc::new(source), @@ -291,7 +290,7 @@ impl HotDataEngine { // This causes DataFusion to drop any open file handles to the cached files let source: Source = serde_json::from_str(&conn.config_json)?; - let catalog_provider = Arc::new(HotDataCatalogProvider::new( + let catalog_provider = Arc::new(RivetCatalogProvider::new( conn.id, conn.name.clone(), Arc::new(source), @@ -358,39 +357,39 @@ impl HotDataEngine { } } -impl Drop for HotDataEngine { +impl Drop for RivetEngine { fn drop(&mut self) { // Ensure catalog connection is closed when engine is dropped let _ = block_on(self.catalog.close()); } } -/// Builder for HotDataEngine +/// Builder for RivetEngine /// /// # Example /// /// ```no_run -/// use rivetdb::datafusion::HotDataEngine; +/// use rivetdb::RivetEngine; /// use std::path::PathBuf; /// -/// let builder = HotDataEngine::builder() +/// let builder = RivetEngine::builder() /// .metadata_dir(PathBuf::from("/tmp/rivet")); /// /// // let engine = builder.build().unwrap(); /// ``` -pub struct HotDataEngineBuilder { +pub struct RivetEngineBuilder { metadata_dir: Option, catalog: Option>, storage: Option>, } -impl Default for HotDataEngineBuilder { +impl Default for RivetEngineBuilder { fn default() -> Self { Self::new() } } -impl HotDataEngineBuilder { +impl RivetEngineBuilder { pub fn new() -> Self { Self { metadata_dir: None, @@ -414,7 +413,7 @@ impl HotDataEngineBuilder { self } - pub async fn build(self) -> Result { + pub async fn build(self) -> Result { let catalog = self .catalog .ok_or_else(|| anyhow::anyhow!("Catalog manager not set"))?; @@ -430,7 +429,7 @@ impl HotDataEngineBuilder { // Register storage with DataFusion storage.register_with_datafusion(&df_ctx)?; - let mut engine = HotDataEngine { + let mut engine = RivetEngine { catalog, df_ctx, storage, @@ -443,9 +442,9 @@ impl HotDataEngineBuilder { } } -impl HotDataEngine { - pub fn builder() -> HotDataEngineBuilder { - HotDataEngineBuilder::new() +impl RivetEngine { + pub fn builder() -> RivetEngineBuilder { + RivetEngineBuilder::new() } /// Create a new engine from application configuration. @@ -582,7 +581,7 @@ impl HotDataEngine { }; // Use builder to construct engine - HotDataEngine::builder() + RivetEngine::builder() .metadata_dir(metadata_dir) .catalog(catalog) .storage(storage) @@ -615,7 +614,7 @@ mod tests { )); // Build engine using builder pattern - let engine = HotDataEngine::builder() + let engine = RivetEngine::builder() .metadata_dir(metadata_dir.clone()) .catalog(catalog) .storage(storage) @@ -640,7 +639,7 @@ mod tests { async fn test_builder_pattern_missing_fields() { // Test that builder fails when required fields are missing let temp_dir = TempDir::new().unwrap(); - let result = HotDataEngine::builder() + let result = RivetEngine::builder() .metadata_dir(temp_dir.path().to_path_buf()) .build() .await; @@ -687,7 +686,7 @@ mod tests { }, }; - let engine = HotDataEngine::from_config(&config).await; + let engine = RivetEngine::from_config(&config).await; assert!( engine.is_ok(), "from_config should create engine successfully" diff --git a/src/http/app_server.rs b/src/http/app_server.rs index 1088761..6f69561 100644 --- a/src/http/app_server.rs +++ b/src/http/app_server.rs @@ -1,16 +1,16 @@ -use crate::datafusion::HotDataEngine; use crate::http::handlers::{ create_connection_handler, delete_connection_handler, get_connection_handler, health_handler, list_connections_handler, purge_connection_cache_handler, purge_table_cache_handler, query_handler, tables_handler, }; +use crate::RivetEngine; use axum::routing::{delete, get, post}; use axum::Router; use std::sync::Arc; pub struct AppServer { pub router: Router, - pub engine: Arc, + pub engine: Arc, } pub const PATH_QUERY: &str = "/query"; @@ -22,7 +22,7 @@ pub const PATH_CONNECTION_CACHE: &str = "/connections/{name}/cache"; pub const PATH_TABLE_CACHE: &str = "/connections/{name}/tables/{schema}/{table}/cache"; impl AppServer { - pub fn new(engine: HotDataEngine) -> Self { + pub fn new(engine: RivetEngine) -> Self { let engine = Arc::new(engine); AppServer { router: Router::new() diff --git a/src/http/handlers.rs b/src/http/handlers.rs index 11d18a8..5299b6f 100644 --- a/src/http/handlers.rs +++ b/src/http/handlers.rs @@ -1,4 +1,3 @@ -use crate::datafusion::HotDataEngine; use crate::http::error::ApiError; use crate::http::models::{ ConnectionInfo, CreateConnectionRequest, CreateConnectionResponse, GetConnectionResponse, @@ -6,6 +5,7 @@ use crate::http::models::{ }; use crate::http::serialization::{encode_value_at, make_array_encoder}; use crate::source::Source; +use crate::RivetEngine; use axum::{ extract::{Path, Query as QueryParams, State}, http::StatusCode, @@ -19,7 +19,7 @@ use tracing::error; /// Handler for POST /query pub async fn query_handler( - State(engine): State>, + State(engine): State>, Json(request): Json, ) -> Result, ApiError> { // Validate SQL is not empty @@ -83,7 +83,7 @@ pub async fn query_handler( /// Handler for GET /tables pub async fn tables_handler( - State(engine): State>, + State(engine): State>, QueryParams(params): QueryParams>, ) -> Result, ApiError> { // Get optional connection filter @@ -145,7 +145,7 @@ pub async fn health_handler() -> (StatusCode, Json) { /// Handler for POST /connections pub async fn create_connection_handler( - State(engine): State>, + State(engine): State>, Json(request): Json, ) -> Result<(StatusCode, Json), ApiError> { // Validate name is not empty @@ -221,7 +221,7 @@ pub async fn create_connection_handler( /// Handler for GET /connections pub async fn list_connections_handler( - State(engine): State>, + State(engine): State>, ) -> Result, ApiError> { let connections = engine.list_connections().await?; @@ -241,7 +241,7 @@ pub async fn list_connections_handler( /// Handler for GET /connections/{name} pub async fn get_connection_handler( - State(engine): State>, + State(engine): State>, Path(name): Path, ) -> Result, ApiError> { // Get connection info @@ -267,7 +267,7 @@ pub async fn get_connection_handler( /// Handler for DELETE /connections/{name} pub async fn delete_connection_handler( - State(engine): State>, + State(engine): State>, Path(name): Path, ) -> Result { engine.remove_connection(&name).await.map_err(|e| { @@ -283,7 +283,7 @@ pub async fn delete_connection_handler( /// Handler for DELETE /connections/{name}/cache pub async fn purge_connection_cache_handler( - State(engine): State>, + State(engine): State>, Path(name): Path, ) -> Result { engine.purge_connection(&name).await.map_err(|e| { @@ -307,7 +307,7 @@ pub struct TableCachePath { /// Handler for DELETE /connections/{name}/tables/{schema}/{table}/cache pub async fn purge_table_cache_handler( - State(engine): State>, + State(engine): State>, Path(params): Path, ) -> Result { engine diff --git a/src/lib.rs b/src/lib.rs index f2ec810..55e93b5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,8 +2,10 @@ pub mod catalog; pub mod config; pub mod datafetch; pub mod datafusion; +mod engine; pub mod http; pub mod source; pub mod storage; +pub use engine::{QueryResponse, RivetEngine, RivetEngineBuilder}; pub use source::Source; diff --git a/tests/engine_sync_tests.rs b/tests/engine_sync_tests.rs index be7fbfb..dcce29e 100644 --- a/tests/engine_sync_tests.rs +++ b/tests/engine_sync_tests.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use rivetdb::datafusion::HotDataEngine; +use rivetdb::RivetEngine; use tempfile::tempdir; /// Test that sync_connection handles non-existent connections correctly @@ -11,7 +11,7 @@ async fn test_sync_connection_not_found() -> Result<()> { let cache_path = dir.path().join("cache"); let state_path = dir.path().join("state"); - let engine = HotDataEngine::new_with_paths( + let engine = RivetEngine::new_with_paths( catalog_path.to_str().unwrap(), cache_path.to_str().unwrap(), state_path.to_str().unwrap(), @@ -38,7 +38,7 @@ async fn test_sync_connection_no_tables() -> Result<()> { let cache_path = dir.path().join("cache"); let state_path = dir.path().join("state"); - let engine = HotDataEngine::new_with_paths( + let engine = RivetEngine::new_with_paths( catalog_path.to_str().unwrap(), cache_path.to_str().unwrap(), state_path.to_str().unwrap(), diff --git a/tests/http_server_tests.rs b/tests/http_server_tests.rs index e6a0318..a5d332e 100644 --- a/tests/http_server_tests.rs +++ b/tests/http_server_tests.rs @@ -5,8 +5,8 @@ use axum::{ http::{Request, StatusCode}, Router, }; -use rivetdb::datafusion::HotDataEngine; use rivetdb::http::app_server::{AppServer, PATH_CONNECTIONS, PATH_QUERY, PATH_TABLES}; +use rivetdb::RivetEngine; use serde_json::json; use tempfile::TempDir; use tower::util::ServiceExt; @@ -19,7 +19,7 @@ async fn setup_test() -> Result<(Router, TempDir)> { let cache_path = metadata_dir.join("cache"); let state_path = metadata_dir.join("state"); - let engine = HotDataEngine::new_with_paths( + let engine = RivetEngine::new_with_paths( catalog_path.to_str().unwrap(), cache_path.to_str().unwrap(), state_path.to_str().unwrap(), diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index e01eabe..51fbd12 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -10,9 +10,9 @@ use axum::{ http::{Request, StatusCode}, Router, }; -use rivetdb::datafusion::HotDataEngine; use rivetdb::http::app_server::{AppServer, PATH_CONNECTIONS, PATH_QUERY, PATH_TABLES}; use rivetdb::source::Source; +use rivetdb::RivetEngine; use serde_json::json; use std::sync::Arc; use tempfile::TempDir; @@ -29,7 +29,7 @@ struct QueryResult { } impl QueryResult { - fn from_engine(response: &rivetdb::datafusion::QueryResponse) -> Self { + fn from_engine(response: &rivetdb::QueryResponse) -> Self { let batch = response.results.first(); let (columns, row_count) = match batch { @@ -242,7 +242,7 @@ trait TestExecutor: Send + Sync { /// Engine-based test executor. struct EngineExecutor { - engine: Arc, + engine: Arc, } #[async_trait::async_trait] @@ -491,7 +491,7 @@ impl TestHarness { std::fs::create_dir_all(&cache_dir).unwrap(); std::fs::create_dir_all(&state_dir).unwrap(); - let engine = HotDataEngine::new_with_paths( + let engine = RivetEngine::new_with_paths( catalog_path.to_str().unwrap(), cache_dir.to_str().unwrap(), state_dir.to_str().unwrap(),