Skip to content

Commit ac277f2

Browse files
authored
enhance crud support (#7)
* refactor(engine): change purge/remove methods to take &self * feat(http): add GetConnectionResponse model * feat(http): add GET /connections/{name} endpoint * feat(http): add DELETE /connections/{name} endpoint * feat(http): add DELETE /connections/{name}/cache endpoint * feat(http): add DELETE table cache endpoint * test: add new executor trait methods * test: implement executor methods for new endpoints * fix(http): correct Axum path parameter syntax * test: add integration tests for CRUD endpoints * chore: apply rustfmt formatting
1 parent 491fd36 commit ac277f2

File tree

5 files changed

+471
-13
lines changed

5 files changed

+471
-13
lines changed

src/datafusion/engine.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ impl HotDataEngine {
230230
}
231231

232232
/// Purge all cached data for a connection (clears parquet files and resets sync state).
233-
pub async fn purge_connection(&mut self, name: &str) -> Result<()> {
233+
pub async fn purge_connection(&self, name: &str) -> Result<()> {
234234
// Get connection info (validates it exists and gives us the ID)
235235
let conn = self
236236
.catalog
@@ -263,7 +263,7 @@ impl HotDataEngine {
263263

264264
/// Purge cached data for a single table (clears parquet file and resets sync state).
265265
pub async fn purge_table(
266-
&mut self,
266+
&self,
267267
connection_name: &str,
268268
schema_name: &str,
269269
table_name: &str,
@@ -303,7 +303,7 @@ impl HotDataEngine {
303303
}
304304

305305
/// Remove a connection entirely (removes from catalog and deletes all data).
306-
pub async fn remove_connection(&mut self, name: &str) -> Result<()> {
306+
pub async fn remove_connection(&self, name: &str) -> Result<()> {
307307
// Get connection info (validates it exists and gives us the ID)
308308
let conn = self
309309
.catalog

src/http/app_server.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
use crate::datafusion::HotDataEngine;
22
use crate::http::handlers::{
3-
create_connection_handler, health_handler, list_connections_handler, query_handler,
4-
tables_handler,
3+
create_connection_handler, delete_connection_handler, get_connection_handler, health_handler,
4+
list_connections_handler, purge_connection_cache_handler, purge_table_cache_handler,
5+
query_handler, tables_handler,
56
};
6-
use axum::routing::{get, post};
7+
use axum::routing::{delete, get, post};
78
use axum::Router;
89
use std::sync::Arc;
910

@@ -16,6 +17,9 @@ pub const PATH_QUERY: &str = "/query";
1617
pub const PATH_TABLES: &str = "/tables";
1718
pub const PATH_HEALTH: &str = "/health";
1819
pub const PATH_CONNECTIONS: &str = "/connections";
20+
pub const PATH_CONNECTION: &str = "/connections/{name}";
21+
pub const PATH_CONNECTION_CACHE: &str = "/connections/{name}/cache";
22+
pub const PATH_TABLE_CACHE: &str = "/connections/{name}/tables/{schema}/{table}/cache";
1923

2024
impl AppServer {
2125
pub fn new(engine: HotDataEngine) -> Self {
@@ -29,6 +33,15 @@ impl AppServer {
2933
PATH_CONNECTIONS,
3034
post(create_connection_handler).get(list_connections_handler),
3135
)
36+
.route(
37+
PATH_CONNECTION,
38+
get(get_connection_handler).delete(delete_connection_handler),
39+
)
40+
.route(
41+
PATH_CONNECTION_CACHE,
42+
delete(purge_connection_cache_handler),
43+
)
44+
.route(PATH_TABLE_CACHE, delete(purge_table_cache_handler))
3245
.with_state(engine.clone()),
3346
engine,
3447
}

src/http/handlers.rs

Lines changed: 93 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
use crate::datafusion::HotDataEngine;
22
use crate::http::error::ApiError;
33
use crate::http::models::{
4-
ConnectionInfo, CreateConnectionRequest, CreateConnectionResponse, ListConnectionsResponse,
5-
QueryRequest, QueryResponse, TableInfo, TablesResponse,
4+
ConnectionInfo, CreateConnectionRequest, CreateConnectionResponse, GetConnectionResponse,
5+
ListConnectionsResponse, QueryRequest, QueryResponse, TableInfo, TablesResponse,
66
};
77
use crate::http::serialization::{encode_value_at, make_array_encoder};
88
use crate::source::Source;
9-
use axum::{extract::Query as QueryParams, extract::State, http::StatusCode, Json};
9+
use axum::{
10+
extract::{Path, Query as QueryParams, State},
11+
http::StatusCode,
12+
Json,
13+
};
14+
use serde::Deserialize;
1015
use std::collections::HashMap;
1116
use std::sync::Arc;
1217
use std::time::Instant;
@@ -231,3 +236,88 @@ pub async fn list_connections_handler(
231236
connections: connection_infos,
232237
}))
233238
}
239+
240+
/// Handler for GET /connections/{name}
241+
pub async fn get_connection_handler(
242+
State(engine): State<Arc<HotDataEngine>>,
243+
Path(name): Path<String>,
244+
) -> Result<Json<GetConnectionResponse>, ApiError> {
245+
// Get connection info
246+
let conn = engine
247+
.catalog()
248+
.get_connection(&name)?
249+
.ok_or_else(|| ApiError::not_found(format!("Connection '{}' not found", name)))?;
250+
251+
// Get table counts
252+
let tables = engine.list_tables(Some(&name))?;
253+
let table_count = tables.len();
254+
let synced_table_count = tables.iter().filter(|t| t.parquet_path.is_some()).count();
255+
256+
Ok(Json(GetConnectionResponse {
257+
id: conn.id,
258+
name: conn.name,
259+
source_type: conn.source_type,
260+
table_count,
261+
synced_table_count,
262+
}))
263+
}
264+
265+
/// Handler for DELETE /connections/{name}
266+
pub async fn delete_connection_handler(
267+
State(engine): State<Arc<HotDataEngine>>,
268+
Path(name): Path<String>,
269+
) -> Result<StatusCode, ApiError> {
270+
engine.remove_connection(&name).await.map_err(|e| {
271+
if e.to_string().contains("not found") {
272+
ApiError::not_found(format!("Connection '{}' not found", name))
273+
} else {
274+
ApiError::internal_error(e.to_string())
275+
}
276+
})?;
277+
278+
Ok(StatusCode::NO_CONTENT)
279+
}
280+
281+
/// Handler for DELETE /connections/{name}/cache
282+
pub async fn purge_connection_cache_handler(
283+
State(engine): State<Arc<HotDataEngine>>,
284+
Path(name): Path<String>,
285+
) -> Result<StatusCode, ApiError> {
286+
engine.purge_connection(&name).await.map_err(|e| {
287+
if e.to_string().contains("not found") {
288+
ApiError::not_found(format!("Connection '{}' not found", name))
289+
} else {
290+
ApiError::internal_error(e.to_string())
291+
}
292+
})?;
293+
294+
Ok(StatusCode::NO_CONTENT)
295+
}
296+
297+
/// Path parameters for table cache operations
298+
#[derive(Deserialize)]
299+
pub struct TableCachePath {
300+
name: String,
301+
schema: String,
302+
table: String,
303+
}
304+
305+
/// Handler for DELETE /connections/{name}/tables/{schema}/{table}/cache
306+
pub async fn purge_table_cache_handler(
307+
State(engine): State<Arc<HotDataEngine>>,
308+
Path(params): Path<TableCachePath>,
309+
) -> Result<StatusCode, ApiError> {
310+
engine
311+
.purge_table(&params.name, &params.schema, &params.table)
312+
.await
313+
.map_err(|e| {
314+
let msg = e.to_string();
315+
if msg.contains("not found") {
316+
ApiError::not_found(msg)
317+
} else {
318+
ApiError::internal_error(msg)
319+
}
320+
})?;
321+
322+
Ok(StatusCode::NO_CONTENT)
323+
}

src/http/models.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,3 +60,13 @@ pub struct ConnectionInfo {
6060
pub struct ListConnectionsResponse {
6161
pub connections: Vec<ConnectionInfo>,
6262
}
63+
64+
/// Response body for GET /connections/{name}
65+
#[derive(Debug, Serialize)]
66+
pub struct GetConnectionResponse {
67+
pub id: i32,
68+
pub name: String,
69+
pub source_type: String,
70+
pub table_count: usize,
71+
pub synced_table_count: usize,
72+
}

0 commit comments

Comments
 (0)