Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/base/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,3 +282,8 @@ pub struct SimpleSemanticsQueryHandlerSpec {
pub query_transform_flow: TransientFlowSpec,
pub default_similarity_metric: VectorSimilarityMetric,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct AuthEntryReference {
pub key: String,
}
15 changes: 5 additions & 10 deletions src/builder/analyzed_flow.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
use std::sync::Arc;
use crate::prelude::*;

use super::{analyzer, plan};
use crate::{
api_error,
base::{schema, spec},
ops::registry::ExecutorFactoryRegistry,
service::error::{shared_ok, SharedError, SharedResultExt},
setup::{self, ObjectSetupStatusCheck},
};
use anyhow::Result;
use futures::{
future::{BoxFuture, Shared},
FutureExt,
};

pub struct AnalyzedFlow {
pub flow_instance: spec::FlowInstanceSpec,
Expand All @@ -28,8 +21,9 @@ impl AnalyzedFlow {
flow_instance: crate::base::spec::FlowInstanceSpec,
existing_flow_ss: Option<&setup::FlowSetupState<setup::ExistingMode>>,
registry: &ExecutorFactoryRegistry,
auth_registry: Arc<AuthRegistry>,
) -> Result<Self> {
let ctx = analyzer::build_flow_instance_context(&flow_instance.name);
let ctx = analyzer::build_flow_instance_context(&flow_instance.name, auth_registry);
let (data_schema, execution_plan_fut, desired_state) =
analyzer::analyze_flow(&flow_instance, &ctx, existing_flow_ss, registry)?;
let setup_status_check =
Expand Down Expand Up @@ -79,8 +73,9 @@ impl AnalyzedTransientFlow {
pub async fn from_transient_flow(
transient_flow: spec::TransientFlowSpec,
registry: &ExecutorFactoryRegistry,
auth_registry: Arc<AuthRegistry>,
) -> Result<Self> {
let ctx = analyzer::build_flow_instance_context(&transient_flow.name);
let ctx = analyzer::build_flow_instance_context(&transient_flow.name, auth_registry);
let (output_type, data_schema, execution_plan_fut) =
analyzer::analyze_transient_flow(&transient_flow, &ctx, registry)?;
Ok(Self {
Expand Down
10 changes: 7 additions & 3 deletions src/builder/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use std::{collections::HashMap, future::Future, sync::Arc};
use super::plan::*;
use crate::execution::db_tracking_setup;
use crate::setup::{
self, DesiredMode, FlowSetupMetadata, FlowSetupState, ResourceIdentifier, SourceSetupState,
TargetSetupState, TargetSetupStateCommon,
self, AuthRegistry, DesiredMode, FlowSetupMetadata, FlowSetupState, ResourceIdentifier,
SourceSetupState, TargetSetupState, TargetSetupStateCommon,
};
use crate::utils::fingerprint::Fingerprinter;
use crate::{
Expand Down Expand Up @@ -1027,9 +1027,13 @@ impl AnalyzerContext<'_> {
}
}

pub fn build_flow_instance_context(flow_inst_name: &str) -> Arc<FlowInstanceContext> {
pub fn build_flow_instance_context(
flow_inst_name: &str,
auth_registry: Arc<AuthRegistry>,
) -> Arc<FlowInstanceContext> {
Arc::new(FlowInstanceContext {
flow_instance_name: flow_inst_name.to_string(),
auth_registry,
})
}

Expand Down
6 changes: 5 additions & 1 deletion src/builder/flow_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,9 +347,11 @@ impl FlowBuilder {
.get(name)
.cloned();
let root_data_scope = Arc::new(Mutex::new(DataScopeBuilder::new()));
let flow_inst_context =
build_flow_instance_context(name, lib_context.auth_registry.clone());
let result = Self {
lib_context,
flow_inst_context: build_flow_instance_context(name),
flow_inst_context,
existing_flow_ss,

root_data_scope_ref: DataScopeRef(Arc::new(DataScopeRefInfo {
Expand Down Expand Up @@ -648,6 +650,7 @@ impl FlowBuilder {
spec,
self.existing_flow_ss.as_ref(),
&crate::ops::executor_factory_registry(),
self.lib_context.auth_registry.clone(),
))
})
.into_py_result()?;
Expand Down Expand Up @@ -688,6 +691,7 @@ impl FlowBuilder {
get_runtime().block_on(super::AnalyzedTransientFlow::from_transient_flow(
spec,
&crate::ops::executor_factory_registry(),
self.lib_context.auth_registry.clone(),
))
})
.into_py_result()?;
Expand Down
2 changes: 2 additions & 0 deletions src/lib_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ static TOKIO_RUNTIME: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap
pub struct LibContext {
pub pool: PgPool,
pub flows: Mutex<BTreeMap<String, Arc<FlowContext>>>,
pub auth_registry: Arc<AuthRegistry>,
pub all_setup_states: RwLock<setup::AllSetupState<setup::ExistingMode>>,
}

Expand Down Expand Up @@ -103,6 +104,7 @@ pub fn create_lib_context(settings: settings::Settings) -> Result<LibContext> {
pool,
all_setup_states: RwLock::new(all_setup_states),
flows: Mutex::new(BTreeMap::new()),
auth_registry: Arc::new(AuthRegistry::new()),
})
}

Expand Down
1 change: 1 addition & 0 deletions src/ops/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use serde::Serialize;

pub struct FlowInstanceContext {
pub flow_instance_name: String,
pub auth_registry: Arc<AuthRegistry>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
Expand Down
7 changes: 6 additions & 1 deletion src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
pub(crate) use anyhow::Result;
pub(crate) use async_trait::async_trait;
pub(crate) use chrono::{DateTime, Utc};
pub(crate) use futures::{future::BoxFuture, prelude::*, stream::BoxStream};
pub(crate) use futures::{
future::{BoxFuture, Shared},
prelude::*,
stream::BoxStream,
};
pub(crate) use futures::{FutureExt, StreamExt};
pub(crate) use indexmap::{IndexMap, IndexSet};
pub(crate) use itertools::Itertools;
Expand All @@ -19,6 +23,7 @@ pub(crate) use crate::execution;
pub(crate) use crate::lib_context::{get_lib_context, get_runtime, FlowContext, LibContext};
pub(crate) use crate::ops::interface;
pub(crate) use crate::service::error::ApiError;
pub(crate) use crate::setup::AuthRegistry;

pub(crate) use crate::{api_bail, api_error};

Expand Down
36 changes: 36 additions & 0 deletions src/setup/auth_registry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use std::collections::hash_map;

use crate::prelude::*;

pub struct AuthRegistry {
entries: RwLock<HashMap<String, serde_json::Value>>,
}

impl AuthRegistry {
pub fn new() -> Self {
Self {
entries: RwLock::new(HashMap::new()),
}
}

pub fn add(&self, key: String, value: serde_json::Value) -> Result<()> {
let mut entries = self.entries.write().unwrap();
match entries.entry(key) {
hash_map::Entry::Occupied(entry) => {
api_bail!("Auth entry already exists: {}", entry.key());
}
hash_map::Entry::Vacant(entry) => {
entry.insert(value);
}
}
Ok(())
}

pub fn get<T: DeserializeOwned>(&self, entry_ref: &spec::AuthEntryReference) -> Result<T> {
let entries = self.entries.read().unwrap();
match entries.get(&entry_ref.key) {
Some(value) => Ok(serde_json::from_value(value.clone())?),
None => api_bail!("Auth entry not found: {}", entry_ref.key),
}
}
}
2 changes: 2 additions & 0 deletions src/setup/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
mod auth_registry;
mod db_metadata;
mod driver;
mod states;

pub use auth_registry::AuthRegistry;
pub use driver::*;
pub use states::*;