Skip to content

chore: Abstract metrics meters for execution#6384

Open
srilman wants to merge 1 commit intomainfrom
slade/custom-meter-type
Open

chore: Abstract metrics meters for execution#6384
srilman wants to merge 1 commit intomainfrom
slade/custom-meter-type

Conversation

@srilman
Copy link
Contributor

@srilman srilman commented Mar 11, 2026

Changes Made

Wrap the OTEL meter type to reduce duplicated code.

@srilman srilman requested a review from a team as a code owner March 11, 2026 23:42
@github-actions github-actions bot added the chore label Mar 11, 2026
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 11, 2026

Greptile Summary

This PR introduces a Meter wrapper struct in common_metrics that encapsulates the raw opentelemetry::metrics::Meter and exposes domain-specific convenience methods (duration_us_metric, rows_in_metric, rows_out_metric, etc.), eliminating the repeated counter-construction boilerplate that previously existed across 20+ pipeline node and execution files. A new UpDownCounter type is also added for tracking signed counters (e.g., currently active tasks). The Counter::new and Gauge::new constructors are made private, with creation now channelled exclusively through Meter.

  • New Meter abstraction (src/common/metrics/src/meters.rs): wraps OTEL meter with query_scope / test_scope constructors and helper factories; Counter and Gauge constructors made private.
  • New UpDownCounter: AtomicI64-backed signed counter used for task.active tracking in RuntimeNodeManager.
  • Distributed pipeline nodes (src/daft-distributed/): all nodes now receive a &Meter instead of constructing raw OTEL instruments.
  • Local execution pipeline (src/daft-local-execution/): DefaultRuntimeStats, intermediate operators, sinks, and sources all migrated to Meter.
  • test_scope visibility: Meter::test_scope is public without a #[cfg(test)] guard, leaking a test-only helper into production binaries.
  • Duplicate factory methods: u64_counter and u64_counter_with_desc_and_unit could be unified into a single method with optional parameters, per the project's preference for parametrised functions over near-duplicate variants.

