Skip to content

Commit a7800ae

Browse files
goffrieConvex, Inc.
authored andcommitted
Put export implementation behind a trait object (#40094)
GitOrigin-RevId: f7dea429e43e6e21daf31b6dc112c0399db9d6f2
1 parent 91cb639 commit a7800ae

File tree

9 files changed

+79
-20
lines changed

9 files changed

+79
-20
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/application/src/exports/worker.rs

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use database::{
2323
SystemMetadataModel,
2424
};
2525
use exports::{
26-
export_inner,
26+
interface::ExportProvider,
2727
ExportComponents,
2828
};
2929
use futures::{
@@ -63,6 +63,7 @@ pub struct ExportWorker<RT: Runtime> {
6363
pub(super) database: Database<RT>,
6464
pub(super) storage: Arc<dyn Storage>,
6565
pub(super) file_storage: Arc<dyn Storage>,
66+
pub(super) export_provider: Arc<dyn ExportProvider<RT>>,
6667
pub(super) backoff: Backoff,
6768
pub(super) usage_tracking: UsageCounter,
6869
pub(super) instance_name: String,
@@ -75,6 +76,7 @@ impl<RT: Runtime> ExportWorker<RT> {
7576
database: Database<RT>,
7677
storage: Arc<dyn Storage>,
7778
file_storage: Arc<dyn Storage>,
79+
export_provider: Arc<dyn ExportProvider<RT>>,
7880
usage_tracking: UsageCounter,
7981
instance_name: String,
8082
) -> impl Future<Output = ()> + Send {
@@ -83,6 +85,7 @@ impl<RT: Runtime> ExportWorker<RT> {
8385
database,
8486
storage,
8587
file_storage,
88+
export_provider,
8689
backoff: Backoff::new(INITIAL_BACKOFF, MAX_BACKOFF),
8790
usage_tracking,
8891
instance_name,
@@ -179,22 +182,22 @@ impl<RT: Runtime> ExportWorker<RT> {
179182
let database_snapshot = self.database.latest_database_snapshot()?;
180183
let snapshot_ts = *database_snapshot.timestamp();
181184
tracing::info!(%snapshot_ts, "Export {id} beginning...");
185+
let components = ExportComponents {
186+
runtime: self.runtime.clone(),
187+
database: database_snapshot,
188+
storage: self.storage.clone(),
189+
file_storage: self.file_storage.clone(),
190+
instance_name: self.instance_name.clone(),
191+
};
182192
let (object_key, usage) = {
183193
let database_ = self.database.clone();
184-
let export_future = async {
185-
let database_ = self.database.clone();
186-
187-
export_inner(
188-
&ExportComponents {
189-
runtime: self.runtime.clone(),
190-
database: database_snapshot,
191-
storage: self.storage.clone(),
192-
file_storage: self.file_storage.clone(),
193-
instance_name: self.instance_name.clone(),
194-
},
195-
format,
196-
requestor,
197-
|msg| async {
194+
let export_future = self.export_provider.export(
195+
&components,
196+
format,
197+
requestor,
198+
Box::new(move |msg| {
199+
let database_ = database_.clone();
200+
async move {
198201
tracing::info!("Export {id} progress: {msg}");
199202
database_
200203
.execute_with_occ_retries(
@@ -221,12 +224,12 @@ impl<RT: Runtime> ExportWorker<RT> {
221224
)
222225
.await?;
223226
Ok(())
224-
},
225-
)
226-
.await
227-
};
228-
tokio::pin!(export_future);
227+
}
228+
.boxed()
229+
}),
230+
);
229231

232+
let database_ = self.database.clone();
230233
// In parallel, monitor the export document to check for cancellation
231234
let monitor_export = async move {
232235
loop {

crates/application/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::{
2121
},
2222
};
2323

24+
use ::exports::interface::ExportProvider;
2425
use ::log_streaming::{
2526
LogManager,
2627
LogManagerClient,
@@ -687,6 +688,7 @@ impl<RT: Runtime> Application<RT> {
687688
fetch_client: Arc<dyn FetchClient>,
688689
local_log_sink: Option<String>,
689690
lease_lost_shutdown: ShutdownSignal,
691+
export_provider: Arc<dyn ExportProvider<RT>>,
690692
) -> anyhow::Result<Self> {
691693
let module_cache =
692694
ModuleCache::new(runtime.clone(), application_storage.modules_storage.clone()).await;
@@ -811,6 +813,7 @@ impl<RT: Runtime> Application<RT> {
811813
database.clone(),
812814
application_storage.exports_storage.clone(),
813815
application_storage.files_storage.clone(),
816+
export_provider,
814817
database.usage_counter(),
815818
instance_name.clone(),
816819
);

crates/application/src/test_helpers.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ use events::usage::{
5151
NoOpUsageEventLogger,
5252
UsageEventLogger,
5353
};
54+
use exports::interface::InProcessExportProvider;
5455
use file_storage::{
5556
FileStorage,
5657
TransactionalFileStorage,
@@ -279,6 +280,7 @@ impl<RT: Runtime> ApplicationTestExt<RT> for Application<RT> {
279280
fetch_client,
280281
None, // local_log_sink
281282
ShutdownSignal::panic(),
283+
Arc::new(InProcessExportProvider),
282284
)
283285
.await?;
284286

crates/exports/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ testing = [
2323

2424
[dependencies]
2525
anyhow = { workspace = true }
26+
async-trait = { workspace = true }
2627
async_zip = { workspace = true }
2728
bytes = { workspace = true }
2829
common = { workspace = true }

crates/exports/src/interface.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
use common::{
2+
runtime::Runtime,
3+
types::ObjectKey,
4+
};
5+
use futures::future::BoxFuture;
6+
use model::exports::types::{
7+
ExportFormat,
8+
ExportRequestor,
9+
};
10+
use usage_tracking::FunctionUsageTracker;
11+
12+
use crate::ExportComponents;
13+
14+
#[async_trait::async_trait]
15+
pub trait ExportProvider<RT: Runtime>: Send + Sync {
16+
async fn export(
17+
&self,
18+
components: &ExportComponents<RT>,
19+
format: ExportFormat,
20+
requestor: ExportRequestor,
21+
update_progress: Box<
22+
dyn Fn(String) -> BoxFuture<'static, anyhow::Result<()>> + Send + Sync,
23+
>,
24+
) -> anyhow::Result<(ObjectKey, FunctionUsageTracker)>;
25+
}
26+
27+
pub struct InProcessExportProvider;
28+
29+
#[async_trait::async_trait]
30+
impl<RT: Runtime> ExportProvider<RT> for InProcessExportProvider {
31+
async fn export(
32+
&self,
33+
components: &ExportComponents<RT>,
34+
format: ExportFormat,
35+
requestor: ExportRequestor,
36+
update_progress: Box<
37+
dyn Fn(String) -> BoxFuture<'static, anyhow::Result<()>> + Send + Sync,
38+
>,
39+
) -> anyhow::Result<(ObjectKey, FunctionUsageTracker)> {
40+
crate::export_inner(components, format, requestor, &*update_progress).await
41+
}
42+
}

crates/exports/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ use self::{
7979
};
8080

8181
mod export_storage;
82+
pub mod interface;
8283
mod metrics;
8384
#[cfg(test)]
8485
mod tests;

crates/local_backend/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ testing = [
3434
"node_executor/testing",
3535
"sync/testing",
3636
"db_connection/testing",
37+
"exports/testing",
3738
]
3839

3940
[dependencies]
@@ -56,6 +57,7 @@ db_connection = { workspace = true }
5657
either = { workspace = true }
5758
errors = { workspace = true }
5859
events = { workspace = true }
60+
exports = { workspace = true }
5961
fastrace = { workspace = true }
6062
file_storage = { workspace = true }
6163
function_runner = { workspace = true }
@@ -117,6 +119,7 @@ database = { workspace = true, features = ["testing"] }
117119
db_connection = { workspace = true, features = ["testing"] }
118120
errors = { workspace = true, features = ["testing"] }
119121
events = { workspace = true, features = ["testing"] }
122+
exports = { workspace = true, features = ["testing"] }
120123
function_runner = { workspace = true, features = ["testing"] }
121124
isolate = { workspace = true, features = ["testing"] }
122125
jsonschema = { workspace = true }

crates/local_backend/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use common::{
4646
use config::LocalConfig;
4747
use database::Database;
4848
use events::usage::NoOpUsageEventLogger;
49+
use exports::interface::InProcessExportProvider;
4950
use file_storage::{
5051
FileStorage,
5152
TransactionalFileStorage,
@@ -241,6 +242,7 @@ pub async fn make_app(
241242
fetch_client,
242243
config.local_log_sink.clone(),
243244
preempt_tx.clone(),
245+
Arc::new(InProcessExportProvider),
244246
)
245247
.await?;
246248

0 commit comments

Comments
 (0)