Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[build]
# This is required by tokio-console: https://docs.rs/tokio-console/latest/tokio_console
rustflags = ["--cfg", "tokio_unstable"]
17 changes: 12 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,21 @@ base64 = "0.22.1"
chrono = "0.4.40"
config = "0.14.1"
const_format = "0.2.34"
env_logger = "0.11.6"
futures = "0.3.31"
log = "0.4.26"
regex = "1.11.1"
serde = { version = "1.0.218", features = ["derive"] }
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"
sqlx = { version = "0.8.3", features = ["chrono", "postgres", "runtime-tokio"] }
tokio = { version = "1.43.0", features = ["macros", "rt-multi-thread"] }
tokio = { version = "1.44.0", features = [
"macros",
"rt-multi-thread",
"full",
"tracing",
] }
tower = "0.5.2"
tower-http = { version = "0.6.2", features = ["cors", "trace"] }
indexmap = { version = "2.7.1", features = ["serde"] }
indexmap = { version = "2.8.0", features = ["serde"] }
blake2 = "0.10.6"
pgvector = { version = "0.4.0", features = ["sqlx"] }
blocking = "1.6.1"
Expand All @@ -41,6 +45,9 @@ async-lock = "3.4.0"
hex = "0.4.3"
pythonize = "0.23.0"
# TODO: Switch to a stable tag of mistralrs after a new release is tagged.
mistralrs = { git = "https://github.com/EricLBuehler/mistral.rs.git" }
mistralrs = { git = "https://github.com/EricLBuehler/mistral.rs.git", features = [
"metal",
] }
schemars = "0.8.22"
openssl = { version = "0.10.71", features = ["vendored"] }
console-subscriber = "0.4.1"
3 changes: 3 additions & 0 deletions src/builder/analyzed_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
};
use anyhow::Result;
use futures::{future::Shared, FutureExt};
use log::trace;

pub struct AnalyzedFlow {
pub flow_instance: spec::FlowInstanceSpec,
Expand Down Expand Up @@ -55,13 +56,15 @@ impl AnalyzedFlow {
}

pub async fn get_execution_plan(&self) -> Result<Arc<plan::ExecutionPlan>> {
trace!("get_execution_plan() started");
let execution_plan = self
.execution_plan
.as_ref()
.ok_or_else(|| api_error!("Flow setup is not up to date. Please run `cocoindex setup` to update the setup."))?
.clone()
.await
.std_result()?;
trace!("get_execution_plan() finished");
Ok(execution_plan)
}
}
Expand Down
21 changes: 16 additions & 5 deletions src/builder/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use anyhow::{anyhow, bail, Context, Result};
use futures::future::try_join3;
use futures::{future::try_join_all, FutureExt};
use indexmap::IndexMap;
use log::warn;
use log::{trace, warn};

#[derive(Debug)]
pub(super) enum ValueTypeBuilder {
Expand Down Expand Up @@ -649,9 +649,12 @@ impl<'a> AnalyzerContext<'a> {
let op_name = source_op.name.clone();
let output = scope.add_field(source_op.name, &output_type)?;
let result_fut = async move {
trace!("Start building executor for source op `{}`", op_name);
let executor = executor.await?;
trace!("Finished building executor for source op `{}`", op_name);
Ok(AnalyzedSourceOp {
source_id: source_id.unwrap_or_default(),
executor: executor.await?,
executor,
output,
primary_key_type: output_type
.typ
Expand Down Expand Up @@ -701,12 +704,15 @@ impl<'a> AnalyzerContext<'a> {
.add_field(reactive_op.name.clone(), &output_type)?;
let reactive_op = reactive_op.clone();
async move {
trace!("Start building executor for transform op `{}`", reactive_op.name);
let executor = executor.await.with_context(|| {
format!("Failed to build executor for transform op: {}", reactive_op.name)
})?;
let enable_cache = executor.enable_cache();
let behavior_version = executor.behavior_version();
trace!("Finished building executor for transform op `{}`, enable cache: {enable_cache}, behavior version: {behavior_version:?}", reactive_op.name);
let function_exec_info = AnalyzedFunctionExecInfo {
enable_cache: executor.enable_cache(),
enable_cache,
behavior_version,
fingerprinter: Fingerprinter::default()
.with(&reactive_op.name)?
Expand Down Expand Up @@ -874,7 +880,7 @@ impl<'a> AnalyzerContext<'a> {
};

let target_id: i32 = 1; // TODO: Fill it with a meaningful value automatically
let ((setup_key, desired_state), executor_futs) = export_factory.clone().build(
let ((setup_key, desired_state), executor_fut) = export_factory.clone().build(
export_op.name.clone(),
target_id,
spec,
Expand Down Expand Up @@ -967,9 +973,14 @@ impl<'a> AnalyzerContext<'a> {
.transpose()?;

Ok(async move {
let (executor, query_target) = executor_futs
trace!("Start building executor for export op `{}`", export_op.name);
let (executor, query_target) = executor_fut
.await
.with_context(|| format!("Analyzing export op: {}", export_op.name))?;
trace!(
"Finished building executor for export op `{}`",
export_op.name
);
let name = export_op.name;
Ok(AnalyzedExportOp {
name,
Expand Down
3 changes: 2 additions & 1 deletion src/lib_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ impl LibContext {
}

pub fn create_lib_context(settings: settings::Settings) -> Result<LibContext> {
env_logger::init();
console_subscriber::init();

let runtime = Runtime::new()?;
let (pool, all_css) = runtime.block_on(async {
let pool = PgPool::connect(&settings.database_url).await?;
Expand Down
1 change: 0 additions & 1 deletion src/ops/functions/extract_by_mistral.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ impl Executor {
async fn new(spec: Spec) -> Result<Self> {
let model = mistralrs::TextModelBuilder::new(spec.model.model_id)
.with_isq(spec.model.isq_type)
.with_logging()
.with_paged_attn(|| mistralrs::PagedAttentionMetaBuilder::default().build())?
.build()
.await?;
Expand Down
Loading