Skip to content

Commit 731a98c

Browse files
mishamskfa-assistant
authored andcommitted
Telemetry refactor to support private events & Proto defined event data structs (#5715)
* remove unusued pbjson serde gen. add skip_serialize_none pull protogen.toml path out of implementation add glob support to protogen better messages move type attributes to protogen config remove deserialize derive from vortex events add message & enum attrs support to protogen * add new tracing config creation api not relying on io_args telemetry context avoid rare flaky failures in tracing tests update metadata crates fix json payload inconsistency, simplify phase/node span_name enusre stable sort in protogen structure tracing tests and add jsonl filtering tests fix metadata fix copybara structure module for manual impl for generated proto files add otlp testing infra hide arrow schema from public api's make a private api to deserialize all event types from arrow including private * switch to macro derive for serde stuff in proto add new method derive for protos wip NodeEvaluated wip update goldie wip * proper skipped reporting * fix evaluation detail serialization and update metadata sample * add COPY fs/sa/crates/proto-rust-macros fs/sa/crates/proto-rust-macros for metadata GitOrigin-RevId: 439a1d347344453817dd736ceaa0346931a7384f
1 parent fccc1e5 commit 731a98c

File tree

113 files changed

+7597
-24599
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

113 files changed

+7597
-24599
lines changed

Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ members = [
3434
"crates/vortex-client",
3535
"crates/vortex-events",
3636
"crates/proto-rust",
37+
"crates/proto-rust-macros",
3738
"crates/dbt-env",
3839
"crates/dbt-test-utils",
3940
]
@@ -178,6 +179,7 @@ dbt-test-containers = { path = "crates/dbt-test-containers" }
178179
dbt-test-primitives = { path = "crates/dbt-test-primitives" }
179180
dbt-test-utils = { path = "crates/dbt-test-utils" }
180181
proto-rust = { path = "crates/proto-rust" }
182+
proto-rust-macros = { path = "crates/proto-rust-macros" }
181183
vortex-client = { path = "crates/vortex-client" }
182184
vortex-events = { path = "crates/vortex-events" }
183185

@@ -277,7 +279,7 @@ datafusion-physical-expr = { version = "48.0.1" }
277279
datafusion-physical-plan = { version = "48.0.1" }
278280

279281
# uuid
280-
uuid = { version = "1.16.0", features = ["v4"] }
282+
uuid = { version = "1.16.0", features = ["v4", "serde"] }
281283

282284
# networking & cache
283285
redis = { version = "0.31.0", features = [
@@ -386,6 +388,7 @@ rustyline = { version = "15.0.0", default-features = false, features = [
386388
# datatypes and algos
387389
base64 = "0.22.1"
388390
bigdecimal = "0.4.7"
391+
bitflags = "2.9.4"
389392
blake3 = "1.5.0"
390393
chrono = { version = "0.4.41", features = ["std", "clock", "serde"] }
391394
chrono-tz = { version = "0.10" }
@@ -461,7 +464,6 @@ log = { version = "0.4.27", features = ["kv_serde"] }
461464

462465
# tracing (observability)
463466
tracing = "0.1.41"
464-
tracing-log = "0.2.0" # temporary until we migrate to tracing exclusively
465467
tracing-subscriber = { version = "0.3.20", features = ["env-filter", "json"] }
466468

467469
# OpenTelemetry (optional embedded OTLP exporter for traces, see dbt-common)

crates/dbt-common/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ itertools = { workspace = true }
2929
rand = { workspace = true }
3030
regex = { workspace = true }
3131
unicode-segmentation = { version = "1.12" }
32-
uuid = { workspace = true }
32+
uuid = { workspace = true, features = ["v7"] }
3333

3434
arrow = { workspace = true }
3535
arrow-schema = { workspace = true }
@@ -61,8 +61,8 @@ log = { workspace = true }
6161
rust-embed = { workspace = true }
6262

6363
# Tracing
64+
proto-rust = { workspace = true }
6465
tracing = { workspace = true }
65-
tracing-log = { workspace = true } # temporary until we migrate to tracing exclusively
6666
tracing-subscriber = { workspace = true }
6767

6868
# Embedded OTLP exporter for traces

crates/dbt-common/src/io_args.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use clap::ValueEnum;
22
use dbt_serde_yaml::{JsonSchema, Value};
3+
use dbt_telemetry::NodeType;
34
use pathdiff::diff_paths;
45
use serde::{Deserialize, Serialize};
56
use std::collections::HashMap;
@@ -37,8 +38,8 @@ pub struct IoArgs {
3738
pub in_dir: PathBuf,
3839
pub out_dir: PathBuf,
3940
pub log_path: Option<PathBuf>,
40-
pub otm_file_name: Option<String>,
41-
pub otm_parquet_file_name: Option<String>,
41+
pub otel_file_name: Option<String>,
42+
pub otel_parquet_file_name: Option<String>,
4243
pub export_to_otlp: bool,
4344
pub log_format: LogFormat,
4445
pub log_level: Option<LevelFilter>,
@@ -75,7 +76,7 @@ impl fmt::Debug for IoArgs {
7576
.field("in_dir", &self.in_dir)
7677
.field("out_dir", &self.out_dir)
7778
.field("log_path", &self.log_path)
78-
.field("otm_file_name", &self.otm_file_name)
79+
.field("otel_file_name", &self.otel_file_name)
7980
.field("status_reporter", &self.status_reporter.is_some())
8081
.finish()
8182
}
@@ -111,8 +112,8 @@ impl IoArgs {
111112
/// This function takes an artifact path, which may either be a workspace
112113
/// resource, or some generated temp location, and returns a path to its
113114
/// corresponding location in the workspace
114-
pub fn map_to_workspace_path(&self, path: &Path, resource_type: &str) -> PathBuf {
115-
if resource_type == "unit_test" || resource_type == "snapshot" {
115+
pub fn map_to_workspace_path(&self, path: &Path, resource_type: NodeType) -> PathBuf {
116+
if resource_type == NodeType::UnitTest || resource_type == NodeType::Snapshot {
116117
let special_component_idx = path.components().position(|c| {
117118
c.as_os_str() == DBT_GENERIC_TESTS_DIR_NAME
118119
|| c.as_os_str() == DBT_SNAPSHOTS_DIR_NAME
@@ -385,6 +386,19 @@ impl Display for ClapResourceType {
385386
}
386387
}
387388

389+
impl From<&ClapResourceType> for NodeType {
390+
fn from(value: &ClapResourceType) -> Self {
391+
match value {
392+
ClapResourceType::Model => NodeType::Model,
393+
ClapResourceType::Source => NodeType::Source,
394+
ClapResourceType::Seed => NodeType::Seed,
395+
ClapResourceType::Snapshot => NodeType::Snapshot,
396+
ClapResourceType::Test => NodeType::Test,
397+
ClapResourceType::UnitTest => NodeType::UnitTest,
398+
}
399+
}
400+
}
401+
388402
#[derive(
389403
Debug,
390404
Clone,

crates/dbt-common/src/logging/logger.rs

Lines changed: 16 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use std::fmt::Display;
1111
use std::io::{IsTerminal as _, Write};
1212
use std::path::PathBuf;
1313
use std::sync::{Arc, Mutex};
14-
use tracing_log::LogTracer;
1514

1615
const QUERY_LOG_SQL: &str = "query_log.sql";
1716
const CACHE_LOG_FILE: &str = "beta_cache.log";
@@ -97,7 +96,6 @@ enum LogTarget {
9796
Stdout,
9897
Stderr,
9998
Writer(Arc<Mutex<Box<dyn Write + Send>>>),
100-
TracingBridge(LogTracer),
10199
}
102100

103101
// Individual logger that can be customized
@@ -135,9 +133,6 @@ macro_rules! locked_writeln {
135133
writeln!(writer, $($arg)*).ok();
136134
}
137135
}
138-
LogTarget::TracingBridge(_) => {
139-
// TracingBridge doesn't use writeln, handled separately in log() method
140-
}
141136
}
142137
};
143138
}
@@ -153,7 +148,6 @@ impl Logger {
153148
LogTarget::Stdout => !std::io::stdout().is_terminal(),
154149
LogTarget::Stderr => !std::io::stderr().is_terminal(),
155150
LogTarget::Writer(_) => true, // Always remove ANSI codes for file writers
156-
LogTarget::TracingBridge(_) => true, // Always remove ANSI codes for tracing bridge
157151
};
158152
Self {
159153
target: writer,
@@ -292,44 +286,24 @@ impl log::Log for Logger {
292286

293287
fn log(&self, record: &Record) {
294288
if self.enabled(record.metadata()) && !super::term::is_term_control_only(record) {
295-
match &self.target {
296-
LogTarget::TracingBridge(tracer) => {
297-
// Check if this record is already handled by new tracing logic
298-
if record
299-
.key_values()
300-
.get(log::kv::Key::from_str("_TRACING_HANDLED_"))
301-
.is_some()
302-
{
303-
// This record is already handled by the new tracing logic, skip it
304-
return;
289+
match self.config.format {
290+
LogFormat::Text | LogFormat::Default => {
291+
let mut text = record.args().to_string();
292+
if self.remove_ansi_codes {
293+
text = remove_ansi_codes(&text);
305294
}
306-
307-
// For tracing bridge, we need to strip ANSI codes and pass through
308-
// the log level filtering logic, then delegate to the tracer
309-
let text = remove_ansi_codes(&record.args().to_string());
310-
311-
// Pass a new record with stripped ANSI codes
312-
tracer.log(&record.to_builder().args(format_args!("{text}")).build());
295+
locked_writeln!(self, "{}", text);
313296
}
314-
_ => match self.config.format {
315-
LogFormat::Text | LogFormat::Default => {
316-
let mut text = record.args().to_string();
317-
if self.remove_ansi_codes {
318-
text = remove_ansi_codes(&text);
319-
}
320-
locked_writeln!(self, "{}", text);
321-
}
322-
LogFormat::Json => {
323-
let json = Self::format_json(
324-
record,
325-
&self.invocation_id.to_string(),
326-
self.remove_ansi_codes,
327-
);
328-
locked_writeln!(self, "{}", json);
329-
}
330-
// This is handled in new tracing infra
331-
LogFormat::Otel => {}
332-
},
297+
LogFormat::Json => {
298+
let json = Self::format_json(
299+
record,
300+
&self.invocation_id.to_string(),
301+
self.remove_ansi_codes,
302+
);
303+
locked_writeln!(self, "{}", json);
304+
}
305+
// This is handled in new tracing infra
306+
LogFormat::Otel => {}
333307
}
334308
}
335309
}
@@ -347,9 +321,6 @@ impl log::Log for Logger {
347321
let _ = writer.flush();
348322
}
349323
}
350-
LogTarget::TracingBridge(ref tracer) => {
351-
tracer.flush();
352-
}
353324
}
354325
}
355326
}
@@ -454,27 +425,6 @@ impl MultiLoggerBuilder {
454425
self
455426
}
456427

457-
fn add_tracing_bridge_logger(mut self, log_config: &FsLogConfig) -> Self {
458-
let config = LoggerConfig {
459-
level_filter: log_config.log_level,
460-
format: LogFormat::Text, // Format doesn't matter for tracing bridge
461-
min_level: None,
462-
max_level: Some(LevelFilter::Debug),
463-
includes: None,
464-
excludes: None,
465-
};
466-
467-
let logger = Logger::new(
468-
"tracing_bridge",
469-
LogTarget::TracingBridge(LogTracer::new()),
470-
config,
471-
self.invocation_id,
472-
);
473-
474-
self.loggers.push(Box::new(logger));
475-
self
476-
}
477-
478428
fn build(self) -> MultiLogger {
479429
MultiLogger {
480430
loggers: self.loggers,
@@ -644,9 +594,6 @@ pub fn init_logger(log_config: FsLogConfig) -> FsResult<()> {
644594
) as Box<dyn Write + Send>));
645595
builder = builder.add_logger("cache_stats", file, cache_log_config);
646596

647-
// Add tracing bridge logger
648-
builder = builder.add_tracing_bridge_logger(&log_config);
649-
650597
// Build the logger
651598
let logger = builder.build();
652599
// Register the logger globally

crates/dbt-common/src/macros.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ macro_rules! show_completed {
369369
.signed_duration_since($start_time)
370370
.to_std()
371371
.unwrap_or_default();
372-
let resource_type = $task.resource_type();
372+
let resource_type_str = $task.resource_type();
373373
let materialization = $task.base().materialized.to_string();
374374
let schema = $task.base().schema.clone();
375375
let alias = $task.base().alias.clone();
@@ -389,7 +389,7 @@ macro_rules! show_completed {
389389

390390
if $io.should_show(ShowOptions::Completed) {
391391
let schema_alias = format_schema_alias(&schema, &alias);
392-
let resource_type_formatted = format_resource_type_fixed_width(resource_type);
392+
let resource_type_formatted = format_resource_type_fixed_width(resource_type_str.as_ref());
393393
let materialization_suffix = format_materialization_suffix(Some(&materialization), desc.as_deref());
394394
let duration = format_duration_fixed_width(duration);
395395
let output = format!(
@@ -787,24 +787,28 @@ macro_rules! show_warning {
787787
let err = $err;
788788

789789
// New tracing based logic
790-
use $crate::tracing::{ToTracingValue, log_level_to_severity};
790+
use $crate::tracing::log_level_to_severity;
791791
use $crate::tracing::emit::_tracing::Level as TracingLevel;
792792
use $crate::tracing::metrics::{increment_metric, MetricKey};
793-
use $crate::tracing::constants::TRACING_ATTR_FIELD;
794-
use $crate::macros::_dbt_telemetry::{LogEventInfo, TelemetryAttributes, RecordCodeLocation};
793+
use $crate::macros::_dbt_telemetry::LogMessage;
795794
increment_metric(MetricKey::TotalWarnings, 1);
796795

797796
let (original_severity_number, original_severity_text) = log_level_to_severity(&$crate::macros::log_adapter::log::Level::Warn);
798797

799798
$crate::emit_tracing_event!(
800799
level: TracingLevel::WARN,
801-
TelemetryAttributes::Log(LogEventInfo {
800+
LogMessage {
802801
code: Some(err.code as u16 as u32),
803-
dbt_core_code: None,
804-
original_severity_number,
802+
dbt_core_event_code: None,
803+
original_severity_number: original_severity_number as i32,
805804
original_severity_text: original_severity_text.to_string(),
806-
location: RecordCodeLocation::none(), // Will be auto injected
807-
}),
805+
// The rest will be auto injected
806+
phase: None,
807+
unique_id: None,
808+
file: None,
809+
line: None,
810+
}
811+
.into(),
808812
"{}",
809813
err.pretty().as_str()
810814
);

0 commit comments

Comments
 (0)