Skip to content
Merged
6 changes: 3 additions & 3 deletions src/datafusion/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ impl HotDataEngine {
}

/// Purge all cached data for a connection (clears parquet files and resets sync state).
pub async fn purge_connection(&mut self, name: &str) -> Result<()> {
pub async fn purge_connection(&self, name: &str) -> Result<()> {
// Get connection info (validates it exists and gives us the ID)
let conn = self
.catalog
Expand Down Expand Up @@ -263,7 +263,7 @@ impl HotDataEngine {

/// Purge cached data for a single table (clears parquet file and resets sync state).
pub async fn purge_table(
&mut self,
&self,
connection_name: &str,
schema_name: &str,
table_name: &str,
Expand Down Expand Up @@ -303,7 +303,7 @@ impl HotDataEngine {
}

/// Remove a connection entirely (removes from catalog and deletes all data).
pub async fn remove_connection(&mut self, name: &str) -> Result<()> {
pub async fn remove_connection(&self, name: &str) -> Result<()> {
// Get connection info (validates it exists and gives us the ID)
let conn = self
.catalog
Expand Down
19 changes: 16 additions & 3 deletions src/http/app_server.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::datafusion::HotDataEngine;
use crate::http::handlers::{
create_connection_handler, health_handler, list_connections_handler, query_handler,
tables_handler,
create_connection_handler, delete_connection_handler, get_connection_handler, health_handler,
list_connections_handler, purge_connection_cache_handler, purge_table_cache_handler,
query_handler, tables_handler,
};
use axum::routing::{get, post};
use axum::routing::{delete, get, post};
use axum::Router;
use std::sync::Arc;

Expand All @@ -16,6 +17,9 @@ pub const PATH_QUERY: &str = "/query";
pub const PATH_TABLES: &str = "/tables";
pub const PATH_HEALTH: &str = "/health";
pub const PATH_CONNECTIONS: &str = "/connections";
pub const PATH_CONNECTION: &str = "/connections/{name}";
pub const PATH_CONNECTION_CACHE: &str = "/connections/{name}/cache";
pub const PATH_TABLE_CACHE: &str = "/connections/{name}/tables/{schema}/{table}/cache";

impl AppServer {
pub fn new(engine: HotDataEngine) -> Self {
Expand All @@ -29,6 +33,15 @@ impl AppServer {
PATH_CONNECTIONS,
post(create_connection_handler).get(list_connections_handler),
)
.route(
PATH_CONNECTION,
get(get_connection_handler).delete(delete_connection_handler),
)
.route(
PATH_CONNECTION_CACHE,
delete(purge_connection_cache_handler),
)
.route(PATH_TABLE_CACHE, delete(purge_table_cache_handler))
.with_state(engine.clone()),
engine,
}
Expand Down
96 changes: 93 additions & 3 deletions src/http/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
use crate::datafusion::HotDataEngine;
use crate::http::error::ApiError;
use crate::http::models::{
ConnectionInfo, CreateConnectionRequest, CreateConnectionResponse, ListConnectionsResponse,
QueryRequest, QueryResponse, TableInfo, TablesResponse,
ConnectionInfo, CreateConnectionRequest, CreateConnectionResponse, GetConnectionResponse,
ListConnectionsResponse, QueryRequest, QueryResponse, TableInfo, TablesResponse,
};
use crate::http::serialization::{encode_value_at, make_array_encoder};
use crate::source::Source;
use axum::{extract::Query as QueryParams, extract::State, http::StatusCode, Json};
use axum::{
extract::{Path, Query as QueryParams, State},
http::StatusCode,
Json,
};
use serde::Deserialize;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
Expand Down Expand Up @@ -231,3 +236,88 @@ pub async fn list_connections_handler(
connections: connection_infos,
}))
}

/// Handler for GET /connections/{name}
pub async fn get_connection_handler(
State(engine): State<Arc<HotDataEngine>>,
Path(name): Path<String>,
) -> Result<Json<GetConnectionResponse>, ApiError> {
// Get connection info
let conn = engine
.catalog()
.get_connection(&name)?
.ok_or_else(|| ApiError::not_found(format!("Connection '{}' not found", name)))?;

// Get table counts
let tables = engine.list_tables(Some(&name))?;
let table_count = tables.len();
let synced_table_count = tables.iter().filter(|t| t.parquet_path.is_some()).count();

Ok(Json(GetConnectionResponse {
id: conn.id,
name: conn.name,
source_type: conn.source_type,
table_count,
synced_table_count,
}))
}

/// Handler for DELETE /connections/{name}
pub async fn delete_connection_handler(
State(engine): State<Arc<HotDataEngine>>,
Path(name): Path<String>,
) -> Result<StatusCode, ApiError> {
engine.remove_connection(&name).await.map_err(|e| {
if e.to_string().contains("not found") {
ApiError::not_found(format!("Connection '{}' not found", name))
} else {
ApiError::internal_error(e.to_string())
}
})?;

Ok(StatusCode::NO_CONTENT)
}

/// Handler for DELETE /connections/{name}/cache
pub async fn purge_connection_cache_handler(
State(engine): State<Arc<HotDataEngine>>,
Path(name): Path<String>,
) -> Result<StatusCode, ApiError> {
engine.purge_connection(&name).await.map_err(|e| {
if e.to_string().contains("not found") {
ApiError::not_found(format!("Connection '{}' not found", name))
} else {
ApiError::internal_error(e.to_string())
}
})?;

Ok(StatusCode::NO_CONTENT)
}

/// Path parameters for table cache operations
#[derive(Deserialize)]
pub struct TableCachePath {
name: String,
schema: String,
table: String,
}

/// Handler for DELETE /connections/{name}/tables/{schema}/{table}/cache
pub async fn purge_table_cache_handler(
State(engine): State<Arc<HotDataEngine>>,
Path(params): Path<TableCachePath>,
) -> Result<StatusCode, ApiError> {
engine
.purge_table(&params.name, &params.schema, &params.table)
.await
.map_err(|e| {
let msg = e.to_string();
if msg.contains("not found") {
ApiError::not_found(msg)
} else {
ApiError::internal_error(msg)
}
})?;

Ok(StatusCode::NO_CONTENT)
}
10 changes: 10 additions & 0 deletions src/http/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,13 @@ pub struct ConnectionInfo {
pub struct ListConnectionsResponse {
pub connections: Vec<ConnectionInfo>,
}

/// Response body for GET /connections/{name}
#[derive(Debug, Serialize)]
pub struct GetConnectionResponse {
pub id: i32,
pub name: String,
pub source_type: String,
pub table_count: usize,
pub synced_table_count: usize,
}
Loading
Loading