Confidence Score: 4/5

  • This PR is safe to merge; it is a pure refactor with no behavioural changes and all existing tests still apply.
  • The refactor is mechanical and well-scoped: no logic changes, no API breakage to callers outside the metrics crate, and the new Meter wrapper correctly delegates to the underlying OTEL types. The only concerns are minor style issues (public test helper without #[cfg(test)], two near-duplicate factory methods) that do not affect correctness or safety.
  • src/common/metrics/src/meters.rs — public test_scope method and two near-duplicate counter factory methods.

Important Files Changed

Filename Overview
src/common/metrics/src/meters.rs Core of the PR: introduces the Meter wrapper with query_scope/test_scope constructors and domain-specific helpers (duration_us_metric, rows_in_metric, rows_out_metric). Also adds UpDownCounter. Two minor issues: test_scope is public without a #[cfg(test)] guard, and u64_counter/u64_counter_with_desc_and_unit could be a single parametrised method.
src/common/metrics/src/lib.rs Re-exports Meter and UpDownCounter from the meters module alongside the existing Counter, Gauge, and normalize_name; no issues.
src/daft-distributed/src/statistics/stats.rs Migrates RuntimeNodeManager and BaseCounters to use the new Meter wrapper. Removes duplicated counter-construction code. Logic is correct and consistent with the rest of the refactor.
src/daft-distributed/src/statistics/mod.rs Uses Meter::query_scope to create a single meter for the whole pipeline; no issues.
src/daft-distributed/src/pipeline_node/filter.rs Migrated FilterStats to use Meter convenience methods; logic unchanged and correct.
src/daft-distributed/src/pipeline_node/mod.rs Updated PipelineNodeImpl::runtime_stats to accept &Meter instead of the raw OTEL meter; clean, no issues.
src/daft-local-execution/src/runtime_stats/values.rs Migrates DefaultRuntimeStats to use Meter convenience methods; straightforward and correct.
src/daft-local-execution/src/runtime_stats/mod.rs Tests updated to use Meter::test_scope("test_stats") instead of constructing OTEL meters directly; functional behavior unchanged.
src/daft-local-execution/src/intermediate_ops/intermediate_op.rs Updated make_runtime_stats to use Meter; no issues.
src/daft-local-execution/src/pipeline.rs Pipeline builder context now passes a Meter to node construction; clean migration.

Class Diagram

%%{init: {'theme': 'neutral'}}%%
classDiagram
    class Meter {
        -otel: opentelemetry::metrics::Meter
        +query_scope(query_id: QueryID, name) Meter
        +test_scope(name: &str) Meter
        +u64_counter(name) Counter
        +u64_counter_with_desc_and_unit(name, desc, unit) Counter
        +f64_gauge(name) Gauge
        +i64_up_down_counter(name) UpDownCounter
        +duration_us_metric() Counter
        +rows_in_metric() Counter
        +rows_out_metric() Counter
    }

    class Counter {
        -value: AtomicU64
        -otel: opentelemetry::metrics::Counter~u64~
        +add(value, key_values) u64
        +load(ordering) u64
    }

    class Gauge {
        -value: AtomicF64
        -otel: opentelemetry::metrics::Gauge~f64~
        +update(value, key_values)
        +load(ordering) f64
    }

    class UpDownCounter {
        -value: AtomicI64
        -otel: opentelemetry::metrics::UpDownCounter~i64~
        +add(value, key_values)
        +load(ordering) i64
    }

    class DefaultRuntimeStats {
        -duration_us: Counter
        -rows_in: Counter
        -rows_out: Counter
        -node_kv: Vec~KeyValue~
        +new(meter, node_info) DefaultRuntimeStats
        +add_rows_in(rows)
        +add_rows_out(rows)
        +add_cpu_us(cpu_us)
    }

    class RuntimeNodeManager {
        -active_tasks: UpDownCounter
        -completed_tasks: Counter
        -failed_tasks: Counter
        -cancelled_tasks: Counter
        +new(meter, runtime_stats, node_info) RuntimeNodeManager
        +handle_task_event(event)
        +export_snapshot() StatSnapshot
    }

    Meter --> Counter : creates
    Meter --> Gauge : creates
    Meter --> UpDownCounter : creates
    DefaultRuntimeStats --> Counter : uses
    RuntimeNodeManager --> Counter : uses
    RuntimeNodeManager --> UpDownCounter : uses
Loading

Last reviewed commit: 776bcf3

Comment on lines +137 to +140
pub fn test_scope(name: &'static str) -> Self {
let otel = global::meter(name);
Self { otel }
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_scope is public and not #[cfg(test)]-gated

Meter::test_scope is intended only for test usage (as the name implies and as it's only used in #[cfg(test)] blocks), but it is declared pub fn without a #[cfg(test)] guard. This means the method is compiled into production binaries and exposed as part of the public API of the common_metrics crate.

Consider gating it with #[cfg(test)] or, if it also needs to be available to integration tests in other crates, #[cfg(any(test, feature = "test-utils"))]:

Suggested change
pub fn test_scope(name: &'static str) -> Self {
let otel = global::meter(name);
Self { otel }
}
#[cfg(test)]
pub fn test_scope(name: &'static str) -> Self {
let otel = global::meter(name);
Self { otel }
}

Comment on lines +142 to +153
pub fn u64_counter(&self, name: impl Into<Cow<'static, str>>) -> Counter {
Counter::new(&self.otel, name, None, None)
}

pub fn u64_counter_with_desc_and_unit(
&self,
name: impl Into<Cow<'static, str>>,
description: Option<Cow<'static, str>>,
unit: Option<Cow<'static, str>>,
) -> Counter {
Counter::new(&self.otel, name, description, unit)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two similar Counter-factory methods could be unified

u64_counter and u64_counter_with_desc_and_unit both create a Counter, differing only in whether description and unit are provided. Per the project's preference for single parametrised functions over multiple near-duplicate ones, these could be merged using Option parameters:

pub fn u64_counter(
    &self,
    name: impl Into<Cow<'static, str>>,
    description: Option<Cow<'static, str>>,
    unit: Option<Cow<'static, str>>,
) -> Counter {
    Counter::new(&self.otel, name, description, unit)
}

All call sites that currently pass no description/unit would use None, None, while sites needing metadata pass them directly — removing the need for the _with_desc_and_unit variant.

Rule Used: Prefer single parametrized functions over multiple... (source)

Learnt From
Eventual-Inc/Daft#5207

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

@codecov
Copy link

codecov bot commented Mar 12, 2026

Codecov Report

❌ Patch coverage is 56.62651% with 72 lines in your changes missing coverage. Please review.
✅ Project coverage is 74.78%. Comparing base (24be16f) to head (776bcf3).
⚠️ Report is 14 commits behind head on main.

Files with missing lines Patch % Lines
src/common/metrics/src/meters.rs 67.69% 21 Missing ⚠️
src/daft-distributed/src/statistics/stats.rs 0.00% 19 Missing ⚠️
src/daft-distributed/src/pipeline_node/sink.rs 0.00% 12 Missing ⚠️
.../daft-distributed/src/pipeline_node/scan_source.rs 0.00% 7 Missing ⚠️
src/daft-distributed/src/pipeline_node/explode.rs 0.00% 4 Missing ⚠️
src/daft-distributed/src/pipeline_node/filter.rs 0.00% 4 Missing ⚠️
src/daft-distributed/src/pipeline_node/udf.rs 0.00% 4 Missing ⚠️
src/daft-distributed/src/statistics/mod.rs 0.00% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #6384      +/-   ##
==========================================
- Coverage   74.85%   74.78%   -0.08%     
==========================================
  Files        1021     1022       +1     
  Lines      135842   136431     +589     
==========================================
+ Hits       101683   102025     +342     
- Misses      34159    34406     +247     
Files with missing lines Coverage Δ
src/common/metrics/src/lib.rs 52.63% <ø> (ø)
...rc/daft-distributed/src/pipeline_node/actor_udf.rs 32.14% <ø> (ø)
...-distributed/src/pipeline_node/glob_scan_source.rs 60.00% <ø> (ø)
...-distributed/src/pipeline_node/in_memory_source.rs 59.74% <ø> (ø)
...daft-distributed/src/pipeline_node/into_batches.rs 27.39% <ø> (ø)
src/daft-distributed/src/pipeline_node/limit.rs 17.20% <ø> (ø)
src/daft-distributed/src/pipeline_node/mod.rs 41.08% <ø> (ø)
...ft-local-execution/src/intermediate_ops/explode.rs 96.84% <100.00%> (ø)
...aft-local-execution/src/intermediate_ops/filter.rs 94.21% <100.00%> (ø)
...-execution/src/intermediate_ops/intermediate_op.rs 88.77% <ø> (ø)
... and 18 more

... and 46 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@srilman srilman requested review from cckellogg and samstokes March 17, 2026 18:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant