Skip to content

Commit ab9f286

Browse files
authored
Minor refactor: switch to BoxFuture whenever applicable. (#223)
1 parent 19da5c0 commit ab9f286

File tree

7 files changed

+30
-36
lines changed

7 files changed

+30
-36
lines changed

src/builder/analyzed_flow.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{future::Future, pin::Pin, sync::Arc};
1+
use std::sync::Arc;
22

33
use super::{analyzer, plan};
44
use crate::{
@@ -9,16 +9,18 @@ use crate::{
99
setup::{self, ObjectSetupStatusCheck},
1010
};
1111
use anyhow::Result;
12-
use futures::{future::Shared, FutureExt};
12+
use futures::{
13+
future::{BoxFuture, Shared},
14+
FutureExt,
15+
};
1316

1417
pub struct AnalyzedFlow {
1518
pub flow_instance: spec::FlowInstanceSpec,
1619
pub data_schema: schema::DataSchema,
1720
pub desired_state: setup::FlowSetupState<setup::DesiredMode>,
1821
/// It's None if the flow is not up to date
19-
pub execution_plan: Option<
20-
Shared<Pin<Box<dyn Future<Output = Result<Arc<plan::ExecutionPlan>, SharedError>> + Send>>>,
21-
>,
22+
pub execution_plan:
23+
Option<Shared<BoxFuture<'static, Result<Arc<plan::ExecutionPlan>, SharedError>>>>,
2224
}
2325

2426
impl AnalyzedFlow {

src/builder/analyzer.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::collections::{BTreeMap, HashSet};
22
use std::sync::Mutex;
3-
use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc};
3+
use std::{collections::HashMap, future::Future, sync::Arc};
44

55
use super::plan::*;
66
use crate::execution::db_tracking_setup;
@@ -16,7 +16,7 @@ use crate::{
1616
utils::immutable::RefList,
1717
};
1818
use anyhow::{anyhow, bail, Context, Result};
19-
use futures::future::try_join3;
19+
use futures::future::{try_join3, BoxFuture};
2020
use futures::{future::try_join_all, FutureExt};
2121
use indexmap::IndexMap;
2222
use log::{trace, warn};
@@ -678,7 +678,7 @@ impl AnalyzerContext<'_> {
678678
scope: &mut ExecutionScope<'_>,
679679
reactive_op: &NamedSpec<ReactiveOpSpec>,
680680
parent_scopes: RefList<'_, &'_ ExecutionScope<'_>>,
681-
) -> Result<Pin<Box<dyn Future<Output = Result<AnalyzedReactiveOp>> + Send>>> {
681+
) -> Result<BoxFuture<'static, Result<AnalyzedReactiveOp>>> {
682682
let result_fut = match &reactive_op.spec {
683683
ReactiveOpSpec::Transform(op) => {
684684
let input_field_schemas =

src/ops/factory_bases.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::sync::Arc;
55

66
use anyhow::Result;
77
use axum::async_trait;
8+
use futures::future::BoxFuture;
89
use serde::de::DeserializeOwned;
910
use serde::Serialize;
1011

@@ -208,7 +209,7 @@ impl<T: SourceFactoryBase> SourceFactory for T {
208209
context: Arc<FlowInstanceContext>,
209210
) -> Result<(
210211
EnrichedValueType,
211-
ExecutorFuture<'static, Box<dyn SourceExecutor>>,
212+
BoxFuture<'static, Result<Box<dyn SourceExecutor>>>,
212213
)> {
213214
let spec: T::Spec = serde_json::from_value(spec)?;
214215
let output_schema = self.get_output_schema(&spec, &context)?;
@@ -259,7 +260,7 @@ impl<T: SimpleFunctionFactoryBase> SimpleFunctionFactory for T {
259260
context: Arc<FlowInstanceContext>,
260261
) -> Result<(
261262
EnrichedValueType,
262-
ExecutorFuture<'static, Box<dyn SimpleFunctionExecutor>>,
263+
BoxFuture<'static, Result<Box<dyn SimpleFunctionExecutor>>>,
263264
)> {
264265
let spec: T::Spec = serde_json::from_value(spec)?;
265266
let mut args_resolver = OpArgsResolver::new(&input_schema)?;
@@ -288,7 +289,7 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
288289
context: Arc<FlowInstanceContext>,
289290
) -> Result<(
290291
(Self::Key, Self::SetupState),
291-
ExecutorFuture<'static, (Arc<dyn ExportTargetExecutor>, Option<Arc<dyn QueryTarget>>)>,
292+
BoxFuture<'static, Result<(Arc<dyn ExportTargetExecutor>, Option<Arc<dyn QueryTarget>>)>>,
292293
)>;
293294

294295
fn check_setup_status(
@@ -388,7 +389,7 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
388389
context: Arc<FlowInstanceContext>,
389390
) -> Result<(
390391
(serde_json::Value, serde_json::Value),
391-
ExecutorFuture<'static, (Arc<dyn ExportTargetExecutor>, Option<Arc<dyn QueryTarget>>)>,
392+
BoxFuture<'static, Result<(Arc<dyn ExportTargetExecutor>, Option<Arc<dyn QueryTarget>>)>>,
392393
)> {
393394
let spec: T::Spec = serde_json::from_value(spec)?;
394395
let ((setup_key, setup_state), executors) = StorageFactoryBase::build(

src/ops/interface.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,14 @@ use crate::base::{
66
use crate::setup;
77
use anyhow::Result;
88
use async_trait::async_trait;
9+
use futures::future::BoxFuture;
910
use serde::Serialize;
10-
use std::{fmt::Debug, future::Future, pin::Pin, sync::Arc};
11+
use std::sync::Arc;
1112

1213
pub struct FlowInstanceContext {
1314
pub flow_instance_name: String,
1415
}
1516

16-
pub type ExecutorFuture<'a, E> = Pin<Box<dyn Future<Output = Result<E>> + Send + 'a>>;
17-
1817
#[async_trait]
1918
pub trait SourceExecutor: Send + Sync {
2019
/// Get the list of keys for the source.
@@ -31,7 +30,7 @@ pub trait SourceFactory {
3130
context: Arc<FlowInstanceContext>,
3231
) -> Result<(
3332
EnrichedValueType,
34-
ExecutorFuture<'static, Box<dyn SourceExecutor>>,
33+
BoxFuture<'static, Result<Box<dyn SourceExecutor>>>,
3534
)>;
3635
}
3736

@@ -59,7 +58,7 @@ pub trait SimpleFunctionFactory {
5958
context: Arc<FlowInstanceContext>,
6059
) -> Result<(
6160
EnrichedValueType,
62-
ExecutorFuture<'static, Box<dyn SimpleFunctionExecutor>>,
61+
BoxFuture<'static, Result<Box<dyn SimpleFunctionExecutor>>>,
6362
)>;
6463
}
6564

@@ -111,7 +110,7 @@ pub trait ExportTargetFactory {
111110
context: Arc<FlowInstanceContext>,
112111
) -> Result<(
113112
(serde_json::Value, serde_json::Value),
114-
ExecutorFuture<'static, (Arc<dyn ExportTargetExecutor>, Option<Arc<dyn QueryTarget>>)>,
113+
BoxFuture<'static, Result<(Arc<dyn ExportTargetExecutor>, Option<Arc<dyn QueryTarget>>)>>,
115114
)>;
116115

117116
fn check_setup_status(

src/ops/py_factory.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::sync::Arc;
22

33
use axum::async_trait;
4-
use futures::FutureExt;
4+
use futures::{future::BoxFuture, FutureExt};
55
use pyo3::{
66
pyclass, pymethods,
77
types::{IntoPyDict, PyString, PyTuple},
@@ -16,9 +16,7 @@ use crate::{
1616
};
1717
use anyhow::Result;
1818

19-
use super::sdk::{
20-
ExecutorFuture, FlowInstanceContext, SimpleFunctionExecutor, SimpleFunctionFactory,
21-
};
19+
use super::interface::{FlowInstanceContext, SimpleFunctionExecutor, SimpleFunctionFactory};
2220

2321
#[pyclass(name = "OpArgSchema")]
2422
pub struct PyOpArgSchema {
@@ -114,7 +112,7 @@ impl SimpleFunctionFactory for PyFunctionFactory {
114112
_context: Arc<FlowInstanceContext>,
115113
) -> Result<(
116114
schema::EnrichedValueType,
117-
ExecutorFuture<'static, Box<dyn SimpleFunctionExecutor>>,
115+
BoxFuture<'static, Result<Box<dyn SimpleFunctionExecutor>>>,
118116
)> {
119117
let (result_type, executor, kw_args_names, num_positional_args) =
120118
Python::with_gil(|py| -> anyhow::Result<_> {

src/ops/storages/postgres.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
use std::borrow::Cow;
22
use std::collections::{BTreeMap, HashMap};
3-
use std::future::Future;
43
use std::ops::Bound;
5-
use std::pin::Pin;
64
use std::sync::{Arc, Mutex};
75

86
use crate::base::spec::{self, *};
@@ -13,7 +11,7 @@ use crate::{get_lib_context, setup};
1311
use anyhow::{anyhow, bail, Result};
1412
use async_trait::async_trait;
1513
use derivative::Derivative;
16-
use futures::future::Shared;
14+
use futures::future::{BoxFuture, Shared};
1715
use futures::FutureExt;
1816
use indexmap::{IndexMap, IndexSet};
1917
use itertools::Itertools;
@@ -457,12 +455,8 @@ fn distance_to_similarity(metric: VectorSimilarityMetric, distance: f64) -> f64
457455
}
458456

459457
pub struct Factory {
460-
db_pools: Mutex<
461-
HashMap<
462-
Option<String>,
463-
Shared<Pin<Box<dyn Future<Output = Result<PgPool, SharedError>> + Send>>>,
464-
>,
465-
>,
458+
db_pools:
459+
Mutex<HashMap<Option<String>, Shared<BoxFuture<'static, Result<PgPool, SharedError>>>>>,
466460
}
467461

468462
impl Default for Factory {
@@ -932,7 +926,7 @@ impl StorageFactoryBase for Arc<Factory> {
932926
context: Arc<FlowInstanceContext>,
933927
) -> Result<(
934928
(TableId, SetupState),
935-
ExecutorFuture<'static, (Arc<dyn ExportTargetExecutor>, Option<Arc<dyn QueryTarget>>)>,
929+
BoxFuture<'static, Result<(Arc<dyn ExportTargetExecutor>, Option<Arc<dyn QueryTarget>>)>>,
936930
)> {
937931
let table_id = TableId {
938932
database_url: spec.database_url.clone(),

src/server.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ use crate::{lib_context::LibContext, service};
22

33
use anyhow::Result;
44
use axum::{routing, Router};
5-
use futures::FutureExt;
5+
use futures::{future::BoxFuture, FutureExt};
66
use serde::Deserialize;
7-
use std::{future::Future, pin::Pin, sync::Arc};
7+
use std::sync::Arc;
88
use tower::ServiceBuilder;
99
use tower_http::{
1010
cors::{AllowOrigin, CorsLayer},
@@ -21,7 +21,7 @@ pub struct ServerSettings {
2121
pub async fn init_server(
2222
lib_context: Arc<LibContext>,
2323
settings: ServerSettings,
24-
) -> Result<Pin<Box<dyn Future<Output = ()> + Send>>> {
24+
) -> Result<BoxFuture<'static, ()>> {
2525
let mut cors = CorsLayer::default();
2626
if let Some(ui_cors_origin) = &settings.cors_origin {
2727
cors = cors

0 commit comments

Comments
 (0)