Skip to content

Commit bbcc720

Browse files
committed
feat(cube): Pass tracing spans through spawned tasks in ExecutionPlan execution
1 parent b357f20 commit bbcc720

File tree

3 files changed

+56
-5
lines changed

3 files changed

+56
-5
lines changed

datafusion/physical-plan/src/cube_ext/spawn.rs

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ use std::future::Future;
1919
use crate::cube_ext::catch_unwind::{
2020
async_try_with_catch_unwind, try_with_catch_unwind, PanicError,
2121
};
22+
use datafusion_common_runtime::SpawnedTask;
2223
use futures::sink::SinkExt;
23-
use tokio::task::JoinHandle;
24+
use tokio::task::{AbortHandle, JoinHandle, JoinSet};
2425
use tracing_futures::Instrument;
2526

2627
/// Calls [tokio::spawn] and additionally enables tracing of the spawned task as part of the current
@@ -57,6 +58,56 @@ where
5758
}
5859
}
5960

61+
/// Propagates current span to the spawned task. See [spawn] for details.
62+
pub fn spawn_on_joinset<F, T>(join_set: &mut JoinSet<T>, task: F) -> AbortHandle
63+
where
64+
F: Future<Output = T>,
65+
F: Send + 'static,
66+
T: Send + 'static,
67+
{
68+
if let Some(s) = new_subtask_span() {
69+
join_set.spawn(async move {
70+
let _p = s.parent; // ensure parent stays alive.
71+
task.instrument(s.child).await
72+
})
73+
} else {
74+
join_set.spawn(task)
75+
}
76+
}
77+
78+
/// Propagates current span to the blocking operation. See [spawn] for details.
79+
pub fn spawn_blocking_on_joinset<F, T>(join_set: &mut JoinSet<T>, f: F) -> AbortHandle
80+
where
81+
F: FnOnce() -> T,
82+
F: Send + 'static,
83+
T: Send + 'static,
84+
{
85+
if let Some(s) = new_subtask_span() {
86+
join_set.spawn_blocking(move || {
87+
let _p = s.parent; // ensure parent stays alive.
88+
s.child.in_scope(f)
89+
})
90+
} else {
91+
join_set.spawn_blocking(f)
92+
}
93+
}
94+
95+
/// Propagates current span to the spawned task. See [spawn] for details.
96+
pub fn spawn_spawned_task<T>(task: T) -> SpawnedTask<T::Output>
97+
where
98+
T: Future + Send + 'static,
99+
T::Output: Send + 'static,
100+
{
101+
if let Some(s) = new_subtask_span() {
102+
SpawnedTask::spawn(async move {
103+
let _p = s.parent; // ensure parent stays alive.
104+
task.instrument(s.child).await
105+
})
106+
} else {
107+
SpawnedTask::spawn(task)
108+
}
109+
}
110+
60111
struct SpawnSpans {
61112
parent: tracing::Span,
62113
child: tracing::Span,

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ impl RepartitionExecState {
133133

134134
let r_metrics = RepartitionMetrics::new(i, num_output_partitions, &metrics);
135135

136-
let input_task = SpawnedTask::spawn(RepartitionExec::pull_from_input(
136+
let input_task = crate::cube_ext::spawn_spawned_task(RepartitionExec::pull_from_input(
137137
Arc::clone(&input),
138138
i,
139139
txs.clone(),
@@ -144,7 +144,7 @@ impl RepartitionExecState {
144144

145145
// In a separate task, wait for each input to be done
146146
// (and pass along any errors, including panic!s)
147-
let wait_for_task = SpawnedTask::spawn(RepartitionExec::wait_for_task(
147+
let wait_for_task = crate::cube_ext::spawn_spawned_task(RepartitionExec::wait_for_task(
148148
input_task,
149149
txs.into_iter()
150150
.map(|(partition, (tx, _reservation))| (partition, tx))

datafusion/physical-plan/src/stream.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ impl<O: Send + 'static> ReceiverStreamBuilder<O> {
7979
F: Future<Output = Result<()>>,
8080
F: Send + 'static,
8181
{
82-
self.join_set.spawn(task);
82+
crate::cube_ext::spawn_on_joinset(&mut self.join_set, task);
8383
}
8484

8585
/// Spawn a blocking task that will be aborted if this builder (or the stream
@@ -92,7 +92,7 @@ impl<O: Send + 'static> ReceiverStreamBuilder<O> {
9292
F: FnOnce() -> Result<()>,
9393
F: Send + 'static,
9494
{
95-
self.join_set.spawn_blocking(f);
95+
crate::cube_ext::spawn_blocking_on_joinset(&mut self.join_set, f);
9696
}
9797

9898
/// Create a stream of all data written to `tx`

0 commit comments

Comments
 (0)