Skip to content

Commit 2db0981

Browse files
committed
Propagate span when spawning tasks
Used by CubeStore tracing infrastructure
1 parent 6ab96d5 commit 2db0981

File tree

8 files changed

+66
-14
lines changed

8 files changed

+66
-14
lines changed

rust/datafusion/src/cube_ext/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,6 @@ pub mod joinagg;
2121
pub mod sequence;
2222
pub mod stream;
2323
pub mod util;
24+
25+
mod spawn;
26+
pub use spawn::*;

rust/datafusion/src/cube_ext/spawn.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use futures::Future;
19+
use tokio::task::JoinHandle;
20+
use tracing_futures::Instrument;
21+
22+
/// Calls [tokio::spawn] and additionally enables tracing of the spawned task as part of the current
23+
/// computation. This is CubeStore approach to tracing, so all code must use this function instead
24+
/// of replace [tokio::spawn].
25+
pub fn spawn<T>(task: T) -> JoinHandle<T::Output>
26+
where
27+
T: Future + Send + 'static,
28+
T::Output: Send + 'static,
29+
{
30+
tokio::spawn(task.in_current_span())
31+
}
32+
33+
/// Propagates current span to blocking operation. See [spawn] for details.
34+
pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<R>
35+
where
36+
F: FnOnce() -> R + Send + 'static,
37+
R: Send + 'static,
38+
{
39+
let span = tracing::Span::current();
40+
if span.is_disabled() {
41+
tokio::task::spawn_blocking(f)
42+
} else {
43+
tokio::task::spawn_blocking(move || {
44+
let _ = tracing::info_span!(parent: &span, "blocking task").enter();
45+
f()
46+
})
47+
}
48+
}

