Skip to content

Commit 04eef98

Browse files
authored
extract data fetch orchestration from table provider (#38)
1 parent c51f00b commit 04eef98

File tree

6 files changed

+119
-86
lines changed

6 files changed

+119
-86
lines changed

src/datafetch/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
mod error;
22
mod fetcher;
33
pub mod native;
4+
mod orchestrator;
45
mod types;
56

67
pub use error::DataFetchError;
78
pub use fetcher::DataFetcher;
89
pub use native::{NativeFetcher, StreamingParquetWriter};
10+
pub use orchestrator::FetchOrchestrator;
911
pub use types::{deserialize_arrow_schema, ColumnMetadata, TableMetadata};

src/datafetch/orchestrator.rs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
use anyhow::Result;
2+
use std::sync::Arc;
3+
4+
use super::native::StreamingParquetWriter;
5+
use super::DataFetcher;
6+
use crate::catalog::CatalogManager;
7+
use crate::source::Source;
8+
use crate::storage::StorageManager;
9+
10+
/// Orchestrates the full table fetch workflow: fetch from source → write to storage → update catalog.
11+
#[derive(Debug)]
12+
pub struct FetchOrchestrator {
13+
fetcher: Arc<dyn DataFetcher>,
14+
storage: Arc<dyn StorageManager>,
15+
catalog: Arc<dyn CatalogManager>,
16+
}
17+
18+
impl FetchOrchestrator {
19+
pub fn new(
20+
fetcher: Arc<dyn DataFetcher>,
21+
storage: Arc<dyn StorageManager>,
22+
catalog: Arc<dyn CatalogManager>,
23+
) -> Self {
24+
Self {
25+
fetcher,
26+
storage,
27+
catalog,
28+
}
29+
}
30+
31+
/// Fetch table data from source, write to cache storage, and update catalog metadata.
32+
///
33+
/// Returns the URL of the cached parquet file.
34+
pub async fn cache_table(
35+
&self,
36+
source: &Source,
37+
connection_id: i32,
38+
schema_name: &str,
39+
table_name: &str,
40+
) -> Result<String> {
41+
// Prepare cache write location
42+
let write_path = self
43+
.storage
44+
.prepare_cache_write(connection_id, schema_name, table_name);
45+
46+
// Create writer
47+
let mut writer = StreamingParquetWriter::new(write_path.clone());
48+
49+
// Fetch the table data into writer
50+
self.fetcher
51+
.fetch_table(
52+
source,
53+
None, // catalog
54+
schema_name,
55+
table_name,
56+
&mut writer,
57+
)
58+
.await
59+
.map_err(|e| anyhow::anyhow!("Failed to fetch table: {}", e))?;
60+
61+
// Close writer
62+
writer
63+
.close()
64+
.map_err(|e| anyhow::anyhow!("Failed to close writer: {}", e))?;
65+
66+
// Finalize cache write (uploads to S3 if needed, returns URL)
67+
let parquet_url = self
68+
.storage
69+
.finalize_cache_write(&write_path, connection_id, schema_name, table_name)
70+
.await
71+
.map_err(|e| anyhow::anyhow!("Failed to finalize cache write: {}", e))?;
72+
73+
// Update catalog with new path
74+
if let Ok(Some(info)) = self
75+
.catalog
76+
.get_table(connection_id, schema_name, table_name)
77+
.await
78+
{
79+
let _ = self.catalog.update_table_sync(info.id, &parquet_url).await;
80+
}
81+
82+
Ok(parquet_url)
83+
}
84+
}

src/datafusion/catalog_provider.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
use super::block_on;
22
use super::schema_provider::RivetSchemaProvider;
33
use crate::catalog::CatalogManager;
4-
use crate::datafetch::{DataFetcher, NativeFetcher};
4+
use crate::datafetch::FetchOrchestrator;
55
use crate::source::Source;
6-
use crate::storage::StorageManager;
76
use async_trait::async_trait;
87
use datafusion::catalog::{CatalogProvider, SchemaProvider};
98
use std::collections::HashMap;
@@ -17,9 +16,8 @@ pub struct RivetCatalogProvider {
1716
connection_name: String,
1817
source: Arc<Source>,
1918
catalog: Arc<dyn CatalogManager>,
19+
orchestrator: Arc<FetchOrchestrator>,
2020
schemas: Arc<RwLock<HashMap<String, Arc<dyn SchemaProvider>>>>,
21-
storage: Arc<dyn StorageManager>,
22-
fetcher: Arc<dyn DataFetcher>,
2321
}
2422

2523
impl RivetCatalogProvider {
@@ -28,16 +26,15 @@ impl RivetCatalogProvider {
2826
connection_name: String,
2927
source: Arc<Source>,
3028
catalog: Arc<dyn CatalogManager>,
31-
storage: Arc<dyn StorageManager>,
29+
orchestrator: Arc<FetchOrchestrator>,
3230
) -> Self {
3331
Self {
3432
connection_id,
3533
connection_name,
3634
source,
3735
catalog,
36+
orchestrator,
3837
schemas: Arc::new(RwLock::new(HashMap::new())),
39-
storage,
40-
fetcher: Arc::new(NativeFetcher::new()),
4138
}
4239
}
4340

@@ -66,8 +63,7 @@ impl RivetCatalogProvider {
6663
schema_name.to_string(),
6764
self.source.clone(),
6865
self.catalog.clone(),
69-
self.storage.clone(),
70-
self.fetcher.clone(),
66+
self.orchestrator.clone(),
7167
)) as Arc<dyn SchemaProvider>;
7268

7369
schemas.insert(schema_name.to_string(), schema_provider.clone());

src/datafusion/lazy_table_provider.rs

Lines changed: 7 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use anyhow::Result;
21
use async_trait::async_trait;
32
use datafusion::arrow::datatypes::SchemaRef;
43
use datafusion::catalog::Session;
@@ -10,9 +9,8 @@ use std::any::Any;
109
use std::sync::Arc;
1110

1211
use crate::catalog::CatalogManager;
13-
use crate::datafetch::DataFetcher;
12+
use crate::datafetch::FetchOrchestrator;
1413
use crate::source::Source;
15-
use crate::storage::StorageManager;
1614

1715
/// A lazy table provider that defers data fetching until scan() is called.
1816
///
@@ -22,33 +20,29 @@ use crate::storage::StorageManager;
2220
#[derive(Debug)]
2321
pub struct LazyTableProvider {
2422
schema: SchemaRef,
25-
fetcher: Arc<dyn DataFetcher>,
2623
source: Arc<Source>,
2724
catalog: Arc<dyn CatalogManager>,
28-
storage: Arc<dyn StorageManager>,
25+
orchestrator: Arc<FetchOrchestrator>,
2926
connection_id: i32,
3027
schema_name: String,
3128
table_name: String,
3229
}
3330

3431
impl LazyTableProvider {
35-
#[allow(clippy::too_many_arguments)]
3632
pub fn new(
3733
schema: SchemaRef,
38-
fetcher: Arc<dyn DataFetcher>,
3934
source: Arc<Source>,
4035
catalog: Arc<dyn CatalogManager>,
41-
storage: Arc<dyn StorageManager>,
36+
orchestrator: Arc<FetchOrchestrator>,
4237
connection_id: i32,
4338
schema_name: String,
4439
table_name: String,
4540
) -> Self {
4641
Self {
4742
schema,
48-
fetcher,
4943
source,
5044
catalog,
51-
storage,
45+
orchestrator,
5246
connection_id,
5347
schema_name,
5448
table_name,
@@ -96,61 +90,15 @@ impl LazyTableProvider {
9690

9791
/// Fetch the table data and update catalog
9892
async fn fetch_and_cache(&self) -> Result<String, DataFusionError> {
99-
use crate::datafetch::native::StreamingParquetWriter;
100-
101-
// Prepare cache write location
102-
let write_path = self.storage.prepare_cache_write(
103-
self.connection_id,
104-
&self.schema_name,
105-
&self.table_name,
106-
);
107-
108-
// Create writer
109-
let mut writer = StreamingParquetWriter::new(write_path.clone());
110-
111-
// Fetch the table data into writer using the Source directly
112-
self.fetcher
113-
.fetch_table(
93+
self.orchestrator
94+
.cache_table(
11495
&self.source,
115-
None, // catalog
116-
&self.schema_name,
117-
&self.table_name,
118-
&mut writer,
119-
)
120-
.await
121-
.map_err(|e| {
122-
DataFusionError::External(format!("Failed to fetch table: {}", e).into())
123-
})?;
124-
125-
// Close writer
126-
writer.close().map_err(|e| {
127-
DataFusionError::External(format!("Failed to close writer: {}", e).into())
128-
})?;
129-
130-
// Finalize cache write (uploads to S3 if needed, returns URL)
131-
let parquet_url = self
132-
.storage
133-
.finalize_cache_write(
134-
&write_path,
13596
self.connection_id,
13697
&self.schema_name,
13798
&self.table_name,
13899
)
139100
.await
140-
.map_err(|e| {
141-
DataFusionError::External(format!("Failed to finalize cache write: {}", e).into())
142-
})?;
143-
144-
// Update catalog with new path
145-
if let Ok(Some(info)) = self
146-
.catalog
147-
.get_table(self.connection_id, &self.schema_name, &self.table_name)
148-
.await
149-
{
150-
let _ = self.catalog.update_table_sync(info.id, &parquet_url).await;
151-
}
152-
153-
Ok(parquet_url)
101+
.map_err(|e| DataFusionError::External(format!("Failed to cache table: {}", e).into()))
154102
}
155103
}
156104

src/datafusion/schema_provider.rs

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,10 @@ use datafusion::datasource::TableProvider;
44
use std::sync::Arc;
55

66
use super::block_on;
7+
use super::lazy_table_provider::LazyTableProvider;
78
use crate::catalog::CatalogManager;
8-
use crate::datafetch::{deserialize_arrow_schema, DataFetcher};
9+
use crate::datafetch::{deserialize_arrow_schema, FetchOrchestrator};
910
use crate::source::Source;
10-
use crate::storage::StorageManager;
11-
12-
use super::lazy_table_provider::LazyTableProvider;
1311

1412
/// A schema provider that syncs tables on-demand from remote sources.
1513
/// Wraps MemorySchemaProvider for caching already-loaded tables.
@@ -21,31 +19,27 @@ pub struct RivetSchemaProvider {
2119
schema_name: String,
2220
source: Arc<Source>,
2321
catalog: Arc<dyn CatalogManager>,
22+
orchestrator: Arc<FetchOrchestrator>,
2423
inner: Arc<MemorySchemaProvider>,
25-
storage: Arc<dyn StorageManager>,
26-
fetcher: Arc<dyn DataFetcher>,
2724
}
2825

2926
impl RivetSchemaProvider {
30-
#[allow(clippy::too_many_arguments)]
3127
pub fn new(
3228
connection_id: i32,
3329
connection_name: String,
3430
schema_name: String,
3531
source: Arc<Source>,
3632
catalog: Arc<dyn CatalogManager>,
37-
storage: Arc<dyn StorageManager>,
38-
fetcher: Arc<dyn DataFetcher>,
33+
orchestrator: Arc<FetchOrchestrator>,
3934
) -> Self {
4035
Self {
4136
connection_id,
4237
connection_name,
4338
schema_name,
4439
source,
4540
catalog,
41+
orchestrator,
4642
inner: Arc::new(MemorySchemaProvider::new()),
47-
storage,
48-
fetcher,
4943
}
5044
}
5145
}
@@ -109,10 +103,9 @@ impl SchemaProvider for RivetSchemaProvider {
109103
// Create LazyTableProvider
110104
let provider = Arc::new(LazyTableProvider::new(
111105
schema,
112-
self.fetcher.clone(),
113106
self.source.clone(),
114107
self.catalog.clone(),
115-
self.storage.clone(),
108+
self.orchestrator.clone(),
116109
self.connection_id,
117110
self.schema_name.clone(),
118111
name.to_string(),

src/engine.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::catalog::{CatalogManager, ConnectionInfo, SqliteCatalogManager, TableInfo};
2-
use crate::datafetch::DataFetcher;
2+
use crate::datafetch::{DataFetcher, FetchOrchestrator, NativeFetcher};
33
use crate::datafusion::{block_on, RivetCatalogProvider};
44
use crate::source::Source;
55
use crate::storage::{FilesystemStorage, StorageManager};
@@ -23,6 +23,7 @@ pub struct RivetEngine {
2323
catalog: Arc<dyn CatalogManager>,
2424
df_ctx: SessionContext,
2525
storage: Arc<dyn StorageManager>,
26+
orchestrator: Arc<FetchOrchestrator>,
2627
}
2728

2829
impl RivetEngine {
@@ -200,7 +201,7 @@ impl RivetEngine {
200201
conn.name.clone(),
201202
Arc::new(source),
202203
self.catalog.clone(),
203-
self.storage.clone(),
204+
self.orchestrator.clone(),
204205
)) as Arc<dyn CatalogProvider>;
205206

206207
self.df_ctx.register_catalog(&conn.name, catalog_provider);
@@ -246,7 +247,7 @@ impl RivetEngine {
246247
name.to_string(),
247248
Arc::new(source),
248249
self.catalog.clone(),
249-
self.storage.clone(),
250+
self.orchestrator.clone(),
250251
)) as Arc<dyn CatalogProvider>;
251252

252253
self.df_ctx.register_catalog(name, catalog_provider);
@@ -326,7 +327,7 @@ impl RivetEngine {
326327
conn.name.clone(),
327328
Arc::new(source),
328329
self.catalog.clone(),
329-
self.storage.clone(),
330+
self.orchestrator.clone(),
330331
)) as Arc<dyn CatalogProvider>;
331332

332333
// register_catalog replaces existing catalog with same name
@@ -368,7 +369,7 @@ impl RivetEngine {
368369
conn.name.clone(),
369370
Arc::new(source),
370371
self.catalog.clone(),
371-
self.storage.clone(),
372+
self.orchestrator.clone(),
372373
)) as Arc<dyn CatalogProvider>;
373374

374375
// register_catalog replaces existing catalog with same name
@@ -574,10 +575,19 @@ impl RivetEngineBuilder {
574575
let df_ctx = SessionContext::new();
575576
storage.register_with_datafusion(&df_ctx)?;
576577

578+
// Step 6: Create fetch orchestrator
579+
let fetcher = Arc::new(NativeFetcher::new());
580+
let orchestrator = Arc::new(FetchOrchestrator::new(
581+
fetcher,
582+
storage.clone(),
583+
catalog.clone(),
584+
));
585+
577586
let mut engine = RivetEngine {
578587
catalog,
579588
df_ctx,
580589
storage,
590+
orchestrator,
581591
};
582592

583593
// Register all existing connections as DataFusion catalogs

0 commit comments

Comments
 (0)