From d39c5095237c665bc86f40c6c05ca9e860162773 Mon Sep 17 00:00:00 2001 From: LJ Date: Tue, 11 Mar 2025 09:50:31 -0700 Subject: [PATCH] Integrate with console subscriber and add some more traces. --- .cargo/config.toml | 3 +++ Cargo.toml | 17 ++++++++++++----- src/builder/analyzed_flow.rs | 3 +++ src/builder/analyzer.rs | 21 ++++++++++++++++----- src/lib_context.rs | 3 ++- src/ops/functions/extract_by_mistral.rs | 1 - 6 files changed, 36 insertions(+), 12 deletions(-) create mode 100644 .cargo/config.toml diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 000000000..fdd312159 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,3 @@ +[build] +# This is required by tokio-console: https://docs.rs/tokio-console/latest/tokio_console +rustflags = ["--cfg", "tokio_unstable"] diff --git a/Cargo.toml b/Cargo.toml index e1be453bd..8aec94148 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,17 +20,21 @@ base64 = "0.22.1" chrono = "0.4.40" config = "0.14.1" const_format = "0.2.34" -env_logger = "0.11.6" futures = "0.3.31" log = "0.4.26" regex = "1.11.1" -serde = { version = "1.0.218", features = ["derive"] } +serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" sqlx = { version = "0.8.3", features = ["chrono", "postgres", "runtime-tokio"] } -tokio = { version = "1.43.0", features = ["macros", "rt-multi-thread"] } +tokio = { version = "1.44.0", features = [ + "macros", + "rt-multi-thread", + "full", + "tracing", +] } tower = "0.5.2" tower-http = { version = "0.6.2", features = ["cors", "trace"] } -indexmap = { version = "2.7.1", features = ["serde"] } +indexmap = { version = "2.8.0", features = ["serde"] } blake2 = "0.10.6" pgvector = { version = "0.4.0", features = ["sqlx"] } blocking = "1.6.1" @@ -41,6 +45,9 @@ async-lock = "3.4.0" hex = "0.4.3" pythonize = "0.23.0" # TODO: Switch to a stable tag of mistralrs after a new release is tagged. -mistralrs = { git = "https://github.com/EricLBuehler/mistral.rs.git" } +mistralrs = { git = "https://github.com/EricLBuehler/mistral.rs.git", features = [ + "metal", +] } schemars = "0.8.22" openssl = { version = "0.10.71", features = ["vendored"] } +console-subscriber = "0.4.1" diff --git a/src/builder/analyzed_flow.rs b/src/builder/analyzed_flow.rs index 08bfdabef..c06ebbe57 100644 --- a/src/builder/analyzed_flow.rs +++ b/src/builder/analyzed_flow.rs @@ -10,6 +10,7 @@ use crate::{ }; use anyhow::Result; use futures::{future::Shared, FutureExt}; +use log::trace; pub struct AnalyzedFlow { pub flow_instance: spec::FlowInstanceSpec, @@ -55,6 +56,7 @@ impl AnalyzedFlow { } pub async fn get_execution_plan(&self) -> Result> { + trace!("get_execution_plan() started"); let execution_plan = self .execution_plan .as_ref() @@ -62,6 +64,7 @@ impl AnalyzedFlow { .clone() .await .std_result()?; + trace!("get_execution_plan() finished"); Ok(execution_plan) } } diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index dc576f3b9..8e4508daf 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -23,7 +23,7 @@ use anyhow::{anyhow, bail, Context, Result}; use futures::future::try_join3; use futures::{future::try_join_all, FutureExt}; use indexmap::IndexMap; -use log::warn; +use log::{trace, warn}; #[derive(Debug)] pub(super) enum ValueTypeBuilder { @@ -649,9 +649,12 @@ impl<'a> AnalyzerContext<'a> { let op_name = source_op.name.clone(); let output = scope.add_field(source_op.name, &output_type)?; let result_fut = async move { + trace!("Start building executor for source op `{}`", op_name); + let executor = executor.await?; + trace!("Finished building executor for source op `{}`", op_name); Ok(AnalyzedSourceOp { source_id: source_id.unwrap_or_default(), - executor: executor.await?, + executor, output, primary_key_type: output_type .typ @@ -701,12 +704,15 @@ impl<'a> AnalyzerContext<'a> { .add_field(reactive_op.name.clone(), &output_type)?; let reactive_op = reactive_op.clone(); async move { + trace!("Start building executor for transform op `{}`", reactive_op.name); let executor = executor.await.with_context(|| { format!("Failed to build executor for transform op: {}", reactive_op.name) })?; + let enable_cache = executor.enable_cache(); let behavior_version = executor.behavior_version(); + trace!("Finished building executor for transform op `{}`, enable cache: {enable_cache}, behavior version: {behavior_version:?}", reactive_op.name); let function_exec_info = AnalyzedFunctionExecInfo { - enable_cache: executor.enable_cache(), + enable_cache, behavior_version, fingerprinter: Fingerprinter::default() .with(&reactive_op.name)? @@ -874,7 +880,7 @@ impl<'a> AnalyzerContext<'a> { }; let target_id: i32 = 1; // TODO: Fill it with a meaningful value automatically - let ((setup_key, desired_state), executor_futs) = export_factory.clone().build( + let ((setup_key, desired_state), executor_fut) = export_factory.clone().build( export_op.name.clone(), target_id, spec, @@ -967,9 +973,14 @@ impl<'a> AnalyzerContext<'a> { .transpose()?; Ok(async move { - let (executor, query_target) = executor_futs + trace!("Start building executor for export op `{}`", export_op.name); + let (executor, query_target) = executor_fut .await .with_context(|| format!("Analyzing export op: {}", export_op.name))?; + trace!( + "Finished building executor for export op `{}`", + export_op.name + ); let name = export_op.name; Ok(AnalyzedExportOp { name, diff --git a/src/lib_context.rs b/src/lib_context.rs index d2ff31341..b18ca829c 100644 --- a/src/lib_context.rs +++ b/src/lib_context.rs @@ -49,7 +49,8 @@ impl LibContext { } pub fn create_lib_context(settings: settings::Settings) -> Result { - env_logger::init(); + console_subscriber::init(); + let runtime = Runtime::new()?; let (pool, all_css) = runtime.block_on(async { let pool = PgPool::connect(&settings.database_url).await?; diff --git a/src/ops/functions/extract_by_mistral.rs b/src/ops/functions/extract_by_mistral.rs index df100610d..52201a46e 100644 --- a/src/ops/functions/extract_by_mistral.rs +++ b/src/ops/functions/extract_by_mistral.rs @@ -46,7 +46,6 @@ impl Executor { async fn new(spec: Spec) -> Result { let model = mistralrs::TextModelBuilder::new(spec.model.model_id) .with_isq(spec.model.isq_type) - .with_logging() .with_paged_attn(|| mistralrs::PagedAttentionMetaBuilder::default().build())? .build() .await?;