Skip to content

Commit 39ce910

Browse files
authored
refactor(setup): make check_setup_status() async (#390)
1 parent c4f0df8 commit 39ce910

File tree

8 files changed

+42
-25
lines changed

8 files changed

+42
-25
lines changed

src/builder/analyzed_flow.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ impl AnalyzedFlow {
3030
registry,
3131
)?;
3232
let setup_status_check =
33-
setup::check_flow_setup_status(Some(&desired_state), existing_flow_ss)?;
33+
setup::check_flow_setup_status(Some(&desired_state), existing_flow_ss).await?;
3434
let execution_plan = if setup_status_check.is_up_to_date() {
3535
Some(
3636
async move {

src/ops/factory_bases.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
304304

305305
/// Will not be called if it's setup by user.
306306
/// It returns an error if the target only supports setup by user.
307-
fn check_setup_status(
307+
async fn check_setup_status(
308308
&self,
309309
key: Self::Key,
310310
desired_state: Option<Self::SetupState>,
@@ -392,7 +392,7 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
392392
Ok((data_coll_output, decl_output))
393393
}
394394

395-
fn check_setup_status(
395+
async fn check_setup_status(
396396
&self,
397397
key: &serde_json::Value,
398398
desired_state: Option<serde_json::Value>,
@@ -410,7 +410,8 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
410410
desired_state,
411411
existing_states,
412412
auth_registry,
413-
)?;
413+
)
414+
.await?;
414415
Ok(Box::new(status_check))
415416
}
416417

src/ops/interface.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ pub trait ExportTargetFactory: Send + Sync {
190190

191191
/// Will not be called if it's setup by user.
192192
/// It returns an error if the target only supports setup by user.
193-
fn check_setup_status(
193+
async fn check_setup_status(
194194
&self,
195195
key: &serde_json::Value,
196196
desired_state: Option<serde_json::Value>,

src/ops/storages/neo4j.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1248,7 +1248,7 @@ impl StorageFactoryBase for Factory {
12481248
Ok((data_coll_output, decl_output))
12491249
}
12501250

1251-
fn check_setup_status(
1251+
async fn check_setup_status(
12521252
&self,
12531253
key: GraphElement,
12541254
desired: Option<SetupState>,

src/ops/storages/postgres.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -967,7 +967,7 @@ impl StorageFactoryBase for Arc<Factory> {
967967
Ok((data_coll_output, vec![]))
968968
}
969969

970-
fn check_setup_status(
970+
async fn check_setup_status(
971971
&self,
972972
key: TableId,
973973
desired: Option<SetupState>,

src/ops/storages/qdrant.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ impl StorageFactoryBase for Arc<Factory> {
382382
Ok((data_coll_output, vec![]))
383383
}
384384

385-
fn check_setup_status(
385+
async fn check_setup_status(
386386
&self,
387387
_key: String,
388388
_desired: Option<()>,

src/py/mod.rs

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -304,20 +304,32 @@ impl SetupStatusCheck {
304304
}
305305

306306
#[pyfunction]
307-
fn sync_setup() -> PyResult<SetupStatusCheck> {
307+
fn sync_setup(py: Python<'_>) -> PyResult<SetupStatusCheck> {
308308
let lib_context = get_lib_context().into_py_result()?;
309309
let flows = lib_context.flows.lock().unwrap();
310310
let all_setup_states = lib_context.all_setup_states.read().unwrap();
311-
let setup_status = setup::sync_setup(&flows, &all_setup_states).into_py_result()?;
312-
Ok(SetupStatusCheck(setup_status))
311+
py.allow_threads(|| {
312+
get_runtime()
313+
.block_on(async {
314+
let setup_status = setup::sync_setup(&flows, &all_setup_states).await?;
315+
anyhow::Ok(SetupStatusCheck(setup_status))
316+
})
317+
.into_py_result()
318+
})
313319
}
314320

315321
#[pyfunction]
316-
fn drop_setup(flow_names: Vec<String>) -> PyResult<SetupStatusCheck> {
322+
fn drop_setup(py: Python<'_>, flow_names: Vec<String>) -> PyResult<SetupStatusCheck> {
317323
let lib_context = get_lib_context().into_py_result()?;
318324
let all_setup_states = lib_context.all_setup_states.read().unwrap();
319-
let setup_status = setup::drop_setup(flow_names, &all_setup_states).into_py_result()?;
320-
Ok(SetupStatusCheck(setup_status))
325+
py.allow_threads(|| {
326+
get_runtime()
327+
.block_on(async {
328+
let setup_status = setup::drop_setup(flow_names, &all_setup_states).await?;
329+
anyhow::Ok(SetupStatusCheck(setup_status))
330+
})
331+
.into_py_result()
332+
})
321333
}
322334

323335
#[pyfunction]

src/setup/driver.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ fn group_resource_states<'a>(
206206
Ok(grouped)
207207
}
208208

209-
pub fn check_flow_setup_status(
209+
pub async fn check_flow_setup_status(
210210
desired_state: Option<&FlowSetupState<DesiredMode>>,
211211
existing_state: Option<&FlowSetupState<ExistingMode>>,
212212
) -> Result<FlowSetupStatusCheck> {
@@ -286,12 +286,16 @@ pub fn check_flow_setup_status(
286286
let status_check = if never_setup_by_sys {
287287
None
288288
} else {
289-
Some(factory.check_setup_status(
290-
&resource_id.key,
291-
target_state,
292-
existing_without_setup_by_user,
293-
get_auth_registry(),
294-
)?)
289+
Some(
290+
factory
291+
.check_setup_status(
292+
&resource_id.key,
293+
target_state,
294+
existing_without_setup_by_user,
295+
get_auth_registry(),
296+
)
297+
.await?,
298+
)
295299
};
296300
target_resources.push(ResourceSetupInfo {
297301
key: resource_id.clone(),
@@ -310,7 +314,7 @@ pub fn check_flow_setup_status(
310314
})
311315
}
312316

313-
pub fn sync_setup(
317+
pub async fn sync_setup(
314318
flows: &BTreeMap<String, Arc<FlowContext>>,
315319
all_setup_state: &AllSetupState<ExistingMode>,
316320
) -> Result<AllSetupStatusCheck> {
@@ -319,7 +323,7 @@ pub fn sync_setup(
319323
let existing_state = all_setup_state.flows.get(flow_name);
320324
flow_status_checks.insert(
321325
flow_name.clone(),
322-
check_flow_setup_status(Some(&flow_context.flow.desired_state), existing_state)?,
326+
check_flow_setup_status(Some(&flow_context.flow.desired_state), existing_state).await?,
323327
);
324328
}
325329
Ok(AllSetupStatusCheck {
@@ -331,7 +335,7 @@ pub fn sync_setup(
331335
})
332336
}
333337

334-
pub fn drop_setup(
338+
pub async fn drop_setup(
335339
flow_names: impl IntoIterator<Item = String>,
336340
all_setup_state: &AllSetupState<ExistingMode>,
337341
) -> Result<AllSetupStatusCheck> {
@@ -343,7 +347,7 @@ pub fn drop_setup(
343347
if let Some(existing_state) = all_setup_state.flows.get(&flow_name) {
344348
flow_status_checks.insert(
345349
flow_name,
346-
check_flow_setup_status(None, Some(existing_state))?,
350+
check_flow_setup_status(None, Some(existing_state)).await?,
347351
);
348352
}
349353
}

0 commit comments

Comments
 (0)