Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/common/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub mod snapshot;
use std::{ops::Index, sync::Arc, time::Duration};

use indicatif::{HumanBytes, HumanCount, HumanDuration, HumanFloatCount};
pub use meters::{Counter, Gauge, normalize_name};
pub use meters::{Counter, Gauge, Meter, UpDownCounter, normalize_name};
pub use operator_metrics::{
MetricsCollector, NoopMetricsCollector, OperatorCounter, OperatorMetrics,
};
Expand Down
110 changes: 104 additions & 6 deletions src/common/metrics/src/meters.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use std::{
borrow::Cow,
sync::atomic::{AtomicU64, Ordering},
sync::atomic::{AtomicI64, AtomicU64, Ordering},
};

use atomic_float::AtomicF64;
use opentelemetry::{KeyValue, metrics::Meter};
use opentelemetry::{InstrumentationScope, KeyValue, global};

use crate::{
ATTR_QUERY_ID, DURATION_KEY, QueryID, ROWS_IN_KEY, ROWS_OUT_KEY, UNIT_MICROSECONDS, UNIT_ROWS,
};

pub fn normalize_name(name: impl Into<Cow<'static, str>>) -> String {
let name = name.into();
Expand All @@ -21,8 +25,8 @@ pub struct Counter {
}

impl Counter {
pub fn new(
meter: &Meter,
fn new(
meter: &opentelemetry::metrics::Meter,
name: impl Into<Cow<'static, str>>,
description: Option<Cow<'static, str>>,
unit: Option<Cow<'static, str>>,
Expand Down Expand Up @@ -62,8 +66,8 @@ pub struct Gauge {
}

impl Gauge {
pub fn new(
meter: &Meter,
fn new(
meter: &opentelemetry::metrics::Meter,
name: impl Into<Cow<'static, str>>,
description: Option<Cow<'static, str>>,
) -> Self {
Expand All @@ -89,3 +93,97 @@ impl Gauge {
self.value.load(ordering)
}
}

pub struct UpDownCounter {
value: AtomicI64,
otel: opentelemetry::metrics::UpDownCounter<i64>,
}

impl UpDownCounter {
fn new(meter: &opentelemetry::metrics::Meter, name: impl Into<Cow<'static, str>>) -> Self {
let normalized_name = normalize_name(name);
let builder = meter.i64_up_down_counter(normalized_name);
Self {
value: AtomicI64::new(0),
otel: builder.build(),
}
}

pub fn add(&self, value: i64, key_values: &[KeyValue]) {
self.value.fetch_add(value, Ordering::Relaxed);
self.otel.add(value, key_values);
}

pub fn load(&self, ordering: Ordering) -> i64 {
self.value.load(ordering)
}
}

#[derive(Clone)]
pub struct Meter {
otel: opentelemetry::metrics::Meter,
}

impl Meter {
pub fn query_scope(query_id: QueryID, name: impl Into<Cow<'static, str>>) -> Self {
let scope = InstrumentationScope::builder(name)
.with_attributes(vec![KeyValue::new(ATTR_QUERY_ID, query_id)])
.build();

let otel = global::meter_with_scope(scope);
Self { otel }
}

pub fn test_scope(name: &'static str) -> Self {
let otel = global::meter(name);
Self { otel }
}
Comment on lines +137 to +140
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_scope is public and not #[cfg(test)]-gated

Meter::test_scope is intended only for test usage (as the name implies and as it's only used in #[cfg(test)] blocks), but it is declared pub fn without a #[cfg(test)] guard. This means the method is compiled into production binaries and exposed as part of the public API of the common_metrics crate.

Consider gating it with #[cfg(test)] or, if it also needs to be available to integration tests in other crates, #[cfg(any(test, feature = "test-utils"))]:

Suggested change
pub fn test_scope(name: &'static str) -> Self {
let otel = global::meter(name);
Self { otel }
}
#[cfg(test)]
pub fn test_scope(name: &'static str) -> Self {
let otel = global::meter(name);
Self { otel }
}


pub fn u64_counter(&self, name: impl Into<Cow<'static, str>>) -> Counter {
Counter::new(&self.otel, name, None, None)
}

pub fn u64_counter_with_desc_and_unit(
&self,
name: impl Into<Cow<'static, str>>,
description: Option<Cow<'static, str>>,
unit: Option<Cow<'static, str>>,
) -> Counter {
Counter::new(&self.otel, name, description, unit)
}
Comment on lines +142 to +153
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two similar Counter-factory methods could be unified

u64_counter and u64_counter_with_desc_and_unit both create a Counter, differing only in whether description and unit are provided. Per the project's preference for single parametrised functions over multiple near-duplicate ones, these could be merged using Option parameters:

pub fn u64_counter(
    &self,
    name: impl Into<Cow<'static, str>>,
    description: Option<Cow<'static, str>>,
    unit: Option<Cow<'static, str>>,
) -> Counter {
    Counter::new(&self.otel, name, description, unit)
}

All call sites that currently pass no description/unit would use None, None, while sites needing metadata pass them directly — removing the need for the _with_desc_and_unit variant.

Rule Used: Prefer single parametrized functions over multiple... (source)

Learnt From
Eventual-Inc/Daft#5207

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!


pub fn f64_gauge(&self, name: impl Into<Cow<'static, str>>) -> Gauge {
Gauge::new(&self.otel, name, None)
}

pub fn i64_up_down_counter(&self, name: impl Into<Cow<'static, str>>) -> UpDownCounter {
UpDownCounter::new(&self.otel, name)
}

pub fn duration_us_metric(&self) -> Counter {
Counter::new(
&self.otel,
DURATION_KEY,
None,
Some(Cow::Borrowed(UNIT_MICROSECONDS)),
)
}

pub fn rows_in_metric(&self) -> Counter {
Counter::new(
&self.otel,
ROWS_IN_KEY,
None,
Some(Cow::Borrowed(UNIT_ROWS)),
)
}

pub fn rows_out_metric(&self) -> Counter {
Counter::new(
&self.otel,
ROWS_OUT_KEY,
None,
Some(Cow::Borrowed(UNIT_ROWS)),
)
}
}
6 changes: 4 additions & 2 deletions src/daft-distributed/src/pipeline_node/actor_udf.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::sync::Arc;

use common_error::DaftResult;
use common_metrics::ops::{NodeCategory, NodeType};
use common_metrics::{
Meter,
ops::{NodeCategory, NodeType},
};
use common_py_serde::PyObjectWrapper;
use common_runtime::JoinSet;
use daft_dsl::{
Expand All @@ -12,7 +15,6 @@ use daft_local_plan::{LocalNodeContext, LocalPhysicalPlan};
use daft_logical_plan::stats::StatsState;
use daft_schema::schema::SchemaRef;
use futures::StreamExt;
use opentelemetry::metrics::Meter;
use pyo3::{Py, PyAny, Python, types::PyAnyMethods};

use super::{NodeID, PipelineNodeConfig, PipelineNodeContext, PipelineNodeImpl, udf::UdfStats};
Expand Down
13 changes: 6 additions & 7 deletions src/daft-distributed/src/pipeline_node/explode.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use std::sync::{Arc, atomic::Ordering};

use common_metrics::{
Counter, DURATION_KEY, Gauge, ROWS_IN_KEY, ROWS_OUT_KEY, StatSnapshot, UNIT_MICROSECONDS,
UNIT_ROWS,
Counter, Gauge, Meter, StatSnapshot,
ops::{NodeCategory, NodeInfo, NodeType},
snapshot::ExplodeSnapshot,
};
use daft_dsl::expr::bound_expr::BoundExpr;
use daft_local_plan::{LocalNodeContext, LocalPhysicalPlan};
use daft_logical_plan::stats::StatsState;
use daft_schema::schema::SchemaRef;
use opentelemetry::{KeyValue, metrics::Meter};
use opentelemetry::KeyValue;

use super::{DistributedPipelineNode, PipelineNodeImpl, TaskBuilderStream};
use crate::{
Expand All @@ -33,10 +32,10 @@ impl ExplodeStats {
pub fn new(meter: &Meter, context: &PipelineNodeContext) -> Self {
let node_kv = key_values_from_context(context);
Self {
duration_us: Counter::new(meter, DURATION_KEY, None, Some(UNIT_MICROSECONDS.into())),
rows_in: Counter::new(meter, ROWS_IN_KEY, None, Some(UNIT_ROWS.into())),
rows_out: Counter::new(meter, ROWS_OUT_KEY, None, Some(UNIT_ROWS.into())),
amplification: Gauge::new(meter, "amplification", None),
duration_us: meter.duration_us_metric(),
rows_in: meter.rows_in_metric(),
rows_out: meter.rows_out_metric(),
amplification: meter.f64_gauge("amplification"),
node_kv,
}
}
Expand Down
13 changes: 6 additions & 7 deletions src/daft-distributed/src/pipeline_node/filter.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use std::sync::{Arc, atomic::Ordering};

use common_metrics::{
Counter, DURATION_KEY, Gauge, ROWS_IN_KEY, ROWS_OUT_KEY, StatSnapshot, UNIT_MICROSECONDS,
UNIT_ROWS,
Counter, Gauge, Meter, StatSnapshot,
ops::{NodeCategory, NodeInfo, NodeType},
snapshot::FilterSnapshot,
};
use daft_dsl::expr::bound_expr::BoundExpr;
use daft_local_plan::{LocalNodeContext, LocalPhysicalPlan};
use daft_logical_plan::stats::StatsState;
use daft_schema::schema::SchemaRef;
use opentelemetry::{KeyValue, metrics::Meter};
use opentelemetry::KeyValue;

use super::{DistributedPipelineNode, PipelineNodeImpl, TaskBuilderStream};
use crate::{
Expand All @@ -33,10 +32,10 @@ impl FilterStats {
pub fn new(meter: &Meter, context: &PipelineNodeContext) -> Self {
let node_kv = key_values_from_context(context);
Self {
duration_us: Counter::new(meter, DURATION_KEY, None, Some(UNIT_MICROSECONDS.into())),
rows_in: Counter::new(meter, ROWS_IN_KEY, None, Some(UNIT_ROWS.into())),
rows_out: Counter::new(meter, ROWS_OUT_KEY, None, Some(UNIT_ROWS.into())),
selectivity: Gauge::new(meter, "selectivity", None),
duration_us: meter.duration_us_metric(),
rows_in: meter.rows_in_metric(),
rows_out: meter.rows_out_metric(),
selectivity: meter.f64_gauge("selectivity"),
node_kv,
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/daft-distributed/src/pipeline_node/glob_scan_source.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use std::sync::Arc;

use common_io_config::IOConfig;
use common_metrics::ops::{NodeCategory, NodeType};
use common_metrics::{
Meter,
ops::{NodeCategory, NodeType},
};
use common_scan_info::Pushdowns;
use daft_local_plan::{LocalNodeContext, LocalPhysicalPlan};
use daft_logical_plan::{ClusteringSpec, stats::StatsState};
use daft_schema::schema::SchemaRef;
use futures::{StreamExt, stream};
use opentelemetry::metrics::Meter;

use super::{
DistributedPipelineNode, PipelineNodeConfig, PipelineNodeContext, scan_source::SourceStats,
Expand Down
6 changes: 4 additions & 2 deletions src/daft-distributed/src/pipeline_node/in_memory_source.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::{collections::HashMap, sync::Arc};

use common_metrics::ops::{NodeCategory, NodeType};
use common_metrics::{
Meter,
ops::{NodeCategory, NodeType},
};
use common_partitioning::PartitionRef;
use daft_local_plan::{LocalNodeContext, LocalPhysicalPlan};
use daft_logical_plan::{ClusteringSpec, InMemoryInfo, stats::StatsState};
use futures::{StreamExt, stream};
use opentelemetry::metrics::Meter;

use super::{PipelineNodeContext, PipelineNodeImpl, scan_source::SourceStats};
use crate::{
Expand Down
3 changes: 1 addition & 2 deletions src/daft-distributed/src/pipeline_node/into_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ use std::sync::Arc;

use common_error::DaftResult;
use common_metrics::{
StatSnapshot,
Meter, StatSnapshot,
ops::{NodeCategory, NodeInfo, NodeType},
};
use daft_local_plan::{LocalNodeContext, LocalPhysicalPlan};
use daft_logical_plan::{partitioning::UnknownClusteringConfig, stats::StatsState};
use daft_schema::schema::SchemaRef;
use futures::StreamExt;
use opentelemetry::metrics::Meter;

use super::{PipelineNodeImpl, TaskBuilderStream};
use crate::{
Expand Down
3 changes: 1 addition & 2 deletions src/daft-distributed/src/pipeline_node/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ use std::{cmp::Ordering, collections::VecDeque, sync::Arc};

use common_error::DaftResult;
use common_metrics::{
StatSnapshot,
Meter, StatSnapshot,
ops::{NodeCategory, NodeInfo, NodeType},
};
use daft_local_plan::{LocalNodeContext, LocalPhysicalPlan};
use daft_logical_plan::stats::StatsState;
use daft_schema::schema::SchemaRef;
use futures::StreamExt;
use opentelemetry::metrics::Meter;

use super::{DistributedPipelineNode, MaterializedOutput, PipelineNodeImpl, TaskBuilderStream};
use crate::{
Expand Down
3 changes: 1 addition & 2 deletions src/daft-distributed/src/pipeline_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use common_display::{
};
use common_error::DaftResult;
use common_metrics::{
QueryID,
Meter, QueryID,
ops::{NodeCategory, NodeType},
};
use common_partitioning::PartitionRef;
Expand All @@ -24,7 +24,6 @@ use daft_logical_plan::{partitioning::ClusteringSpecRef, stats::StatsState};
use daft_schema::schema::SchemaRef;
use futures::{Stream, StreamExt, stream::BoxStream};
use materialize::materialize_all_pipeline_outputs;
use opentelemetry::metrics::Meter;

use crate::{
plan::{PlanExecutionContext, QueryIdx, TaskIDCounter},
Expand Down
15 changes: 9 additions & 6 deletions src/daft-distributed/src/pipeline_node/scan_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use common_display::{DisplayAs, DisplayLevel};
#[cfg(feature = "python")]
use common_file_formats::FileFormatConfig;
use common_metrics::{
BYTES_READ_KEY, Counter, DURATION_KEY, ROWS_OUT_KEY, StatSnapshot, UNIT_BYTES,
UNIT_MICROSECONDS, UNIT_ROWS,
BYTES_READ_KEY, Counter, Meter, StatSnapshot, UNIT_BYTES,
ops::{NodeCategory, NodeInfo, NodeType},
snapshot::SourceSnapshot,
};
Expand All @@ -14,7 +13,7 @@ use daft_local_plan::{LocalNodeContext, LocalPhysicalPlan};
use daft_logical_plan::{ClusteringSpec, stats::StatsState};
use daft_schema::schema::SchemaRef;
use futures::{StreamExt, stream};
use opentelemetry::{KeyValue, metrics::Meter};
use opentelemetry::KeyValue;

use super::{PipelineNodeConfig, PipelineNodeContext, PipelineNodeImpl, TaskBuilderStream};
use crate::{
Expand All @@ -35,9 +34,13 @@ impl SourceStats {
pub fn new(meter: &Meter, context: &PipelineNodeContext) -> Self {
let node_kv = key_values_from_context(context);
Self {
duration_us: Counter::new(meter, DURATION_KEY, None, Some(UNIT_MICROSECONDS.into())),
rows_out: Counter::new(meter, ROWS_OUT_KEY, None, Some(UNIT_ROWS.into())),
bytes_read: Counter::new(meter, BYTES_READ_KEY, None, Some(UNIT_BYTES.into())),
duration_us: meter.duration_us_metric(),
rows_out: meter.rows_out_metric(),
bytes_read: meter.u64_counter_with_desc_and_unit(
BYTES_READ_KEY,
None,
Some(UNIT_BYTES.into()),
),
node_kv,
}
}
Expand Down
21 changes: 14 additions & 7 deletions src/daft-distributed/src/pipeline_node/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use std::sync::{Arc, atomic::Ordering};
use common_error::DaftResult;
use common_file_formats::FileFormat;
use common_metrics::{
BYTES_WRITTEN_KEY, Counter, DURATION_KEY, ROWS_IN_KEY, ROWS_WRITTEN_KEY, StatSnapshot,
UNIT_BYTES, UNIT_MICROSECONDS, UNIT_ROWS,
BYTES_WRITTEN_KEY, Counter, Meter, ROWS_WRITTEN_KEY, StatSnapshot, UNIT_BYTES, UNIT_ROWS,
ops::{NodeCategory, NodeInfo, NodeType},
snapshot::WriteSnapshot,
};
Expand All @@ -15,7 +14,7 @@ use daft_logical_plan::sink_info::CatalogType;
use daft_logical_plan::{OutputFileInfo, SinkInfo, stats::StatsState};
use daft_schema::schema::SchemaRef;
use futures::TryStreamExt;
use opentelemetry::{KeyValue, metrics::Meter};
use opentelemetry::KeyValue;

use super::{PipelineNodeImpl, TaskBuilderStream};
use crate::{
Expand Down Expand Up @@ -44,10 +43,18 @@ impl WriteStats {
pub fn new(meter: &Meter, context: &PipelineNodeContext) -> Self {
let node_kv = key_values_from_context(context);
Self {
duration_us: Counter::new(meter, DURATION_KEY, None, Some(UNIT_MICROSECONDS.into())),
rows_in: Counter::new(meter, ROWS_IN_KEY, None, Some(UNIT_ROWS.into())),
rows_written: Counter::new(meter, ROWS_WRITTEN_KEY, None, Some(UNIT_ROWS.into())),
bytes_written: Counter::new(meter, BYTES_WRITTEN_KEY, None, Some(UNIT_BYTES.into())),
duration_us: meter.duration_us_metric(),
rows_in: meter.rows_in_metric(),
rows_written: meter.u64_counter_with_desc_and_unit(
ROWS_WRITTEN_KEY,
None,
Some(UNIT_ROWS.into()),
),
bytes_written: meter.u64_counter_with_desc_and_unit(
BYTES_WRITTEN_KEY,
None,
Some(UNIT_BYTES.into()),
),
node_kv,
}
}
Expand Down
Loading
Loading