Skip to content

Commit c97f394

Browse files
authored
Add a AuthRegistry and put it in AnalyzerContext. (#284)
1 parent 8af010c commit c97f394

File tree

9 files changed

+69
-15
lines changed

9 files changed

+69
-15
lines changed

src/base/spec.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,3 +282,8 @@ pub struct SimpleSemanticsQueryHandlerSpec {
282282
pub query_transform_flow: TransientFlowSpec,
283283
pub default_similarity_metric: VectorSimilarityMetric,
284284
}
285+
286+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
287+
pub struct AuthEntryReference {
288+
pub key: String,
289+
}

src/builder/analyzed_flow.rs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,11 @@
1-
use std::sync::Arc;
1+
use crate::prelude::*;
22

33
use super::{analyzer, plan};
44
use crate::{
5-
api_error,
6-
base::{schema, spec},
75
ops::registry::ExecutorFactoryRegistry,
86
service::error::{shared_ok, SharedError, SharedResultExt},
97
setup::{self, ObjectSetupStatusCheck},
108
};
11-
use anyhow::Result;
12-
use futures::{
13-
future::{BoxFuture, Shared},
14-
FutureExt,
15-
};
169

1710
pub struct AnalyzedFlow {
1811
pub flow_instance: spec::FlowInstanceSpec,
@@ -28,8 +21,9 @@ impl AnalyzedFlow {
2821
flow_instance: crate::base::spec::FlowInstanceSpec,
2922
existing_flow_ss: Option<&setup::FlowSetupState<setup::ExistingMode>>,
3023
registry: &ExecutorFactoryRegistry,
24+
auth_registry: Arc<AuthRegistry>,
3125
) -> Result<Self> {
32-
let ctx = analyzer::build_flow_instance_context(&flow_instance.name);
26+
let ctx = analyzer::build_flow_instance_context(&flow_instance.name, auth_registry);
3327
let (data_schema, execution_plan_fut, desired_state) =
3428
analyzer::analyze_flow(&flow_instance, &ctx, existing_flow_ss, registry)?;
3529
let setup_status_check =
@@ -79,8 +73,9 @@ impl AnalyzedTransientFlow {
7973
pub async fn from_transient_flow(
8074
transient_flow: spec::TransientFlowSpec,
8175
registry: &ExecutorFactoryRegistry,
76+
auth_registry: Arc<AuthRegistry>,
8277
) -> Result<Self> {
83-
let ctx = analyzer::build_flow_instance_context(&transient_flow.name);
78+
let ctx = analyzer::build_flow_instance_context(&transient_flow.name, auth_registry);
8479
let (output_type, data_schema, execution_plan_fut) =
8580
analyzer::analyze_transient_flow(&transient_flow, &ctx, registry)?;
8681
Ok(Self {

src/builder/analyzer.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ use std::{collections::HashMap, future::Future, sync::Arc};
55
use super::plan::*;
66
use crate::execution::db_tracking_setup;
77
use crate::setup::{
8-
self, DesiredMode, FlowSetupMetadata, FlowSetupState, ResourceIdentifier, SourceSetupState,
9-
TargetSetupState, TargetSetupStateCommon,
8+
self, AuthRegistry, DesiredMode, FlowSetupMetadata, FlowSetupState, ResourceIdentifier,
9+
SourceSetupState, TargetSetupState, TargetSetupStateCommon,
1010
};
1111
use crate::utils::fingerprint::Fingerprinter;
1212
use crate::{
@@ -1027,9 +1027,13 @@ impl AnalyzerContext<'_> {
10271027
}
10281028
}
10291029

1030-
pub fn build_flow_instance_context(flow_inst_name: &str) -> Arc<FlowInstanceContext> {
1030+
pub fn build_flow_instance_context(
1031+
flow_inst_name: &str,
1032+
auth_registry: Arc<AuthRegistry>,
1033+
) -> Arc<FlowInstanceContext> {
10311034
Arc::new(FlowInstanceContext {
10321035
flow_instance_name: flow_inst_name.to_string(),
1036+
auth_registry,
10331037
})
10341038
}
10351039

src/builder/flow_builder.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,9 +347,11 @@ impl FlowBuilder {
347347
.get(name)
348348
.cloned();
349349
let root_data_scope = Arc::new(Mutex::new(DataScopeBuilder::new()));
350+
let flow_inst_context =
351+
build_flow_instance_context(name, lib_context.auth_registry.clone());
350352
let result = Self {
351353
lib_context,
352-
flow_inst_context: build_flow_instance_context(name),
354+
flow_inst_context,
353355
existing_flow_ss,
354356

355357
root_data_scope_ref: DataScopeRef(Arc::new(DataScopeRefInfo {
@@ -648,6 +650,7 @@ impl FlowBuilder {
648650
spec,
649651
self.existing_flow_ss.as_ref(),
650652
&crate::ops::executor_factory_registry(),
653+
self.lib_context.auth_registry.clone(),
651654
))
652655
})
653656
.into_py_result()?;
@@ -688,6 +691,7 @@ impl FlowBuilder {
688691
get_runtime().block_on(super::AnalyzedTransientFlow::from_transient_flow(
689692
spec,
690693
&crate::ops::executor_factory_registry(),
694+
self.lib_context.auth_registry.clone(),
691695
))
692696
})
693697
.into_py_result()?;

src/lib_context.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ static TOKIO_RUNTIME: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap
6363
pub struct LibContext {
6464
pub pool: PgPool,
6565
pub flows: Mutex<BTreeMap<String, Arc<FlowContext>>>,
66+
pub auth_registry: Arc<AuthRegistry>,
6667
pub all_setup_states: RwLock<setup::AllSetupState<setup::ExistingMode>>,
6768
}
6869

@@ -103,6 +104,7 @@ pub fn create_lib_context(settings: settings::Settings) -> Result<LibContext> {
103104
pool,
104105
all_setup_states: RwLock::new(all_setup_states),
105106
flows: Mutex::new(BTreeMap::new()),
107+
auth_registry: Arc::new(AuthRegistry::new()),
106108
})
107109
}
108110

src/ops/interface.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use serde::Serialize;
1212

1313
pub struct FlowInstanceContext {
1414
pub flow_instance_name: String,
15+
pub auth_registry: Arc<AuthRegistry>,
1516
}
1617

1718
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]

src/prelude.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@
33
pub(crate) use anyhow::Result;
44
pub(crate) use async_trait::async_trait;
55
pub(crate) use chrono::{DateTime, Utc};
6-
pub(crate) use futures::{future::BoxFuture, prelude::*, stream::BoxStream};
6+
pub(crate) use futures::{
7+
future::{BoxFuture, Shared},
8+
prelude::*,
9+
stream::BoxStream,
10+
};
711
pub(crate) use futures::{FutureExt, StreamExt};
812
pub(crate) use indexmap::{IndexMap, IndexSet};
913
pub(crate) use itertools::Itertools;
@@ -19,6 +23,7 @@ pub(crate) use crate::execution;
1923
pub(crate) use crate::lib_context::{get_lib_context, get_runtime, FlowContext, LibContext};
2024
pub(crate) use crate::ops::interface;
2125
pub(crate) use crate::service::error::ApiError;
26+
pub(crate) use crate::setup::AuthRegistry;
2227

2328
pub(crate) use crate::{api_bail, api_error};
2429

src/setup/auth_registry.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
use std::collections::hash_map;
2+
3+
use crate::prelude::*;
4+
5+
pub struct AuthRegistry {
6+
entries: RwLock<HashMap<String, serde_json::Value>>,
7+
}
8+
9+
impl AuthRegistry {
10+
pub fn new() -> Self {
11+
Self {
12+
entries: RwLock::new(HashMap::new()),
13+
}
14+
}
15+
16+
pub fn add(&self, key: String, value: serde_json::Value) -> Result<()> {
17+
let mut entries = self.entries.write().unwrap();
18+
match entries.entry(key) {
19+
hash_map::Entry::Occupied(entry) => {
20+
api_bail!("Auth entry already exists: {}", entry.key());
21+
}
22+
hash_map::Entry::Vacant(entry) => {
23+
entry.insert(value);
24+
}
25+
}
26+
Ok(())
27+
}
28+
29+
pub fn get<T: DeserializeOwned>(&self, entry_ref: &spec::AuthEntryReference) -> Result<T> {
30+
let entries = self.entries.read().unwrap();
31+
match entries.get(&entry_ref.key) {
32+
Some(value) => Ok(serde_json::from_value(value.clone())?),
33+
None => api_bail!("Auth entry not found: {}", entry_ref.key),
34+
}
35+
}
36+
}

src/setup/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
mod auth_registry;
12
mod db_metadata;
23
mod driver;
34
mod states;
45

6+
pub use auth_registry::AuthRegistry;
57
pub use driver::*;
68
pub use states::*;

0 commit comments

Comments
 (0)