Skip to content

Commit 61f981b

Browse files
authored
Add benchmark script and documentation for maximizing CPU usage in DataFusion Python (#1216)
* docs: add configuration tips for maximizing CPU usage and new benchmark script * docs: enhance benchmark example for maximizing CPU usage in DataFusion * docs: enhance benchmark script and configuration guide for maximizing CPU usage
1 parent b76cd8f commit 61f981b

File tree

3 files changed

+247
-1
lines changed

3 files changed

+247
-1
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ DataFusion's Python bindings can be used as a foundation for building new data s
4242
- Serialize and deserialize query plans in Substrait format.
4343
- Experimental support for transpiling SQL queries to DataFrame calls with Polars, Pandas, and cuDF.
4444

45+
For tips on tuning parallelism, see
46+
[Maximizing CPU Usage](docs/source/user-guide/configuration.rst#maximizing-cpu-usage)
47+
in the configuration guide.
48+
4549
## Example Usage
4650

4751
The following example demonstrates running a SQL query against a Parquet file using DataFusion, storing the results

benchmarks/max_cpu_usage.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
"""Benchmark script showing how to maximize CPU usage.
18+
19+
This script demonstrates one example of tuning DataFusion for improved parallelism
20+
and CPU utilization. It uses synthetic in-memory data and performs simple aggregation
21+
operations to showcase the impact of partitioning configuration.
22+
23+
IMPORTANT: This is a simplified example designed to illustrate partitioning concepts.
24+
Actual performance in your applications may vary significantly based on many factors:
25+
26+
- Type of table providers (Parquet files, CSV, databases, etc.)
27+
- I/O operations and storage characteristics (local disk, network, cloud storage)
28+
- Query complexity and operation types (joins, window functions, complex expressions)
29+
- Data distribution and size characteristics
30+
- Memory available and hardware specifications
31+
- Network latency for distributed data sources
32+
33+
It is strongly recommended that you create similar benchmarks tailored to your specific:
34+
- Hardware configuration
35+
- Data sources and formats
36+
- Typical query patterns and workloads
37+
- Performance requirements
38+
39+
This will give you more accurate insights into how DataFusion configuration options
40+
will affect your particular use case.
41+
"""
42+
43+
from __future__ import annotations
44+
45+
import argparse
46+
import multiprocessing
47+
import time
48+
49+
import pyarrow as pa
50+
from datafusion import SessionConfig, SessionContext, col
51+
from datafusion import functions as f
52+
53+
54+
def main(num_rows: int, partitions: int) -> None:
55+
"""Run a simple aggregation after repartitioning.
56+
57+
This function demonstrates basic partitioning concepts using synthetic data.
58+
Real-world performance will depend on your specific data sources, query types,
59+
and system configuration.
60+
"""
61+
# Create some example data (synthetic in-memory data for demonstration)
62+
# Note: Real applications typically work with files, databases, or other
63+
# data sources that have different I/O and distribution characteristics
64+
array = pa.array(range(num_rows))
65+
batch = pa.record_batch([array], names=["a"])
66+
67+
# Configure the session to use a higher target partition count and
68+
# enable automatic repartitioning.
69+
config = (
70+
SessionConfig()
71+
.with_target_partitions(partitions)
72+
.with_repartition_joins(enabled=True)
73+
.with_repartition_aggregations(enabled=True)
74+
.with_repartition_windows(enabled=True)
75+
)
76+
ctx = SessionContext(config)
77+
78+
# Register the input data and repartition manually to ensure that all
79+
# partitions are used.
80+
df = ctx.create_dataframe([[batch]]).repartition(partitions)
81+
82+
start = time.time()
83+
df = df.aggregate([], [f.sum(col("a"))])
84+
df.collect()
85+
end = time.time()
86+
87+
print(
88+
f"Processed {num_rows} rows using {partitions} partitions in {end - start:.3f}s"
89+
)
90+
91+
92+
if __name__ == "__main__":
93+
parser = argparse.ArgumentParser(description=__doc__)
94+
parser.add_argument(
95+
"--rows",
96+
type=int,
97+
default=1_000_000,
98+
help="Number of rows in the generated dataset",
99+
)
100+
parser.add_argument(
101+
"--partitions",
102+
type=int,
103+
default=multiprocessing.cpu_count(),
104+
help="Target number of partitions to use",
105+
)
106+
args = parser.parse_args()
107+
main(args.rows, args.partitions)

docs/source/user-guide/configuration.rst

Lines changed: 136 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,141 @@ a :py:class:`~datafusion.context.SessionConfig` and :py:class:`~datafusion.conte
4646
ctx = SessionContext(config, runtime)
4747
print(ctx)
4848
49+
Maximizing CPU Usage
50+
--------------------
4951

50-
You can read more about available :py:class:`~datafusion.context.SessionConfig` options in the `rust DataFusion Configuration guide <https://arrow.apache.org/datafusion/user-guide/configs.html>`_,
52+
DataFusion uses partitions to parallelize work. For small queries the
53+
default configuration (number of CPU cores) is often sufficient, but to
54+
fully utilize available hardware you can tune how many partitions are
55+
created and when DataFusion will repartition data automatically.
56+
57+
Configure a ``SessionContext`` with a higher partition count:
58+
59+
.. code-block:: python
60+
61+
from datafusion import SessionConfig, SessionContext
62+
63+
# allow up to 16 concurrent partitions
64+
config = SessionConfig().with_target_partitions(16)
65+
ctx = SessionContext(config)
66+
67+
Automatic repartitioning for joins, aggregations, window functions and
68+
other operations can be enabled to increase parallelism:
69+
70+
.. code-block:: python
71+
72+
config = (
73+
SessionConfig()
74+
.with_target_partitions(16)
75+
.with_repartition_joins(True)
76+
.with_repartition_aggregations(True)
77+
.with_repartition_windows(True)
78+
)
79+
80+
Manual repartitioning is available on DataFrames when you need precise
81+
control:
82+
83+
.. code-block:: python
84+
85+
from datafusion import col
86+
87+
df = ctx.read_parquet("data.parquet")
88+
89+
# Evenly divide into 16 partitions
90+
df = df.repartition(16)
91+
92+
# Or partition by the hash of a column
93+
df = df.repartition_by_hash(col("a"), num=16)
94+
95+
result = df.collect()
96+
97+
98+
Benchmark Example
99+
^^^^^^^^^^^^^^^^^
100+
101+
The repository includes a benchmark script that demonstrates how to maximize CPU usage
102+
with DataFusion. The :code:`benchmarks/max_cpu_usage.py` script shows a practical example
103+
of configuring DataFusion for optimal parallelism.
104+
105+
You can run the benchmark script to see the impact of different configuration settings:
106+
107+
.. code-block:: bash
108+
109+
# Run with default settings (uses all CPU cores)
110+
python benchmarks/max_cpu_usage.py
111+
112+
# Run with specific number of rows and partitions
113+
python benchmarks/max_cpu_usage.py --rows 5000000 --partitions 16
114+
115+
# See all available options
116+
python benchmarks/max_cpu_usage.py --help
117+
118+
Here's an example showing the performance difference between single and multiple partitions:
119+
120+
.. code-block:: bash
121+
122+
# Single partition - slower processing
123+
$ python benchmarks/max_cpu_usage.py --rows=10000000 --partitions 1
124+
Processed 10000000 rows using 1 partitions in 0.107s
125+
126+
# Multiple partitions - faster processing
127+
$ python benchmarks/max_cpu_usage.py --rows=10000000 --partitions 10
128+
Processed 10000000 rows using 10 partitions in 0.038s
129+
130+
This example demonstrates nearly 3x performance improvement (0.107s vs 0.038s) when using
131+
10 partitions instead of 1, showcasing how proper partitioning can significantly improve
132+
CPU utilization and query performance.
133+
134+
The script demonstrates several key optimization techniques:
135+
136+
1. **Higher target partition count**: Uses :code:`with_target_partitions()` to set the number of concurrent partitions
137+
2. **Automatic repartitioning**: Enables repartitioning for joins, aggregations, and window functions
138+
3. **Manual repartitioning**: Uses :code:`repartition()` to ensure all partitions are utilized
139+
4. **CPU-intensive operations**: Performs aggregations that can benefit from parallelization
140+
141+
The benchmark creates synthetic data and measures the time taken to perform a sum aggregation
142+
across the specified number of partitions. This helps you understand how partition configuration
143+
affects performance on your specific hardware.
144+
145+
Important Considerations
146+
""""""""""""""""""""""""
147+
148+
The provided benchmark script demonstrates partitioning concepts using synthetic in-memory data
149+
and simple aggregation operations. While useful for understanding basic configuration principles,
150+
actual performance in production environments may vary significantly based on numerous factors:
151+
152+
**Data Sources and I/O Characteristics:**
153+
154+
- **Table providers**: Performance differs greatly between Parquet files, CSV files, databases, and cloud storage
155+
- **Storage type**: Local SSD, network-attached storage, and cloud storage have vastly different characteristics
156+
- **Network latency**: Remote data sources introduce additional latency considerations
157+
- **File sizes and distribution**: Large files may benefit differently from partitioning than many small files
158+
159+
**Query and Workload Characteristics:**
160+
161+
- **Operation complexity**: Simple aggregations versus complex joins, window functions, or nested queries
162+
- **Data distribution**: Skewed data may not partition evenly, affecting parallel efficiency
163+
- **Memory usage**: Large datasets may require different memory management strategies
164+
- **Concurrent workloads**: Multiple queries running simultaneously affect resource allocation
165+
166+
**Hardware and Environment Factors:**
167+
168+
- **CPU architecture**: Different processors have varying parallel processing capabilities
169+
- **Available memory**: Limited RAM may require different optimization strategies
170+
- **System load**: Other applications competing for resources affect DataFusion performance
171+
172+
**Recommendations for Production Use:**
173+
174+
To optimize DataFusion for your specific use case, it is strongly recommended to:
175+
176+
1. **Create custom benchmarks** using your actual data sources, formats, and query patterns
177+
2. **Test with representative data volumes** that match your production workloads
178+
3. **Measure end-to-end performance** including data loading, processing, and result handling
179+
4. **Evaluate different configuration combinations** for your specific hardware and workload
180+
5. **Monitor resource utilization** (CPU, memory, I/O) to identify bottlenecks in your environment
181+
182+
This approach will provide more accurate insights into how DataFusion configuration options
183+
will impact your particular applications and infrastructure.
184+
185+
For more information about available :py:class:`~datafusion.context.SessionConfig` options, see the `rust DataFusion Configuration guide <https://arrow.apache.org/datafusion/user-guide/configs.html>`_,
51186
and about :code:`RuntimeEnvBuilder` options in the rust `online API documentation <https://docs.rs/datafusion/latest/datafusion/execution/runtime_env/struct.RuntimeEnvBuilder.html>`_.

0 commit comments

Comments
 (0)