Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 10 additions & 16 deletions src/builder/flow_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use crate::{
schema::{CollectorSchema, FieldSchema},
spec::{FieldName, NamedSpec},
},
get_lib_context,
lib_context::LibContext,
ops::interface::FlowInstanceContext,
py::IntoPyResult,
Expand Down Expand Up @@ -339,8 +338,7 @@ pub struct FlowBuilder {
impl FlowBuilder {
#[new]
pub fn new(name: &str) -> PyResult<Self> {
let lib_context = get_lib_context()
.ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?;
let lib_context = get_lib_context().into_py_result()?;
let existing_flow_ss = lib_context
.combined_setup_states
.read()
Expand Down Expand Up @@ -643,13 +641,11 @@ impl FlowBuilder {
};
let analyzed_flow = py
.allow_threads(|| {
self.lib_context
.runtime
.block_on(super::AnalyzedFlow::from_flow_instance(
spec,
self.existing_flow_ss.as_ref(),
&crate::ops::executor_factory_registry(),
))
get_runtime().block_on(super::AnalyzedFlow::from_flow_instance(
spec,
self.existing_flow_ss.as_ref(),
&crate::ops::executor_factory_registry(),
))
})
.into_py_result()?;
let mut flow_ctxs = self.lib_context.flows.lock().unwrap();
Expand Down Expand Up @@ -686,12 +682,10 @@ impl FlowBuilder {
};
let analyzed_flow = py
.allow_threads(|| {
self.lib_context.runtime.block_on(
super::AnalyzedTransientFlow::from_transient_flow(
spec,
&crate::ops::executor_factory_registry(),
),
)
get_runtime().block_on(super::AnalyzedTransientFlow::from_transient_flow(
spec,
&crate::ops::executor_factory_registry(),
))
})
.into_py_result()?;
Ok(py::TransientFlow(Arc::new(analyzed_flow)))
Expand Down
9 changes: 3 additions & 6 deletions src/execution/db_tracking_setup.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::get_lib_context;
use crate::prelude::*;

use crate::setup::{CombinedState, ResourceSetupStatusCheck, SetupChangeType};
use anyhow::Result;
use axum::async_trait;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;

Expand Down Expand Up @@ -153,9 +152,7 @@ impl ResourceSetupStatusCheck for TrackingTableSetupStatusCheck {
}

async fn apply_change(&self) -> Result<()> {
let pool = &get_lib_context()
.ok_or(anyhow::anyhow!("CocoIndex library not initialized"))?
.pool;
let pool = &get_lib_context()?.pool;
if let Some(desired) = &self.desired_state {
for lagacy_name in self.legacy_table_names.iter() {
let query = format!(
Expand Down
10 changes: 0 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,3 @@ mod service;
mod settings;
mod setup;
mod utils;

use lib_context::LibContext;
use std::sync::{Arc, RwLock};

static LIB_CONTEXT: RwLock<Option<Arc<LibContext>>> = RwLock::new(None);

pub(crate) fn get_lib_context() -> Option<Arc<LibContext>> {
let lib_context_locked = LIB_CONTEXT.read().unwrap();
lib_context_locked.as_ref().cloned()
}
44 changes: 35 additions & 9 deletions src/lib_context.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
use crate::prelude::*;

use std::collections::BTreeMap;
use std::sync::{Arc, RwLock};

use crate::execution::source_indexer::SourceIndexingContext;
use crate::service::error::ApiError;
use crate::settings;
use crate::setup;
use crate::{builder::AnalyzedFlow, execution::query::SimpleSemanticsQueryHandler};
use axum::http::StatusCode;
use sqlx::PgPool;
use std::collections::BTreeMap;
use tokio::runtime::Runtime;

pub struct FlowContext {
Expand Down Expand Up @@ -60,8 +58,9 @@ impl FlowContext {
}
}

static TOKIO_RUNTIME: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap());

pub struct LibContext {
pub runtime: Runtime,
pub pool: PgPool,
pub flows: Mutex<BTreeMap<String, Arc<FlowContext>>>,
pub combined_setup_states: RwLock<setup::AllSetupState<setup::ExistingMode>>,
Expand All @@ -83,20 +82,47 @@ impl LibContext {
}
}

pub fn get_runtime() -> &'static Runtime {
&TOKIO_RUNTIME
}

static LIB_INIT: OnceLock<()> = OnceLock::new();
pub fn create_lib_context(settings: settings::Settings) -> Result<LibContext> {
console_subscriber::init();
env_logger::init();
LIB_INIT.get_or_init(|| {
console_subscriber::init();
env_logger::init();
pyo3_async_runtimes::tokio::init_with_runtime(get_runtime()).unwrap();
});

let runtime = Runtime::new()?;
let (pool, all_css) = runtime.block_on(async {
let (pool, all_css) = get_runtime().block_on(async {
let pool = PgPool::connect(&settings.database_url).await?;
let existing_ss = setup::get_existing_setup_state(&pool).await?;
anyhow::Ok((pool, existing_ss))
})?;
Ok(LibContext {
runtime,
pool,
combined_setup_states: RwLock::new(all_css),
flows: Mutex::new(BTreeMap::new()),
})
}

static LIB_CONTEXT: RwLock<Option<Arc<LibContext>>> = RwLock::new(None);

pub(crate) fn init_lib_context(settings: settings::Settings) -> Result<()> {
let mut lib_context_locked = LIB_CONTEXT.write().unwrap();
*lib_context_locked = Some(Arc::new(create_lib_context(settings)?));
Ok(())
}

pub(crate) fn get_lib_context() -> Result<Arc<LibContext>> {
let lib_context_locked = LIB_CONTEXT.read().unwrap();
lib_context_locked
.as_ref()
.cloned()
.ok_or_else(|| anyhow!("CocoIndex library is not initialized or already stopped"))
}

pub(crate) fn clear_lib_context() {
let mut lib_context_locked = LIB_CONTEXT.write().unwrap();
*lib_context_locked = None;
}
16 changes: 4 additions & 12 deletions src/ops/storages/postgres.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap};
use std::ops::Bound;
use std::sync::{Arc, Mutex};
use crate::prelude::*;

use crate::base::spec::{self, *};
use crate::ops::sdk::*;
use crate::service::error::{shared_ok, SharedError, SharedResultExt};
use crate::setup;
use crate::utils::db::ValidIdentifier;
use crate::{get_lib_context, setup};
use anyhow::{anyhow, bail, Result};
use async_trait::async_trait;
use derivative::Derivative;
use futures::future::{BoxFuture, Shared};
Expand All @@ -19,6 +15,7 @@ use serde::Serialize;
use sqlx::postgres::types::PgRange;
use sqlx::postgres::PgRow;
use sqlx::{PgPool, Row};
use std::ops::Bound;
use uuid::Uuid;

#[derive(Debug, Deserialize)]
Expand Down Expand Up @@ -1005,12 +1002,7 @@ impl Factory {
shared_ok(if let Some(database_url) = database_url {
PgPool::connect(&database_url).await?
} else {
get_lib_context()
.ok_or_else(|| {
SharedError::new(anyhow!("Cocoindex is not initialized"))
})?
.pool
.clone()
get_lib_context().map_err(SharedError::new)?.pool.clone()
})
};
let shared_fut = pool_fut.boxed().shared();
Expand Down
6 changes: 4 additions & 2 deletions src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ pub(crate) use futures::{future::BoxFuture, prelude::*, stream::BoxStream};
pub(crate) use futures::{FutureExt, StreamExt};
pub(crate) use itertools::Itertools;
pub(crate) use serde::{Deserialize, Serialize};
pub(crate) use std::sync::{Arc, Mutex, Weak};
pub(crate) use std::borrow::Cow;
pub(crate) use std::collections::{BTreeMap, HashMap};
pub(crate) use std::sync::{Arc, LazyLock, Mutex, OnceLock, RwLock, Weak};

pub(crate) use crate::base::{schema, spec, value};
pub(crate) use crate::builder::{self, plan};
pub(crate) use crate::execution;
pub(crate) use crate::lib_context::{FlowContext, LibContext};
pub(crate) use crate::lib_context::{get_lib_context, get_runtime, FlowContext, LibContext};
pub(crate) use crate::ops::interface;
pub(crate) use crate::service::error::ApiError;

Expand Down
Loading