Skip to content

Commit 1b2ff9a

Browse files
test: Update distributed mode to support localhost workers
1 parent 198dad2 commit 1b2ff9a

File tree

13 files changed

+1134
-350
lines changed

13 files changed

+1134
-350
lines changed

Cargo.lock

Lines changed: 181 additions & 34 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/distributed_otlp.rs

Lines changed: 106 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,73 @@
5252
use std::time::Duration;
5353

5454
use datafusion::{common::internal_datafusion_err, error::Result};
55-
use integration_utils::{init_session, run_traced_query};
55+
use integration_utils::{DistributedMode, SessionBuilder, run_traced_query};
5656
use opentelemetry::{KeyValue, trace::TracerProvider};
5757
use opentelemetry_otlp::WithExportConfig;
5858
use opentelemetry_sdk::{Resource, trace::Sampler};
5959
use tracing::{Instrument, Level};
6060
use tracing_subscriber::{Registry, fmt, prelude::*};
6161

62+
/// Internal helper macro for generating match arms using repetition.
63+
/// This uses Rust's macro repetition syntax ($(...)*).
64+
macro_rules! generate_query_span_match {
65+
($i:expr, $mode_name:expr, $query_name:expr, $($num:tt),*) => {
66+
match ($i, $mode_name) {
67+
$(
68+
($num, "Memory") => tracing::info_span!(
69+
concat!("tpch_query_", stringify!($num), "_memory"),
70+
query = %$query_name,
71+
query_num = $i,
72+
distributed_mode = $mode_name
73+
),
74+
($num, "Localhost") => tracing::info_span!(
75+
concat!("tpch_query_", stringify!($num), "_localhost"),
76+
query = %$query_name,
77+
query_num = $i,
78+
distributed_mode = $mode_name
79+
),
80+
)*
81+
_ => unreachable!("Invalid query number or mode"),
82+
}
83+
};
84+
}
85+
86+
/// Macro to generate span creation for all query/mode combinations.
87+
/// This is necessary because tracing span names must be compile-time constants.
88+
///
89+
/// Usage: create_query_span!(query_num, mode_name, query_name)
90+
macro_rules! create_query_span {
91+
($i:expr, $mode_name:expr, $query_name:expr) => {
92+
generate_query_span_match!(
93+
$i,
94+
$mode_name,
95+
$query_name,
96+
1,
97+
2,
98+
3,
99+
4,
100+
5,
101+
6,
102+
7,
103+
8,
104+
9,
105+
10,
106+
11,
107+
12,
108+
13,
109+
14,
110+
15,
111+
16,
112+
17,
113+
18,
114+
19,
115+
20,
116+
21,
117+
22
118+
)
119+
};
120+
}
121+
62122
#[tokio::main]
63123
async fn main() -> Result<()> {
64124
// Initialize tracing infrastructure and obtain a tracer provider.
@@ -74,26 +134,52 @@ async fn main() -> Result<()> {
74134
}
75135

76136
async fn run_distributed_otlp_example() -> Result<()> {
77-
// Loop over all 22 TPCH queries
78-
for i in 1..=22 {
79-
let query_name = format!("tpch/q{}", i);
80-
81-
// Create a new root span for each query to ensure independent traces.
82-
// This span will be the root of a new trace tree.
83-
let span = tracing::info_span!("tpch_query", query = %query_name, query_num = i);
84-
85-
// Execute the query within the new root span context.
86-
async {
87-
tracing::info!("Running TPCH query: {}", query_name);
88-
89-
// Initialize a distinct DataFusion session context for each query.
90-
let ctx = init_session(false, true, 5, true, true).await?;
91-
92-
// Run the SQL query with tracing enabled.
93-
run_traced_query(&ctx, &query_name).await
137+
// Test both distributed execution modes
138+
let modes = [
139+
(DistributedMode::Memory, "Memory"),
140+
(DistributedMode::Localhost, "Localhost"),
141+
];
142+
143+
for (mode, mode_name) in modes {
144+
tracing::info!("Starting TPCH queries with {} distributed mode", mode_name);
145+
146+
// Loop over all 22 TPCH queries
147+
for i in 1..=22 {
148+
let query_name = format!("tpch/q{}", i);
149+
150+
// Create a new root span for each query to ensure independent traces.
151+
// This span will be the root of a new trace tree.
152+
// Note: Span names must be compile-time constants, so we use a macro to generate them.
153+
let span = create_query_span!(i, mode_name, query_name);
154+
155+
// Execute the query within the new root span context.
156+
async {
157+
tracing::info!(
158+
"Running TPCH query: {} in {} mode",
159+
query_name,
160+
mode_name
161+
);
162+
163+
// Initialize a distinct DataFusion session context for each query.
164+
let ctx = SessionBuilder::new()
165+
.with_metrics()
166+
.with_preview(5)
167+
.with_compact_preview()
168+
.with_distributed_mode(mode)
169+
.build()
170+
.await?;
171+
172+
// Run the SQL query with tracing enabled.
173+
run_traced_query(&ctx, &query_name).await
174+
}
175+
.instrument(span)
176+
.await?;
94177
}
95-
.instrument(span)
96-
.await?;
178+
179+
tracing::info!(
180+
"Completed all TPCH queries with {} distributed mode",
181+
mode_name
182+
);
97183
}
98184

99185
Ok(())

examples/otlp.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
use std::time::Duration;
5454

5555
use datafusion::{common::internal_datafusion_err, error::Result};
56-
use integration_utils::{init_session, run_traced_query};
56+
use integration_utils::{SessionBuilder, run_traced_query};
5757
use opentelemetry::{KeyValue, trace::TracerProvider};
5858
use opentelemetry_otlp::WithExportConfig;
5959
use opentelemetry_sdk::{Resource, trace::Sampler};
@@ -80,7 +80,13 @@ async fn main() -> Result<()> {
8080
#[instrument(level = "info")]
8181
async fn run_otlp_example() -> Result<()> {
8282
// Initialize the DataFusion session context.
83-
let ctx = init_session(true, true, 5, true, false).await?;
83+
let ctx = SessionBuilder::new()
84+
.with_object_store_tracing()
85+
.with_metrics()
86+
.with_preview(5)
87+
.with_compact_preview()
88+
.build()
89+
.await?;
8490

8591
// Run the SQL query with tracing enabled.
8692
run_traced_query(&ctx, QUERY_NAME).await?;

integration-utils/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,15 @@ workspace = true
3434
arrow = "56.1"
3535
arrow-flight = "56.1"
3636
async-trait = "0.1"
37+
dashmap = "6.1"
3738
datafusion = { workspace = true, features = ["parquet", "nested_expressions"] }
3839
datafusion-distributed = { git = "https://github.com/datafusion-contrib/datafusion-distributed", branch = "main" }
3940
datafusion-tracing = { workspace = true }
4041
futures = { workspace = true }
4142
hyper-util = "0.1"
4243
instrumented-object-store = { workspace = true }
4344
object_store = { version = "0.12.1", default-features = false }
45+
structopt = "0.3"
4446
tokio = { workspace = true, features = ["full"] }
4547
tokio-stream = "0.1"
4648
tonic = { version = "0.13", features = ["transport"] }

integration-utils/src/data.rs

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
//
18+
// This product includes software developed at Datadog (https://www.datadoghq.com/) Copyright 2025 Datadog, Inc.
19+
20+
//! Data and table utilities
21+
22+
use std::path::PathBuf;
23+
use std::sync::Arc;
24+
25+
use datafusion::common::internal_datafusion_err;
26+
use datafusion::datasource::file_format::parquet::ParquetFormat;
27+
use datafusion::datasource::listing::ListingOptions;
28+
use datafusion::{error::Result, prelude::*};
29+
use tracing::{info, instrument};
30+
31+
/// Returns the path to the directory containing the Parquet tables.
32+
pub fn data_dir() -> PathBuf {
33+
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("data")
34+
}
35+
36+
/// Registers all TPCH Parquet tables required for executing the queries.
37+
#[instrument(level = "info", skip(ctx))]
38+
pub(crate) async fn register_tpch_tables(ctx: &SessionContext) -> Result<()> {
39+
// Construct the path to the directory containing Parquet data.
40+
let data_dir = data_dir();
41+
42+
// Register the weather table.
43+
ctx.register_parquet(
44+
"weather",
45+
data_dir.join("weather").to_string_lossy(),
46+
ParquetReadOptions::default(),
47+
)
48+
.await?;
49+
50+
// Generate and register each table from Parquet files.
51+
// This includes all standard TPCH tables so examples/tests can rely on them.
52+
for table in [
53+
"nation", "region", "part", "supplier", "partsupp", "customer", "orders",
54+
"lineitem",
55+
] {
56+
let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()));
57+
58+
let parquet_path = data_dir.join(table).with_extension("parquet");
59+
if !parquet_path.exists() {
60+
return Err(internal_datafusion_err!(
61+
"Missing TPCH Parquet file: {}.\nGenerate TPCH data first by running: ./dev/generate_tpch_parquet.sh\nThis script requires 'tpchgen-cli' (install with: cargo install tpchgen-cli)",
62+
parquet_path.display()
63+
));
64+
}
65+
66+
// Generate the file path URL for the Parquet data.
67+
let table_path = format!("file://{}", parquet_path.to_string_lossy());
68+
69+
info!("Registering table '{}' from {}", table, table_path);
70+
71+
// Register the table with DataFusion's session context.
72+
ctx.register_listing_table(table, &table_path, listing_options, None, None)
73+
.await?;
74+
}
75+
76+
Ok(())
77+
}

integration-utils/src/channel_resolver.rs renamed to integration-utils/src/distributed/in_memory.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use tonic::transport::{Endpoint, Server};
3232
const DUMMY_URL: &str = "http://localhost:50051";
3333
const MAX_MESSAGE_SIZE: usize = 2 * 1024 * 1024 * 1024; // 2GB
3434

35-
/// [ChannelResolver] implementation that returns gRPC clients baked by an in-memory
35+
/// [ChannelResolver] implementation that returns gRPC clients backed by an in-memory
3636
/// tokio duplex rather than a TCP connection.
3737
#[derive(Clone)]
3838
pub(crate) struct InMemoryChannelResolver {

0 commit comments

Comments
 (0)