Skip to content

Commit c399325

Browse files
authored
Integrate with console subscriber and add some more traces. (#90)
1 parent 7e847c3 commit c399325

File tree

6 files changed

+36
-12
lines changed

6 files changed

+36
-12
lines changed

.cargo/config.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
[build]
2+
# This is required by tokio-console: https://docs.rs/tokio-console/latest/tokio_console
3+
rustflags = ["--cfg", "tokio_unstable"]

Cargo.toml

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,21 @@ base64 = "0.22.1"
2020
chrono = "0.4.40"
2121
config = "0.14.1"
2222
const_format = "0.2.34"
23-
env_logger = "0.11.6"
2423
futures = "0.3.31"
2524
log = "0.4.26"
2625
regex = "1.11.1"
27-
serde = { version = "1.0.218", features = ["derive"] }
26+
serde = { version = "1.0.219", features = ["derive"] }
2827
serde_json = "1.0.140"
2928
sqlx = { version = "0.8.3", features = ["chrono", "postgres", "runtime-tokio"] }
30-
tokio = { version = "1.43.0", features = ["macros", "rt-multi-thread"] }
29+
tokio = { version = "1.44.0", features = [
30+
"macros",
31+
"rt-multi-thread",
32+
"full",
33+
"tracing",
34+
] }
3135
tower = "0.5.2"
3236
tower-http = { version = "0.6.2", features = ["cors", "trace"] }
33-
indexmap = { version = "2.7.1", features = ["serde"] }
37+
indexmap = { version = "2.8.0", features = ["serde"] }
3438
blake2 = "0.10.6"
3539
pgvector = { version = "0.4.0", features = ["sqlx"] }
3640
blocking = "1.6.1"
@@ -41,6 +45,9 @@ async-lock = "3.4.0"
4145
hex = "0.4.3"
4246
pythonize = "0.23.0"
4347
# TODO: Switch to a stable tag of mistralrs after a new release is tagged.
44-
mistralrs = { git = "https://github.com/EricLBuehler/mistral.rs.git" }
48+
mistralrs = { git = "https://github.com/EricLBuehler/mistral.rs.git", features = [
49+
"metal",
50+
] }
4551
schemars = "0.8.22"
4652
openssl = { version = "0.10.71", features = ["vendored"] }
53+
console-subscriber = "0.4.1"

src/builder/analyzed_flow.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::{
1010
};
1111
use anyhow::Result;
1212
use futures::{future::Shared, FutureExt};
13+
use log::trace;
1314

1415
pub struct AnalyzedFlow {
1516
pub flow_instance: spec::FlowInstanceSpec,
@@ -55,13 +56,15 @@ impl AnalyzedFlow {
5556
}
5657

5758
pub async fn get_execution_plan(&self) -> Result<Arc<plan::ExecutionPlan>> {
59+
trace!("get_execution_plan() started");
5860
let execution_plan = self
5961
.execution_plan
6062
.as_ref()
6163
.ok_or_else(|| api_error!("Flow setup is not up to date. Please run `cocoindex setup` to update the setup."))?
6264
.clone()
6365
.await
6466
.std_result()?;
67+
trace!("get_execution_plan() finished");
6568
Ok(execution_plan)
6669
}
6770
}

src/builder/analyzer.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use anyhow::{anyhow, bail, Context, Result};
2323
use futures::future::try_join3;
2424
use futures::{future::try_join_all, FutureExt};
2525
use indexmap::IndexMap;
26-
use log::warn;
26+
use log::{trace, warn};
2727

2828
#[derive(Debug)]
2929
pub(super) enum ValueTypeBuilder {
@@ -649,9 +649,12 @@ impl<'a> AnalyzerContext<'a> {
649649
let op_name = source_op.name.clone();
650650
let output = scope.add_field(source_op.name, &output_type)?;
651651
let result_fut = async move {
652+
trace!("Start building executor for source op `{}`", op_name);
653+
let executor = executor.await?;
654+
trace!("Finished building executor for source op `{}`", op_name);
652655
Ok(AnalyzedSourceOp {
653656
source_id: source_id.unwrap_or_default(),
654-
executor: executor.await?,
657+
executor,
655658
output,
656659
primary_key_type: output_type
657660
.typ
@@ -701,12 +704,15 @@ impl<'a> AnalyzerContext<'a> {
701704
.add_field(reactive_op.name.clone(), &output_type)?;
702705
let reactive_op = reactive_op.clone();
703706
async move {
707+
trace!("Start building executor for transform op `{}`", reactive_op.name);
704708
let executor = executor.await.with_context(|| {
705709
format!("Failed to build executor for transform op: {}", reactive_op.name)
706710
})?;
711+
let enable_cache = executor.enable_cache();
707712
let behavior_version = executor.behavior_version();
713+
trace!("Finished building executor for transform op `{}`, enable cache: {enable_cache}, behavior version: {behavior_version:?}", reactive_op.name);
708714
let function_exec_info = AnalyzedFunctionExecInfo {
709-
enable_cache: executor.enable_cache(),
715+
enable_cache,
710716
behavior_version,
711717
fingerprinter: Fingerprinter::default()
712718
.with(&reactive_op.name)?
@@ -874,7 +880,7 @@ impl<'a> AnalyzerContext<'a> {
874880
};
875881

876882
let target_id: i32 = 1; // TODO: Fill it with a meaningful value automatically
877-
let ((setup_key, desired_state), executor_futs) = export_factory.clone().build(
883+
let ((setup_key, desired_state), executor_fut) = export_factory.clone().build(
878884
export_op.name.clone(),
879885
target_id,
880886
spec,
@@ -967,9 +973,14 @@ impl<'a> AnalyzerContext<'a> {
967973
.transpose()?;
968974

969975
Ok(async move {
970-
let (executor, query_target) = executor_futs
976+
trace!("Start building executor for export op `{}`", export_op.name);
977+
let (executor, query_target) = executor_fut
971978
.await
972979
.with_context(|| format!("Analyzing export op: {}", export_op.name))?;
980+
trace!(
981+
"Finished building executor for export op `{}`",
982+
export_op.name
983+
);
973984
let name = export_op.name;
974985
Ok(AnalyzedExportOp {
975986
name,

src/lib_context.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ impl LibContext {
4949
}
5050

5151
pub fn create_lib_context(settings: settings::Settings) -> Result<LibContext> {
52-
env_logger::init();
52+
console_subscriber::init();
53+
5354
let runtime = Runtime::new()?;
5455
let (pool, all_css) = runtime.block_on(async {
5556
let pool = PgPool::connect(&settings.database_url).await?;

src/ops/functions/extract_by_mistral.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ impl Executor {
4646
async fn new(spec: Spec) -> Result<Self> {
4747
let model = mistralrs::TextModelBuilder::new(spec.model.model_id)
4848
.with_isq(spec.model.isq_type)
49-
.with_logging()
5049
.with_paged_attn(|| mistralrs::PagedAttentionMetaBuilder::default().build())?
5150
.build()
5251
.await?;

0 commit comments

Comments
 (0)