Skip to content

Commit 797ace9

Browse files
authored
add tracing instrumentation to the pyo3 layer through ContextVars (#12026)
### What As part of benchmark testing, we'd like to be able to have a single e2e trace for each benchmark test. In order to do so, we need to pass trace context across pyo3 boundary and it seems (per Gemini) that the recommended way to do this is through ``ContextVar``s which is what standard tracing libraries in python do. This PR leverages that and adds a context extraction logic as well as an API (per @abey79 suggestion) to the rerun sdk so that we can easily access this trace context var and set it on the python side. For now I've focused on the query paths and we can see both query and fetching phase under a single trace thanks to changes in the dataframe table provider and trace propagation between query execution phases. Follow up will add instrumentation to the register and index creation paths. Example usage: ``` from rerun.catalog import _rerun_trace_context from opentelemetry import context from opentelemetry.propagate import get_global_textmap trace_ctx = _rerun_trace_context() with tracer.start_as_current_span("sample_index_values") as span: # Add attributes to help identify this benchmark span.set_attribute("benchmark.name", "sample_index_values") span.set_attribute("dataset.name", self.dataset_name) span.set_attribute("sample_count", 6) wrist = self.dataset.dataframe_query_view( index="log_tick", contents="/camera/wrist/embedding /thumbnail/camera/wrist", ) sampled_times = [0, 100, 200, 500, 1000, 2000] current_ctx = context.get_current() carrier = {} get_global_textmap().inject(carrier, current_ctx) if carrier: token = trace_ctx.set(carrier) result = ( (wrist.filter_index_values(sampled_times).fill_latest_at()).df().drop("rerun_partition_id").collect() ) if carrier: trace_ctx.reset(token) span.set_attribute("result_rows", len(result)) ``
1 parent 8701ea2 commit 797ace9

File tree

13 files changed

+252
-22
lines changed

13 files changed

+252
-22
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8869,6 +8869,7 @@ dependencies = [
88698869
"re_arrow_util",
88708870
"re_dataframe",
88718871
"re_log_types",
8872+
"re_perf_telemetry",
88728873
"re_protos",
88738874
"re_redap_client",
88748875
"re_sorbet",
@@ -9209,6 +9210,7 @@ dependencies = [
92099210
"opentelemetry_sdk",
92109211
"parking_lot",
92119212
"prometheus-client",
9213+
"pyo3",
92129214
"serde",
92139215
"serde_json",
92149216
"tokio",

crates/store/re_datafusion/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ tokio-stream.workspace = true
5353
tonic.workspace = true
5454
tracing.workspace = true
5555

56+
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
57+
re_perf_telemetry.workspace = true
58+
5659
[target.'cfg(target_arch = "wasm32")'.dependencies]
5760
futures.workspace = true
5861
wasm-bindgen-futures.workspace = true

crates/store/re_datafusion/src/dataframe_query_common.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ pub struct DataframeQueryTableProvider {
4949
sort_index: Option<Index>,
5050
chunk_info_batches: Arc<Vec<RecordBatch>>,
5151
client: ConnectionClient,
52+
53+
/// passing trace headers between phases of execution pipeline helps keep
54+
/// the entire operation under a single trace.
55+
#[cfg(not(target_arch = "wasm32"))]
56+
trace_headers: Option<crate::TraceHeaders>,
5257
}
5358

5459
impl DataframeQueryTableProvider {
@@ -62,6 +67,7 @@ impl DataframeQueryTableProvider {
6267
dataset_id: EntryId,
6368
query_expression: &QueryExpression,
6469
partition_ids: &[impl AsRef<str> + Sync],
70+
#[cfg(not(target_arch = "wasm32"))] trace_headers: Option<crate::TraceHeaders>,
6571
) -> Result<Self, DataFusionError> {
6672
use futures::StreamExt as _;
6773

@@ -164,6 +170,8 @@ impl DataframeQueryTableProvider {
164170
sort_index: query_expression.filtered_index,
165171
chunk_info_batches,
166172
client,
173+
#[cfg(not(target_arch = "wasm32"))]
174+
trace_headers,
167175
})
168176
}
169177

@@ -259,6 +267,8 @@ impl TableProvider for DataframeQueryTableProvider {
259267
Arc::clone(&self.chunk_info_batches),
260268
query_expression,
261269
self.client.clone(),
270+
#[cfg(not(target_arch = "wasm32"))]
271+
self.trace_headers.clone(),
262272
)
263273
.map(Arc::new)
264274
.map(|exec| {

crates/store/re_datafusion/src/dataframe_query_provider.rs

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,27 @@ use crate::dataframe_query_common::{
4545
/// the IO stream.
4646
const CPU_THREAD_IO_CHANNEL_SIZE: usize = 32;
4747

48+
/// Helper to attach parent trace context if available.
49+
/// Returns a guard that must be kept alive for the duration of the traced scope.
50+
/// We can use this to ensure all phases of table provider's execution pipeline are
51+
/// parented by a single trace.
52+
#[cfg(not(target_arch = "wasm32"))]
53+
#[inline]
54+
fn attach_trace_context(
55+
trace_headers: &Option<crate::TraceHeaders>,
56+
) -> Option<re_perf_telemetry::external::opentelemetry::ContextGuard> {
57+
let headers = trace_headers.as_ref()?;
58+
if !headers.traceparent.is_empty() {
59+
let parent_ctx =
60+
re_perf_telemetry::external::opentelemetry::global::get_text_map_propagator(|prop| {
61+
prop.extract(headers)
62+
});
63+
Some(parent_ctx.attach())
64+
} else {
65+
None
66+
}
67+
}
68+
4869
#[derive(Debug)]
4970
pub(crate) struct PartitionStreamExec {
5071
props: PlanProperties,
@@ -61,6 +82,10 @@ pub(crate) struct PartitionStreamExec {
6182
target_partitions: usize,
6283
worker_runtime: Arc<CpuRuntime>,
6384
client: ConnectionClient,
85+
86+
/// passing trace headers between phases of execution pipeline helps keep
87+
/// the entire operation under a single trace.
88+
trace_headers: Option<crate::TraceHeaders>,
6489
}
6590

6691
type ChunksWithPartition = Vec<(Chunk, Option<String>)>;
@@ -79,6 +104,10 @@ pub struct DataframePartitionStreamInner {
79104
/// so that our worker does not shut down unexpectedly.
80105
cpu_runtime: Arc<CpuRuntime>,
81106
cpu_join_handle: Option<JoinHandle<Result<(), DataFusionError>>>,
107+
108+
/// passing trace headers between phases of execution pipeline helps keep
109+
/// the entire operation under a single trace.
110+
trace_headers: Option<crate::TraceHeaders>,
82111
}
83112

84113
/// This is a temporary fix to minimize the impact of leaking memory
@@ -96,13 +125,16 @@ pub struct DataframePartitionStream {
96125
impl Stream for DataframePartitionStream {
97126
type Item = Result<RecordBatch, DataFusionError>;
98127

99-
#[tracing::instrument(level = "info", skip_all)]
100128
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
101129
let this_outer = self.get_mut();
102130
let Some(this) = this_outer.inner.as_mut() else {
103131
return Poll::Ready(None);
104132
};
105133

134+
#[cfg(not(target_arch = "wasm32"))]
135+
let _trace_guard = attach_trace_context(&this.trace_headers);
136+
let _span = tracing::info_span!("poll_next").entered();
137+
106138
// If we have any errors on the worker thread, we want to ensure we pass them up
107139
// through the stream.
108140
if this
@@ -132,11 +164,16 @@ impl Stream for DataframePartitionStream {
132164
return Poll::Ready(Some(exec_err!("No tx for chunks from CPU thread")));
133165
};
134166

135-
this.io_join_handle = Some(io_handle.spawn(chunk_stream_io_loop(
136-
this.client.clone(),
137-
this.chunk_infos.clone(),
138-
chunk_tx,
139-
)));
167+
let client = this.client.clone();
168+
let chunk_infos = this.chunk_infos.clone();
169+
let current_span = tracing::Span::current();
170+
171+
this.io_join_handle = Some(
172+
io_handle.spawn(
173+
async move { chunk_stream_io_loop(client, chunk_infos, chunk_tx).await }
174+
.instrument(current_span.clone()),
175+
),
176+
);
140177
}
141178

142179
let result = this
@@ -163,6 +200,7 @@ impl RecordBatchStream for DataframePartitionStream {
163200

164201
impl PartitionStreamExec {
165202
#[tracing::instrument(level = "info", skip_all)]
203+
#[expect(clippy::too_many_arguments)]
166204
pub fn try_new(
167205
table_schema: &SchemaRef,
168206
sort_index: Option<Index>,
@@ -171,6 +209,7 @@ impl PartitionStreamExec {
171209
chunk_info_batches: Arc<Vec<RecordBatch>>,
172210
mut query_expression: QueryExpression,
173211
client: ConnectionClient,
212+
trace_headers: Option<crate::TraceHeaders>,
174213
) -> datafusion::common::Result<Self> {
175214
let projected_schema = match projection {
176215
Some(p) => Arc::new(table_schema.project(p)?),
@@ -270,6 +309,7 @@ impl PartitionStreamExec {
270309
target_partitions: num_partitions,
271310
worker_runtime,
272311
client,
312+
trace_headers,
273313
})
274314
}
275315
}
@@ -476,12 +516,15 @@ impl ExecutionPlan for PartitionStreamExec {
476516
}
477517
}
478518

479-
#[tracing::instrument(level = "info", skip_all)]
480519
fn execute(
481520
&self,
482521
partition: usize,
483522
_context: Arc<TaskContext>,
484523
) -> datafusion::common::Result<SendableRecordBatchStream> {
524+
#[cfg(not(target_arch = "wasm32"))]
525+
let _trace_guard = attach_trace_context(&self.trace_headers);
526+
let _span = tracing::info_span!("execute").entered();
527+
485528
let (chunk_tx, chunk_rx) = tokio::sync::mpsc::channel(CPU_THREAD_IO_CHANNEL_SIZE);
486529

487530
let random_state = ahash::RandomState::with_seeds(0, 0, 0, 0);
@@ -526,6 +569,7 @@ impl ExecutionPlan for PartitionStreamExec {
526569
io_join_handle: None,
527570
cpu_join_handle,
528571
cpu_runtime: Arc::clone(&self.worker_runtime),
572+
trace_headers: self.trace_headers.clone(),
529573
};
530574
let stream = DataframePartitionStream {
531575
inner: Some(stream),
@@ -552,6 +596,7 @@ impl ExecutionPlan for PartitionStreamExec {
552596
target_partitions,
553597
worker_runtime: Arc::new(CpuRuntime::try_new(target_partitions)?),
554598
client: self.client.clone(),
599+
trace_headers: self.trace_headers.clone(),
555600
};
556601

557602
plan.props.partitioning = match plan.props.partitioning {

crates/store/re_datafusion/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,6 @@ pub use dataset_manifest::DatasetManifestProvider;
2323
pub use partition_table::SegmentTableProvider;
2424
pub use search_provider::SearchResultsTableProvider;
2525
pub use table_entry_provider::TableEntryTableProvider;
26+
27+
#[cfg(not(target_arch = "wasm32"))]
28+
pub(crate) type TraceHeaders = re_perf_telemetry::TraceHeaders;

crates/utils/re_perf_telemetry/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ tracy_enabled = []
4242
## E.g. an async function that yields 50 times will be counted as 51 (the first call + 50 yields).
4343
tracy = ["dep:tracing-tracy"]
4444

45+
## PyO3 integration for cross-boundary tracing
46+
pyo3 = ["dep:pyo3"]
47+
4548

4649
[dependencies]
4750

@@ -78,6 +81,7 @@ tokio.workspace = true
7881

7982
# External (optional)
8083
tracing-tracy = { workspace = true, optional = true }
84+
pyo3 = { workspace = true, optional = true }
8185

8286
[lints]
8387
workspace = true

crates/utils/re_perf_telemetry/src/lib.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,82 @@ impl From<&TraceHeaders> for opentelemetry::Context {
192192

193193
// ---
194194

195+
/// The name of the `ContextVar` used for trace context propagation
196+
pub const TRACE_CONTEXT_VAR_NAME: &str = "TRACE_CONTEXT";
197+
198+
#[cfg(feature = "pyo3")]
199+
/// Get the trace context `ContextVar` object.
200+
///
201+
/// This returns the same Python `ContextVar` instance every time, ensuring that
202+
/// values set on it can be read back later. It is up to the caller to ensure trace context
203+
/// is reset and cleared as needed.
204+
pub fn get_trace_context_var(py: pyo3::Python<'_>) -> pyo3::PyResult<pyo3::Bound<'_, pyo3::PyAny>> {
205+
use pyo3::prelude::*;
206+
207+
static CONTEXT_VAR: parking_lot::Mutex<Option<pyo3::Py<pyo3::PyAny>>> =
208+
parking_lot::Mutex::new(None);
209+
210+
let mut guard = CONTEXT_VAR.lock();
211+
212+
if let Some(var) = guard.as_ref() {
213+
return Ok(var.bind(py).clone());
214+
}
215+
216+
// Create the trace context ContextVar
217+
let module = py.import("contextvars")?;
218+
let contextvar_class = module.getattr("ContextVar")?;
219+
let trace_ctx_var = contextvar_class.call1((TRACE_CONTEXT_VAR_NAME,))?;
220+
let trace_ctx_unbound = trace_ctx_var.clone().unbind();
221+
222+
*guard = Some(trace_ctx_unbound);
223+
224+
Ok(trace_ctx_var)
225+
}
226+
227+
#[cfg(feature = "pyo3")]
228+
/// Extract trace context from Python `ContextVar` for cross-boundary propagation.
229+
pub fn extract_trace_context_from_contextvar(py: pyo3::Python<'_>) -> TraceHeaders {
230+
use pyo3::prelude::*;
231+
use pyo3::types::PyDict;
232+
233+
fn try_extract(py: pyo3::Python<'_>) -> PyResult<TraceHeaders> {
234+
let context_var = get_trace_context_var(py)?;
235+
236+
match context_var.call_method0("get") {
237+
Ok(trace_data) => {
238+
if let Ok(dict) = trace_data.downcast::<PyDict>() {
239+
let traceparent = dict
240+
.get_item(TraceHeaders::TRACEPARENT_KEY)?
241+
.and_then(|v| v.extract::<String>().ok())
242+
.unwrap_or_default();
243+
244+
let tracestate = dict
245+
.get_item(TraceHeaders::TRACESTATE_KEY)?
246+
.and_then(|v| v.extract::<String>().ok());
247+
248+
let headers = TraceHeaders {
249+
traceparent,
250+
tracestate,
251+
};
252+
253+
tracing::debug!("Trace headers: {:?}", headers);
254+
Ok(headers)
255+
} else {
256+
Ok(TraceHeaders::empty())
257+
}
258+
}
259+
Err(_) => Ok(TraceHeaders::empty()),
260+
}
261+
}
262+
263+
try_extract(py).unwrap_or_else(|err| {
264+
tracing::debug!("Failed to extract trace context: {}", err);
265+
TraceHeaders::empty()
266+
})
267+
}
268+
269+
// ---
270+
195271
// Extension to [`tracing_subscriber:EnvFilter`] that allows to
196272
// add a directive only if not already present in the base filter
197273
pub trait EnvFilterExt

rerun_py/Cargo.toml

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ name = "rerun_bindings" # name of the .so library that the Python module will im
1515

1616

1717
[features]
18-
default = ["extension-module"]
18+
default = ["extension-module", "perf_telemetry"]
1919

2020
## Extra features that aren't ready to be included in release builds yet.
2121
extra = []
@@ -38,7 +38,11 @@ nasm = ["re_video/nasm"]
3838
## Enables integration with `re_perf_telemetry` (Tracy, Jaeger).
3939
##
4040
## This only works on native.
41-
perf_telemetry = ["dep:re_perf_telemetry", "re_redap_client/perf_telemetry"]
41+
perf_telemetry = [
42+
"dep:re_perf_telemetry",
43+
"re_redap_client/perf_telemetry",
44+
"re_perf_telemetry/pyo3",
45+
]
4246

4347
server = ["re_sdk/server", "dep:re_grpc_server"]
4448

@@ -109,7 +113,7 @@ thiserror.workspace = true
109113

110114
# Native dependencies:
111115
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
112-
re_perf_telemetry = { workspace = true, features = ["tracy"], optional = true }
116+
re_perf_telemetry = { workspace = true, features = ["tracy", "pyo3"], optional = true }
113117

114118

115119
[build-dependencies]

rerun_py/rerun_bindings/rerun_bindings.pyi

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1851,3 +1851,6 @@ class Credentials:
18511851

18521852
def get_credentials() -> Credentials | None:
18531853
"""Returns the credentials for the current user."""
1854+
1855+
def rerun_trace_context() -> Any:
1856+
"""Get the trace context ContextVar for distributed tracing propagation."""

rerun_py/rerun_sdk/rerun/catalog/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
# Conditionally compiled function - always exists at runtime but mypy can't verify
34
from rerun_bindings import (
45
AlreadyExistsError as AlreadyExistsError,
56
DataframeQueryView as DataframeQueryView,
@@ -14,6 +15,7 @@
1415
Task as Task,
1516
Tasks as Tasks,
1617
VectorDistanceMetric as VectorDistanceMetric,
18+
rerun_trace_context as _rerun_trace_context,
1719
)
1820
from rerun_bindings.types import (
1921
IndexValuesLike as IndexValuesLike,

0 commit comments

Comments
 (0)