Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ DataFusion's Python bindings can be used as a foundation for building new data s
- Serialize and deserialize query plans in Substrait format.
- Experimental support for transpiling SQL queries to DataFrame calls with Polars, Pandas, and cuDF.

For tips on tuning parallelism, see
[Maximizing CPU Usage](docs/source/user-guide/configuration.rst#maximizing-cpu-usage)
in the configuration guide.

## Example Usage

The following example demonstrates running a SQL query against a Parquet file using DataFusion, storing the results
Expand Down
107 changes: 107 additions & 0 deletions benchmarks/max_cpu_usage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Benchmark script showing how to maximize CPU usage.

This script demonstrates one example of tuning DataFusion for improved parallelism
and CPU utilization. It uses synthetic in-memory data and performs simple aggregation
operations to showcase the impact of partitioning configuration.

IMPORTANT: This is a simplified example designed to illustrate partitioning concepts.
Actual performance in your applications may vary significantly based on many factors:

- Type of table providers (Parquet files, CSV, databases, etc.)
- I/O operations and storage characteristics (local disk, network, cloud storage)
- Query complexity and operation types (joins, window functions, complex expressions)
- Data distribution and size characteristics
- Memory available and hardware specifications
- Network latency for distributed data sources

It is strongly recommended that you create similar benchmarks tailored to your specific:
- Hardware configuration
- Data sources and formats
- Typical query patterns and workloads
- Performance requirements

This will give you more accurate insights into how DataFusion configuration options
will affect your particular use case.
"""

from __future__ import annotations

import argparse
import multiprocessing
import time

import pyarrow as pa
from datafusion import SessionConfig, SessionContext, col
from datafusion import functions as f


def main(num_rows: int, partitions: int) -> None:
"""Run a simple aggregation after repartitioning.

This function demonstrates basic partitioning concepts using synthetic data.
Real-world performance will depend on your specific data sources, query types,
and system configuration.
"""
# Create some example data (synthetic in-memory data for demonstration)
# Note: Real applications typically work with files, databases, or other
# data sources that have different I/O and distribution characteristics
array = pa.array(range(num_rows))
batch = pa.record_batch([array], names=["a"])

# Configure the session to use a higher target partition count and
# enable automatic repartitioning.
config = (
SessionConfig()
.with_target_partitions(partitions)
.with_repartition_joins(enabled=True)
.with_repartition_aggregations(enabled=True)
.with_repartition_windows(enabled=True)
)
ctx = SessionContext(config)

# Register the input data and repartition manually to ensure that all
# partitions are used.
df = ctx.create_dataframe([[batch]]).repartition(partitions)

start = time.time()
df = df.aggregate([], [f.sum(col("a"))])
df.collect()
end = time.time()

print(
f"Processed {num_rows} rows using {partitions} partitions in {end - start:.3f}s"
)


if __name__ == "__main__":
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--rows",
type=int,
default=1_000_000,
help="Number of rows in the generated dataset",
)
parser.add_argument(
"--partitions",
type=int,
default=multiprocessing.cpu_count(),
help="Target number of partitions to use",
)
args = parser.parse_args()
main(args.rows, args.partitions)
137 changes: 136 additions & 1 deletion docs/source/user-guide/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,141 @@ a :py:class:`~datafusion.context.SessionConfig` and :py:class:`~datafusion.conte
ctx = SessionContext(config, runtime)
print(ctx)

Maximizing CPU Usage
--------------------

You can read more about available :py:class:`~datafusion.context.SessionConfig` options in the `rust DataFusion Configuration guide <https://arrow.apache.org/datafusion/user-guide/configs.html>`_,
DataFusion uses partitions to parallelize work. For small queries the
default configuration (number of CPU cores) is often sufficient, but to
fully utilize available hardware you can tune how many partitions are
created and when DataFusion will repartition data automatically.

Configure a ``SessionContext`` with a higher partition count:

.. code-block:: python

from datafusion import SessionConfig, SessionContext

# allow up to 16 concurrent partitions
config = SessionConfig().with_target_partitions(16)
ctx = SessionContext(config)

Automatic repartitioning for joins, aggregations, window functions and
other operations can be enabled to increase parallelism:

.. code-block:: python

config = (
SessionConfig()
.with_target_partitions(16)
.with_repartition_joins(True)
.with_repartition_aggregations(True)
.with_repartition_windows(True)
)

Manual repartitioning is available on DataFrames when you need precise
control:

.. code-block:: python

from datafusion import col

df = ctx.read_parquet("data.parquet")

# Evenly divide into 16 partitions
df = df.repartition(16)

# Or partition by the hash of a column
df = df.repartition_by_hash(col("a"), num=16)

result = df.collect()


Benchmark Example
^^^^^^^^^^^^^^^^^

The repository includes a benchmark script that demonstrates how to maximize CPU usage
with DataFusion. The :code:`benchmarks/max_cpu_usage.py` script shows a practical example
of configuring DataFusion for optimal parallelism.

You can run the benchmark script to see the impact of different configuration settings:

.. code-block:: bash

# Run with default settings (uses all CPU cores)
python benchmarks/max_cpu_usage.py

# Run with specific number of rows and partitions
python benchmarks/max_cpu_usage.py --rows 5000000 --partitions 16

# See all available options
python benchmarks/max_cpu_usage.py --help

Here's an example showing the performance difference between single and multiple partitions:

.. code-block:: bash

# Single partition - slower processing
$ python benchmarks/max_cpu_usage.py --rows=10000000 --partitions 1
Processed 10000000 rows using 1 partitions in 0.107s

# Multiple partitions - faster processing
$ python benchmarks/max_cpu_usage.py --rows=10000000 --partitions 10
Processed 10000000 rows using 10 partitions in 0.038s

This example demonstrates nearly 3x performance improvement (0.107s vs 0.038s) when using
10 partitions instead of 1, showcasing how proper partitioning can significantly improve
CPU utilization and query performance.

The script demonstrates several key optimization techniques:

1. **Higher target partition count**: Uses :code:`with_target_partitions()` to set the number of concurrent partitions
2. **Automatic repartitioning**: Enables repartitioning for joins, aggregations, and window functions
3. **Manual repartitioning**: Uses :code:`repartition()` to ensure all partitions are utilized
4. **CPU-intensive operations**: Performs aggregations that can benefit from parallelization

The benchmark creates synthetic data and measures the time taken to perform a sum aggregation
across the specified number of partitions. This helps you understand how partition configuration
affects performance on your specific hardware.

Important Considerations
""""""""""""""""""""""""

The provided benchmark script demonstrates partitioning concepts using synthetic in-memory data
and simple aggregation operations. While useful for understanding basic configuration principles,
actual performance in production environments may vary significantly based on numerous factors:

**Data Sources and I/O Characteristics:**

- **Table providers**: Performance differs greatly between Parquet files, CSV files, databases, and cloud storage
- **Storage type**: Local SSD, network-attached storage, and cloud storage have vastly different characteristics
- **Network latency**: Remote data sources introduce additional latency considerations
- **File sizes and distribution**: Large files may benefit differently from partitioning than many small files

**Query and Workload Characteristics:**

- **Operation complexity**: Simple aggregations versus complex joins, window functions, or nested queries
- **Data distribution**: Skewed data may not partition evenly, affecting parallel efficiency
- **Memory usage**: Large datasets may require different memory management strategies
- **Concurrent workloads**: Multiple queries running simultaneously affect resource allocation

**Hardware and Environment Factors:**

- **CPU architecture**: Different processors have varying parallel processing capabilities
- **Available memory**: Limited RAM may require different optimization strategies
- **System load**: Other applications competing for resources affect DataFusion performance

**Recommendations for Production Use:**

To optimize DataFusion for your specific use case, it is strongly recommended to:

1. **Create custom benchmarks** using your actual data sources, formats, and query patterns
2. **Test with representative data volumes** that match your production workloads
3. **Measure end-to-end performance** including data loading, processing, and result handling
4. **Evaluate different configuration combinations** for your specific hardware and workload
5. **Monitor resource utilization** (CPU, memory, I/O) to identify bottlenecks in your environment

This approach will provide more accurate insights into how DataFusion configuration options
will impact your particular applications and infrastructure.

For more information about available :py:class:`~datafusion.context.SessionConfig` options, see the `rust DataFusion Configuration guide <https://arrow.apache.org/datafusion/user-guide/configs.html>`_,
and about :code:`RuntimeEnvBuilder` options in the rust `online API documentation <https://docs.rs/datafusion/latest/datafusion/execution/runtime_env/struct.RuntimeEnvBuilder.html>`_.
Loading