Skip to content

Commit 47513f1

Browse files
authored
Add setup logic for Neo4j target. (#285)
1 parent c97f394 commit 47513f1

File tree

12 files changed

+381
-43
lines changed

12 files changed

+381
-43
lines changed

src/builder/analyzed_flow.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@ impl AnalyzedFlow {
2121
flow_instance: crate::base::spec::FlowInstanceSpec,
2222
existing_flow_ss: Option<&setup::FlowSetupState<setup::ExistingMode>>,
2323
registry: &ExecutorFactoryRegistry,
24-
auth_registry: Arc<AuthRegistry>,
24+
auth_registry: &Arc<AuthRegistry>,
2525
) -> Result<Self> {
2626
let ctx = analyzer::build_flow_instance_context(&flow_instance.name, auth_registry);
2727
let (data_schema, execution_plan_fut, desired_state) =
2828
analyzer::analyze_flow(&flow_instance, &ctx, existing_flow_ss, registry)?;
2929
let setup_status_check =
30-
setup::check_flow_setup_status(Some(&desired_state), existing_flow_ss)?;
30+
setup::check_flow_setup_status(Some(&desired_state), existing_flow_ss, auth_registry)?;
3131
let execution_plan = if setup_status_check.is_up_to_date() {
3232
Some(
3333
async move {
@@ -73,7 +73,7 @@ impl AnalyzedTransientFlow {
7373
pub async fn from_transient_flow(
7474
transient_flow: spec::TransientFlowSpec,
7575
registry: &ExecutorFactoryRegistry,
76-
auth_registry: Arc<AuthRegistry>,
76+
auth_registry: &Arc<AuthRegistry>,
7777
) -> Result<Self> {
7878
let ctx = analyzer::build_flow_instance_context(&transient_flow.name, auth_registry);
7979
let (output_type, data_schema, execution_plan_fut) =

src/builder/analyzer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1029,11 +1029,11 @@ impl AnalyzerContext<'_> {
10291029

10301030
pub fn build_flow_instance_context(
10311031
flow_inst_name: &str,
1032-
auth_registry: Arc<AuthRegistry>,
1032+
auth_registry: &Arc<AuthRegistry>,
10331033
) -> Arc<FlowInstanceContext> {
10341034
Arc::new(FlowInstanceContext {
10351035
flow_instance_name: flow_inst_name.to_string(),
1036-
auth_registry,
1036+
auth_registry: auth_registry.clone(),
10371037
})
10381038
}
10391039

src/builder/flow_builder.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -347,8 +347,7 @@ impl FlowBuilder {
347347
.get(name)
348348
.cloned();
349349
let root_data_scope = Arc::new(Mutex::new(DataScopeBuilder::new()));
350-
let flow_inst_context =
351-
build_flow_instance_context(name, lib_context.auth_registry.clone());
350+
let flow_inst_context = build_flow_instance_context(name, &lib_context.auth_registry);
352351
let result = Self {
353352
lib_context,
354353
flow_inst_context,
@@ -650,7 +649,7 @@ impl FlowBuilder {
650649
spec,
651650
self.existing_flow_ss.as_ref(),
652651
&crate::ops::executor_factory_registry(),
653-
self.lib_context.auth_registry.clone(),
652+
&self.lib_context.auth_registry,
654653
))
655654
})
656655
.into_py_result()?;
@@ -691,7 +690,7 @@ impl FlowBuilder {
691690
get_runtime().block_on(super::AnalyzedTransientFlow::from_transient_flow(
692691
spec,
693692
&crate::ops::executor_factory_registry(),
694-
self.lib_context.auth_registry.clone(),
693+
&self.lib_context.auth_registry,
695694
))
696695
})
697696
.into_py_result()?;

src/ops/factory_bases.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,7 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
295295
key: Self::Key,
296296
desired_state: Option<Self::SetupState>,
297297
existing_states: setup::CombinedState<Self::SetupState>,
298+
auth_registry: &Arc<AuthRegistry>,
298299
) -> Result<impl setup::ResourceSetupStatusCheck<Self::Key, Self::SetupState> + 'static>;
299300

300301
fn check_state_compatibility(
@@ -402,6 +403,7 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
402403
key: &serde_json::Value,
403404
desired_state: Option<serde_json::Value>,
404405
existing_states: setup::CombinedState<serde_json::Value>,
406+
auth_registry: &Arc<AuthRegistry>,
405407
) -> Result<
406408
Box<
407409
dyn setup::ResourceSetupStatusCheck<serde_json::Value, serde_json::Value> + Send + Sync,
@@ -412,8 +414,13 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
412414
.map(|v| serde_json::from_value(v.clone()))
413415
.transpose()?;
414416
let existing_states = from_json_combined_state(existing_states)?;
415-
let status_check =
416-
StorageFactoryBase::check_setup_status(self, key, desired_state, existing_states)?;
417+
let status_check = StorageFactoryBase::check_setup_status(
418+
self,
419+
key,
420+
desired_state,
421+
existing_states,
422+
auth_registry,
423+
)?;
417424
Ok(Box::new(ResourceSetupStatusCheckWrapper::<T>::new(
418425
Box::new(status_check),
419426
)?))

src/ops/interface.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ pub trait ExportTargetFactory {
181181
key: &serde_json::Value,
182182
desired_state: Option<serde_json::Value>,
183183
existing_states: setup::CombinedState<serde_json::Value>,
184+
auth_registry: &Arc<AuthRegistry>,
184185
) -> Result<
185186
Box<
186187
dyn setup::ResourceSetupStatusCheck<serde_json::Value, serde_json::Value> + Send + Sync,

src/ops/registration.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ fn register_executor_factories(registry: &mut ExecutorFactoryRegistry) -> Result
1414

1515
Arc::new(storages::postgres::Factory::default()).register(registry)?;
1616

17+
let neo4j_pool = Arc::new(storages::neo4j::GraphPool::default());
18+
storages::neo4j::RelationshipFactory::new(neo4j_pool).register(registry)?;
19+
1720
Ok(())
1821
}
1922

0 commit comments

Comments
 (0)