rust/datafusion/src/datasource/memory.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use crate::physical_plan::common;
3434
use crate::physical_plan::memory::MemoryExec;
3535
use crate::physical_plan::ExecutionPlan;
3636
use crate::{
37+
cube_ext,
3738
datasource::datasource::Statistics,
3839
physical_plan::{repartition::RepartitionExec, Partitioning},
3940
};
@@ -121,7 +122,7 @@ impl MemTable {
121122
let tasks = (0..partition_count)
122123
.map(|part_i| {
123124
let exec = exec.clone();
124-
tokio::spawn(async move {
125+
cube_ext::spawn(async move {
125126
let stream = exec.execute(part_i).await?;
126127
common::collect(stream).await
127128
})

rust/datafusion/src/physical_plan/hash_aggregate.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ use super::{
7777
SendableRecordBatchStream,
7878
};
7979

80+
use crate::cube_ext;
8081
use crate::logical_plan::{DFSchema, DFSchemaRef};
8182
use crate::physical_plan::sorted_aggregate::SortedAggState;
8283
use crate::scalar::ScalarValue;
@@ -85,7 +86,6 @@ use itertools::Itertools;
8586
use smallvec::smallvec;
8687
use smallvec::SmallVec;
8788
use std::convert::TryFrom;
88-
use tracing_futures::{Instrument, WithSubscriber};
8989

9090
/// Hash aggregate modes
9191
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
@@ -765,7 +765,7 @@ impl GroupedHashAggregateStream {
765765
};
766766
tx.send(result)
767767
};
768-
tokio::spawn(task.in_current_span().with_current_subscriber());
768+
cube_ext::spawn(task);
769769

770770
GroupedHashAggregateStream {
771771
schema,
@@ -945,7 +945,7 @@ impl HashAggregateStream {
945945
compute_hash_aggregate(mode, schema_clone, aggr_expr, input).await;
946946
tx.send(result)
947947
};
948-
tokio::spawn(task.in_current_span().with_current_subscriber());
948+
cube_ext::spawn(task);
949949

950950
HashAggregateStream {
951951
schema,

rust/datafusion/src/physical_plan/merge.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,10 @@ use crate::physical_plan::Partitioning;
4040
use crate::physical_plan::{ExecutionPlan, OptimizerHints};
4141

4242
use super::SendableRecordBatchStream;
43+
use crate::cube_ext;
4344
use crate::logical_plan::DFSchemaRef;
4445
use pin_project_lite::pin_project;
4546
use std::option::Option::None;
46-
use tracing_futures::{Instrument, WithSubscriber};
4747

4848
/// Merge execution plan executes partitions in parallel and combines them into a single
4949
/// partition. No guarantees are made about the order of the resulting partition.
@@ -145,7 +145,7 @@ impl ExecutionPlan for MergeExec {
145145
sender.send(item).await.ok();
146146
}
147147
};
148-
tokio::spawn(task.in_current_span().with_current_subscriber());
148+
cube_ext::spawn(task);
149149
}
150150

151151
Ok(Box::pin(MergeStream {

rust/datafusion/src/physical_plan/parquet.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use super::{
3232
};
3333
use crate::{
3434
catalog::catalog::MemoryCatalogList,
35+
cube_ext,
3536
physical_plan::{common, ExecutionPlan, Partitioning},
3637
};
3738
use crate::{
@@ -59,10 +60,7 @@ use parquet::file::{
5960

6061
use fmt::Debug;
6162
use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
62-
use tokio::{
63-
sync::mpsc::{channel, Receiver, Sender},
64-
task,
65-
};
63+
use tokio::sync::mpsc::{channel, Receiver, Sender};
6664
use tokio_stream::wrappers::ReceiverStream;
6765

6866
use crate::datasource::datasource::{ColumnStatistics, Statistics};
@@ -877,7 +875,7 @@ impl ExecutionPlan for ParquetExec {
877875
let batch_size = self.batch_size;
878876
let limit = self.limit;
879877

880-
task::spawn_blocking(move || {
878+
cube_ext::spawn_blocking(move || {
881879
if let Err(e) = read_files(
882880
&filenames,
883881
&projection,

rust/datafusion/src/physical_plan/repartition.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
3333
use super::{hash_join::create_hashes, RecordBatchStream, SendableRecordBatchStream};
3434
use async_trait::async_trait;
3535

36+
use crate::cube_ext;
3637
use crate::logical_plan::DFSchemaRef;
3738
use futures::stream::Stream;
3839
use futures::StreamExt;
@@ -142,7 +143,7 @@ impl ExecutionPlan for RepartitionExec {
142143
.map(|(partition, (tx, _rx))| (*partition, tx.clone()))
143144
.collect();
144145
let partitioning = self.partitioning.clone();
145-
let _: JoinHandle<Result<()>> = tokio::spawn(async move {
146+
let _: JoinHandle<Result<()>> = cube_ext::spawn(async move {
146147
let mut stream = input.execute(i).await?;
147148
let mut counter = 0;
148149
let hashes_buf = &mut vec![];
@@ -435,7 +436,7 @@ mod tests {
435436
#[tokio::test]
436437
async fn many_to_many_round_robin_within_tokio_task() -> Result<()> {
437438
let join_handle: JoinHandle<Result<Vec<Vec<RecordBatch>>>> =
438-
tokio::spawn(async move {
439+
cube_ext::spawn(async move {
439440
// define input partitions
440441
let schema = test_schema();
441442
let partition = create_vec_batches(&schema, 50);

rust/datafusion/src/physical_plan/sort.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use arrow::record_batch::RecordBatch;
3838
use arrow::{array::ArrayRef, error::ArrowError};
3939

4040
use super::{RecordBatchStream, SendableRecordBatchStream};
41+
use crate::cube_ext;
4142
use crate::error::{DataFusionError, Result};
4243
use crate::logical_plan::DFSchemaRef;
4344
use crate::physical_plan::expressions::PhysicalSortExpr;
@@ -249,7 +250,7 @@ impl SortStream {
249250
let (tx, rx) = futures::channel::oneshot::channel();
250251

251252
let schema = input.schema();
252-
tokio::spawn(async move {
253+
cube_ext::spawn(async move {
253254
let schema = input.schema();
254255
let sorted_batch = common::collect(input)
255256
.await

0 commit comments

Comments
 (0)