Skip to content

Commit 87e0a4b

Browse files
authored
Refactor catalog manager to be fully async (#34)
* refactor catalog manager to be async * remove unnecessary multi_thread flavor * move `block_on` to datafusion module
1 parent e6e4fbf commit 87e0a4b

14 files changed

+402
-268
lines changed

src/bin/server.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ async fn main() -> Result<()> {
3434
tracing::info!("Configuration '{}' loaded successfully", &cli.config);
3535

3636
// Initialize engine from config
37-
let engine = HotDataEngine::from_config(&config)?;
37+
let engine = HotDataEngine::from_config(&config).await?;
3838

3939
tracing::info!("Engine initialized");
4040

@@ -55,7 +55,7 @@ async fn main() -> Result<()> {
5555
server.await?;
5656

5757
// Explicitly shutdown engine to close catalog connection
58-
if let Err(e) = engine.shutdown() {
58+
if let Err(e) = engine.shutdown().await {
5959
tracing::error!("Error during engine shutdown: {}", e);
6060
}
6161

src/catalog/manager.rs

Lines changed: 21 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use anyhow::Result;
2+
use async_trait::async_trait;
23
use serde::{Deserialize, Serialize};
34
use sqlx::FromRow;
45
use std::fmt::Debug;
5-
use tokio::task::block_in_place;
66

77
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
88
pub struct ConnectionInfo {
@@ -24,61 +24,54 @@ pub struct TableInfo {
2424
pub arrow_schema_json: Option<String>,
2525
}
2626

27-
/// Blocking helper for async operations.
28-
///
29-
/// Uses `block_in_place` to avoid blocking the tokio runtime when calling
30-
/// async code from a sync context.
31-
pub fn block_on<F, T>(f: F) -> Result<T>
32-
where
33-
F: std::future::Future<Output = Result<T>>,
34-
{
35-
block_in_place(|| tokio::runtime::Handle::current().block_on(f))
36-
}
37-
38-
/// Synchronous interface for catalog operations.
39-
///
40-
/// This trait uses blocking methods for compatibility with `dyn` trait objects.
41-
/// Implementations should use [`block_on`] to wrap async database operations.
27+
/// Async interface for catalog operations.
28+
#[async_trait]
4229
pub trait CatalogManager: Debug + Send + Sync {
4330
/// Close the catalog connection. This is idempotent and can be called multiple times.
44-
fn close(&self) -> Result<()> {
31+
async fn close(&self) -> Result<()> {
4532
// Default implementation does nothing - sqlx pools handle cleanup automatically
4633
Ok(())
4734
}
4835

4936
/// Apply any pending schema migrations. Should be idempotent.
50-
fn run_migrations(&self) -> Result<()>;
37+
async fn run_migrations(&self) -> Result<()>;
5138

52-
fn list_connections(&self) -> Result<Vec<ConnectionInfo>>;
53-
fn add_connection(&self, name: &str, source_type: &str, config_json: &str) -> Result<i32>;
54-
fn get_connection(&self, name: &str) -> Result<Option<ConnectionInfo>>;
55-
fn add_table(
39+
async fn list_connections(&self) -> Result<Vec<ConnectionInfo>>;
40+
async fn add_connection(&self, name: &str, source_type: &str, config_json: &str)
41+
-> Result<i32>;
42+
async fn get_connection(&self, name: &str) -> Result<Option<ConnectionInfo>>;
43+
async fn add_table(
5644
&self,
5745
connection_id: i32,
5846
schema_name: &str,
5947
table_name: &str,
6048
arrow_schema_json: &str,
6149
) -> Result<i32>;
62-
fn list_tables(&self, connection_id: Option<i32>) -> Result<Vec<TableInfo>>;
63-
fn get_table(
50+
async fn list_tables(&self, connection_id: Option<i32>) -> Result<Vec<TableInfo>>;
51+
async fn get_table(
6452
&self,
6553
connection_id: i32,
6654
schema_name: &str,
6755
table_name: &str,
6856
) -> Result<Option<TableInfo>>;
69-
fn update_table_sync(&self, table_id: i32, parquet_path: &str, state_path: &str) -> Result<()>;
57+
async fn update_table_sync(
58+
&self,
59+
table_id: i32,
60+
parquet_path: &str,
61+
state_path: &str,
62+
) -> Result<()>;
7063

7164
/// Clear table cache metadata (set paths to NULL) without deleting files.
72-
fn clear_table_cache_metadata(
65+
async fn clear_table_cache_metadata(
7366
&self,
7467
connection_id: i32,
7568
schema_name: &str,
7669
table_name: &str,
7770
) -> Result<TableInfo>;
7871

7972
/// Clear cache metadata for all tables in a connection (set paths to NULL).
80-
fn clear_connection_cache_metadata(&self, name: &str) -> Result<()>;
73+
async fn clear_connection_cache_metadata(&self, name: &str) -> Result<()>;
8174

8275
/// Delete connection and all associated table rows from metadata.
83-
fn delete_connection(&self, name: &str) -> Result<()>;
76+
async fn delete_connection(&self, name: &str) -> Result<()>;
8477
}

src/catalog/postgres_manager.rs

Lines changed: 46 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use crate::catalog::backend::CatalogBackend;
2-
use crate::catalog::manager::{block_on, CatalogManager, ConnectionInfo, TableInfo};
2+
use crate::catalog::manager::{CatalogManager, ConnectionInfo, TableInfo};
33
use crate::catalog::migrations::{run_migrations, CatalogMigrations};
44
use anyhow::Result;
5+
use async_trait::async_trait;
56
use sqlx::postgres::PgPoolOptions;
67
use sqlx::{PgPool, Postgres};
78
use std::fmt::{self, Debug, Formatter};
@@ -11,11 +12,7 @@ pub struct PostgresCatalogManager {
1112
}
1213

1314
impl PostgresCatalogManager {
14-
pub fn new(connection_string: &str) -> Result<Self> {
15-
block_on(Self::new_async(connection_string))
16-
}
17-
18-
async fn new_async(connection_string: &str) -> Result<Self> {
15+
pub async fn new(connection_string: &str) -> Result<Self> {
1916
let pool = PgPoolOptions::new()
2017
.max_connections(5)
2118
.connect(connection_string)
@@ -97,79 +94,86 @@ impl CatalogMigrations for PostgresMigrationBackend {
9794
}
9895
}
9996

97+
#[async_trait]
10098
impl CatalogManager for PostgresCatalogManager {
101-
fn run_migrations(&self) -> Result<()> {
102-
block_on(run_migrations::<PostgresMigrationBackend>(
103-
self.backend.pool(),
104-
))
99+
async fn run_migrations(&self) -> Result<()> {
100+
run_migrations::<PostgresMigrationBackend>(self.backend.pool()).await
105101
}
106102

107-
fn list_connections(&self) -> Result<Vec<ConnectionInfo>> {
108-
block_on(self.backend.list_connections())
103+
async fn list_connections(&self) -> Result<Vec<ConnectionInfo>> {
104+
self.backend.list_connections().await
109105
}
110106

111-
fn add_connection(&self, name: &str, source_type: &str, config_json: &str) -> Result<i32> {
112-
block_on(self.backend.add_connection(name, source_type, config_json))
107+
async fn add_connection(
108+
&self,
109+
name: &str,
110+
source_type: &str,
111+
config_json: &str,
112+
) -> Result<i32> {
113+
self.backend
114+
.add_connection(name, source_type, config_json)
115+
.await
113116
}
114117

115-
fn get_connection(&self, name: &str) -> Result<Option<ConnectionInfo>> {
116-
block_on(self.backend.get_connection(name))
118+
async fn get_connection(&self, name: &str) -> Result<Option<ConnectionInfo>> {
119+
self.backend.get_connection(name).await
117120
}
118121

119-
fn add_table(
122+
async fn add_table(
120123
&self,
121124
connection_id: i32,
122125
schema_name: &str,
123126
table_name: &str,
124127
arrow_schema_json: &str,
125128
) -> Result<i32> {
126-
block_on(
127-
self.backend
128-
.add_table(connection_id, schema_name, table_name, arrow_schema_json),
129-
)
129+
self.backend
130+
.add_table(connection_id, schema_name, table_name, arrow_schema_json)
131+
.await
130132
}
131133

132-
fn list_tables(&self, connection_id: Option<i32>) -> Result<Vec<TableInfo>> {
133-
block_on(self.backend.list_tables(connection_id))
134+
async fn list_tables(&self, connection_id: Option<i32>) -> Result<Vec<TableInfo>> {
135+
self.backend.list_tables(connection_id).await
134136
}
135137

136-
fn get_table(
138+
async fn get_table(
137139
&self,
138140
connection_id: i32,
139141
schema_name: &str,
140142
table_name: &str,
141143
) -> Result<Option<TableInfo>> {
142-
block_on(
143-
self.backend
144-
.get_table(connection_id, schema_name, table_name),
145-
)
144+
self.backend
145+
.get_table(connection_id, schema_name, table_name)
146+
.await
146147
}
147148

148-
fn update_table_sync(&self, table_id: i32, parquet_path: &str, state_path: &str) -> Result<()> {
149-
block_on(
150-
self.backend
151-
.update_table_sync(table_id, parquet_path, state_path),
152-
)
149+
async fn update_table_sync(
150+
&self,
151+
table_id: i32,
152+
parquet_path: &str,
153+
state_path: &str,
154+
) -> Result<()> {
155+
self.backend
156+
.update_table_sync(table_id, parquet_path, state_path)
157+
.await
153158
}
154159

155-
fn clear_table_cache_metadata(
160+
async fn clear_table_cache_metadata(
156161
&self,
157162
connection_id: i32,
158163
schema_name: &str,
159164
table_name: &str,
160165
) -> Result<TableInfo> {
161-
block_on(
162-
self.backend
163-
.clear_table_cache_metadata(connection_id, schema_name, table_name),
164-
)
166+
self.backend
167+
.clear_table_cache_metadata(connection_id, schema_name, table_name)
168+
.await
165169
}
166170

167-
fn clear_connection_cache_metadata(&self, name: &str) -> Result<()> {
168-
block_on(self.backend.clear_connection_cache_metadata(name))
171+
async fn clear_connection_cache_metadata(&self, name: &str) -> Result<()> {
172+
self.backend.clear_connection_cache_metadata(name).await
169173
}
170174

171-
fn delete_connection(&self, name: &str) -> Result<()> {
172-
block_on(self.backend.delete_connection(name))
175+
async fn delete_connection(&self, name: &str) -> Result<()> {
176+
self.backend.delete_connection(name).await
173177
}
174178
}
175179

0 commit comments

Comments
 (0)