diff --git a/README.md b/README.md index 5b7a43317..33a479db4 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,10 @@ DataFusion's Python bindings can be used as a foundation for building new data s - Serialize and deserialize query plans in Substrait format. - Experimental support for transpiling SQL queries to DataFrame calls with Polars, Pandas, and cuDF. +For tips on tuning parallelism, see +[Maximizing CPU Usage](docs/source/user-guide/configuration.rst#maximizing-cpu-usage) +in the configuration guide. + ## Example Usage The following example demonstrates running a SQL query against a Parquet file using DataFusion, storing the results diff --git a/benchmarks/max_cpu_usage.py b/benchmarks/max_cpu_usage.py new file mode 100644 index 000000000..327c7b78c --- /dev/null +++ b/benchmarks/max_cpu_usage.py @@ -0,0 +1,107 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Benchmark script showing how to maximize CPU usage. + +This script demonstrates one example of tuning DataFusion for improved parallelism +and CPU utilization. It uses synthetic in-memory data and performs simple aggregation +operations to showcase the impact of partitioning configuration. + +IMPORTANT: This is a simplified example designed to illustrate partitioning concepts. +Actual performance in your applications may vary significantly based on many factors: + +- Type of table providers (Parquet files, CSV, databases, etc.) +- I/O operations and storage characteristics (local disk, network, cloud storage) +- Query complexity and operation types (joins, window functions, complex expressions) +- Data distribution and size characteristics +- Memory available and hardware specifications +- Network latency for distributed data sources + +It is strongly recommended that you create similar benchmarks tailored to your specific: +- Hardware configuration +- Data sources and formats +- Typical query patterns and workloads +- Performance requirements + +This will give you more accurate insights into how DataFusion configuration options +will affect your particular use case. +""" + +from __future__ import annotations + +import argparse +import multiprocessing +import time + +import pyarrow as pa +from datafusion import SessionConfig, SessionContext, col +from datafusion import functions as f + + +def main(num_rows: int, partitions: int) -> None: + """Run a simple aggregation after repartitioning. + + This function demonstrates basic partitioning concepts using synthetic data. + Real-world performance will depend on your specific data sources, query types, + and system configuration. + """ + # Create some example data (synthetic in-memory data for demonstration) + # Note: Real applications typically work with files, databases, or other + # data sources that have different I/O and distribution characteristics + array = pa.array(range(num_rows)) + batch = pa.record_batch([array], names=["a"]) + + # Configure the session to use a higher target partition count and + # enable automatic repartitioning. + config = ( + SessionConfig() + .with_target_partitions(partitions) + .with_repartition_joins(enabled=True) + .with_repartition_aggregations(enabled=True) + .with_repartition_windows(enabled=True) + ) + ctx = SessionContext(config) + + # Register the input data and repartition manually to ensure that all + # partitions are used. + df = ctx.create_dataframe([[batch]]).repartition(partitions) + + start = time.time() + df = df.aggregate([], [f.sum(col("a"))]) + df.collect() + end = time.time() + + print( + f"Processed {num_rows} rows using {partitions} partitions in {end - start:.3f}s" + ) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--rows", + type=int, + default=1_000_000, + help="Number of rows in the generated dataset", + ) + parser.add_argument( + "--partitions", + type=int, + default=multiprocessing.cpu_count(), + help="Target number of partitions to use", + ) + args = parser.parse_args() + main(args.rows, args.partitions) diff --git a/docs/source/user-guide/configuration.rst b/docs/source/user-guide/configuration.rst index db200a46a..5425a040d 100644 --- a/docs/source/user-guide/configuration.rst +++ b/docs/source/user-guide/configuration.rst @@ -46,6 +46,141 @@ a :py:class:`~datafusion.context.SessionConfig` and :py:class:`~datafusion.conte ctx = SessionContext(config, runtime) print(ctx) +Maximizing CPU Usage +-------------------- -You can read more about available :py:class:`~datafusion.context.SessionConfig` options in the `rust DataFusion Configuration guide `_, +DataFusion uses partitions to parallelize work. For small queries the +default configuration (number of CPU cores) is often sufficient, but to +fully utilize available hardware you can tune how many partitions are +created and when DataFusion will repartition data automatically. + +Configure a ``SessionContext`` with a higher partition count: + +.. code-block:: python + + from datafusion import SessionConfig, SessionContext + + # allow up to 16 concurrent partitions + config = SessionConfig().with_target_partitions(16) + ctx = SessionContext(config) + +Automatic repartitioning for joins, aggregations, window functions and +other operations can be enabled to increase parallelism: + +.. code-block:: python + + config = ( + SessionConfig() + .with_target_partitions(16) + .with_repartition_joins(True) + .with_repartition_aggregations(True) + .with_repartition_windows(True) + ) + +Manual repartitioning is available on DataFrames when you need precise +control: + +.. code-block:: python + + from datafusion import col + + df = ctx.read_parquet("data.parquet") + + # Evenly divide into 16 partitions + df = df.repartition(16) + + # Or partition by the hash of a column + df = df.repartition_by_hash(col("a"), num=16) + + result = df.collect() + + +Benchmark Example +^^^^^^^^^^^^^^^^^ + +The repository includes a benchmark script that demonstrates how to maximize CPU usage +with DataFusion. The :code:`benchmarks/max_cpu_usage.py` script shows a practical example +of configuring DataFusion for optimal parallelism. + +You can run the benchmark script to see the impact of different configuration settings: + +.. code-block:: bash + + # Run with default settings (uses all CPU cores) + python benchmarks/max_cpu_usage.py + + # Run with specific number of rows and partitions + python benchmarks/max_cpu_usage.py --rows 5000000 --partitions 16 + + # See all available options + python benchmarks/max_cpu_usage.py --help + +Here's an example showing the performance difference between single and multiple partitions: + +.. code-block:: bash + + # Single partition - slower processing + $ python benchmarks/max_cpu_usage.py --rows=10000000 --partitions 1 + Processed 10000000 rows using 1 partitions in 0.107s + + # Multiple partitions - faster processing + $ python benchmarks/max_cpu_usage.py --rows=10000000 --partitions 10 + Processed 10000000 rows using 10 partitions in 0.038s + +This example demonstrates nearly 3x performance improvement (0.107s vs 0.038s) when using +10 partitions instead of 1, showcasing how proper partitioning can significantly improve +CPU utilization and query performance. + +The script demonstrates several key optimization techniques: + +1. **Higher target partition count**: Uses :code:`with_target_partitions()` to set the number of concurrent partitions +2. **Automatic repartitioning**: Enables repartitioning for joins, aggregations, and window functions +3. **Manual repartitioning**: Uses :code:`repartition()` to ensure all partitions are utilized +4. **CPU-intensive operations**: Performs aggregations that can benefit from parallelization + +The benchmark creates synthetic data and measures the time taken to perform a sum aggregation +across the specified number of partitions. This helps you understand how partition configuration +affects performance on your specific hardware. + +Important Considerations +"""""""""""""""""""""""" + +The provided benchmark script demonstrates partitioning concepts using synthetic in-memory data +and simple aggregation operations. While useful for understanding basic configuration principles, +actual performance in production environments may vary significantly based on numerous factors: + +**Data Sources and I/O Characteristics:** + +- **Table providers**: Performance differs greatly between Parquet files, CSV files, databases, and cloud storage +- **Storage type**: Local SSD, network-attached storage, and cloud storage have vastly different characteristics +- **Network latency**: Remote data sources introduce additional latency considerations +- **File sizes and distribution**: Large files may benefit differently from partitioning than many small files + +**Query and Workload Characteristics:** + +- **Operation complexity**: Simple aggregations versus complex joins, window functions, or nested queries +- **Data distribution**: Skewed data may not partition evenly, affecting parallel efficiency +- **Memory usage**: Large datasets may require different memory management strategies +- **Concurrent workloads**: Multiple queries running simultaneously affect resource allocation + +**Hardware and Environment Factors:** + +- **CPU architecture**: Different processors have varying parallel processing capabilities +- **Available memory**: Limited RAM may require different optimization strategies +- **System load**: Other applications competing for resources affect DataFusion performance + +**Recommendations for Production Use:** + +To optimize DataFusion for your specific use case, it is strongly recommended to: + +1. **Create custom benchmarks** using your actual data sources, formats, and query patterns +2. **Test with representative data volumes** that match your production workloads +3. **Measure end-to-end performance** including data loading, processing, and result handling +4. **Evaluate different configuration combinations** for your specific hardware and workload +5. **Monitor resource utilization** (CPU, memory, I/O) to identify bottlenecks in your environment + +This approach will provide more accurate insights into how DataFusion configuration options +will impact your particular applications and infrastructure. + +For more information about available :py:class:`~datafusion.context.SessionConfig` options, see the `rust DataFusion Configuration guide `_, and about :code:`RuntimeEnvBuilder` options in the rust `online API documentation `_.