Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added python/cocoindex/query.py
Empty file.
8 changes: 8 additions & 0 deletions src/lib_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::prelude::*;
use crate::builder::AnalyzedFlow;
use crate::execution::source_indexer::SourceIndexingContext;
use crate::service::error::ApiError;
use crate::service::query_handler::{QueryHandler, QueryHandlerInfo};
use crate::settings;
use crate::setup::ObjectSetupChange;
use axum::http::StatusCode;
Expand Down Expand Up @@ -97,9 +98,15 @@ impl FlowExecutionContext {
}
}

pub struct QueryHandlerContext {
pub info: Arc<QueryHandlerInfo>,
pub handler: Arc<dyn QueryHandler>,
}

pub struct FlowContext {
pub flow: Arc<AnalyzedFlow>,
execution_ctx: Arc<tokio::sync::RwLock<FlowExecutionContext>>,
pub query_handlers: RwLock<HashMap<String, QueryHandlerContext>>,
}

impl FlowContext {
Expand All @@ -117,6 +124,7 @@ impl FlowContext {
Ok(Self {
flow,
execution_ctx,
query_handlers: RwLock::new(HashMap::new()),
})
}

Expand Down
61 changes: 60 additions & 1 deletion src/py/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ use crate::prelude::*;

use crate::base::schema::{FieldSchema, ValueType};
use crate::base::spec::{NamedSpec, OutputMode, ReactiveOpSpec, SpecFormatter};
use crate::lib_context::{clear_lib_context, get_auth_registry, init_lib_context};
use crate::lib_context::{
QueryHandlerContext, clear_lib_context, get_auth_registry, init_lib_context,
};
use crate::ops::py_factory::{PyExportTargetFactory, PyOpArgSchema};
use crate::ops::{interface::ExecutorFactory, py_factory::PyFunctionFactory, register_factory};
use crate::server::{self, ServerSettings};
use crate::service::query_handler::QueryHandlerInfo;
use crate::settings::Settings;
use crate::setup::{self};
use pyo3::IntoPyObjectExt;
Expand Down Expand Up @@ -430,6 +433,62 @@ impl Flow {
};
SetupChangeBundle(Arc::new(bundle))
}

pub fn add_query_handler(&self, name: String, handler: Py<PyAny>) -> PyResult<()> {
struct PyQueryHandler {
handler: Py<PyAny>,
}

#[async_trait]
impl crate::service::query_handler::QueryHandler for PyQueryHandler {
async fn query(
&self,
input: crate::service::query_handler::QueryInput,
flow_ctx: &interface::FlowInstanceContext,
) -> Result<crate::service::query_handler::QueryOutput> {
// Call the Python async function on the flow's event loop
let result_fut = Python::with_gil(|py| -> Result<_> {
let handler = self.handler.clone_ref(py);
// Build args: pass a dict with the query input
let args = pyo3::types::PyTuple::new(py, [input.query])?;
let result_coro = handler.call(py, args, None).to_result_with_py_trace(py)?;

let py_exec_ctx = flow_ctx
.py_exec_ctx
.as_ref()
.ok_or_else(|| anyhow!("Python execution context is missing"))?;
let task_locals = pyo3_async_runtimes::TaskLocals::new(
py_exec_ctx.event_loop.bind(py).clone(),
);
Ok(pyo3_async_runtimes::into_future_with_locals(
&task_locals,
result_coro.into_bound(py),
)?)
})?;

let py_obj = result_fut.await;
// Convert Python result to Rust type with proper traceback handling
let output = Python::with_gil(|py| -> Result<_> {
let output_any = py_obj.to_result_with_py_trace(py)?;
let output: crate::py::Pythonized<crate::service::query_handler::QueryOutput> =
output_any.extract(py)?;
Ok(output.into_inner())
})?;

Ok(output)
}
}

let mut handlers = self.0.query_handlers.write().unwrap();
handlers.insert(
name,
QueryHandlerContext {
info: Arc::new(QueryHandlerInfo {}),
handler: Arc::new(PyQueryHandler { handler }),
},
);
Ok(())
}
}

#[pyclass]
Expand Down
8 changes: 8 additions & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ pub async fn init_server(
"/flows/{flowInstName}/data",
routing::get(service::flows::evaluate_data),
)
.route(
"/flows/{flowInstName}/queryHandlers",
routing::get(service::flows::get_query_handlers),
)
.route(
"/flows/{flowInstName}/queryHandlers/{queryHandlerName}",
routing::get(service::flows::query),
)
.route(
"/flows/{flowInstName}/rowStatus",
routing::get(service::flows::get_row_indexing_status),
Expand Down
40 changes: 40 additions & 0 deletions src/service/flows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::prelude::*;

use crate::execution::{evaluator, indexing_status, memoization, row_indexer, stats};
use crate::lib_context::LibContext;
use crate::service::query_handler::{QueryHandlerInfo, QueryInput, QueryOutput};
use crate::{base::schema::FlowSchema, ops::interface::SourceExecutorReadOptions};
use axum::{
Json,
Expand Down Expand Up @@ -255,3 +256,42 @@ pub async fn get_row_indexing_status(
.await?;
Ok(Json(indexing_status))
}

pub async fn get_query_handlers(
Path(flow_name): Path<String>,
State(lib_context): State<Arc<LibContext>>,
) -> Result<Json<HashMap<String, Arc<QueryHandlerInfo>>>, ApiError> {
let flow_ctx = lib_context.get_flow_context(&flow_name)?;
let query_handlers = flow_ctx.query_handlers.read().unwrap();
Ok(Json(
query_handlers
.iter()
.map(|(name, handler)| (name.clone(), handler.info.clone()))
.collect(),
))
}

pub async fn query(
Path((flow_name, query_handler_name)): Path<(String, String)>,
Query(query): Query<QueryInput>,
State(lib_context): State<Arc<LibContext>>,
) -> Result<Json<QueryOutput>, ApiError> {
let flow_ctx = lib_context.get_flow_context(&flow_name)?;
let query_handler = {
let query_handlers = flow_ctx.query_handlers.read().unwrap();
query_handlers
.get(&query_handler_name)
.ok_or_else(|| {
ApiError::new(
&format!("query handler not found: {query_handler_name}"),
StatusCode::BAD_REQUEST,
)
})?
.handler
.clone()
};
let query_output = query_handler
.query(query.into(), &flow_ctx.flow.flow_instance_ctx)
.await?;
Ok(Json(query_output))
}
1 change: 1 addition & 0 deletions src/service/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub(crate) mod error;
pub(crate) mod flows;
pub(crate) mod query_handler;
29 changes: 29 additions & 0 deletions src/service/query_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use crate::prelude::*;

#[derive(Serialize)]
pub struct QueryHandlerInfo {}

#[derive(Serialize, Deserialize)]
pub struct QueryInput {
pub query: String,
}

#[derive(Serialize, Deserialize, Default)]
pub struct QueryInfo {
pub embedding: Option<serde_json::Value>,
}

#[derive(Serialize, Deserialize)]
pub struct QueryOutput {
pub results: Vec<IndexMap<String, serde_json::Value>>,
pub query_info: QueryInfo,
}

#[async_trait]
pub trait QueryHandler: Send + Sync {
async fn query(
&self,
input: QueryInput,
flow_ctx: &interface::FlowInstanceContext,
) -> Result<QueryOutput>;
}
Loading