Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/cocoindex/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Cocoindex is a framework for building and running indexing pipelines.
"""

from . import functions, query, sources, storages, cli, utils
from . import functions, sources, storages, cli, utils

from .auth_registry import AuthEntryReference, add_auth_entry, ref_auth_entry
from .flow import FlowBuilder, DataScope, DataSlice, Flow, transform_flow
Expand Down
3 changes: 1 addition & 2 deletions python/cocoindex/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Callable, Any

from . import _engine # type: ignore
from . import flow, query, setting
from . import flow, setting
from .convert import dump_engine_object


Expand All @@ -24,7 +24,6 @@ def init(settings: setting.Settings | None = None) -> None:
def start_server(settings: setting.ServerSettings) -> None:
"""Start the cocoindex server."""
flow.ensure_all_flows_built()
query.ensure_all_handlers_built()
_engine.start_server(settings.__dict__)


Expand Down
115 changes: 0 additions & 115 deletions python/cocoindex/query.py

This file was deleted.

7 changes: 3 additions & 4 deletions src/builder/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1066,8 +1066,8 @@ impl AnalyzerContext<'_> {
let export_target_factory = export_op_group.target_factory.clone();
Ok(async move {
trace!("Start building executor for export op `{op_name}`");
let executors = data_coll_output
.executors
let export_context = data_coll_output
.export_context
.await
.with_context(|| format!("Analyzing export op: {op_name}"))?;
trace!("Finished building executor for export op `{op_name}`");
Expand All @@ -1076,8 +1076,7 @@ impl AnalyzerContext<'_> {
target_id,
input: data_fields_info.local_collector_ref,
export_target_factory,
export_context: executors.export_context,
query_target: executors.query_target,
export_context,
primary_key_def: data_fields_info.primary_key_def,
primary_key_type: data_fields_info.primary_key_type,
value_fields: data_fields_info.value_fields_idx,
Expand Down
1 change: 0 additions & 1 deletion src/builder/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ pub struct AnalyzedExportOp {
pub input: AnalyzedLocalCollectorReference,
pub export_target_factory: Arc<dyn ExportTargetFactory + Send + Sync>,
pub export_context: Arc<dyn Any + Send + Sync>,
pub query_target: Option<Arc<dyn QueryTarget>>,
pub primary_key_def: AnalyzedPrimaryKeyDef,
pub primary_key_type: schema::ValueType,
/// idx for value fields - excluding the primary key field.
Expand Down
1 change: 0 additions & 1 deletion src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ pub(crate) mod dumper;
pub(crate) mod evaluator;
pub(crate) mod indexing_status;
pub(crate) mod memoization;
pub(crate) mod query;
pub(crate) mod row_indexer;
pub(crate) mod source_indexer;
pub(crate) mod stats;
Expand Down
124 changes: 0 additions & 124 deletions src/execution/query.rs

This file was deleted.

20 changes: 2 additions & 18 deletions src/lib_context.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
use crate::prelude::*;

use crate::builder::AnalyzedFlow;
use crate::execution::source_indexer::SourceIndexingContext;
use crate::service::error::ApiError;
use crate::settings;
use crate::setup;
use crate::{builder::AnalyzedFlow, execution::query::SimpleSemanticsQueryHandler};
use axum::http::StatusCode;
use sqlx::postgres::PgConnectOptions;
use sqlx::PgPool;
use sqlx::postgres::PgConnectOptions;
use std::collections::BTreeMap;
use tokio::runtime::Runtime;

pub struct FlowContext {
pub flow: Arc<AnalyzedFlow>,
pub source_indexing_contexts: Vec<tokio::sync::OnceCell<Arc<SourceIndexingContext>>>,
pub query_handlers: Mutex<BTreeMap<String, Arc<SimpleSemanticsQueryHandler>>>,
}

impl FlowContext {
Expand All @@ -26,7 +25,6 @@ impl FlowContext {
Self {
flow,
source_indexing_contexts,
query_handlers: Mutex::new(BTreeMap::new()),
}
}

Expand All @@ -43,20 +41,6 @@ impl FlowContext {
})
.await
}

pub fn get_query_handler(&self, name: &str) -> Result<Arc<SimpleSemanticsQueryHandler>> {
let query_handlers = self.query_handlers.lock().unwrap();
let query_handler = query_handlers
.get(name)
.ok_or_else(|| {
ApiError::new(
&format!("Query handler not found: {name}"),
StatusCode::NOT_FOUND,
)
})?
.clone();
Ok(query_handler)
}
}

static TOKIO_RUNTIME: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap());
Expand Down
15 changes: 3 additions & 12 deletions src/ops/factory_bases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,13 +265,8 @@ impl<T: SimpleFunctionFactoryBase> SimpleFunctionFactory for T {
}
}

pub struct TypedExportTargetExecutors<F: StorageFactoryBase + ?Sized> {
pub export_context: Arc<F::ExportContext>,
pub query_target: Option<Arc<dyn QueryTarget>>,
}

pub struct TypedExportDataCollectionBuildOutput<F: StorageFactoryBase + ?Sized> {
pub executors: BoxFuture<'static, Result<TypedExportTargetExecutors<F>>>,
pub export_context: BoxFuture<'static, Result<Arc<F::ExportContext>>>,
pub setup_key: F::Key,
pub desired_setup_state: F::SetupState,
}
Expand Down Expand Up @@ -400,12 +395,8 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
.into_iter()
.map(|d| {
Ok(interface::ExportDataCollectionBuildOutput {
executors: async move {
let executors = d.executors.await?;
Ok(interface::ExportTargetExecutors {
export_context: executors.export_context,
query_target: executors.query_target,
})
export_context: async move {
Ok(d.export_context.await? as Arc<dyn Any + Send + Sync>)
}
.boxed(),
setup_key: serde_json::to_value(d.setup_key)?,
Expand Down
Loading