Skip to content

Commit c59163a

Browse files
authored
Use static tokio runtime and init pyo3-async-runtimes with it. (#239)
1 parent 83b442e commit c59163a

File tree

8 files changed

+83
-118
lines changed

8 files changed

+83
-118
lines changed

src/builder/flow_builder.rs

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ use crate::{
1515
schema::{CollectorSchema, FieldSchema},
1616
spec::{FieldName, NamedSpec},
1717
},
18-
get_lib_context,
1918
lib_context::LibContext,
2019
ops::interface::FlowInstanceContext,
2120
py::IntoPyResult,
@@ -339,8 +338,7 @@ pub struct FlowBuilder {
339338
impl FlowBuilder {
340339
#[new]
341340
pub fn new(name: &str) -> PyResult<Self> {
342-
let lib_context = get_lib_context()
343-
.ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?;
341+
let lib_context = get_lib_context().into_py_result()?;
344342
let existing_flow_ss = lib_context
345343
.combined_setup_states
346344
.read()
@@ -643,13 +641,11 @@ impl FlowBuilder {
643641
};
644642
let analyzed_flow = py
645643
.allow_threads(|| {
646-
self.lib_context
647-
.runtime
648-
.block_on(super::AnalyzedFlow::from_flow_instance(
649-
spec,
650-
self.existing_flow_ss.as_ref(),
651-
&crate::ops::executor_factory_registry(),
652-
))
644+
get_runtime().block_on(super::AnalyzedFlow::from_flow_instance(
645+
spec,
646+
self.existing_flow_ss.as_ref(),
647+
&crate::ops::executor_factory_registry(),
648+
))
653649
})
654650
.into_py_result()?;
655651
let mut flow_ctxs = self.lib_context.flows.lock().unwrap();
@@ -686,12 +682,10 @@ impl FlowBuilder {
686682
};
687683
let analyzed_flow = py
688684
.allow_threads(|| {
689-
self.lib_context.runtime.block_on(
690-
super::AnalyzedTransientFlow::from_transient_flow(
691-
spec,
692-
&crate::ops::executor_factory_registry(),
693-
),
694-
)
685+
get_runtime().block_on(super::AnalyzedTransientFlow::from_transient_flow(
686+
spec,
687+
&crate::ops::executor_factory_registry(),
688+
))
695689
})
696690
.into_py_result()?;
697691
Ok(py::TransientFlow(Arc::new(analyzed_flow)))

src/execution/db_tracking_setup.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
use crate::get_lib_context;
1+
use crate::prelude::*;
2+
23
use crate::setup::{CombinedState, ResourceSetupStatusCheck, SetupChangeType};
3-
use anyhow::Result;
4-
use axum::async_trait;
54
use serde::{Deserialize, Serialize};
65
use sqlx::PgPool;
76

@@ -153,9 +152,7 @@ impl ResourceSetupStatusCheck for TrackingTableSetupStatusCheck {
153152
}
154153

155154
async fn apply_change(&self) -> Result<()> {
156-
let pool = &get_lib_context()
157-
.ok_or(anyhow::anyhow!("CocoIndex library not initialized"))?
158-
.pool;
155+
let pool = &get_lib_context()?.pool;
159156
if let Some(desired) = &self.desired_state {
160157
for lagacy_name in self.legacy_table_names.iter() {
161158
let query = format!(

src/lib.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,3 @@ mod service;
1111
mod settings;
1212
mod setup;
1313
mod utils;
14-
15-
use lib_context::LibContext;
16-
use std::sync::{Arc, RwLock};
17-
18-
static LIB_CONTEXT: RwLock<Option<Arc<LibContext>>> = RwLock::new(None);
19-
20-
pub(crate) fn get_lib_context() -> Option<Arc<LibContext>> {
21-
let lib_context_locked = LIB_CONTEXT.read().unwrap();
22-
lib_context_locked.as_ref().cloned()
23-
}

src/lib_context.rs

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
use crate::prelude::*;
22

3-
use std::collections::BTreeMap;
4-
use std::sync::{Arc, RwLock};
5-
63
use crate::execution::source_indexer::SourceIndexingContext;
74
use crate::service::error::ApiError;
85
use crate::settings;
96
use crate::setup;
107
use crate::{builder::AnalyzedFlow, execution::query::SimpleSemanticsQueryHandler};
118
use axum::http::StatusCode;
129
use sqlx::PgPool;
10+
use std::collections::BTreeMap;
1311
use tokio::runtime::Runtime;
1412

1513
pub struct FlowContext {
@@ -60,8 +58,9 @@ impl FlowContext {
6058
}
6159
}
6260

61+
static TOKIO_RUNTIME: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap());
62+
6363
pub struct LibContext {
64-
pub runtime: Runtime,
6564
pub pool: PgPool,
6665
pub flows: Mutex<BTreeMap<String, Arc<FlowContext>>>,
6766
pub combined_setup_states: RwLock<setup::AllSetupState<setup::ExistingMode>>,
@@ -83,20 +82,47 @@ impl LibContext {
8382
}
8483
}
8584

85+
pub fn get_runtime() -> &'static Runtime {
86+
&TOKIO_RUNTIME
87+
}
88+
89+
static LIB_INIT: OnceLock<()> = OnceLock::new();
8690
pub fn create_lib_context(settings: settings::Settings) -> Result<LibContext> {
87-
console_subscriber::init();
88-
env_logger::init();
91+
LIB_INIT.get_or_init(|| {
92+
console_subscriber::init();
93+
env_logger::init();
94+
pyo3_async_runtimes::tokio::init_with_runtime(get_runtime()).unwrap();
95+
});
8996

90-
let runtime = Runtime::new()?;
91-
let (pool, all_css) = runtime.block_on(async {
97+
let (pool, all_css) = get_runtime().block_on(async {
9298
let pool = PgPool::connect(&settings.database_url).await?;
9399
let existing_ss = setup::get_existing_setup_state(&pool).await?;
94100
anyhow::Ok((pool, existing_ss))
95101
})?;
96102
Ok(LibContext {
97-
runtime,
98103
pool,
99104
combined_setup_states: RwLock::new(all_css),
100105
flows: Mutex::new(BTreeMap::new()),
101106
})
102107
}
108+
109+
static LIB_CONTEXT: RwLock<Option<Arc<LibContext>>> = RwLock::new(None);
110+
111+
pub(crate) fn init_lib_context(settings: settings::Settings) -> Result<()> {
112+
let mut lib_context_locked = LIB_CONTEXT.write().unwrap();
113+
*lib_context_locked = Some(Arc::new(create_lib_context(settings)?));
114+
Ok(())
115+
}
116+
117+
pub(crate) fn get_lib_context() -> Result<Arc<LibContext>> {
118+
let lib_context_locked = LIB_CONTEXT.read().unwrap();
119+
lib_context_locked
120+
.as_ref()
121+
.cloned()
122+
.ok_or_else(|| anyhow!("CocoIndex library is not initialized or already stopped"))
123+
}
124+
125+
pub(crate) fn clear_lib_context() {
126+
let mut lib_context_locked = LIB_CONTEXT.write().unwrap();
127+
*lib_context_locked = None;
128+
}

src/ops/storages/postgres.rs

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
1-
use std::borrow::Cow;
2-
use std::collections::{BTreeMap, HashMap};
3-
use std::ops::Bound;
4-
use std::sync::{Arc, Mutex};
1+
use crate::prelude::*;
52

63
use crate::base::spec::{self, *};
74
use crate::ops::sdk::*;
85
use crate::service::error::{shared_ok, SharedError, SharedResultExt};
6+
use crate::setup;
97
use crate::utils::db::ValidIdentifier;
10-
use crate::{get_lib_context, setup};
11-
use anyhow::{anyhow, bail, Result};
128
use async_trait::async_trait;
139
use derivative::Derivative;
1410
use futures::future::{BoxFuture, Shared};
@@ -19,6 +15,7 @@ use serde::Serialize;
1915
use sqlx::postgres::types::PgRange;
2016
use sqlx::postgres::PgRow;
2117
use sqlx::{PgPool, Row};
18+
use std::ops::Bound;
2219
use uuid::Uuid;
2320

2421
#[derive(Debug, Deserialize)]
@@ -1005,12 +1002,7 @@ impl Factory {
10051002
shared_ok(if let Some(database_url) = database_url {
10061003
PgPool::connect(&database_url).await?
10071004
} else {
1008-
get_lib_context()
1009-
.ok_or_else(|| {
1010-
SharedError::new(anyhow!("Cocoindex is not initialized"))
1011-
})?
1012-
.pool
1013-
.clone()
1005+
get_lib_context().map_err(SharedError::new)?.pool.clone()
10141006
})
10151007
};
10161008
let shared_fut = pool_fut.boxed().shared();

src/prelude.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@ pub(crate) use futures::{future::BoxFuture, prelude::*, stream::BoxStream};
66
pub(crate) use futures::{FutureExt, StreamExt};
77
pub(crate) use itertools::Itertools;
88
pub(crate) use serde::{Deserialize, Serialize};
9-
pub(crate) use std::sync::{Arc, Mutex, Weak};
9+
pub(crate) use std::borrow::Cow;
10+
pub(crate) use std::collections::{BTreeMap, HashMap};
11+
pub(crate) use std::sync::{Arc, LazyLock, Mutex, OnceLock, RwLock, Weak};
1012

1113
pub(crate) use crate::base::{schema, spec, value};
1214
pub(crate) use crate::builder::{self, plan};
1315
pub(crate) use crate::execution;
14-
pub(crate) use crate::lib_context::{FlowContext, LibContext};
16+
pub(crate) use crate::lib_context::{get_lib_context, get_runtime, FlowContext, LibContext};
1517
pub(crate) use crate::ops::interface;
1618
pub(crate) use crate::service::error::ApiError;
1719

0 commit comments

Comments
 (0)