diff --git a/src/builder/analyzed_flow.rs b/src/builder/analyzed_flow.rs index 3c4a35ce4..68e8ecbb2 100644 --- a/src/builder/analyzed_flow.rs +++ b/src/builder/analyzed_flow.rs @@ -1,4 +1,4 @@ -use std::{future::Future, pin::Pin, sync::Arc}; +use std::sync::Arc; use super::{analyzer, plan}; use crate::{ @@ -9,16 +9,18 @@ use crate::{ setup::{self, ObjectSetupStatusCheck}, }; use anyhow::Result; -use futures::{future::Shared, FutureExt}; +use futures::{ + future::{BoxFuture, Shared}, + FutureExt, +}; pub struct AnalyzedFlow { pub flow_instance: spec::FlowInstanceSpec, pub data_schema: schema::DataSchema, pub desired_state: setup::FlowSetupState, /// It's None if the flow is not up to date - pub execution_plan: Option< - Shared, SharedError>> + Send>>>, - >, + pub execution_plan: + Option, SharedError>>>>, } impl AnalyzedFlow { diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index b27c00d9e..703bb6ffa 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -1,6 +1,6 @@ use std::collections::{BTreeMap, HashSet}; use std::sync::Mutex; -use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc}; +use std::{collections::HashMap, future::Future, sync::Arc}; use super::plan::*; use crate::execution::db_tracking_setup; @@ -16,7 +16,7 @@ use crate::{ utils::immutable::RefList, }; use anyhow::{anyhow, bail, Context, Result}; -use futures::future::try_join3; +use futures::future::{try_join3, BoxFuture}; use futures::{future::try_join_all, FutureExt}; use indexmap::IndexMap; use log::{trace, warn}; @@ -678,7 +678,7 @@ impl AnalyzerContext<'_> { scope: &mut ExecutionScope<'_>, reactive_op: &NamedSpec, parent_scopes: RefList<'_, &'_ ExecutionScope<'_>>, - ) -> Result> + Send>>> { + ) -> Result>> { let result_fut = match &reactive_op.spec { ReactiveOpSpec::Transform(op) => { let input_field_schemas = diff --git a/src/ops/factory_bases.rs b/src/ops/factory_bases.rs index e1b52506a..84344a80a 100644 --- a/src/ops/factory_bases.rs +++ b/src/ops/factory_bases.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use anyhow::Result; use axum::async_trait; +use futures::future::BoxFuture; use serde::de::DeserializeOwned; use serde::Serialize; @@ -208,7 +209,7 @@ impl SourceFactory for T { context: Arc, ) -> Result<( EnrichedValueType, - ExecutorFuture<'static, Box>, + BoxFuture<'static, Result>>, )> { let spec: T::Spec = serde_json::from_value(spec)?; let output_schema = self.get_output_schema(&spec, &context)?; @@ -259,7 +260,7 @@ impl SimpleFunctionFactory for T { context: Arc, ) -> Result<( EnrichedValueType, - ExecutorFuture<'static, Box>, + BoxFuture<'static, Result>>, )> { let spec: T::Spec = serde_json::from_value(spec)?; let mut args_resolver = OpArgsResolver::new(&input_schema)?; @@ -288,7 +289,7 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static { context: Arc, ) -> Result<( (Self::Key, Self::SetupState), - ExecutorFuture<'static, (Arc, Option>)>, + BoxFuture<'static, Result<(Arc, Option>)>>, )>; fn check_setup_status( @@ -388,7 +389,7 @@ impl ExportTargetFactory for T { context: Arc, ) -> Result<( (serde_json::Value, serde_json::Value), - ExecutorFuture<'static, (Arc, Option>)>, + BoxFuture<'static, Result<(Arc, Option>)>>, )> { let spec: T::Spec = serde_json::from_value(spec)?; let ((setup_key, setup_state), executors) = StorageFactoryBase::build( diff --git a/src/ops/interface.rs b/src/ops/interface.rs index ac420a992..80be8615b 100644 --- a/src/ops/interface.rs +++ b/src/ops/interface.rs @@ -6,15 +6,14 @@ use crate::base::{ use crate::setup; use anyhow::Result; use async_trait::async_trait; +use futures::future::BoxFuture; use serde::Serialize; -use std::{fmt::Debug, future::Future, pin::Pin, sync::Arc}; +use std::sync::Arc; pub struct FlowInstanceContext { pub flow_instance_name: String, } -pub type ExecutorFuture<'a, E> = Pin> + Send + 'a>>; - #[async_trait] pub trait SourceExecutor: Send + Sync { /// Get the list of keys for the source. @@ -31,7 +30,7 @@ pub trait SourceFactory { context: Arc, ) -> Result<( EnrichedValueType, - ExecutorFuture<'static, Box>, + BoxFuture<'static, Result>>, )>; } @@ -59,7 +58,7 @@ pub trait SimpleFunctionFactory { context: Arc, ) -> Result<( EnrichedValueType, - ExecutorFuture<'static, Box>, + BoxFuture<'static, Result>>, )>; } @@ -111,7 +110,7 @@ pub trait ExportTargetFactory { context: Arc, ) -> Result<( (serde_json::Value, serde_json::Value), - ExecutorFuture<'static, (Arc, Option>)>, + BoxFuture<'static, Result<(Arc, Option>)>>, )>; fn check_setup_status( diff --git a/src/ops/py_factory.rs b/src/ops/py_factory.rs index d14468aef..17d8cb703 100644 --- a/src/ops/py_factory.rs +++ b/src/ops/py_factory.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use axum::async_trait; -use futures::FutureExt; +use futures::{future::BoxFuture, FutureExt}; use pyo3::{ pyclass, pymethods, types::{IntoPyDict, PyString, PyTuple}, @@ -16,9 +16,7 @@ use crate::{ }; use anyhow::Result; -use super::sdk::{ - ExecutorFuture, FlowInstanceContext, SimpleFunctionExecutor, SimpleFunctionFactory, -}; +use super::interface::{FlowInstanceContext, SimpleFunctionExecutor, SimpleFunctionFactory}; #[pyclass(name = "OpArgSchema")] pub struct PyOpArgSchema { @@ -114,7 +112,7 @@ impl SimpleFunctionFactory for PyFunctionFactory { _context: Arc, ) -> Result<( schema::EnrichedValueType, - ExecutorFuture<'static, Box>, + BoxFuture<'static, Result>>, )> { let (result_type, executor, kw_args_names, num_positional_args) = Python::with_gil(|py| -> anyhow::Result<_> { diff --git a/src/ops/storages/postgres.rs b/src/ops/storages/postgres.rs index 7eb8e4122..8fb3c9db9 100644 --- a/src/ops/storages/postgres.rs +++ b/src/ops/storages/postgres.rs @@ -1,8 +1,6 @@ use std::borrow::Cow; use std::collections::{BTreeMap, HashMap}; -use std::future::Future; use std::ops::Bound; -use std::pin::Pin; use std::sync::{Arc, Mutex}; use crate::base::spec::{self, *}; @@ -13,7 +11,7 @@ use crate::{get_lib_context, setup}; use anyhow::{anyhow, bail, Result}; use async_trait::async_trait; use derivative::Derivative; -use futures::future::Shared; +use futures::future::{BoxFuture, Shared}; use futures::FutureExt; use indexmap::{IndexMap, IndexSet}; use itertools::Itertools; @@ -457,12 +455,8 @@ fn distance_to_similarity(metric: VectorSimilarityMetric, distance: f64) -> f64 } pub struct Factory { - db_pools: Mutex< - HashMap< - Option, - Shared> + Send>>>, - >, - >, + db_pools: + Mutex, Shared>>>>, } impl Default for Factory { @@ -932,7 +926,7 @@ impl StorageFactoryBase for Arc { context: Arc, ) -> Result<( (TableId, SetupState), - ExecutorFuture<'static, (Arc, Option>)>, + BoxFuture<'static, Result<(Arc, Option>)>>, )> { let table_id = TableId { database_url: spec.database_url.clone(), diff --git a/src/server.rs b/src/server.rs index 3860ad01f..31cdb411e 100644 --- a/src/server.rs +++ b/src/server.rs @@ -2,9 +2,9 @@ use crate::{lib_context::LibContext, service}; use anyhow::Result; use axum::{routing, Router}; -use futures::FutureExt; +use futures::{future::BoxFuture, FutureExt}; use serde::Deserialize; -use std::{future::Future, pin::Pin, sync::Arc}; +use std::sync::Arc; use tower::ServiceBuilder; use tower_http::{ cors::{AllowOrigin, CorsLayer}, @@ -21,7 +21,7 @@ pub struct ServerSettings { pub async fn init_server( lib_context: Arc, settings: ServerSettings, -) -> Result + Send>>> { +) -> Result> { let mut cors = CorsLayer::default(); if let Some(ui_cors_origin) = &settings.cors_origin { cors = cors