Skip to content

Commit c51133b

Browse files
committed
refactor catalog manager to be async
1 parent e6e4fbf commit c51133b

14 files changed

+388
-256
lines changed

src/bin/server.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ async fn main() -> Result<()> {
3434
tracing::info!("Configuration '{}' loaded successfully", &cli.config);
3535

3636
// Initialize engine from config
37-
let engine = HotDataEngine::from_config(&config)?;
37+
let engine = HotDataEngine::from_config(&config).await?;
3838

3939
tracing::info!("Engine initialized");
4040

@@ -55,7 +55,7 @@ async fn main() -> Result<()> {
5555
server.await?;
5656

5757
// Explicitly shutdown engine to close catalog connection
58-
if let Err(e) = engine.shutdown() {
58+
if let Err(e) = engine.shutdown().await {
5959
tracing::error!("Error during engine shutdown: {}", e);
6060
}
6161

src/catalog/manager.rs

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,22 @@
11
use anyhow::Result;
2+
use async_trait::async_trait;
23
use serde::{Deserialize, Serialize};
34
use sqlx::FromRow;
45
use std::fmt::Debug;
56
use tokio::task::block_in_place;
67

8+
/// Blocking helper for async operations.
9+
///
10+
/// Uses `block_in_place` to avoid blocking the tokio runtime when calling
11+
/// async code from a sync context. This is needed when async catalog methods
12+
/// must be called from sync trait implementations (e.g., DataFusion's CatalogProvider).
13+
pub fn block_on<F, T>(f: F) -> T
14+
where
15+
F: std::future::Future<Output = T>,
16+
{
17+
block_in_place(|| tokio::runtime::Handle::current().block_on(f))
18+
}
19+
720
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
821
pub struct ConnectionInfo {
922
pub id: i32,
@@ -24,61 +37,54 @@ pub struct TableInfo {
2437
pub arrow_schema_json: Option<String>,
2538
}
2639

27-
/// Blocking helper for async operations.
28-
///
29-
/// Uses `block_in_place` to avoid blocking the tokio runtime when calling
30-
/// async code from a sync context.
31-
pub fn block_on<F, T>(f: F) -> Result<T>
32-
where
33-
F: std::future::Future<Output = Result<T>>,
34-
{
35-
block_in_place(|| tokio::runtime::Handle::current().block_on(f))
36-
}
37-
38-
/// Synchronous interface for catalog operations.
39-
///
40-
/// This trait uses blocking methods for compatibility with `dyn` trait objects.
41-
/// Implementations should use [`block_on`] to wrap async database operations.
40+
/// Async interface for catalog operations.
41+
#[async_trait]
4242
pub trait CatalogManager: Debug + Send + Sync {
4343
/// Close the catalog connection. This is idempotent and can be called multiple times.
44-
fn close(&self) -> Result<()> {
44+
async fn close(&self) -> Result<()> {
4545
// Default implementation does nothing - sqlx pools handle cleanup automatically
4646
Ok(())
4747
}
4848

4949
/// Apply any pending schema migrations. Should be idempotent.
50-
fn run_migrations(&self) -> Result<()>;
50+
async fn run_migrations(&self) -> Result<()>;
5151

52-
fn list_connections(&self) -> Result<Vec<ConnectionInfo>>;
53-
fn add_connection(&self, name: &str, source_type: &str, config_json: &str) -> Result<i32>;
54-
fn get_connection(&self, name: &str) -> Result<Option<ConnectionInfo>>;
55-
fn add_table(
52+
async fn list_connections(&self) -> Result<Vec<ConnectionInfo>>;
53+
async fn add_connection(&self, name: &str, source_type: &str, config_json: &str)
54+
-> Result<i32>;
55+
async fn get_connection(&self, name: &str) -> Result<Option<ConnectionInfo>>;
56+
async fn add_table(
5657
&self,
5758
connection_id: i32,
5859
schema_name: &str,
5960
table_name: &str,
6061
arrow_schema_json: &str,
6162
) -> Result<i32>;
62-
fn list_tables(&self, connection_id: Option<i32>) -> Result<Vec<TableInfo>>;
63-
fn get_table(
63+
async fn list_tables(&self, connection_id: Option<i32>) -> Result<Vec<TableInfo>>;
64+
async fn get_table(
6465
&self,
6566
connection_id: i32,
6667
schema_name: &str,
6768
table_name: &str,
6869
) -> Result<Option<TableInfo>>;
69-
fn update_table_sync(&self, table_id: i32, parquet_path: &str, state_path: &str) -> Result<()>;
70+
async fn update_table_sync(
71+
&self,
72+
table_id: i32,
73+
parquet_path: &str,
74+
state_path: &str,
75+
) -> Result<()>;
7076

7177
/// Clear table cache metadata (set paths to NULL) without deleting files.
72-
fn clear_table_cache_metadata(
78+
async fn clear_table_cache_metadata(
7379
&self,
7480
connection_id: i32,
7581
schema_name: &str,
7682
table_name: &str,
7783
) -> Result<TableInfo>;
7884

7985
/// Clear cache metadata for all tables in a connection (set paths to NULL).
80-
fn clear_connection_cache_metadata(&self, name: &str) -> Result<()>;
86+
async fn clear_connection_cache_metadata(&self, name: &str) -> Result<()>;
8187

8288
/// Delete connection and all associated table rows from metadata.
83-
fn delete_connection(&self, name: &str) -> Result<()>;
89+
async fn delete_connection(&self, name: &str) -> Result<()>;
8490
}

src/catalog/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,6 @@ mod sqlite_manager;
55

66
mod manager;
77

8-
pub use manager::{CatalogManager, ConnectionInfo, TableInfo};
8+
pub use manager::{block_on, CatalogManager, ConnectionInfo, TableInfo};
99
pub use postgres_manager::PostgresCatalogManager;
1010
pub use sqlite_manager::SqliteCatalogManager;

src/catalog/postgres_manager.rs

Lines changed: 46 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use crate::catalog::backend::CatalogBackend;
2-
use crate::catalog::manager::{block_on, CatalogManager, ConnectionInfo, TableInfo};
2+
use crate::catalog::manager::{CatalogManager, ConnectionInfo, TableInfo};
33
use crate::catalog::migrations::{run_migrations, CatalogMigrations};
44
use anyhow::Result;
5+
use async_trait::async_trait;
56
use sqlx::postgres::PgPoolOptions;
67
use sqlx::{PgPool, Postgres};
78
use std::fmt::{self, Debug, Formatter};
@@ -11,11 +12,7 @@ pub struct PostgresCatalogManager {
1112
}
1213

1314
impl PostgresCatalogManager {
14-
pub fn new(connection_string: &str) -> Result<Self> {
15-
block_on(Self::new_async(connection_string))
16-
}
17-
18-
async fn new_async(connection_string: &str) -> Result<Self> {
15+
pub async fn new(connection_string: &str) -> Result<Self> {
1916
let pool = PgPoolOptions::new()
2017
.max_connections(5)
2118
.connect(connection_string)
@@ -97,79 +94,86 @@ impl CatalogMigrations for PostgresMigrationBackend {
9794
}
9895
}
9996

97+
#[async_trait]
10098
impl CatalogManager for PostgresCatalogManager {
101-
fn run_migrations(&self) -> Result<()> {
102-
block_on(run_migrations::<PostgresMigrationBackend>(
103-
self.backend.pool(),
104-
))
99+
async fn run_migrations(&self) -> Result<()> {
100+
run_migrations::<PostgresMigrationBackend>(self.backend.pool()).await
105101
}
106102

107-
fn list_connections(&self) -> Result<Vec<ConnectionInfo>> {
108-
block_on(self.backend.list_connections())
103+
async fn list_connections(&self) -> Result<Vec<ConnectionInfo>> {
104+
self.backend.list_connections().await
109105
}
110106

111-
fn add_connection(&self, name: &str, source_type: &str, config_json: &str) -> Result<i32> {
112-
block_on(self.backend.add_connection(name, source_type, config_json))
107+
async fn add_connection(
108+
&self,
109+
name: &str,
110+
source_type: &str,
111+
config_json: &str,
112+
) -> Result<i32> {
113+
self.backend
114+
.add_connection(name, source_type, config_json)
115+
.await
113116
}
114117

115-
fn get_connection(&self, name: &str) -> Result<Option<ConnectionInfo>> {
116-
block_on(self.backend.get_connection(name))
118+
async fn get_connection(&self, name: &str) -> Result<Option<ConnectionInfo>> {
119+
self.backend.get_connection(name).await
117120
}
118121

119-
fn add_table(
122+
async fn add_table(
120123
&self,
121124
connection_id: i32,
122125
schema_name: &str,
123126
table_name: &str,
124127
arrow_schema_json: &str,
125128
) -> Result<i32> {
126-
block_on(
127-
self.backend
128-
.add_table(connection_id, schema_name, table_name, arrow_schema_json),
129-
)
129+
self.backend
130+
.add_table(connection_id, schema_name, table_name, arrow_schema_json)
131+
.await
130132
}
131133

132-
fn list_tables(&self, connection_id: Option<i32>) -> Result<Vec<TableInfo>> {
133-
block_on(self.backend.list_tables(connection_id))
134+
async fn list_tables(&self, connection_id: Option<i32>) -> Result<Vec<TableInfo>> {
135+
self.backend.list_tables(connection_id).await
134136
}
135137

136-
fn get_table(
138+
async fn get_table(
137139
&self,
138140
connection_id: i32,
139141
schema_name: &str,
140142
table_name: &str,
141143
) -> Result<Option<TableInfo>> {
142-
block_on(
143-
self.backend
144-
.get_table(connection_id, schema_name, table_name),
145-
)
144+
self.backend
145+
.get_table(connection_id, schema_name, table_name)
146+
.await
146147
}
147148

148-
fn update_table_sync(&self, table_id: i32, parquet_path: &str, state_path: &str) -> Result<()> {
149-
block_on(
150-
self.backend
151-
.update_table_sync(table_id, parquet_path, state_path),
152-
)
149+
async fn update_table_sync(
150+
&self,
151+
table_id: i32,
152+
parquet_path: &str,
153+
state_path: &str,
154+
) -> Result<()> {
155+
self.backend
156+
.update_table_sync(table_id, parquet_path, state_path)
157+
.await
153158
}
154159

155-
fn clear_table_cache_metadata(
160+
async fn clear_table_cache_metadata(
156161
&self,
157162
connection_id: i32,
158163
schema_name: &str,
159164
table_name: &str,
160165
) -> Result<TableInfo> {
161-
block_on(
162-
self.backend
163-
.clear_table_cache_metadata(connection_id, schema_name, table_name),
164-
)
166+
self.backend
167+
.clear_table_cache_metadata(connection_id, schema_name, table_name)
168+
.await
165169
}
166170

167-
fn clear_connection_cache_metadata(&self, name: &str) -> Result<()> {
168-
block_on(self.backend.clear_connection_cache_metadata(name))
171+
async fn clear_connection_cache_metadata(&self, name: &str) -> Result<()> {
172+
self.backend.clear_connection_cache_metadata(name).await
169173
}
170174

171-
fn delete_connection(&self, name: &str) -> Result<()> {
172-
block_on(self.backend.delete_connection(name))
175+
async fn delete_connection(&self, name: &str) -> Result<()> {
176+
self.backend.delete_connection(name).await
173177
}
174178
}
175179

0 commit comments

Comments
 (0)