Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/actions/spelling/allow.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ bitwidth
blackbox
Blaupunkt
Blusens
bpf
bpftrace
buildname
buildroot
bytestream
Expand Down Expand Up @@ -518,11 +520,14 @@ ubuntu
Umeox
UMTS
unchunked
uprobe
uprobes
upstreaminfo
urlencoding
useragents
usergroups
userguide
ustack
Verizon
vhosts
Videocon
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,7 @@ target-x86_64-unknown-linux-musl = ["target-unix"]
# Enables features that work only on systems providing `cfg(unix)`
unix = ["tikv-jemallocator", "allocation-tracing"]
allocation-tracing = ["vector-lib/allocation-tracing"]
component-probes = []

# Enables kubernetes dependencies and shared code. Kubernetes-related sources,
# transforms and sinks should depend on this feature.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added a `component-probes` Cargo feature (disabled by default) that enables bpftrace-based per-component CPU attribution. When enabled, a shared-memory array and uprobe symbol allow external bpftrace scripts to attribute CPU samples to individual Vector components (sources, transforms, sinks).

authors: connoryy
199 changes: 199 additions & 0 deletions src/internal_telemetry/component_probes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
//! Lightweight bpftrace-based per-component CPU attribution.
//!
//! When the `component-probes` feature is enabled, this module provides a
//! [`ComponentProbesLayer`] that tags each Tokio worker thread with the ID of
//! the currently executing component. External bpftrace scripts read this tag
//! on a profile timer to produce per-component flamegraphs.

use std::{
marker::PhantomData,
sync::atomic::{AtomicU32, Ordering},
};

use tracing::{
Subscriber,
field::{Field, Visit},
span::{Attributes, Id},
};
use tracing_subscriber::{Layer, layer::Context, registry::LookupSpan};

/// Returns a leaked `&'static AtomicU32` unique to the current thread.
///
/// On first access, allocates a byte via `Box::leak` and calls
/// [`vector_register_thread`] so bpftrace can map this thread's TID
/// to the byte's address. The leaked byte is valid for the process lifetime.
fn thread_label() -> &'static AtomicU32 {
thread_local! {
static LABEL: &'static AtomicU32 = {
let label: &'static AtomicU32 = Box::leak(Box::new(AtomicU32::new(0)));
#[cfg(target_os = "linux")]
{
let tid = nix::unistd::gettid().as_raw() as u64;
vector_register_thread(tid, label as *const AtomicU32 as *const u8);
}
label
};
}
LABEL.with(|l| *l)
}

/// Uprobe attachment point called once per thread to register the
/// `tid -> label_address` mapping with bpftrace.
#[unsafe(no_mangle)]
#[inline(never)]
#[allow(clippy::missing_const_for_fn)]
pub extern "C" fn vector_register_thread(tid: u64, label_ptr: *const u8) {
std::hint::black_box((tid, label_ptr));
}

/// Uprobe attachment point called once per component to register the
/// `group_id -> component_name` mapping with bpftrace.
#[unsafe(no_mangle)]
#[inline(never)]
#[allow(clippy::missing_const_for_fn)]
pub extern "C" fn vector_register_component(
id: u32,
name_ptr: *const u8,
name_len: usize,
) {
std::hint::black_box((id, name_ptr, name_len));
}

/// Next probe group ID. 0 means idle (no component active).
static NEXT_PROBE_ID: AtomicU32 = AtomicU32::new(1);

/// Stored in span extensions to associate a span with a probe group ID.
struct ProbeGroupId(u32);

/// Extracts the `component_id` field value from span attributes.
#[derive(Default)]
struct ComponentIdVisitor {
component_id: Option<String>,
}

impl Visit for ComponentIdVisitor {
fn record_str(&mut self, field: &Field, value: &str) {
if field.name() == "component_id" {
self.component_id = Some(value.to_owned());
}
}

fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
if field.name() == "component_id" {
self.component_id = Some(format!("{value:?}"));
}
}
}

/// A tracing layer that writes the active component's group ID to a per-thread
/// [`AtomicU32`] on span enter and clears it on exit.
///
/// Detects component spans via the `component_id` field in `on_new_span`,
/// assigns a unique probe group ID, and registers the mapping with bpftrace
/// via [`vector_register_component`]. Independent of `allocation-tracing`.
pub struct ComponentProbesLayer<S> {
_subscriber: PhantomData<S>,
}

impl<S> Default for ComponentProbesLayer<S> {
fn default() -> Self {
Self::new()
}
}

impl<S> ComponentProbesLayer<S> {
#[must_use]
pub const fn new() -> Self {
Self {
_subscriber: PhantomData,
}
}
}

impl<S> Layer<S> for ComponentProbesLayer<S>
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
let mut visitor = ComponentIdVisitor::default();
attrs.record(&mut visitor);

if let Some(component_id) = visitor.component_id {
let probe_id = NEXT_PROBE_ID.fetch_add(1, Ordering::Relaxed);
if probe_id == 0 {
return;
}

let id_bytes = component_id.as_bytes();
vector_register_component(probe_id, id_bytes.as_ptr(), id_bytes.len());

if let Some(span) = ctx.span(id) {
span.extensions_mut().insert(ProbeGroupId(probe_id));
}
}
}

fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
if let Some(span) = ctx.span(id)
&& let Some(probe) = span.extensions().get::<ProbeGroupId>()
{
thread_label().store(probe.0, Ordering::Relaxed);
}
}

