diff --git a/python/cocoindex/__init__.py b/python/cocoindex/__init__.py index 2b252299a..dd34bb610 100644 --- a/python/cocoindex/__init__.py +++ b/python/cocoindex/__init__.py @@ -2,7 +2,7 @@ Cocoindex is a framework for building and running indexing pipelines. """ -from . import functions, query, sources, storages, cli, utils +from . import functions, sources, storages, cli, utils from .auth_registry import AuthEntryReference, add_auth_entry, ref_auth_entry from .flow import FlowBuilder, DataScope, DataSlice, Flow, transform_flow diff --git a/python/cocoindex/lib.py b/python/cocoindex/lib.py index 555ded1a6..2bd42d442 100644 --- a/python/cocoindex/lib.py +++ b/python/cocoindex/lib.py @@ -6,7 +6,7 @@ from typing import Callable, Any from . import _engine # type: ignore -from . import flow, query, setting +from . import flow, setting from .convert import dump_engine_object @@ -24,7 +24,6 @@ def init(settings: setting.Settings | None = None) -> None: def start_server(settings: setting.ServerSettings) -> None: """Start the cocoindex server.""" flow.ensure_all_flows_built() - query.ensure_all_handlers_built() _engine.start_server(settings.__dict__) diff --git a/python/cocoindex/query.py b/python/cocoindex/query.py deleted file mode 100644 index 18dbc0bfd..000000000 --- a/python/cocoindex/query.py +++ /dev/null @@ -1,115 +0,0 @@ -from typing import Callable, Any -from dataclasses import dataclass -from threading import Lock - -from . import flow as fl -from . import index -from . import _engine # type: ignore - -_handlers_lock = Lock() -_handlers: dict[str, _engine.SimpleSemanticsQueryHandler] = {} - - -@dataclass -class SimpleSemanticsQueryInfo: - """ - Additional information about the query. - """ - - similarity_metric: index.VectorSimilarityMetric - query_vector: list[float] - vector_field_name: str - - -@dataclass -class QueryResult: - """ - A single result from the query. - """ - - data: dict[str, Any] - score: float - - -class SimpleSemanticsQueryHandler: - """ - A query handler that uses simple semantics to query the index. - """ - - _lazy_query_handler: Callable[[], _engine.SimpleSemanticsQueryHandler] - - def __init__( - self, - name: str, - flow: fl.Flow, - target_name: str, - query_transform_flow: Callable[..., fl.DataSlice[Any]], - default_similarity_metric: index.VectorSimilarityMetric = index.VectorSimilarityMetric.COSINE_SIMILARITY, - ) -> None: - engine_handler = None - lock = Lock() - - def _lazy_handler() -> _engine.SimpleSemanticsQueryHandler: - nonlocal engine_handler, lock - if engine_handler is None: - with lock: - if engine_handler is None: - engine_handler = _engine.SimpleSemanticsQueryHandler( - flow.internal_flow(), - target_name, - fl.TransformFlow( - query_transform_flow, [str] - ).internal_flow(), - default_similarity_metric.value, - ) - engine_handler.register_query_handler(name) - return engine_handler - - self._lazy_query_handler = _lazy_handler - - with _handlers_lock: - _handlers[name] = self - - def internal_handler(self) -> _engine.SimpleSemanticsQueryHandler: - """ - Get the internal query handler. - """ - return self._lazy_query_handler() - - def search( - self, - query: str, - limit: int, - vector_field_name: str | None = None, - similarity_metric: index.VectorSimilarityMetric | None = None, - ) -> tuple[list[QueryResult], SimpleSemanticsQueryInfo]: - """ - Search the index with the given query, limit, vector field name, and similarity metric. - """ - internal_results, internal_info = self.internal_handler().search( - query, - limit, - vector_field_name, - similarity_metric.value if similarity_metric is not None else None, - ) - results = [ - QueryResult(data=result["data"], score=result["score"]) - for result in internal_results - ] - info = SimpleSemanticsQueryInfo( - similarity_metric=index.VectorSimilarityMetric( - internal_info["similarity_metric"] - ), - query_vector=internal_info["query_vector"], - vector_field_name=internal_info["vector_field_name"], - ) - return results, info - - -def ensure_all_handlers_built() -> None: - """ - Ensure all handlers are built. - """ - with _handlers_lock: - for handler in _handlers.values(): - handler.internal_handler() diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index 848e6e82f..a3b23a3ae 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -1066,8 +1066,8 @@ impl AnalyzerContext<'_> { let export_target_factory = export_op_group.target_factory.clone(); Ok(async move { trace!("Start building executor for export op `{op_name}`"); - let executors = data_coll_output - .executors + let export_context = data_coll_output + .export_context .await .with_context(|| format!("Analyzing export op: {op_name}"))?; trace!("Finished building executor for export op `{op_name}`"); @@ -1076,8 +1076,7 @@ impl AnalyzerContext<'_> { target_id, input: data_fields_info.local_collector_ref, export_target_factory, - export_context: executors.export_context, - query_target: executors.query_target, + export_context, primary_key_def: data_fields_info.primary_key_def, primary_key_type: data_fields_info.primary_key_type, value_fields: data_fields_info.value_fields_idx, diff --git a/src/builder/plan.rs b/src/builder/plan.rs index 430f0e52c..ed03d744d 100644 --- a/src/builder/plan.rs +++ b/src/builder/plan.rs @@ -103,7 +103,6 @@ pub struct AnalyzedExportOp { pub input: AnalyzedLocalCollectorReference, pub export_target_factory: Arc, pub export_context: Arc, - pub query_target: Option>, pub primary_key_def: AnalyzedPrimaryKeyDef, pub primary_key_type: schema::ValueType, /// idx for value fields - excluding the primary key field. diff --git a/src/execution/mod.rs b/src/execution/mod.rs index 162945b56..33bb4536c 100644 --- a/src/execution/mod.rs +++ b/src/execution/mod.rs @@ -3,7 +3,6 @@ pub(crate) mod dumper; pub(crate) mod evaluator; pub(crate) mod indexing_status; pub(crate) mod memoization; -pub(crate) mod query; pub(crate) mod row_indexer; pub(crate) mod source_indexer; pub(crate) mod stats; diff --git a/src/execution/query.rs b/src/execution/query.rs deleted file mode 100644 index 9052096cc..000000000 --- a/src/execution/query.rs +++ /dev/null @@ -1,124 +0,0 @@ -use std::{sync::Arc, vec}; - -use anyhow::{Result, bail}; -use serde::Serialize; - -use super::evaluator::evaluate_transient_flow; -use crate::{ - api_error, - base::{spec::VectorSimilarityMetric, value}, - builder::{AnalyzedFlow, AnalyzedTransientFlow}, - ops::interface::{QueryResults, QueryTarget, VectorMatchQuery}, -}; - -pub struct SimpleSemanticsQueryHandler { - pub flow_name: String, - pub query_target: Arc, - pub query_transform_flow: Arc, - pub default_similarity_metric: VectorSimilarityMetric, - pub default_vector_field_name: Option, -} - -#[derive(Debug, Serialize)] -pub struct SimpleSemanticsQueryInfo { - pub similarity_metric: VectorSimilarityMetric, - pub query_vector: Vec, - pub vector_field_name: String, -} - -impl SimpleSemanticsQueryHandler { - pub async fn new( - flow: Arc, - target_name: &str, - query_transform_flow: Arc, - default_similarity_metric: VectorSimilarityMetric, - ) -> Result { - let export_op_idx = flow - .flow_instance - .export_ops - .iter() - .position(|export_op| export_op.name == target_name) - .unwrap(); - let export_op = &flow.flow_instance.export_ops[export_op_idx]; - let vector_indexes = &export_op.spec.index_options.vector_indexes; - let execution_plan = flow.get_execution_plan().await?; - let analyzed_export_op = &execution_plan.export_ops[export_op_idx]; - Ok(Self { - flow_name: flow.flow_instance.name.clone(), - query_target: if let Some(query_target) = &analyzed_export_op.query_target { - query_target.clone() - } else { - bail!( - "Query target is not supported by export op: {}", - target_name - ); - }, - query_transform_flow, - default_similarity_metric, - default_vector_field_name: if vector_indexes.len() == 1 { - Some(vector_indexes[0].field_name.clone()) - } else { - None - }, - }) - } - - pub async fn search( - &self, - query: String, - limit: u32, - vector_field_name: Option, - similarity_metric: Option, - ) -> Result<(QueryResults, SimpleSemanticsQueryInfo)> { - let query_results = evaluate_transient_flow( - &self.query_transform_flow, - &vec![value::BasicValue::Str(Arc::from(query)).into()], - ) - .await?; - let vector = match query_results { - value::Value::Basic(value::BasicValue::Vector(v)) => v - .iter() - .map(|f| { - Ok(match f { - value::BasicValue::Int64(i) => *i as f32, - value::BasicValue::Float32(f) => *f, - value::BasicValue::Float64(f) => *f as f32, - value::BasicValue::Bytes(_) - | value::BasicValue::Str(_) - | value::BasicValue::Bool(_) - | value::BasicValue::Range(_) - | value::BasicValue::Uuid(_) - | value::BasicValue::Date(_) - | value::BasicValue::Time(_) - | value::BasicValue::LocalDateTime(_) - | value::BasicValue::OffsetDateTime(_) - | value::BasicValue::TimeDelta(_) - | value::BasicValue::Json(_) - | value::BasicValue::Vector(_) => { - bail!("Query results is not a vector of number") - } - }) - }) - .collect::>>()?, - _ => bail!("Query results is not a vector"), - }; - - let vector_field_name = vector_field_name - .or(self.default_vector_field_name.clone()) - .ok_or_else(|| api_error!("vector field name must be provided"))?; - - let similarity_metric = similarity_metric.unwrap_or(self.default_similarity_metric); - let info = SimpleSemanticsQueryInfo { - similarity_metric, - query_vector: vector.clone(), - vector_field_name: vector_field_name.clone(), - }; - let query = VectorMatchQuery { - vector_field_name, - vector, - similarity_metric, - limit, - }; - Ok((self.query_target.search(query).await?, info)) - } -} diff --git a/src/lib_context.rs b/src/lib_context.rs index a80f2e0bc..25c7b9992 100644 --- a/src/lib_context.rs +++ b/src/lib_context.rs @@ -1,20 +1,19 @@ use crate::prelude::*; +use crate::builder::AnalyzedFlow; use crate::execution::source_indexer::SourceIndexingContext; use crate::service::error::ApiError; use crate::settings; use crate::setup; -use crate::{builder::AnalyzedFlow, execution::query::SimpleSemanticsQueryHandler}; use axum::http::StatusCode; -use sqlx::postgres::PgConnectOptions; use sqlx::PgPool; +use sqlx::postgres::PgConnectOptions; use std::collections::BTreeMap; use tokio::runtime::Runtime; pub struct FlowContext { pub flow: Arc, pub source_indexing_contexts: Vec>>, - pub query_handlers: Mutex>>, } impl FlowContext { @@ -26,7 +25,6 @@ impl FlowContext { Self { flow, source_indexing_contexts, - query_handlers: Mutex::new(BTreeMap::new()), } } @@ -43,20 +41,6 @@ impl FlowContext { }) .await } - - pub fn get_query_handler(&self, name: &str) -> Result> { - let query_handlers = self.query_handlers.lock().unwrap(); - let query_handler = query_handlers - .get(name) - .ok_or_else(|| { - ApiError::new( - &format!("Query handler not found: {name}"), - StatusCode::NOT_FOUND, - ) - })? - .clone(); - Ok(query_handler) - } } static TOKIO_RUNTIME: LazyLock = LazyLock::new(|| Runtime::new().unwrap()); diff --git a/src/ops/factory_bases.rs b/src/ops/factory_bases.rs index f334770cd..c4f523ae4 100644 --- a/src/ops/factory_bases.rs +++ b/src/ops/factory_bases.rs @@ -265,13 +265,8 @@ impl SimpleFunctionFactory for T { } } -pub struct TypedExportTargetExecutors { - pub export_context: Arc, - pub query_target: Option>, -} - pub struct TypedExportDataCollectionBuildOutput { - pub executors: BoxFuture<'static, Result>>, + pub export_context: BoxFuture<'static, Result>>, pub setup_key: F::Key, pub desired_setup_state: F::SetupState, } @@ -400,12 +395,8 @@ impl ExportTargetFactory for T { .into_iter() .map(|d| { Ok(interface::ExportDataCollectionBuildOutput { - executors: async move { - let executors = d.executors.await?; - Ok(interface::ExportTargetExecutors { - export_context: executors.export_context, - query_target: executors.query_target, - }) + export_context: async move { + Ok(d.export_context.await? as Arc) } .boxed(), setup_key: serde_json::to_value(d.setup_key)?, diff --git a/src/ops/interface.rs b/src/ops/interface.rs index e935064f9..8d7592a52 100644 --- a/src/ops/interface.rs +++ b/src/ops/interface.rs @@ -1,10 +1,6 @@ use std::time::SystemTime; -use crate::base::{ - schema::*, - spec::{IndexOptions, VectorSimilarityMetric}, - value::*, -}; +use crate::base::{schema::*, spec::IndexOptions, value::*}; use crate::prelude::*; use crate::setup; use chrono::TimeZone; @@ -241,12 +237,8 @@ pub enum SetupStateCompatibility { NotCompatible, } -pub struct ExportTargetExecutors { - pub export_context: Arc, - pub query_target: Option>, -} pub struct ExportDataCollectionBuildOutput { - pub executors: BoxFuture<'static, Result>, + pub export_context: BoxFuture<'static, Result>>, pub setup_key: serde_json::Value, pub desired_setup_state: serde_json::Value, } @@ -318,58 +310,3 @@ pub enum ExecutorFactory { SimpleFunction(Arc), ExportTarget(Arc), } - -#[derive(Debug)] -pub struct VectorMatchQuery { - pub vector_field_name: String, - pub vector: Vec, - pub similarity_metric: VectorSimilarityMetric, - pub limit: u32, -} - -#[derive(Debug, Clone, Serialize)] -pub struct QueryResult> { - pub data: Row, - pub score: f64, -} - -#[derive(Debug, Clone, Serialize)] -pub struct QueryResults> { - pub fields: Vec, - pub results: Vec>, -} - -impl TryFrom>> for QueryResults { - type Error = anyhow::Error; - - fn try_from(values: QueryResults>) -> Result { - let results = values - .results - .into_iter() - .map(|r| { - let data = serde_json::to_value(TypedFieldsValue { - schema: &values.fields, - values_iter: r.data.iter(), - })?; - Ok(QueryResult { - data, - score: r.score, - }) - }) - .collect::>>()?; - Ok(QueryResults { - fields: values.fields, - results, - }) - } -} -#[derive(Debug, Clone, Serialize)] -pub struct QueryResponse { - pub results: QueryResults, - pub info: serde_json::Value, -} - -#[async_trait] -pub trait QueryTarget: Send + Sync { - async fn search(&self, query: VectorMatchQuery) -> Result; -} diff --git a/src/ops/storages/kuzu.rs b/src/ops/storages/kuzu.rs index 4ba99c1c2..7fcda26dc 100644 --- a/src/ops/storages/kuzu.rs +++ b/src/ops/storages/kuzu.rs @@ -827,14 +827,8 @@ impl StorageFactoryBase for Factory { ), analyzed_data_coll: analyzed, }; - let executors = Box::pin(async move { - Ok(TypedExportTargetExecutors { - export_context: Arc::new(export_context), - query_target: None, - }) - }); Ok(TypedExportDataCollectionBuildOutput { - executors, + export_context: async move { Ok(Arc::new(export_context)) }.boxed(), setup_key, desired_setup_state, }) diff --git a/src/ops/storages/neo4j.rs b/src/ops/storages/neo4j.rs index cf5f3cc69..0d08a554a 100644 --- a/src/ops/storages/neo4j.rs +++ b/src/ops/storages/neo4j.rs @@ -967,18 +967,17 @@ impl StorageFactoryBase for Factory { .auth_registry .get::(&data_coll.spec.connection)?; let factory = self.clone(); - let executors = async move { - let graph = factory.graph_pool.get_graph(&conn_spec).await?; - let executor = Arc::new(ExportContext::new(graph, data_coll.spec, analyzed)?); - Ok(TypedExportTargetExecutors { - export_context: executor, - query_target: None, - }) + let export_context = async move { + Ok(Arc::new(ExportContext::new( + factory.graph_pool.get_graph(&conn_spec).await?, + data_coll.spec, + analyzed, + )?)) } .boxed(); Ok(TypedExportDataCollectionBuildOutput { - executors, + export_context, setup_key, desired_setup_state, }) diff --git a/src/ops/storages/postgres.rs b/src/ops/storages/postgres.rs index 299d0d932..18d468f96 100644 --- a/src/ops/storages/postgres.rs +++ b/src/ops/storages/postgres.rs @@ -6,18 +6,13 @@ use super::shared::table_columns::{ use crate::base::spec::{self, *}; use crate::ops::sdk::*; use crate::settings::DatabaseConnectionSpec; -use crate::utils::db::ValidIdentifier; use async_trait::async_trait; -use bytes::Bytes; -use futures::FutureExt; use indexmap::{IndexMap, IndexSet}; use itertools::Itertools; use serde::Serialize; -use sqlx::postgres::PgRow; +use sqlx::PgPool; use sqlx::postgres::types::PgRange; -use sqlx::{PgPool, Row}; use std::ops::Bound; -use uuid::Uuid; #[derive(Debug, Deserialize)] pub struct Spec { @@ -173,115 +168,11 @@ fn bind_value_field<'arg>( Ok(()) } -fn from_pg_value(row: &PgRow, field_idx: usize, typ: &ValueType) -> Result { - let value = match typ { - ValueType::Basic(basic_type) => { - let basic_value = match basic_type { - BasicValueType::Bytes => row - .try_get::>, _>(field_idx)? - .map(|v| BasicValue::Bytes(Bytes::from(v))), - BasicValueType::Str => row - .try_get::, _>(field_idx)? - .map(|v| BasicValue::Str(Arc::from(v))), - BasicValueType::Bool => row - .try_get::, _>(field_idx)? - .map(BasicValue::Bool), - BasicValueType::Int64 => row - .try_get::, _>(field_idx)? - .map(BasicValue::Int64), - BasicValueType::Float32 => row - .try_get::, _>(field_idx)? - .map(BasicValue::Float32), - BasicValueType::Float64 => row - .try_get::, _>(field_idx)? - .map(BasicValue::Float64), - BasicValueType::Range => row - .try_get::>, _>(field_idx)? - .map(|v| match (v.start, v.end) { - (Bound::Included(start), Bound::Excluded(end)) => { - Ok(BasicValue::Range(RangeValue { - start: start as usize, - end: end as usize, - })) - } - _ => anyhow::bail!("invalid range value"), - }) - .transpose()?, - BasicValueType::Uuid => row - .try_get::, _>(field_idx)? - .map(BasicValue::Uuid), - BasicValueType::Date => row - .try_get::, _>(field_idx)? - .map(BasicValue::Date), - BasicValueType::Time => row - .try_get::, _>(field_idx)? - .map(BasicValue::Time), - BasicValueType::LocalDateTime => row - .try_get::, _>(field_idx)? - .map(BasicValue::LocalDateTime), - BasicValueType::OffsetDateTime => row - .try_get::>, _>(field_idx)? - .map(BasicValue::OffsetDateTime), - BasicValueType::TimeDelta => row - .try_get::, _>(field_idx)? - .map(|pg_interval| { - let duration = chrono::Duration::microseconds(pg_interval.microseconds) - + chrono::Duration::days(pg_interval.days as i64) - + chrono::Duration::days((pg_interval.months as i64) * 30); - BasicValue::TimeDelta(duration) - }), - BasicValueType::Json => row - .try_get::, _>(field_idx)? - .map(|v| BasicValue::Json(Arc::from(v))), - BasicValueType::Vector(vs) => { - if convertible_to_pgvector(vs) { - row.try_get::, _>(field_idx)? - .map(|v| -> Result<_> { - Ok(BasicValue::Vector(Arc::from( - v.as_slice() - .iter() - .map(|e| { - Ok(match *vs.element_type { - BasicValueType::Float32 => BasicValue::Float32(*e), - BasicValueType::Float64 => { - BasicValue::Float64(*e as f64) - } - BasicValueType::Int64 => { - BasicValue::Int64(*e as i64) - } - _ => anyhow::bail!("invalid vector element type"), - }) - }) - .collect::>>()?, - ))) - }) - .transpose()? - } else { - row.try_get::, _>(field_idx)? - .map(|v| BasicValue::from_json(v, basic_type)) - .transpose()? - } - } - }; - basic_value.map(Value::Basic) - } - _ => row - .try_get::, _>(field_idx)? - .map(|v| Value::from_json(v, typ)) - .transpose()?, - }; - let final_value = if let Some(v) = value { v } else { Value::Null }; - Ok(final_value) -} - pub struct ExportContext { db_ref: Option>, db_pool: PgPool, - table_name: ValidIdentifier, key_fields_schema: Vec, value_fields_schema: Vec, - all_fields: Vec, - all_fields_comma_separated: String, upsert_sql_prefix: String, upsert_sql_suffix: String, delete_sql_prefix: String, @@ -311,23 +202,11 @@ impl ExportContext { .collect::>() .join(", "); - let all_fields = key_fields_schema - .iter() - .chain(value_fields_schema.iter()) - .cloned() - .collect::>(); - let table_name = ValidIdentifier::try_from(table_name)?; Ok(Self { db_ref, db_pool, key_fields_schema, value_fields_schema, - all_fields_comma_separated: all_fields - .iter() - .map(|f| f.name.as_str()) - .collect::>() - .join(", "), - all_fields, upsert_sql_prefix: format!( "INSERT INTO {table_name} ({key_fields}, {value_fields}) VALUES " ), @@ -335,7 +214,6 @@ impl ExportContext { " ON CONFLICT ({key_fields}) DO UPDATE SET {set_value_fields};" ), delete_sql_prefix: format!("DELETE FROM {table_name} WHERE "), - table_name, }) } } @@ -414,69 +292,6 @@ impl ExportContext { } } -static SCORE_FIELD_NAME: &str = "__score"; - -struct PostgresQueryTarget { - db_pool: PgPool, - context: Arc, -} - -#[async_trait] -impl QueryTarget for PostgresQueryTarget { - async fn search(&self, query: VectorMatchQuery) -> Result { - let query_str = format!( - "SELECT {} {} $1 AS {SCORE_FIELD_NAME}, {} FROM {} ORDER BY {SCORE_FIELD_NAME} LIMIT $2", - ValidIdentifier::try_from(query.vector_field_name)?, - to_distance_operator(query.similarity_metric), - self.context.all_fields_comma_separated, - self.context.table_name, - ); - let results = sqlx::query(&query_str) - .bind(pgvector::Vector::from(query.vector)) - .bind(query.limit as i64) - .fetch_all(&self.db_pool) - .await? - .into_iter() - .map(|r| -> Result { - let score: f64 = distance_to_similarity(query.similarity_metric, r.try_get(0)?); - let data = self - .context - .key_fields_schema - .iter() - .chain(self.context.value_fields_schema.iter()) - .enumerate() - .map(|(idx, schema)| from_pg_value(&r, idx + 1, &schema.value_type.typ)) - .collect::>>()?; - let result = QueryResult { data, score }; - Ok(result) - }) - .collect::>>()?; - - Ok(QueryResults { - fields: self.context.all_fields.clone(), - results, - }) - } -} - -fn to_distance_operator(metric: VectorSimilarityMetric) -> &'static str { - match metric { - VectorSimilarityMetric::CosineSimilarity => "<=>", - VectorSimilarityMetric::L2Distance => "<->", - VectorSimilarityMetric::InnerProduct => "<#>", - } -} - -fn distance_to_similarity(metric: VectorSimilarityMetric, distance: f64) -> f64 { - match metric { - // cosine distance => cosine similarity - VectorSimilarityMetric::CosineSimilarity => 1.0 - distance, - VectorSimilarityMetric::L2Distance => distance, - // negative inner product => inner product - VectorSimilarityMetric::InnerProduct => -distance, - } -} - #[derive(Default)] pub struct Factory {} @@ -833,7 +648,7 @@ impl StorageFactoryBase for Factory { let table_name = table_id.table_name.clone(); let db_ref = d.spec.database; let auth_registry = context.auth_registry.clone(); - let executors = async move { + let export_context = Box::pin(async move { let db_pool = get_db_pool(db_ref.as_ref(), &auth_registry).await?; let export_context = Arc::new(ExportContext::new( db_ref, @@ -842,19 +657,12 @@ impl StorageFactoryBase for Factory { d.key_fields_schema, d.value_fields_schema, )?); - let query_target = Arc::new(PostgresQueryTarget { - db_pool, - context: export_context.clone(), - }); - Ok(TypedExportTargetExecutors { - export_context: export_context.clone(), - query_target: Some(query_target as Arc), - }) - }; + Ok(export_context) + }); Ok(TypedExportDataCollectionBuildOutput { setup_key: table_id, desired_setup_state: setup_state, - executors: executors.boxed(), + export_context, }) }) .collect::>>()?; diff --git a/src/ops/storages/qdrant.rs b/src/ops/storages/qdrant.rs index 706544750..c8155db1f 100644 --- a/src/ops/storages/qdrant.rs +++ b/src/ops/storages/qdrant.rs @@ -5,7 +5,6 @@ use std::fmt::Display; use crate::ops::registry::ExecutorFactoryRegistry; use crate::setup; -use futures::FutureExt; use qdrant_client::Qdrant; use qdrant_client::qdrant::{ CreateCollectionBuilder, DeletePointsBuilder, Distance, NamedVectors, PointId, PointStruct, @@ -370,14 +369,8 @@ impl StorageFactoryBase for Factory { collection_name: d.spec.collection_name.clone(), fields_info, }); - let executors = async move { - Ok(TypedExportTargetExecutors { - export_context, - query_target: None, - }) - }; Ok(TypedExportDataCollectionBuildOutput { - executors: executors.boxed(), + export_context: Box::pin(async move { Ok(export_context) }), setup_key: CollectionKey { connection: d.spec.connection, collection_name: d.spec.collection_name, diff --git a/src/py/mod.rs b/src/py/mod.rs index 95ca5c2c2..5620f6679 100644 --- a/src/py/mod.rs +++ b/src/py/mod.rs @@ -2,11 +2,8 @@ use crate::execution::evaluator::evaluate_transient_flow; use crate::prelude::*; use crate::base::schema::{FieldSchema, ValueType}; -use crate::base::spec::VectorSimilarityMetric; use crate::base::spec::{NamedSpec, OutputMode, ReactiveOpSpec, SpecFormatter}; -use crate::execution::query; use crate::lib_context::{clear_lib_context, get_auth_registry, init_lib_context}; -use crate::ops::interface::{QueryResult, QueryResults}; use crate::ops::py_factory::PyOpArgSchema; use crate::ops::{interface::ExecutorFactory, py_factory::PyFunctionFactory, register_factory}; use crate::server::{self, ServerSettings}; @@ -15,7 +12,6 @@ use crate::setup; use pyo3::IntoPyObjectExt; use pyo3::{exceptions::PyException, prelude::*}; use pyo3_async_runtimes::tokio::future_into_py; -use std::collections::btree_map; use std::fmt::Write; use std::sync::Arc; @@ -374,82 +370,6 @@ impl TransientFlow { } } -#[pyclass] -pub struct SimpleSemanticsQueryHandler(pub Arc); - -#[pymethods] -impl SimpleSemanticsQueryHandler { - #[new] - pub fn new( - py: Python<'_>, - flow: &Flow, - target_name: &str, - query_transform_flow: &TransientFlow, - default_similarity_metric: Pythonized, - ) -> PyResult { - py.allow_threads(|| { - let handler = get_runtime() - .block_on(query::SimpleSemanticsQueryHandler::new( - flow.0.flow.clone(), - target_name, - query_transform_flow.0.clone(), - default_similarity_metric.0, - )) - .into_py_result()?; - Ok(Self(Arc::new(handler))) - }) - } - - pub fn register_query_handler(&self, name: String) -> PyResult<()> { - let flow_ctx = get_lib_context() - .into_py_result()? - .get_flow_context(&self.0.flow_name) - .into_py_result()?; - let mut query_handlers = flow_ctx.query_handlers.lock().unwrap(); - match query_handlers.entry(name) { - btree_map::Entry::Occupied(entry) => { - return Err(PyException::new_err(format!( - "query handler name already exists: {}", - entry.key() - ))); - } - btree_map::Entry::Vacant(entry) => { - entry.insert(self.0.clone()); - } - } - Ok(()) - } - - #[pyo3(signature = (query, limit, vector_field_name = None, similarity_metric = None))] - pub fn search( - &self, - py: Python<'_>, - query: String, - limit: u32, - vector_field_name: Option, - similarity_metric: Option>, - ) -> PyResult<( - Pythonized>>, - Pythonized, - )> { - py.allow_threads(|| { - let (results, info) = get_runtime().block_on(async move { - self.0 - .search( - query, - limit, - vector_field_name, - similarity_metric.map(|m| m.0), - ) - .await - })?; - let results = QueryResults::::try_from(results)?; - anyhow::Ok((Pythonized(results.results), Pythonized(info))) - }) - .into_py_result() - } -} - #[pyclass] pub struct SetupStatus(setup::AllSetupStatus); @@ -564,7 +484,6 @@ fn cocoindex_engine(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; - m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/src/server.rs b/src/server.rs index 1b52b8186..f4696b543 100644 --- a/src/server.rs +++ b/src/server.rs @@ -69,10 +69,6 @@ pub async fn init_server( "/flows/{flowInstName}/update", routing::post(service::flows::update), ) - .route( - "/flows/{flowInstName}/search", - routing::get(service::search::search), - ) .layer( ServiceBuilder::new() .layer(TraceLayer::new_for_http()) diff --git a/src/service/mod.rs b/src/service/mod.rs index 3e118d867..1780e8555 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -1,3 +1,2 @@ pub(crate) mod error; pub(crate) mod flows; -pub(crate) mod search; diff --git a/src/service/search.rs b/src/service/search.rs deleted file mode 100644 index d42d1f2d9..000000000 --- a/src/service/search.rs +++ /dev/null @@ -1,58 +0,0 @@ -use crate::prelude::*; - -use axum::extract::Path; -use axum::http::StatusCode; - -use crate::lib_context::LibContext; -use crate::ops::interface::QueryResponse; -use axum::{extract::State, Json}; -use axum_extra::extract::Query; - -#[derive(Debug, Deserialize)] -pub struct SearchParams { - handler: Option, - field: Option, - query: String, - limit: u32, - metric: Option, -} - -pub async fn search( - Path(flow_name): Path, - Query(query): Query, - State(lib_context): State>, -) -> Result, ApiError> { - let flow_ctx = lib_context.get_flow_context(&flow_name)?; - let query_handler = match &query.handler { - Some(handler) => flow_ctx.get_query_handler(handler)?, - None => { - let query_handlers = flow_ctx.query_handlers.lock().unwrap(); - if query_handlers.is_empty() { - return Err(ApiError::new( - &format!("No query handler found for flow: {flow_name}"), - StatusCode::NOT_FOUND, - )); - } else if query_handlers.len() == 1 { - query_handlers.values().next().unwrap().clone() - } else { - return Err(ApiError::new( - "Found multiple query handlers for flow {}", - StatusCode::BAD_REQUEST, - )); - } - } - }; - let (results, info) = query_handler - .search(query.query, query.limit, query.field, query.metric) - .await?; - let response = QueryResponse { - results: results.try_into()?, - info: serde_json::to_value(info).map_err(|e| { - ApiError::new( - &format!("Failed to serialize query info: {e}"), - StatusCode::INTERNAL_SERVER_ERROR, - ) - })?, - }; - Ok(Json(response)) -}