Skip to content

Commit 4af075c

Browse files
authored
Make the auth registry prior to LibContext to be configured before main (#289)
1 parent 93db622 commit 4af075c

File tree

6 files changed

+22
-36
lines changed

6 files changed

+22
-36
lines changed

src/builder/analyzed_flow.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,12 @@ impl AnalyzedFlow {
2121
flow_instance: crate::base::spec::FlowInstanceSpec,
2222
existing_flow_ss: Option<&setup::FlowSetupState<setup::ExistingMode>>,
2323
registry: &ExecutorFactoryRegistry,
24-
auth_registry: &Arc<AuthRegistry>,
2524
) -> Result<Self> {
26-
let ctx = analyzer::build_flow_instance_context(&flow_instance.name, auth_registry);
25+
let ctx = analyzer::build_flow_instance_context(&flow_instance.name);
2726
let (data_schema, execution_plan_fut, desired_state) =
2827
analyzer::analyze_flow(&flow_instance, &ctx, existing_flow_ss, registry)?;
2928
let setup_status_check =
30-
setup::check_flow_setup_status(Some(&desired_state), existing_flow_ss, auth_registry)?;
29+
setup::check_flow_setup_status(Some(&desired_state), existing_flow_ss)?;
3130
let execution_plan = if setup_status_check.is_up_to_date() {
3231
Some(
3332
async move {
@@ -73,9 +72,8 @@ impl AnalyzedTransientFlow {
7372
pub async fn from_transient_flow(
7473
transient_flow: spec::TransientFlowSpec,
7574
registry: &ExecutorFactoryRegistry,
76-
auth_registry: &Arc<AuthRegistry>,
7775
) -> Result<Self> {
78-
let ctx = analyzer::build_flow_instance_context(&transient_flow.name, auth_registry);
76+
let ctx = analyzer::build_flow_instance_context(&transient_flow.name);
7977
let (output_type, data_schema, execution_plan_fut) =
8078
analyzer::analyze_transient_flow(&transient_flow, &ctx, registry)?;
8179
Ok(Self {

src/builder/analyzer.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@ use std::{collections::HashMap, future::Future, sync::Arc};
44

55
use super::plan::*;
66
use crate::execution::db_tracking_setup;
7+
use crate::lib_context::get_auth_registry;
78
use crate::setup::{
8-
self, AuthRegistry, DesiredMode, FlowSetupMetadata, FlowSetupState, ResourceIdentifier,
9-
SourceSetupState, TargetSetupState, TargetSetupStateCommon,
9+
self, DesiredMode, FlowSetupMetadata, FlowSetupState, ResourceIdentifier, SourceSetupState,
10+
TargetSetupState, TargetSetupStateCommon,
1011
};
1112
use crate::utils::fingerprint::Fingerprinter;
1213
use crate::{
@@ -1027,13 +1028,10 @@ impl AnalyzerContext<'_> {
10271028
}
10281029
}
10291030

1030-
pub fn build_flow_instance_context(
1031-
flow_inst_name: &str,
1032-
auth_registry: &Arc<AuthRegistry>,
1033-
) -> Arc<FlowInstanceContext> {
1031+
pub fn build_flow_instance_context(flow_inst_name: &str) -> Arc<FlowInstanceContext> {
10341032
Arc::new(FlowInstanceContext {
10351033
flow_instance_name: flow_inst_name.to_string(),
1036-
auth_registry: auth_registry.clone(),
1034+
auth_registry: get_auth_registry().clone(),
10371035
})
10381036
}
10391037

src/builder/flow_builder.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ impl FlowBuilder {
347347
.get(name)
348348
.cloned();
349349
let root_data_scope = Arc::new(Mutex::new(DataScopeBuilder::new()));
350-
let flow_inst_context = build_flow_instance_context(name, &lib_context.auth_registry);
350+
let flow_inst_context = build_flow_instance_context(name);
351351
let result = Self {
352352
lib_context,
353353
flow_inst_context,
@@ -649,7 +649,6 @@ impl FlowBuilder {
649649
spec,
650650
self.existing_flow_ss.as_ref(),
651651
&crate::ops::executor_factory_registry(),
652-
&self.lib_context.auth_registry,
653652
))
654653
})
655654
.into_py_result()?;
@@ -690,7 +689,6 @@ impl FlowBuilder {
690689
get_runtime().block_on(super::AnalyzedTransientFlow::from_transient_flow(
691690
spec,
692691
&crate::ops::executor_factory_registry(),
693-
&self.lib_context.auth_registry,
694692
))
695693
})
696694
.into_py_result()?;

src/lib_context.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,11 @@ impl FlowContext {
5959
}
6060

6161
static TOKIO_RUNTIME: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap());
62+
static AUTH_REGISTRY: LazyLock<Arc<AuthRegistry>> = LazyLock::new(|| Arc::new(AuthRegistry::new()));
6263

6364
pub struct LibContext {
6465
pub pool: PgPool,
6566
pub flows: Mutex<BTreeMap<String, Arc<FlowContext>>>,
66-
pub auth_registry: Arc<AuthRegistry>,
6767
pub all_setup_states: RwLock<setup::AllSetupState<setup::ExistingMode>>,
6868
}
6969

@@ -87,6 +87,10 @@ pub fn get_runtime() -> &'static Runtime {
8787
&TOKIO_RUNTIME
8888
}
8989

90+
pub fn get_auth_registry() -> &'static Arc<AuthRegistry> {
91+
&AUTH_REGISTRY
92+
}
93+
9094
static LIB_INIT: OnceLock<()> = OnceLock::new();
9195
pub fn create_lib_context(settings: settings::Settings) -> Result<LibContext> {
9296
LIB_INIT.get_or_init(|| {
@@ -104,7 +108,6 @@ pub fn create_lib_context(settings: settings::Settings) -> Result<LibContext> {
104108
pool,
105109
all_setup_states: RwLock::new(all_setup_states),
106110
flows: Mutex::new(BTreeMap::new()),
107-
auth_registry: Arc::new(AuthRegistry::new()),
108111
})
109112
}
110113

src/py/mod.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::prelude::*;
22

33
use crate::base::spec::VectorSimilarityMetric;
44
use crate::execution::query;
5-
use crate::lib_context::{clear_lib_context, init_lib_context};
5+
use crate::lib_context::{clear_lib_context, get_auth_registry, init_lib_context};
66
use crate::ops::interface::QueryResults;
77
use crate::ops::py_factory::PyOpArgSchema;
88
use crate::ops::{interface::ExecutorFactory, py_factory::PyFunctionFactory, register_factory};
@@ -282,17 +282,15 @@ fn sync_setup() -> PyResult<SetupStatusCheck> {
282282
let lib_context = get_lib_context().into_py_result()?;
283283
let flows = lib_context.flows.lock().unwrap();
284284
let all_setup_states = lib_context.all_setup_states.read().unwrap();
285-
let setup_status = setup::sync_setup(&flows, &all_setup_states, &lib_context.auth_registry)
286-
.into_py_result()?;
285+
let setup_status = setup::sync_setup(&flows, &all_setup_states).into_py_result()?;
287286
Ok(SetupStatusCheck(setup_status))
288287
}
289288

290289
#[pyfunction]
291290
fn drop_setup(flow_names: Vec<String>) -> PyResult<SetupStatusCheck> {
292291
let lib_context = get_lib_context().into_py_result()?;
293292
let all_setup_states = lib_context.all_setup_states.read().unwrap();
294-
let setup_status = setup::drop_setup(flow_names, &all_setup_states, &lib_context.auth_registry)
295-
.into_py_result()?;
293+
let setup_status = setup::drop_setup(flow_names, &all_setup_states).into_py_result()?;
296294
Ok(SetupStatusCheck(setup_status))
297295
}
298296

@@ -323,9 +321,7 @@ fn apply_setup_changes(py: Python<'_>, setup_status: &SetupStatusCheck) -> PyRes
323321

324322
#[pyfunction]
325323
fn add_auth_entry(key: String, value: Pythonized<serde_json::Value>) -> PyResult<()> {
326-
let lib_context = get_lib_context().into_py_result()?;
327-
lib_context
328-
.auth_registry
324+
get_auth_registry()
329325
.add(key, value.into_inner())
330326
.into_py_result()?;
331327
Ok(())

src/setup/driver.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::prelude::*;
1+
use crate::{lib_context::get_auth_registry, prelude::*};
22

33
use indexmap::IndexMap;
44
use serde::de::DeserializeOwned;
@@ -213,7 +213,6 @@ fn group_resource_states<'a>(
213213
pub fn check_flow_setup_status(
214214
desired_state: Option<&FlowSetupState<DesiredMode>>,
215215
existing_state: Option<&FlowSetupState<ExistingMode>>,
216-
auth_registry: &Arc<AuthRegistry>,
217216
) -> Result<FlowSetupStatusCheck> {
218217
let metadata_change = diff_state(
219218
existing_state.map(|e| &e.metadata),
@@ -294,7 +293,7 @@ pub fn check_flow_setup_status(
294293
&resource_id.key,
295294
target_state,
296295
existing_without_setup_by_user,
297-
auth_registry,
296+
get_auth_registry(),
298297
)?)
299298
};
300299
target_resources.push(ResourceSetupInfo {
@@ -316,18 +315,13 @@ pub fn check_flow_setup_status(
316315
pub fn sync_setup(
317316
flows: &BTreeMap<String, Arc<FlowContext>>,
318317
all_setup_state: &AllSetupState<ExistingMode>,
319-
auth_registry: &Arc<AuthRegistry>,
320318
) -> Result<AllSetupStatusCheck> {
321319
let mut flow_status_checks = BTreeMap::new();
322320
for (flow_name, flow_context) in flows {
323321
let existing_state = all_setup_state.flows.get(flow_name);
324322
flow_status_checks.insert(
325323
flow_name.clone(),
326-
check_flow_setup_status(
327-
Some(&flow_context.flow.desired_state),
328-
existing_state,
329-
auth_registry,
330-
)?,
324+
check_flow_setup_status(Some(&flow_context.flow.desired_state), existing_state)?,
331325
);
332326
}
333327
Ok(AllSetupStatusCheck {
@@ -342,7 +336,6 @@ pub fn sync_setup(
342336
pub fn drop_setup(
343337
flow_names: impl IntoIterator<Item = String>,
344338
all_setup_state: &AllSetupState<ExistingMode>,
345-
auth_registry: &Arc<AuthRegistry>,
346339
) -> Result<AllSetupStatusCheck> {
347340
if !all_setup_state.has_metadata_table {
348341
api_bail!("CocoIndex metadata table is missing.");
@@ -352,7 +345,7 @@ pub fn drop_setup(
352345
if let Some(existing_state) = all_setup_state.flows.get(&flow_name) {
353346
flow_status_checks.insert(
354347
flow_name,
355-
check_flow_setup_status(None, Some(existing_state), auth_registry)?,
348+
check_flow_setup_status(None, Some(existing_state))?,
356349
);
357350
}
358351
}

0 commit comments

Comments
 (0)