Skip to content

Commit fe27d4b

Browse files
authored
Rename RivetEngine (#35)
* rename HotData* -> Rivet* (HotDataEngine -> RivetEngine) * cargo fmt
1 parent 87e0a4b commit fe27d4b

File tree

11 files changed

+58
-61
lines changed

11 files changed

+58
-61
lines changed

src/bin/server.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use anyhow::Result;
22
use clap::Parser;
33
use rivetdb::config::AppConfig;
4-
use rivetdb::datafusion::HotDataEngine;
54
use rivetdb::http::app_server::AppServer;
5+
use rivetdb::RivetEngine;
66
use std::time::Instant;
77

88
#[derive(Parser)]
@@ -34,7 +34,7 @@ async fn main() -> Result<()> {
3434
tracing::info!("Configuration '{}' loaded successfully", &cli.config);
3535

3636
// Initialize engine from config
37-
let engine = HotDataEngine::from_config(&config).await?;
37+
let engine = RivetEngine::from_config(&config).await?;
3838

3939
tracing::info!("Engine initialized");
4040

src/datafusion/catalog_provider.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use super::block_on;
2-
use super::schema_provider::HotDataSchemaProvider;
2+
use super::schema_provider::RivetSchemaProvider;
33
use crate::catalog::CatalogManager;
44
use crate::datafetch::{DataFetcher, NativeFetcher};
55
use crate::source::Source;
@@ -12,7 +12,7 @@ use std::sync::{Arc, RwLock};
1212
/// A catalog provider that represents a single connection.
1313
/// Lazily creates schema providers as they are accessed.
1414
#[derive(Debug)]
15-
pub struct HotDataCatalogProvider {
15+
pub struct RivetCatalogProvider {
1616
connection_id: i32,
1717
connection_name: String,
1818
source: Arc<Source>,
@@ -22,7 +22,7 @@ pub struct HotDataCatalogProvider {
2222
fetcher: Arc<dyn DataFetcher>,
2323
}
2424

25-
impl HotDataCatalogProvider {
25+
impl RivetCatalogProvider {
2626
pub fn new(
2727
connection_id: i32,
2828
connection_name: String,
@@ -60,7 +60,7 @@ impl HotDataCatalogProvider {
6060
}
6161

6262
// Create new schema provider
63-
let schema_provider = Arc::new(HotDataSchemaProvider::new(
63+
let schema_provider = Arc::new(RivetSchemaProvider::new(
6464
self.connection_id,
6565
self.connection_name.clone(),
6666
schema_name.to_string(),
@@ -77,7 +77,7 @@ impl HotDataCatalogProvider {
7777
}
7878

7979
#[async_trait]
80-
impl CatalogProvider for HotDataCatalogProvider {
80+
impl CatalogProvider for RivetCatalogProvider {
8181
fn as_any(&self) -> &dyn std::any::Any {
8282
self
8383
}

src/datafusion/mod.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
mod catalog_provider;
2-
mod engine;
32
mod lazy_table_provider;
43
mod schema_provider;
54

@@ -17,9 +16,6 @@ where
1716
block_in_place(|| tokio::runtime::Handle::current().block_on(f))
1817
}
1918

20-
pub use catalog_provider::HotDataCatalogProvider;
21-
pub use engine::HotDataEngine;
22-
pub use engine::HotDataEngineBuilder;
23-
pub use engine::QueryResponse;
19+
pub use catalog_provider::RivetCatalogProvider;
2420
pub use lazy_table_provider::LazyTableProvider;
25-
pub use schema_provider::HotDataSchemaProvider;
21+
pub use schema_provider::RivetSchemaProvider;

src/datafusion/schema_provider.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use super::lazy_table_provider::LazyTableProvider;
1414
/// A schema provider that syncs tables on-demand from remote sources.
1515
/// Wraps MemorySchemaProvider for caching already-loaded tables.
1616
#[derive(Debug)]
17-
pub struct HotDataSchemaProvider {
17+
pub struct RivetSchemaProvider {
1818
connection_id: i32,
1919
#[allow(dead_code)]
2020
connection_name: String,
@@ -26,7 +26,7 @@ pub struct HotDataSchemaProvider {
2626
fetcher: Arc<dyn DataFetcher>,
2727
}
2828

29-
impl HotDataSchemaProvider {
29+
impl RivetSchemaProvider {
3030
#[allow(clippy::too_many_arguments)]
3131
pub fn new(
3232
connection_id: i32,
@@ -51,7 +51,7 @@ impl HotDataSchemaProvider {
5151
}
5252

5353
#[async_trait]
54-
impl SchemaProvider for HotDataSchemaProvider {
54+
impl SchemaProvider for RivetSchemaProvider {
5555
fn as_any(&self) -> &dyn std::any::Any {
5656
self
5757
}
Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
use super::block_on;
2-
use super::catalog_provider::HotDataCatalogProvider;
31
use crate::catalog::{CatalogManager, ConnectionInfo, SqliteCatalogManager, TableInfo};
42
use crate::datafetch::DataFetcher;
3+
use crate::datafusion::{block_on, RivetCatalogProvider};
54
use crate::source::Source;
65
use crate::storage::{FilesystemStorage, StorageManager};
76
use anyhow::Result;
@@ -20,13 +19,13 @@ pub struct QueryResponse {
2019
}
2120

2221
/// The main query engine that manages connections, catalogs, and query execution.
23-
pub struct HotDataEngine {
22+
pub struct RivetEngine {
2423
catalog: Arc<dyn CatalogManager>,
2524
df_ctx: SessionContext,
2625
storage: Arc<dyn StorageManager>,
2726
}
2827

29-
impl HotDataEngine {
28+
impl RivetEngine {
3029
/// Create a new engine instance and register all existing connections.
3130
pub async fn new(catalog_path: &str) -> Result<Self> {
3231
Self::new_with_paths(catalog_path, "cache", "state", false).await
@@ -123,7 +122,7 @@ impl HotDataEngine {
123122
for conn in connections {
124123
let source: Source = serde_json::from_str(&conn.config_json)?;
125124

126-
let catalog_provider = Arc::new(HotDataCatalogProvider::new(
125+
let catalog_provider = Arc::new(RivetCatalogProvider::new(
127126
conn.id,
128127
conn.name.clone(),
129128
Arc::new(source),
@@ -169,7 +168,7 @@ impl HotDataEngine {
169168
}
170169

171170
// Register with DataFusion
172-
let catalog_provider = Arc::new(HotDataCatalogProvider::new(
171+
let catalog_provider = Arc::new(RivetCatalogProvider::new(
173172
conn_id,
174173
name.to_string(),
175174
Arc::new(source),
@@ -249,7 +248,7 @@ impl HotDataEngine {
249248
// This causes DataFusion to drop any open file handles to the cached files
250249
let source: Source = serde_json::from_str(&conn.config_json)?;
251250

252-
let catalog_provider = Arc::new(HotDataCatalogProvider::new(
251+
let catalog_provider = Arc::new(RivetCatalogProvider::new(
253252
conn.id,
254253
conn.name.clone(),
255254
Arc::new(source),
@@ -291,7 +290,7 @@ impl HotDataEngine {
291290
// This causes DataFusion to drop any open file handles to the cached files
292291
let source: Source = serde_json::from_str(&conn.config_json)?;
293292

294-
let catalog_provider = Arc::new(HotDataCatalogProvider::new(
293+
let catalog_provider = Arc::new(RivetCatalogProvider::new(
295294
conn.id,
296295
conn.name.clone(),
297296
Arc::new(source),
@@ -358,39 +357,39 @@ impl HotDataEngine {
358357
}
359358
}
360359

361-
impl Drop for HotDataEngine {
360+
impl Drop for RivetEngine {
362361
fn drop(&mut self) {
363362
// Ensure catalog connection is closed when engine is dropped
364363
let _ = block_on(self.catalog.close());
365364
}
366365
}
367366

368-
/// Builder for HotDataEngine
367+
/// Builder for RivetEngine
369368
///
370369
/// # Example
371370
///
372371
/// ```no_run
373-
/// use rivetdb::datafusion::HotDataEngine;
372+
/// use rivetdb::RivetEngine;
374373
/// use std::path::PathBuf;
375374
///
376-
/// let builder = HotDataEngine::builder()
375+
/// let builder = RivetEngine::builder()
377376
/// .metadata_dir(PathBuf::from("/tmp/rivet"));
378377
///
379378
/// // let engine = builder.build().unwrap();
380379
/// ```
381-
pub struct HotDataEngineBuilder {
380+
pub struct RivetEngineBuilder {
382381
metadata_dir: Option<PathBuf>,
383382
catalog: Option<Arc<dyn CatalogManager>>,
384383
storage: Option<Arc<dyn StorageManager>>,
385384
}
386385

387-
impl Default for HotDataEngineBuilder {
386+
impl Default for RivetEngineBuilder {
388387
fn default() -> Self {
389388
Self::new()
390389
}
391390
}
392391

393-
impl HotDataEngineBuilder {
392+
impl RivetEngineBuilder {
394393
pub fn new() -> Self {
395394
Self {
396395
metadata_dir: None,
@@ -414,7 +413,7 @@ impl HotDataEngineBuilder {
414413
self
415414
}
416415

417-
pub async fn build(self) -> Result<HotDataEngine> {
416+
pub async fn build(self) -> Result<RivetEngine> {
418417
let catalog = self
419418
.catalog
420419
.ok_or_else(|| anyhow::anyhow!("Catalog manager not set"))?;
@@ -430,7 +429,7 @@ impl HotDataEngineBuilder {
430429
// Register storage with DataFusion
431430
storage.register_with_datafusion(&df_ctx)?;
432431

433-
let mut engine = HotDataEngine {
432+
let mut engine = RivetEngine {
434433
catalog,
435434
df_ctx,
436435
storage,
@@ -443,9 +442,9 @@ impl HotDataEngineBuilder {
443442
}
444443
}
445444

446-
impl HotDataEngine {
447-
pub fn builder() -> HotDataEngineBuilder {
448-
HotDataEngineBuilder::new()
445+
impl RivetEngine {
446+
pub fn builder() -> RivetEngineBuilder {
447+
RivetEngineBuilder::new()
449448
}
450449

451450
/// Create a new engine from application configuration.
@@ -582,7 +581,7 @@ impl HotDataEngine {
582581
};
583582

584583
// Use builder to construct engine
585-
HotDataEngine::builder()
584+
RivetEngine::builder()
586585
.metadata_dir(metadata_dir)
587586
.catalog(catalog)
588587
.storage(storage)
@@ -615,7 +614,7 @@ mod tests {
615614
));
616615

617616
// Build engine using builder pattern
618-
let engine = HotDataEngine::builder()
617+
let engine = RivetEngine::builder()
619618
.metadata_dir(metadata_dir.clone())
620619
.catalog(catalog)
621620
.storage(storage)
@@ -640,7 +639,7 @@ mod tests {
640639
async fn test_builder_pattern_missing_fields() {
641640
// Test that builder fails when required fields are missing
642641
let temp_dir = TempDir::new().unwrap();
643-
let result = HotDataEngine::builder()
642+
let result = RivetEngine::builder()
644643
.metadata_dir(temp_dir.path().to_path_buf())
645644
.build()
646645
.await;
@@ -687,7 +686,7 @@ mod tests {
687686
},
688687
};
689688

690-
let engine = HotDataEngine::from_config(&config).await;
689+
let engine = RivetEngine::from_config(&config).await;
691690
assert!(
692691
engine.is_ok(),
693692
"from_config should create engine successfully"

src/http/app_server.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
1-
use crate::datafusion::HotDataEngine;
21
use crate::http::handlers::{
32
create_connection_handler, delete_connection_handler, get_connection_handler, health_handler,
43
list_connections_handler, purge_connection_cache_handler, purge_table_cache_handler,
54
query_handler, tables_handler,
65
};
6+
use crate::RivetEngine;
77
use axum::routing::{delete, get, post};
88
use axum::Router;
99
use std::sync::Arc;
1010

1111
pub struct AppServer {
1212
pub router: Router,
13-
pub engine: Arc<HotDataEngine>,
13+
pub engine: Arc<RivetEngine>,
1414
}
1515

1616
pub const PATH_QUERY: &str = "/query";
@@ -22,7 +22,7 @@ pub const PATH_CONNECTION_CACHE: &str = "/connections/{name}/cache";
2222
pub const PATH_TABLE_CACHE: &str = "/connections/{name}/tables/{schema}/{table}/cache";
2323

2424
impl AppServer {
25-
pub fn new(engine: HotDataEngine) -> Self {
25+
pub fn new(engine: RivetEngine) -> Self {
2626
let engine = Arc::new(engine);
2727
AppServer {
2828
router: Router::new()

src/http/handlers.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
use crate::datafusion::HotDataEngine;
21
use crate::http::error::ApiError;
32
use crate::http::models::{
43
ConnectionInfo, CreateConnectionRequest, CreateConnectionResponse, GetConnectionResponse,
54
ListConnectionsResponse, QueryRequest, QueryResponse, TableInfo, TablesResponse,
65
};
76
use crate::http::serialization::{encode_value_at, make_array_encoder};
87
use crate::source::Source;
8+
use crate::RivetEngine;
99
use axum::{
1010
extract::{Path, Query as QueryParams, State},
1111
http::StatusCode,
@@ -19,7 +19,7 @@ use tracing::error;
1919

2020
/// Handler for POST /query
2121
pub async fn query_handler(
22-
State(engine): State<Arc<HotDataEngine>>,
22+
State(engine): State<Arc<RivetEngine>>,
2323
Json(request): Json<QueryRequest>,
2424
) -> Result<Json<QueryResponse>, ApiError> {
2525
// Validate SQL is not empty
@@ -83,7 +83,7 @@ pub async fn query_handler(
8383

8484
/// Handler for GET /tables
8585
pub async fn tables_handler(
86-
State(engine): State<Arc<HotDataEngine>>,
86+
State(engine): State<Arc<RivetEngine>>,
8787
QueryParams(params): QueryParams<HashMap<String, String>>,
8888
) -> Result<Json<TablesResponse>, ApiError> {
8989
// Get optional connection filter
@@ -145,7 +145,7 @@ pub async fn health_handler() -> (StatusCode, Json<serde_json::Value>) {
145145

146146
/// Handler for POST /connections
147147
pub async fn create_connection_handler(
148-
State(engine): State<Arc<HotDataEngine>>,
148+
State(engine): State<Arc<RivetEngine>>,
149149
Json(request): Json<CreateConnectionRequest>,
150150
) -> Result<(StatusCode, Json<CreateConnectionResponse>), ApiError> {
151151
// Validate name is not empty
@@ -221,7 +221,7 @@ pub async fn create_connection_handler(
221221

222222
/// Handler for GET /connections
223223
pub async fn list_connections_handler(
224-
State(engine): State<Arc<HotDataEngine>>,
224+
State(engine): State<Arc<RivetEngine>>,
225225
) -> Result<Json<ListConnectionsResponse>, ApiError> {
226226
let connections = engine.list_connections().await?;
227227

@@ -241,7 +241,7 @@ pub async fn list_connections_handler(
241241

242242
/// Handler for GET /connections/{name}
243243
pub async fn get_connection_handler(
244-
State(engine): State<Arc<HotDataEngine>>,
244+
State(engine): State<Arc<RivetEngine>>,
245245
Path(name): Path<String>,
246246
) -> Result<Json<GetConnectionResponse>, ApiError> {
247247
// Get connection info
@@ -267,7 +267,7 @@ pub async fn get_connection_handler(
267267

268268
/// Handler for DELETE /connections/{name}
269269
pub async fn delete_connection_handler(
270-
State(engine): State<Arc<HotDataEngine>>,
270+
State(engine): State<Arc<RivetEngine>>,
271271
Path(name): Path<String>,
272272
) -> Result<StatusCode, ApiError> {
273273
engine.remove_connection(&name).await.map_err(|e| {
@@ -283,7 +283,7 @@ pub async fn delete_connection_handler(
283283

284284
/// Handler for DELETE /connections/{name}/cache
285285
pub async fn purge_connection_cache_handler(
286-
State(engine): State<Arc<HotDataEngine>>,
286+
State(engine): State<Arc<RivetEngine>>,
287287
Path(name): Path<String>,
288288
) -> Result<StatusCode, ApiError> {
289289
engine.purge_connection(&name).await.map_err(|e| {
@@ -307,7 +307,7 @@ pub struct TableCachePath {
307307

308308
/// Handler for DELETE /connections/{name}/tables/{schema}/{table}/cache
309309
pub async fn purge_table_cache_handler(
310-
State(engine): State<Arc<HotDataEngine>>,
310+
State(engine): State<Arc<RivetEngine>>,
311311
Path(params): Path<TableCachePath>,
312312
) -> Result<StatusCode, ApiError> {
313313
engine

0 commit comments

Comments
 (0)