fn on_exit(&self, id: &Id, ctx: Context<'_, S>) {
if let Some(span) = ctx.span(id)
&& span.extensions().get::<ProbeGroupId>().is_some()
{
thread_label().store(0, Ordering::Relaxed);
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn thread_label_store_and_clear() {
let label = thread_label();
let group_id: u32 = 7;

label.store(group_id, Ordering::Relaxed);
assert_eq!(label.load(Ordering::Relaxed), group_id);

label.store(0, Ordering::Relaxed);
assert_eq!(label.load(Ordering::Relaxed), 0);
}

#[test]
fn thread_label_is_stable() {
let a = thread_label();
let b = thread_label();
assert!(std::ptr::eq(a, b), "must return the same address");
}

#[test]
fn thread_labels_are_unique() {
use std::sync::mpsc;
let (tx, rx) = mpsc::channel();
for _ in 0..4 {
let tx = tx.clone();
std::thread::spawn(move || {
tx.send(thread_label() as *const AtomicU32 as usize)
.unwrap();
});
}
drop(tx);
let mut addrs: Vec<usize> = rx.iter().collect();
addrs.sort();
addrs.dedup();
assert_eq!(addrs.len(), 4, "each thread must get a distinct address");
}

#[test]
fn register_component_does_not_panic() {
let name = b"test_component";
vector_register_component(1, name.as_ptr(), name.len());
}
}
3 changes: 3 additions & 0 deletions src/internal_telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@

#[cfg(feature = "allocation-tracing")]
pub mod allocations;

#[cfg(all(target_os = "linux", feature = "component-probes"))]
pub mod component_probes;
9 changes: 9 additions & 0 deletions src/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ pub fn init(color: bool, json: bool, levels: &str, internal_log_rate_limit: u64)
subscriber.with(allocation_layer)
};

#[cfg(all(target_os = "linux", feature = "component-probes"))]
let subscriber = {
let probes_layer =
crate::internal_telemetry::component_probes::ComponentProbesLayer::new()
.with_filter(LevelFilter::ERROR);

subscriber.with(probes_layer)
};

if json {
let formatter = tracing_subscriber::fmt::layer().json().flatten_event(true);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
---
title: Component-Level CPU Profiling with bpftrace
description: Use bpftrace to attribute CPU samples to individual Vector components for targeted performance analysis
authors: ["connoryy"]
domain: observability
weight: 1
tags: ["profiling", "bpftrace", "cpu", "observability", "advanced", "guides", "guide"]
---

When investigating CPU usage in a Vector pipeline, standard profiling tools show
which _functions_ are hot but not which _components_ (sources, transforms,
sinks) are responsible. The `component-probes` feature solves this by tagging
each thread with the currently active component so that bpftrace can sample it
externally.

## Prerequisites

- Linux with [bpftrace](https://github.com/bpftrace/bpftrace)
- Root or `CAP_BPF`
- Vector built with `--features component-probes`

## How It Works

When `component-probes` is enabled, Vector writes the active component's ID to
a per-thread atomic on span enter and clears it on exit. Two `extern "C"`
functions serve as uprobe attachment points:

- `vector_register_thread(tid, label_ptr)` — maps a thread's TID to the
address of its label (fired once per thread).
- `vector_register_component(group_id, name_ptr, name_len)` — maps a group
ID to a component name (fired once per component).

## bpftrace Script

Replace `/path/to/vector` with your binary path:

```bpf

Check failure

Code scanning / check-spelling

Unrecognized Spelling

[bpf](#security-tab) is not a recognized word. \(unrecognized-spelling\)
#!/usr/bin/env bpftrace

uprobe:/path/to/vector:vector_register_thread {
@tid_to_addr[arg0] = arg1;
@vector_pid = pid;
}

uprobe:/path/to/vector:vector_register_component {
@names[arg0] = str(arg1, arg2);
}

profile:hz:997 {
if (@vector_pid != 0 && pid == @vector_pid) {
$addr = @tid_to_addr[tid];
if ($addr != 0) {
$group_id = *(uint32 *)$addr;
if ($group_id != 0) {
@stacks[@names[$group_id], ustack()] = count();

Check failure

Code scanning / check-spelling

Unrecognized Spelling

[ustack](#security-tab) is not a recognized word. \(unrecognized-spelling\)
}
}
}
}
```

This aggregates component-labeled stack traces directly in bpftrace. Start
bpftrace before Vector so it catches the registration uprobes during startup.

Check failure

Code scanning / check-spelling

Unrecognized Spelling

[uprobes](#security-tab) is not a recognized word. \(unrecognized-spelling\)

If `ustack()` is not available in your environment, replace the `@stacks`

Check failure

Code scanning / check-spelling

Unrecognized Spelling

[ustack](#security-tab) is not a recognized word. \(unrecognized-spelling\)
line with a `printf` to emit raw labeled samples that can be joined with
stack traces from other tools like `perf`:

```bpf

Check failure

Code scanning / check-spelling

Unrecognized Spelling

[bpf](#security-tab) is not a recognized word. \(unrecognized-spelling\)
printf("S %lld %d %s\n", nsecs, tid, @names[$group_id]);
```

## Overhead

- **Per span enter/exit**: one span extension lookup + one relaxed atomic store.
- **Per thread**: 4 bytes via `Box::leak` (never freed — bpftrace reads the
address asynchronously with no synchronization).
- **Per component**: one uprobe call at startup.
- **Sampling**: kernel-side, not charged to Vector.

When the feature is not enabled, zero extra code is compiled.
Loading