diff --git a/src/builder/analyzed_flow.rs b/src/builder/analyzed_flow.rs index 3bd037787..548962f88 100644 --- a/src/builder/analyzed_flow.rs +++ b/src/builder/analyzed_flow.rs @@ -30,7 +30,7 @@ impl AnalyzedFlow { registry, )?; let setup_status_check = - setup::check_flow_setup_status(Some(&desired_state), existing_flow_ss)?; + setup::check_flow_setup_status(Some(&desired_state), existing_flow_ss).await?; let execution_plan = if setup_status_check.is_up_to_date() { Some( async move { diff --git a/src/ops/factory_bases.rs b/src/ops/factory_bases.rs index 4b8385deb..a83ba8b74 100644 --- a/src/ops/factory_bases.rs +++ b/src/ops/factory_bases.rs @@ -304,7 +304,7 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static { /// Will not be called if it's setup by user. /// It returns an error if the target only supports setup by user. - fn check_setup_status( + async fn check_setup_status( &self, key: Self::Key, desired_state: Option, @@ -392,7 +392,7 @@ impl ExportTargetFactory for T { Ok((data_coll_output, decl_output)) } - fn check_setup_status( + async fn check_setup_status( &self, key: &serde_json::Value, desired_state: Option, @@ -410,7 +410,8 @@ impl ExportTargetFactory for T { desired_state, existing_states, auth_registry, - )?; + ) + .await?; Ok(Box::new(status_check)) } diff --git a/src/ops/interface.rs b/src/ops/interface.rs index 357592b2c..4c26e431d 100644 --- a/src/ops/interface.rs +++ b/src/ops/interface.rs @@ -190,7 +190,7 @@ pub trait ExportTargetFactory: Send + Sync { /// Will not be called if it's setup by user. /// It returns an error if the target only supports setup by user. - fn check_setup_status( + async fn check_setup_status( &self, key: &serde_json::Value, desired_state: Option, diff --git a/src/ops/storages/neo4j.rs b/src/ops/storages/neo4j.rs index e028574aa..70b62caa3 100644 --- a/src/ops/storages/neo4j.rs +++ b/src/ops/storages/neo4j.rs @@ -1248,7 +1248,7 @@ impl StorageFactoryBase for Factory { Ok((data_coll_output, decl_output)) } - fn check_setup_status( + async fn check_setup_status( &self, key: GraphElement, desired: Option, diff --git a/src/ops/storages/postgres.rs b/src/ops/storages/postgres.rs index 3af2e6553..d5254ed6a 100644 --- a/src/ops/storages/postgres.rs +++ b/src/ops/storages/postgres.rs @@ -967,7 +967,7 @@ impl StorageFactoryBase for Arc { Ok((data_coll_output, vec![])) } - fn check_setup_status( + async fn check_setup_status( &self, key: TableId, desired: Option, diff --git a/src/ops/storages/qdrant.rs b/src/ops/storages/qdrant.rs index 5cbf1a27b..7da0b558d 100644 --- a/src/ops/storages/qdrant.rs +++ b/src/ops/storages/qdrant.rs @@ -382,7 +382,7 @@ impl StorageFactoryBase for Arc { Ok((data_coll_output, vec![])) } - fn check_setup_status( + async fn check_setup_status( &self, _key: String, _desired: Option<()>, diff --git a/src/py/mod.rs b/src/py/mod.rs index 081569fa7..2c4579247 100644 --- a/src/py/mod.rs +++ b/src/py/mod.rs @@ -304,20 +304,32 @@ impl SetupStatusCheck { } #[pyfunction] -fn sync_setup() -> PyResult { +fn sync_setup(py: Python<'_>) -> PyResult { let lib_context = get_lib_context().into_py_result()?; let flows = lib_context.flows.lock().unwrap(); let all_setup_states = lib_context.all_setup_states.read().unwrap(); - let setup_status = setup::sync_setup(&flows, &all_setup_states).into_py_result()?; - Ok(SetupStatusCheck(setup_status)) + py.allow_threads(|| { + get_runtime() + .block_on(async { + let setup_status = setup::sync_setup(&flows, &all_setup_states).await?; + anyhow::Ok(SetupStatusCheck(setup_status)) + }) + .into_py_result() + }) } #[pyfunction] -fn drop_setup(flow_names: Vec) -> PyResult { +fn drop_setup(py: Python<'_>, flow_names: Vec) -> PyResult { let lib_context = get_lib_context().into_py_result()?; let all_setup_states = lib_context.all_setup_states.read().unwrap(); - let setup_status = setup::drop_setup(flow_names, &all_setup_states).into_py_result()?; - Ok(SetupStatusCheck(setup_status)) + py.allow_threads(|| { + get_runtime() + .block_on(async { + let setup_status = setup::drop_setup(flow_names, &all_setup_states).await?; + anyhow::Ok(SetupStatusCheck(setup_status)) + }) + .into_py_result() + }) } #[pyfunction] diff --git a/src/setup/driver.rs b/src/setup/driver.rs index 1e2c950ba..e26504265 100644 --- a/src/setup/driver.rs +++ b/src/setup/driver.rs @@ -206,7 +206,7 @@ fn group_resource_states<'a>( Ok(grouped) } -pub fn check_flow_setup_status( +pub async fn check_flow_setup_status( desired_state: Option<&FlowSetupState>, existing_state: Option<&FlowSetupState>, ) -> Result { @@ -286,12 +286,16 @@ pub fn check_flow_setup_status( let status_check = if never_setup_by_sys { None } else { - Some(factory.check_setup_status( - &resource_id.key, - target_state, - existing_without_setup_by_user, - get_auth_registry(), - )?) + Some( + factory + .check_setup_status( + &resource_id.key, + target_state, + existing_without_setup_by_user, + get_auth_registry(), + ) + .await?, + ) }; target_resources.push(ResourceSetupInfo { key: resource_id.clone(), @@ -310,7 +314,7 @@ pub fn check_flow_setup_status( }) } -pub fn sync_setup( +pub async fn sync_setup( flows: &BTreeMap>, all_setup_state: &AllSetupState, ) -> Result { @@ -319,7 +323,7 @@ pub fn sync_setup( let existing_state = all_setup_state.flows.get(flow_name); flow_status_checks.insert( flow_name.clone(), - check_flow_setup_status(Some(&flow_context.flow.desired_state), existing_state)?, + check_flow_setup_status(Some(&flow_context.flow.desired_state), existing_state).await?, ); } Ok(AllSetupStatusCheck { @@ -331,7 +335,7 @@ pub fn sync_setup( }) } -pub fn drop_setup( +pub async fn drop_setup( flow_names: impl IntoIterator, all_setup_state: &AllSetupState, ) -> Result { @@ -343,7 +347,7 @@ pub fn drop_setup( if let Some(existing_state) = all_setup_state.flows.get(&flow_name) { flow_status_checks.insert( flow_name, - check_flow_setup_status(None, Some(existing_state))?, + check_flow_setup_status(None, Some(existing_state)).await?, ); } }