Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions src/builder/analyzed_flow.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{future::Future, pin::Pin, sync::Arc};
use std::sync::Arc;

use super::{analyzer, plan};
use crate::{
Expand All @@ -9,16 +9,18 @@ use crate::{
setup::{self, ObjectSetupStatusCheck},
};
use anyhow::Result;
use futures::{future::Shared, FutureExt};
use futures::{
future::{BoxFuture, Shared},
FutureExt,
};

pub struct AnalyzedFlow {
pub flow_instance: spec::FlowInstanceSpec,
pub data_schema: schema::DataSchema,
pub desired_state: setup::FlowSetupState<setup::DesiredMode>,
/// It's None if the flow is not up to date
pub execution_plan: Option<
Shared<Pin<Box<dyn Future<Output = Result<Arc<plan::ExecutionPlan>, SharedError>> + Send>>>,
>,
pub execution_plan:
Option<Shared<BoxFuture<'static, Result<Arc<plan::ExecutionPlan>, SharedError>>>>,
}

impl AnalyzedFlow {
Expand Down
6 changes: 3 additions & 3 deletions src/builder/analyzer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::{BTreeMap, HashSet};
use std::sync::Mutex;
use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc};
use std::{collections::HashMap, future::Future, sync::Arc};

use super::plan::*;
use crate::execution::db_tracking_setup;
Expand All @@ -16,7 +16,7 @@ use crate::{
utils::immutable::RefList,
};
use anyhow::{anyhow, bail, Context, Result};
use futures::future::try_join3;
use futures::future::{try_join3, BoxFuture};
use futures::{future::try_join_all, FutureExt};
use indexmap::IndexMap;
use log::{trace, warn};
Expand Down Expand Up @@ -678,7 +678,7 @@ impl AnalyzerContext<'_> {
scope: &mut ExecutionScope<'_>,
reactive_op: &NamedSpec<ReactiveOpSpec>,
parent_scopes: RefList<'_, &'_ ExecutionScope<'_>>,
) -> Result<Pin<Box<dyn Future<Output = Result<AnalyzedReactiveOp>> + Send>>> {
) -> Result<BoxFuture<'static, Result<AnalyzedReactiveOp>>> {
let result_fut = match &reactive_op.spec {
ReactiveOpSpec::Transform(op) => {
let input_field_schemas =
Expand Down
9 changes: 5 additions & 4 deletions src/ops/factory_bases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;

use anyhow::Result;
use axum::async_trait;
use futures::future::BoxFuture;
use serde::de::DeserializeOwned;
use serde::Serialize;

Expand Down Expand Up @@ -208,7 +209,7 @@ impl<T: SourceFactoryBase> SourceFactory for T {
context: Arc<FlowInstanceContext>,
) -> Result<(
EnrichedValueType,
ExecutorFuture<'static, Box<dyn SourceExecutor>>,
BoxFuture<'static, Result<Box<dyn SourceExecutor>>>,
)> {
let spec: T::Spec = serde_json::from_value(spec)?;
let output_schema = self.get_output_schema(&spec, &context)?;
Expand Down Expand Up @@ -259,7 +260,7 @@ impl<T: SimpleFunctionFactoryBase> SimpleFunctionFactory for T {
context: Arc<FlowInstanceContext>,
) -> Result<(
EnrichedValueType,
ExecutorFuture<'static, Box<dyn SimpleFunctionExecutor>>,
BoxFuture<'static, Result<Box<dyn SimpleFunctionExecutor>>>,
)> {
let spec: T::Spec = serde_json::from_value(spec)?;
let mut args_resolver = OpArgsResolver::new(&input_schema)?;
Expand Down Expand Up @@ -288,7 +289,7 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
context: Arc<FlowInstanceContext>,
) -> Result<(
(Self::Key, Self::SetupState),
ExecutorFuture<'static, (Arc<dyn ExportTargetExecutor>, Option<Arc<dyn QueryTarget>>)>,
BoxFuture<'static, Result<(Arc<dyn ExportTargetExecutor>, Option<Arc<dyn QueryTarget>>)>>,
)>;

fn check_setup_status(
Expand Down Expand Up @@ -388,7 +389,7 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
context: Arc<FlowInstanceContext>,
) -> Result<(
(serde_json::Value, serde_json::Value),
ExecutorFuture<'static, (Arc<dyn ExportTargetExecutor>, Option<Arc<dyn QueryTarget>>)>,
BoxFuture<'static, Result<(Arc<dyn ExportTargetExecutor>, Option<Arc<dyn QueryTarget>>)>>,
)> {
let spec: T::Spec = serde_json::from_value(spec)?;
let ((setup_key, setup_state), executors) = StorageFactoryBase::build(
Expand Down
11 changes: 5 additions & 6 deletions src/ops/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ use crate::base::{
use crate::setup;
use anyhow::Result;
use async_trait::async_trait;
use futures::future::BoxFuture;
use serde::Serialize;
use std::{fmt::Debug, future::Future, pin::Pin, sync::Arc};
use std::sync::Arc;

pub struct FlowInstanceContext {
pub flow_instance_name: String,
}

pub type ExecutorFuture<'a, E> = Pin<Box<dyn Future<Output = Result<E>> + Send + 'a>>;

#[async_trait]
pub trait SourceExecutor: Send + Sync {
/// Get the list of keys for the source.
Expand All @@ -31,7 +30,7 @@ pub trait SourceFactory {
context: Arc<FlowInstanceContext>,
) -> Result<(
EnrichedValueType,
ExecutorFuture<'static, Box<dyn SourceExecutor>>,
BoxFuture<'static, Result<Box<dyn SourceExecutor>>>,
)>;
}

Expand Down Expand Up @@ -59,7 +58,7 @@ pub trait SimpleFunctionFactory {
context: Arc<FlowInstanceContext>,
) -> Result<(
EnrichedValueType,
ExecutorFuture<'static, Box<dyn SimpleFunctionExecutor>>,
BoxFuture<'static, Result<Box<dyn SimpleFunctionExecutor>>>,
)>;
}

Expand Down Expand Up @@ -111,7 +110,7 @@ pub trait ExportTargetFactory {
context: Arc<FlowInstanceContext>,
) -> Result<(
(serde_json::Value, serde_json::Value),
ExecutorFuture<'static, (Arc<dyn ExportTargetExecutor>, Option<Arc<dyn QueryTarget>>)>,
BoxFuture<'static, Result<(Arc<dyn ExportTargetExecutor>, Option<Arc<dyn QueryTarget>>)>>,
)>;

fn check_setup_status(
Expand Down
8 changes: 3 additions & 5 deletions src/ops/py_factory.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use axum::async_trait;
use futures::FutureExt;
use futures::{future::BoxFuture, FutureExt};
use pyo3::{
pyclass, pymethods,
types::{IntoPyDict, PyString, PyTuple},
Expand All @@ -16,9 +16,7 @@ use crate::{
};
use anyhow::Result;

use super::sdk::{
ExecutorFuture, FlowInstanceContext, SimpleFunctionExecutor, SimpleFunctionFactory,
};
use super::interface::{FlowInstanceContext, SimpleFunctionExecutor, SimpleFunctionFactory};

#[pyclass(name = "OpArgSchema")]
pub struct PyOpArgSchema {
Expand Down Expand Up @@ -114,7 +112,7 @@ impl SimpleFunctionFactory for PyFunctionFactory {
_context: Arc<FlowInstanceContext>,
) -> Result<(
schema::EnrichedValueType,
ExecutorFuture<'static, Box<dyn SimpleFunctionExecutor>>,
BoxFuture<'static, Result<Box<dyn SimpleFunctionExecutor>>>,
)> {
let (result_type, executor, kw_args_names, num_positional_args) =
Python::with_gil(|py| -> anyhow::Result<_> {
Expand Down
14 changes: 4 additions & 10 deletions src/ops/storages/postgres.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap};
use std::future::Future;
use std::ops::Bound;
use std::pin::Pin;
use std::sync::{Arc, Mutex};

use crate::base::spec::{self, *};
Expand All @@ -13,7 +11,7 @@ use crate::{get_lib_context, setup};
use anyhow::{anyhow, bail, Result};
use async_trait::async_trait;
use derivative::Derivative;
use futures::future::Shared;
use futures::future::{BoxFuture, Shared};
use futures::FutureExt;
use indexmap::{IndexMap, IndexSet};
use itertools::Itertools;
Expand Down Expand Up @@ -457,12 +455,8 @@ fn distance_to_similarity(metric: VectorSimilarityMetric, distance: f64) -> f64
}

pub struct Factory {
db_pools: Mutex<
HashMap<
Option<String>,
Shared<Pin<Box<dyn Future<Output = Result<PgPool, SharedError>> + Send>>>,
>,
>,
db_pools:
Mutex<HashMap<Option<String>, Shared<BoxFuture<'static, Result<PgPool, SharedError>>>>>,
}

impl Default for Factory {
Expand Down Expand Up @@ -932,7 +926,7 @@ impl StorageFactoryBase for Arc<Factory> {
context: Arc<FlowInstanceContext>,
) -> Result<(
(TableId, SetupState),
ExecutorFuture<'static, (Arc<dyn ExportTargetExecutor>, Option<Arc<dyn QueryTarget>>)>,
BoxFuture<'static, Result<(Arc<dyn ExportTargetExecutor>, Option<Arc<dyn QueryTarget>>)>>,
)> {
let table_id = TableId {
database_url: spec.database_url.clone(),
Expand Down
6 changes: 3 additions & 3 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use crate::{lib_context::LibContext, service};

use anyhow::Result;
use axum::{routing, Router};
use futures::FutureExt;
use futures::{future::BoxFuture, FutureExt};
use serde::Deserialize;
use std::{future::Future, pin::Pin, sync::Arc};
use std::sync::Arc;
use tower::ServiceBuilder;
use tower_http::{
cors::{AllowOrigin, CorsLayer},
Expand All @@ -21,7 +21,7 @@ pub struct ServerSettings {
pub async fn init_server(
lib_context: Arc<LibContext>,
settings: ServerSettings,
) -> Result<Pin<Box<dyn Future<Output = ()> + Send>>> {
) -> Result<BoxFuture<'static, ()>> {
let mut cors = CorsLayer::default();
if let Some(ui_cors_origin) = &settings.cors_origin {
cors = cors
Expand Down