Skip to content

Commit 356616e

Browse files
cj-zhukovSergey Zhukov
andauthored
Consolidate execution monitoring examples (#18142) (#18846)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - part of ##18142. ## Rationale for this change This PR is for consolidating all the `execution_monitoring` examples (mem_pool_exec_plan, mem_pool_tracking, tracing) into a single example binary. We are agreed on the pattern and we can apply it to the remaining examples <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> Co-authored-by: Sergey Zhukov <[email protected]>
1 parent 0bd127f commit 356616e

File tree

5 files changed

+124
-9
lines changed

5 files changed

+124
-9
lines changed

datafusion-examples/README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,9 @@ cargo run --example dataframe
6969
- [`examples/custom_data_source/file_stream_provider.rs`](examples/custom_data_source/file_stream_provider.rs): Run a query on `FileStreamProvider` which implements `StreamProvider` for reading and writing to arbitrary stream sources / sinks.
7070
- [`flight/sql_server.rs`](examples/flight/sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from Flight and and FlightSQL (e.g. JDBC) clients
7171
- [`examples/builtin_functions/function_factory.rs`](examples/builtin_functions/function_factory.rs): Register `CREATE FUNCTION` handler to implement SQL macros
72-
- [`memory_pool_tracking.rs`](examples/memory_pool_tracking.rs): Demonstrates TrackConsumersPool for memory tracking and debugging with enhanced error messages
73-
- [`memory_pool_execution_plan.rs`](examples/memory_pool_execution_plan.rs): Shows how to implement memory-aware ExecutionPlan with memory reservation and spilling
72+
- [`examples/execution_monitoring/memory_pool_tracking.rs`](examples/execution_monitoring/memory_pool_tracking.rs): Demonstrates TrackConsumersPool for memory tracking and debugging with enhanced error messages
73+
- [`examples/execution_monitoring/memory_pool_execution_plan.rs`](examples/execution_monitoring/memory_pool_execution_plan.rs): Shows how to implement memory-aware ExecutionPlan with memory reservation and spilling
74+
- [`examples/execution_monitoring/tracing.rs`](examples/execution_monitoring/tracing.rs): Demonstrates the tracing injection feature for the DataFusion runtime
7475
- [`examples/query_planning/optimizer_rule.rs`](examples/query_planning/optimizer_rule.rs): Use a custom OptimizerRule to replace certain predicates
7576
- [`examples/data_io/parquet_embedded_index.rs`](examples/data_io/parquet_embedded_index.rs): Store a custom index inside a Parquet file and use it to speed up queries
7677
- [`examples/data_io/parquet_encrypted.rs`](examples/data_io/parquet_encrypted.rs): Read and write encrypted Parquet files using DataFusion
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! # These examples of memory and performance management
19+
//!
20+
//! These examples demonstrate memory and performance management.
21+
//!
22+
//! ## Usage
23+
//! ```bash
24+
//! cargo run --example execution_monitoring -- [mem_pool_exec_plan|mem_pool_tracking|tracing]
25+
//! ```
26+
//!
27+
//! Each subcommand runs a corresponding example:
28+
//! - `mem_pool_exec_plan` — shows how to implement memory-aware ExecutionPlan with memory reservation and spilling
29+
//! - `mem_pool_tracking` — demonstrates TrackConsumersPool for memory tracking and debugging with enhanced error messages
30+
//! - `tracing` — demonstrates the tracing injection feature for the DataFusion runtime
31+
32+
mod memory_pool_execution_plan;
33+
mod memory_pool_tracking;
34+
mod tracing;
35+
36+
use std::str::FromStr;
37+
38+
use datafusion::error::{DataFusionError, Result};
39+
40+
enum ExampleKind {
41+
MemoryPoolExecutionPlan,
42+
MemoryPoolTracking,
43+
Tracing,
44+
}
45+
46+
impl AsRef<str> for ExampleKind {
47+
fn as_ref(&self) -> &str {
48+
match self {
49+
Self::MemoryPoolExecutionPlan => "mem_pool_exec_plan",
50+
Self::MemoryPoolTracking => "mem_pool_tracking",
51+
Self::Tracing => "tracing",
52+
}
53+
}
54+
}
55+
56+
impl FromStr for ExampleKind {
57+
type Err = DataFusionError;
58+
59+
fn from_str(s: &str) -> Result<Self> {
60+
match s {
61+
"mem_pool_exec_plan" => Ok(Self::MemoryPoolExecutionPlan),
62+
"mem_pool_tracking" => Ok(Self::MemoryPoolTracking),
63+
"tracing" => Ok(Self::Tracing),
64+
_ => Err(DataFusionError::Execution(format!("Unknown example: {s}"))),
65+
}
66+
}
67+
}
68+
69+
impl ExampleKind {
70+
const ALL: [Self; 3] = [
71+
Self::MemoryPoolExecutionPlan,
72+
Self::MemoryPoolTracking,
73+
Self::Tracing,
74+
];
75+
76+
const EXAMPLE_NAME: &str = "execution_monitoring";
77+
78+
fn variants() -> Vec<&'static str> {
79+
Self::ALL.iter().map(|x| x.as_ref()).collect()
80+
}
81+
}
82+
83+
#[tokio::main]
84+
async fn main() -> Result<()> {
85+
let usage = format!(
86+
"Usage: cargo run --example {} -- [{}]",
87+
ExampleKind::EXAMPLE_NAME,
88+
ExampleKind::variants().join("|")
89+
);
90+
91+
let arg = std::env::args().nth(1).ok_or_else(|| {
92+
eprintln!("{usage}");
93+
DataFusionError::Execution("Missing argument".to_string())
94+
})?;
95+
96+
match arg.parse::<ExampleKind>()? {
97+
ExampleKind::MemoryPoolExecutionPlan => {
98+
memory_pool_execution_plan::memory_pool_execution_plan().await?
99+
}
100+
ExampleKind::MemoryPoolTracking => {
101+
memory_pool_tracking::mem_pool_tracking().await?
102+
}
103+
ExampleKind::Tracing => tracing::tracing().await?,
104+
}
105+
106+
Ok(())
107+
}

datafusion-examples/examples/memory_pool_execution_plan.rs renamed to datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
//! See `main.rs` for how to run it.
19+
//!
1820
//! This example demonstrates how to implement custom ExecutionPlans that properly
1921
//! use memory tracking through TrackConsumersPool.
2022
//!
@@ -44,8 +46,8 @@ use std::any::Any;
4446
use std::fmt;
4547
use std::sync::Arc;
4648

47-
#[tokio::main]
48-
async fn main() -> Result<(), Box<dyn std::error::Error>> {
49+
/// Shows how to implement memory-aware ExecutionPlan with memory reservation and spilling
50+
pub async fn memory_pool_execution_plan() -> Result<()> {
4951
println!("=== DataFusion ExecutionPlan Memory Tracking Example ===\n");
5052

5153
// Set up a runtime with memory tracking

datafusion-examples/examples/memory_pool_tracking.rs renamed to datafusion-examples/examples/execution_monitoring/memory_pool_tracking.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
//! See `main.rs` for how to run it.
19+
//!
1820
//! This example demonstrates how to use TrackConsumersPool for memory tracking and debugging.
1921
//!
2022
//! The TrackConsumersPool provides enhanced error messages that show the top memory consumers
@@ -24,11 +26,12 @@
2426
//!
2527
//! * [`automatic_usage_example`]: Shows how to use RuntimeEnvBuilder to automatically enable memory tracking
2628
29+
use datafusion::error::Result;
2730
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
2831
use datafusion::prelude::*;
2932

30-
#[tokio::main]
31-
async fn main() -> Result<(), Box<dyn std::error::Error>> {
33+
/// Demonstrates TrackConsumersPool for memory tracking and debugging with enhanced error messages
34+
pub async fn mem_pool_tracking() -> Result<()> {
3235
println!("=== DataFusion Memory Pool Tracking Example ===\n");
3336

3437
// Example 1: Automatic Usage with RuntimeEnvBuilder
@@ -41,7 +44,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4144
///
4245
/// This shows the recommended way to use TrackConsumersPool through RuntimeEnvBuilder,
4346
/// which automatically creates a TrackConsumersPool with sensible defaults.
44-
async fn automatic_usage_example() -> datafusion::error::Result<()> {
47+
async fn automatic_usage_example() -> Result<()> {
4548
println!("Example 1: Automatic Usage with RuntimeEnvBuilder");
4649
println!("------------------------------------------------");
4750

datafusion-examples/examples/tracing.rs renamed to datafusion-examples/examples/execution_monitoring/tracing.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
//! See `main.rs` for how to run it.
19+
//!
1820
//! This example demonstrates the tracing injection feature for the DataFusion runtime.
1921
//! Tasks spawned on new threads behave differently depending on whether a tracer is injected.
2022
//! The log output clearly distinguishes the two cases.
@@ -61,8 +63,8 @@ use std::any::Any;
6163
use std::sync::Arc;
6264
use tracing::{info, instrument, Instrument, Level, Span};
6365

64-
#[tokio::main]
65-
async fn main() -> Result<()> {
66+
/// Demonstrates the tracing injection feature for the DataFusion runtime
67+
pub async fn tracing() -> Result<()> {
6668
// Initialize tracing subscriber with thread info.
6769
tracing_subscriber::fmt()
6870
.with_thread_ids(true)

0 commit comments

Comments
 (0)