diff --git a/src/builder/flow_builder.rs b/src/builder/flow_builder.rs index 2812fc961..b87509b04 100644 --- a/src/builder/flow_builder.rs +++ b/src/builder/flow_builder.rs @@ -15,7 +15,6 @@ use crate::{ schema::{CollectorSchema, FieldSchema}, spec::{FieldName, NamedSpec}, }, - get_lib_context, lib_context::LibContext, ops::interface::FlowInstanceContext, py::IntoPyResult, @@ -339,8 +338,7 @@ pub struct FlowBuilder { impl FlowBuilder { #[new] pub fn new(name: &str) -> PyResult { - let lib_context = get_lib_context() - .ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?; + let lib_context = get_lib_context().into_py_result()?; let existing_flow_ss = lib_context .combined_setup_states .read() @@ -643,13 +641,11 @@ impl FlowBuilder { }; let analyzed_flow = py .allow_threads(|| { - self.lib_context - .runtime - .block_on(super::AnalyzedFlow::from_flow_instance( - spec, - self.existing_flow_ss.as_ref(), - &crate::ops::executor_factory_registry(), - )) + get_runtime().block_on(super::AnalyzedFlow::from_flow_instance( + spec, + self.existing_flow_ss.as_ref(), + &crate::ops::executor_factory_registry(), + )) }) .into_py_result()?; let mut flow_ctxs = self.lib_context.flows.lock().unwrap(); @@ -686,12 +682,10 @@ impl FlowBuilder { }; let analyzed_flow = py .allow_threads(|| { - self.lib_context.runtime.block_on( - super::AnalyzedTransientFlow::from_transient_flow( - spec, - &crate::ops::executor_factory_registry(), - ), - ) + get_runtime().block_on(super::AnalyzedTransientFlow::from_transient_flow( + spec, + &crate::ops::executor_factory_registry(), + )) }) .into_py_result()?; Ok(py::TransientFlow(Arc::new(analyzed_flow))) diff --git a/src/execution/db_tracking_setup.rs b/src/execution/db_tracking_setup.rs index a559b203a..62554b191 100644 --- a/src/execution/db_tracking_setup.rs +++ b/src/execution/db_tracking_setup.rs @@ -1,7 +1,6 @@ -use crate::get_lib_context; +use crate::prelude::*; + use crate::setup::{CombinedState, ResourceSetupStatusCheck, SetupChangeType}; -use anyhow::Result; -use axum::async_trait; use serde::{Deserialize, Serialize}; use sqlx::PgPool; @@ -153,9 +152,7 @@ impl ResourceSetupStatusCheck for TrackingTableSetupStatusCheck { } async fn apply_change(&self) -> Result<()> { - let pool = &get_lib_context() - .ok_or(anyhow::anyhow!("CocoIndex library not initialized"))? - .pool; + let pool = &get_lib_context()?.pool; if let Some(desired) = &self.desired_state { for lagacy_name in self.legacy_table_names.iter() { let query = format!( diff --git a/src/lib.rs b/src/lib.rs index 6ce035d8f..9d916528b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,13 +11,3 @@ mod service; mod settings; mod setup; mod utils; - -use lib_context::LibContext; -use std::sync::{Arc, RwLock}; - -static LIB_CONTEXT: RwLock>> = RwLock::new(None); - -pub(crate) fn get_lib_context() -> Option> { - let lib_context_locked = LIB_CONTEXT.read().unwrap(); - lib_context_locked.as_ref().cloned() -} diff --git a/src/lib_context.rs b/src/lib_context.rs index 104ed2789..7b3a248a2 100644 --- a/src/lib_context.rs +++ b/src/lib_context.rs @@ -1,8 +1,5 @@ use crate::prelude::*; -use std::collections::BTreeMap; -use std::sync::{Arc, RwLock}; - use crate::execution::source_indexer::SourceIndexingContext; use crate::service::error::ApiError; use crate::settings; @@ -10,6 +7,7 @@ use crate::setup; use crate::{builder::AnalyzedFlow, execution::query::SimpleSemanticsQueryHandler}; use axum::http::StatusCode; use sqlx::PgPool; +use std::collections::BTreeMap; use tokio::runtime::Runtime; pub struct FlowContext { @@ -60,8 +58,9 @@ impl FlowContext { } } +static TOKIO_RUNTIME: LazyLock = LazyLock::new(|| Runtime::new().unwrap()); + pub struct LibContext { - pub runtime: Runtime, pub pool: PgPool, pub flows: Mutex>>, pub combined_setup_states: RwLock>, @@ -83,20 +82,47 @@ impl LibContext { } } +pub fn get_runtime() -> &'static Runtime { + &TOKIO_RUNTIME +} + +static LIB_INIT: OnceLock<()> = OnceLock::new(); pub fn create_lib_context(settings: settings::Settings) -> Result { - console_subscriber::init(); - env_logger::init(); + LIB_INIT.get_or_init(|| { + console_subscriber::init(); + env_logger::init(); + pyo3_async_runtimes::tokio::init_with_runtime(get_runtime()).unwrap(); + }); - let runtime = Runtime::new()?; - let (pool, all_css) = runtime.block_on(async { + let (pool, all_css) = get_runtime().block_on(async { let pool = PgPool::connect(&settings.database_url).await?; let existing_ss = setup::get_existing_setup_state(&pool).await?; anyhow::Ok((pool, existing_ss)) })?; Ok(LibContext { - runtime, pool, combined_setup_states: RwLock::new(all_css), flows: Mutex::new(BTreeMap::new()), }) } + +static LIB_CONTEXT: RwLock>> = RwLock::new(None); + +pub(crate) fn init_lib_context(settings: settings::Settings) -> Result<()> { + let mut lib_context_locked = LIB_CONTEXT.write().unwrap(); + *lib_context_locked = Some(Arc::new(create_lib_context(settings)?)); + Ok(()) +} + +pub(crate) fn get_lib_context() -> Result> { + let lib_context_locked = LIB_CONTEXT.read().unwrap(); + lib_context_locked + .as_ref() + .cloned() + .ok_or_else(|| anyhow!("CocoIndex library is not initialized or already stopped")) +} + +pub(crate) fn clear_lib_context() { + let mut lib_context_locked = LIB_CONTEXT.write().unwrap(); + *lib_context_locked = None; +} diff --git a/src/ops/storages/postgres.rs b/src/ops/storages/postgres.rs index 8fb3c9db9..1cbec79d9 100644 --- a/src/ops/storages/postgres.rs +++ b/src/ops/storages/postgres.rs @@ -1,14 +1,10 @@ -use std::borrow::Cow; -use std::collections::{BTreeMap, HashMap}; -use std::ops::Bound; -use std::sync::{Arc, Mutex}; +use crate::prelude::*; use crate::base::spec::{self, *}; use crate::ops::sdk::*; use crate::service::error::{shared_ok, SharedError, SharedResultExt}; +use crate::setup; use crate::utils::db::ValidIdentifier; -use crate::{get_lib_context, setup}; -use anyhow::{anyhow, bail, Result}; use async_trait::async_trait; use derivative::Derivative; use futures::future::{BoxFuture, Shared}; @@ -19,6 +15,7 @@ use serde::Serialize; use sqlx::postgres::types::PgRange; use sqlx::postgres::PgRow; use sqlx::{PgPool, Row}; +use std::ops::Bound; use uuid::Uuid; #[derive(Debug, Deserialize)] @@ -1005,12 +1002,7 @@ impl Factory { shared_ok(if let Some(database_url) = database_url { PgPool::connect(&database_url).await? } else { - get_lib_context() - .ok_or_else(|| { - SharedError::new(anyhow!("Cocoindex is not initialized")) - })? - .pool - .clone() + get_lib_context().map_err(SharedError::new)?.pool.clone() }) }; let shared_fut = pool_fut.boxed().shared(); diff --git a/src/prelude.rs b/src/prelude.rs index e47a6b8ca..483c4fd64 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -6,12 +6,14 @@ pub(crate) use futures::{future::BoxFuture, prelude::*, stream::BoxStream}; pub(crate) use futures::{FutureExt, StreamExt}; pub(crate) use itertools::Itertools; pub(crate) use serde::{Deserialize, Serialize}; -pub(crate) use std::sync::{Arc, Mutex, Weak}; +pub(crate) use std::borrow::Cow; +pub(crate) use std::collections::{BTreeMap, HashMap}; +pub(crate) use std::sync::{Arc, LazyLock, Mutex, OnceLock, RwLock, Weak}; pub(crate) use crate::base::{schema, spec, value}; pub(crate) use crate::builder::{self, plan}; pub(crate) use crate::execution; -pub(crate) use crate::lib_context::{FlowContext, LibContext}; +pub(crate) use crate::lib_context::{get_lib_context, get_runtime, FlowContext, LibContext}; pub(crate) use crate::ops::interface; pub(crate) use crate::service::error::ApiError; diff --git a/src/py/mod.rs b/src/py/mod.rs index fbcf9d096..e6f3c25ad 100644 --- a/src/py/mod.rs +++ b/src/py/mod.rs @@ -2,15 +2,13 @@ use crate::prelude::*; use crate::base::spec::VectorSimilarityMetric; use crate::execution::query; -use crate::get_lib_context; -use crate::lib_context::create_lib_context; +use crate::lib_context::{clear_lib_context, init_lib_context}; use crate::ops::interface::QueryResults; use crate::ops::py_factory::PyOpArgSchema; use crate::ops::{interface::ExecutorFactory, py_factory::PyFunctionFactory, register_factory}; use crate::server::{self, ServerSettings}; use crate::settings::Settings; use crate::setup; -use crate::LIB_CONTEXT; use pyo3::{exceptions::PyException, prelude::*}; use std::collections::btree_map; @@ -33,11 +31,7 @@ impl IntoPyResult for Result { #[pyfunction] fn init(py: Python<'_>, settings: Pythonized) -> PyResult<()> { py.allow_threads(|| -> anyhow::Result<()> { - let mut lib_context_locked = LIB_CONTEXT.write().unwrap(); - if lib_context_locked.is_some() { - return Ok(()); - } - *lib_context_locked = Some(Arc::new(create_lib_context(settings.into_inner())?)); + init_lib_context(settings.into_inner())?; Ok(()) }) .into_py_result() @@ -46,13 +40,11 @@ fn init(py: Python<'_>, settings: Pythonized) -> PyResult<()> { #[pyfunction] fn start_server(py: Python<'_>, settings: Pythonized) -> PyResult<()> { py.allow_threads(|| -> anyhow::Result<()> { - let lib_context = - get_lib_context().ok_or_else(|| api_error!("Cocoindex is not initialized"))?; - let server = lib_context.runtime.block_on(server::init_server( - lib_context.clone(), + let server = get_runtime().block_on(server::init_server( + get_lib_context()?, settings.into_inner(), ))?; - lib_context.runtime.spawn(server); + get_runtime().spawn(server); Ok(()) }) .into_py_result() @@ -60,10 +52,7 @@ fn start_server(py: Python<'_>, settings: Pythonized) -> PyResul #[pyfunction] fn stop(py: Python<'_>) -> PyResult<()> { - py.allow_threads(|| { - let mut runtime_context_locked = LIB_CONTEXT.write().unwrap(); - *runtime_context_locked = None; - }); + py.allow_threads(clear_lib_context); Ok(()) } @@ -137,12 +126,10 @@ impl Flow { pub fn update<'py>(&self, py: Python<'py>) -> PyResult> { let flow_ctx = self.0.clone(); pyo3_async_runtimes::tokio::future_into_py(py, async move { - let lib_context = get_lib_context() - .ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?; let update_info = { let mut synchronizer = execution::FlowSynchronizer::start( flow_ctx, - &lib_context.pool, + &get_lib_context().into_py_result()?.pool, &execution::FlowSynchronizerOptions { keep_refreshed: false, }, @@ -158,14 +145,11 @@ impl Flow { pub fn keep_in_sync(&self, py: Python<'_>) -> PyResult { py.allow_threads(|| { - let lib_context = get_lib_context() - .ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?; - let synchronizer = lib_context - .runtime + let synchronizer = get_runtime() .block_on(async { let synchronizer = execution::FlowSynchronizer::start( self.0.clone(), - &lib_context.pool, + &get_lib_context()?.pool, &execution::FlowSynchronizerOptions { keep_refreshed: false, }, @@ -186,17 +170,14 @@ impl Flow { options: Pythonized, ) -> PyResult<()> { py.allow_threads(|| { - let lib_context = get_lib_context() - .ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?; - lib_context - .runtime + get_runtime() .block_on(async { let exec_plan = self.0.flow.get_execution_plan().await?; execution::dumper::evaluate_and_dump( &exec_plan, &self.0.flow.data_schema, options.into_inner(), - &lib_context.pool, + &get_lib_context()?.pool, ) .await }) @@ -234,11 +215,7 @@ impl SimpleSemanticsQueryHandler { default_similarity_metric: Pythonized, ) -> PyResult { py.allow_threads(|| { - let lib_context = get_lib_context() - .ok_or_else(|| api_error!("Cocoindex is not initialized")) - .into_py_result()?; - let handler = lib_context - .runtime + let handler = get_runtime() .block_on(query::SimpleSemanticsQueryHandler::new( flow.0.flow.clone(), target_name, @@ -251,9 +228,8 @@ impl SimpleSemanticsQueryHandler { } pub fn register_query_handler(&self, name: String) -> PyResult<()> { - let lib_context = get_lib_context() - .ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?; - let flow_ctx = lib_context + let flow_ctx = get_lib_context() + .into_py_result()? .get_flow_context(&self.0.flow_name) .into_py_result()?; let mut query_handlers = flow_ctx.query_handlers.lock().unwrap(); @@ -284,11 +260,7 @@ impl SimpleSemanticsQueryHandler { Pythonized, )> { py.allow_threads(|| { - let lib_context = get_lib_context() - .ok_or_else(|| anyhow!("cocoindex library not initialized")) - .into_py_result()?; - let (results, info) = lib_context - .runtime + let (results, info) = get_runtime() .block_on(async move { self.0 .search( @@ -327,8 +299,7 @@ impl SetupStatusCheck { fn check_setup_status( options: Pythonized, ) -> PyResult { - let lib_context = get_lib_context() - .ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?; + let lib_context = get_lib_context().into_py_result()?; let flows = lib_context.flows.lock().unwrap(); let all_css = lib_context.combined_setup_states.read().unwrap(); let setup_status = @@ -339,13 +310,14 @@ fn check_setup_status( #[pyfunction] fn apply_setup_changes(py: Python<'_>, setup_status: &SetupStatusCheck) -> PyResult<()> { py.allow_threads(|| { - let lib_context = get_lib_context() - .ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?; - lib_context - .runtime + get_runtime() .block_on(async { - setup::apply_changes(&mut std::io::stdout(), &setup_status.0, &lib_context.pool) - .await + setup::apply_changes( + &mut std::io::stdout(), + &setup_status.0, + &get_lib_context()?.pool, + ) + .await }) .into_py_result()?; Ok(()) diff --git a/src/setup/db_metadata.rs b/src/setup/db_metadata.rs index 0bc310b70..370daa843 100644 --- a/src/setup/db_metadata.rs +++ b/src/setup/db_metadata.rs @@ -1,16 +1,10 @@ -use std::{collections::HashMap, sync::LazyLock}; +use crate::prelude::*; -use anyhow::Result; -use axum::async_trait; +use super::{ResourceSetupStatusCheck, SetupChangeType, StateChange}; +use crate::utils::db::WriteAction; use axum::http::StatusCode; -use log::warn; use sqlx::PgPool; -use crate::utils::db::WriteAction; -use crate::{get_lib_context, service::error::ApiError}; - -use super::{ResourceSetupStatusCheck, SetupChangeType, StateChange}; - const SETUP_METADATA_TABLE_NAME: &str = "cocoindex_setup_metadata"; pub const FLOW_VERSION_RESOURCE_TYPE: &str = "__FlowVersion"; @@ -340,9 +334,7 @@ impl ResourceSetupStatusCheck for MetadataTableSetup { if !self.metadata_table_missing { return Ok(()); } - let pool = &get_lib_context() - .ok_or_else(|| anyhow::anyhow!("Library not initialized"))? - .pool; + let pool = &get_lib_context()?.pool; let query_str = format!( "CREATE TABLE IF NOT EXISTS {SETUP_METADATA_TABLE_NAME} ( flow_name TEXT NOT NULL,