diff --git a/rust/cubestore/Cargo.lock b/rust/cubestore/Cargo.lock index a2c007a67309e..a07c87bbeff2e 100644 --- a/rust/cubestore/Cargo.lock +++ b/rust/cubestore/Cargo.lock @@ -1201,6 +1201,10 @@ dependencies = [ "msql-srv", "nanoid", "num 0.3.1", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-otlp", + "opentelemetry_sdk", "parquet-format 2.6.1", "parse-size", "paste", @@ -1230,6 +1234,7 @@ dependencies = [ "tokio-util", "tracing", "tracing-futures", + "tracing-opentelemetry", "url", "uuid 0.8.2", "warp", @@ -1914,9 +1919,9 @@ checksum = "f0a01e0497841a3b2db4f8afa483cce65f7e96a3498bd6c541734792aeac8fe7" [[package]] name = "glob" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "h2" @@ -3236,6 +3241,88 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "570074cc999d1a58184080966e5bd3bf3a9a4af650c3b05047c2621e7405cd17" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "once_cell", + "pin-project-lite 0.2.14", + "thiserror", +] + +[[package]] +name = "opentelemetry-http" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6351496aeaa49d7c267fb480678d85d1cd30c5edb20b497c48c56f62a8c14b99" +dependencies = [ + "async-trait", + "bytes 1.6.0", + "http 1.1.0", + "opentelemetry", + "reqwest 0.12.5", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29e1f9c8b032d4f635c730c0efcf731d5e2530ea13fa8bef7939ddc8420696bd" +dependencies = [ + "async-trait", + "futures-core", + "http 1.1.0", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost", + "reqwest 0.12.5", + "serde_json", + "thiserror", + "tokio", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9d3968ce3aefdcca5c27e3c4ea4391b37547726a70893aab52d3de95d5f8b34" +dependencies = [ + "hex", + "opentelemetry", + "opentelemetry_sdk", + "prost", + "serde", + "tonic", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2c627d9f4c9cdc1f21a29ee4bfbd6028fcb8bcf2a857b43f3abdf72c9c862f3" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "once_cell", + "opentelemetry", + "percent-encoding", + "rand 0.8.4", + "serde_json", + "thiserror", + "tokio", + "tokio-stream", +] + [[package]] name = "ordered-float" version = "1.1.1" @@ -3443,22 +3530,22 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.0.8" +version = "1.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08" +checksum = "be57f64e946e500c8ee36ef6331845d40a93055567ec57e8fae13efd33759b95" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.0.8" +version = "1.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389" +checksum = "3c0f5fad0874fc7abcd4d750e76917eaebbecaa2c20bde22e1dbeeba8beb758c" dependencies = [ "proc-macro2", "quote", - "syn 1.0.107", + "syn 2.0.58", ] [[package]] @@ -3640,6 +3727,29 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b0487d90e047de87f984913713b85c601c05609aad5b0df4b4573fbf69aa13f" +dependencies = [ + "bytes 1.6.0", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5" +dependencies = [ + "anyhow", + "itertools 0.11.0", + "proc-macro2", + "quote", + "syn 2.0.58", +] + [[package]] name = "protobuf" version = "2.24.1" @@ -4085,6 +4195,7 @@ checksum = "c7d6d2a27d57148378eb5e111173f4276ad26340ecc5c49a4a2152167a2d6a37" dependencies = [ "base64 0.22.0", "bytes 1.6.0", + "futures-channel", "futures-core", "futures-util", "h2 0.4.4", @@ -4555,6 +4666,15 @@ dependencies = [ "keccak", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shared_child" version = "1.0.0" @@ -4927,6 +5047,16 @@ dependencies = [ "syn 1.0.107", ] +[[package]] +name = "thread_local" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" +dependencies = [ + "cfg-if 1.0.0", + "once_cell", +] + [[package]] name = "threadpool" version = "1.8.1" @@ -5124,9 +5254,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.15" +version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" +checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1" dependencies = [ "futures-core", "pin-project-lite 0.2.14", @@ -5171,6 +5301,27 @@ dependencies = [ "serde", ] +[[package]] +name = "tonic" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" +dependencies = [ + "async-trait", + "base64 0.22.0", + "bytes 1.6.0", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "percent-encoding", + "pin-project", + "prost", + "tokio-stream", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.4.8" @@ -5229,6 +5380,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a" dependencies = [ "once_cell", + "valuable", ] [[package]] @@ -5241,6 +5393,46 @@ dependencies = [ "tracing", ] +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-opentelemetry" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc58af5d3f6c5811462cabb3289aec0093f7338e367e5a33d28c0433b3c7360b" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" +dependencies = [ + "sharded-slab", + "thread_local", + "tracing-core", +] + [[package]] name = "treeline" version = "0.1.0" @@ -5411,6 +5603,12 @@ dependencies = [ "getrandom 0.2.14", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "value-bag" version = "1.8.1" @@ -5600,6 +5798,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki-roots" version = "0.26.1" diff --git a/rust/cubestore/cubestore/Cargo.toml b/rust/cubestore/cubestore/Cargo.toml index eb95829a83138..b34712a457f49 100644 --- a/rust/cubestore/cubestore/Cargo.toml +++ b/rust/cubestore/cubestore/Cargo.toml @@ -78,6 +78,14 @@ flatbuffers = "23.1.21" http-auth-basic = "0.1.2" tracing = "0.1.25" tracing-futures = { version = "0.2.5" } +tracing-opentelemetry = "0.27.0" +opentelemetry = "0.26.0" +# opentelemetry_sdk v0.27 build fails because of Nightly in our toolchain (channel = "nightly-2024-01-29") +opentelemetry_sdk = { version = "0.26.0", features = ["rt-tokio"] } +opentelemetry-otlp = { version = "0.26.0", default-features = false, features = [ + "trace", "metrics", "logs", "http-proto", "http-json", "reqwest-client", "tokio" +] } +opentelemetry-http = { version = "0.26.0", features = ["reqwest"] } lru = "0.6.5" moka = { version = "0.10.1", features = ["future"]} ctor = "0.1.20" diff --git a/rust/cubestore/cubestore/src/cluster/mod.rs b/rust/cubestore/cubestore/src/cluster/mod.rs index afe622f875401..77bc6c72b8e8e 100644 --- a/rust/cubestore/cubestore/src/cluster/mod.rs +++ b/rust/cubestore/cubestore/src/cluster/mod.rs @@ -41,7 +41,7 @@ use crate::queryplanner::query_executor::{QueryExecutor, SerializedRecordBatchSt use crate::queryplanner::serialized_plan::SerializedPlan; use crate::remotefs::RemoteFs; use crate::store::ChunkDataStore; -use crate::telemetry::tracing::TracingHelper; +use crate::telemetry::tracing::{TraceIdAndSpanId, TracingHelper}; use crate::CubeError; use async_trait::async_trait; use datafusion::arrow::datatypes::SchemaRef; @@ -60,6 +60,8 @@ use ingestion::job_runner::JobRunner; use itertools::Itertools; use log::{debug, error, info, warn}; use mockall::automock; +use opentelemetry::trace::{SpanContext, SpanId, TraceContextExt, TraceFlags, TraceId}; +use opentelemetry::Context as OtelContext; use regex::Regex; use serde::{Deserialize, Serialize}; use std::collections::hash_map::DefaultHasher; @@ -76,6 +78,7 @@ use tokio::sync::{oneshot, watch, Notify, RwLock}; use tokio::time::timeout; use tokio_util::sync::CancellationToken; use tracing::{instrument, Instrument}; +use tracing_opentelemetry::OpenTelemetrySpanExt; #[automock] #[async_trait] @@ -215,7 +218,7 @@ pub enum WorkerMessage { SerializedPlan, HashMap, HashMap>, - Option<(u64, u64)>, + Option, ), } @@ -251,6 +254,13 @@ impl Configurator for WorkerConfigurator { }; Ok(config) } + + fn teardown() { + let teardown = SELECT_WORKER_SHUTDOWN.read().unwrap(); + if teardown.is_some() { + teardown.as_ref().unwrap()(); + } + } } #[cfg(not(target_os = "windows"))] @@ -320,13 +330,25 @@ impl WorkerProcessing for WorkerProcessor { let records = SerializedRecordBatchStream::write(schema.as_ref(), records)?; Ok((schema, records, data_loaded_size)) }; + let span = trace_id_and_span_id.map(|(t, s)| { - tracing::info_span!( - "Process on selec worker", - cube_dd_trace_id = t, - cube_dd_parent_span_id = s - ) + let trace_id = TraceId::from(t); + let span_id = SpanId::from(s); + let span_context = SpanContext::new( + trace_id, + span_id, + TraceFlags::SAMPLED, + true, + Default::default(), + ); + + let context = OtelContext::new().with_remote_span_context(span_context); + let span = tracing::info_span!("Process on select worker"); + + span.set_parent(context); + span }); + if let Some(span) = span { future.instrument(span).await } else { @@ -369,12 +391,23 @@ lazy_static! { std::sync::RwLock::new(None); } +lazy_static! { + static ref SELECT_WORKER_SHUTDOWN: std::sync::RwLock>> = + std::sync::RwLock::new(None); +} + pub fn register_select_worker_setup(f: fn(&Runtime)) { let mut setup = SELECT_WORKER_SETUP.write().unwrap(); assert!(setup.is_none(), "select worker setup already registered"); *setup = Some(Box::new(f)); } +pub fn register_select_worker_teardown(f: fn()) { + let mut setup = SELECT_WORKER_SHUTDOWN.write().unwrap(); + assert!(setup.is_none(), "select worker teardown already registered"); + *setup = Some(Box::new(f)); +} + pub fn register_select_worker_configure_fn(f: fn() -> BoxFuture<'static, Config>) { let mut func = SELECT_WORKER_CONFIGURE_FN.write().unwrap(); assert!( diff --git a/rust/cubestore/cubestore/src/cluster/worker_pool.rs b/rust/cubestore/cubestore/src/cluster/worker_pool.rs index 0ff552f38006f..edc7b3f6a2326 100644 --- a/rust/cubestore/cubestore/src/cluster/worker_pool.rs +++ b/rust/cubestore/cubestore/src/cluster/worker_pool.rs @@ -376,6 +376,14 @@ pub struct WorkerProcessArgs(PhantomData); + +impl Drop for TeardownGuard { + fn drop(&mut self) { + C::teardown(); + } +} + pub fn worker_main(a: WorkerProcessArgs) -> i32 where C: Configurator, @@ -399,6 +407,7 @@ where tokio_builder.thread_stack_size(stack_size); let runtime = tokio_builder.build().unwrap(); C::setup(&runtime); + let _teardown_guard = TeardownGuard::(PhantomData); runtime.block_on(async move { let services_client = S::connect(services_sender, services_reciever, timeout); let config = match C::configure(services_client).await { @@ -503,6 +512,8 @@ mod tests { config.configure_injector().await; Ok(config) } + + fn teardown() {} } pub struct Processor; @@ -711,6 +722,8 @@ mod tests { let config = TestConfig { services_client }; Ok(config) } + + fn teardown() {} } pub struct ServProcessor; diff --git a/rust/cubestore/cubestore/src/cluster/worker_services.rs b/rust/cubestore/cubestore/src/cluster/worker_services.rs index 5f444ea57151f..e3ead5ec56a80 100644 --- a/rust/cubestore/cubestore/src/cluster/worker_services.rs +++ b/rust/cubestore/cubestore/src/cluster/worker_services.rs @@ -34,6 +34,8 @@ pub trait Configurator: Send + Sync + 'static { dyn Callable, >, ) -> Result; + + fn teardown(); } #[async_trait] diff --git a/rust/cubestore/cubestore/src/telemetry/tracing.rs b/rust/cubestore/cubestore/src/telemetry/tracing.rs index 847450a420a7c..42b9bea18753f 100644 --- a/rust/cubestore/cubestore/src/telemetry/tracing.rs +++ b/rust/cubestore/cubestore/src/telemetry/tracing.rs @@ -1,15 +1,29 @@ use crate::config::injection::DIService; use crate::CubeError; use std::sync::Arc; +use tracing::Span; + +pub type TraceIdAndSpanId = (u128, u64); pub trait TracingHelper: DIService + Send + Sync { - fn trace_and_span_id(&self) -> Option<(u64, u64)>; + fn trace_and_span_id(&self) -> Option; + fn span_from_existing_trace( + &self, + trace_id_and_span_id: Option, + ) -> Option; } pub struct TracingHelperImpl; impl TracingHelper for TracingHelperImpl { - fn trace_and_span_id(&self) -> Option<(u64, u64)> { + fn trace_and_span_id(&self) -> Option { + None + } + + fn span_from_existing_trace( + &self, + _trace_id_and_span_id: Option, + ) -> Option { None } }