Skip to content

Conversation

@gabotechs
Copy link
Collaborator

@gabotechs gabotechs commented Sep 8, 2025

Before, the distributed benchmarks where spawning a localhost worker in the same process as the benchmark itself, without applying any constraint to the resources used for either the localhost worker or the overall process.

This PR ships the ability to run workers as different processes with constrained resources and benchmark against them.

From the README.md:

Running TPCH benchmarks in distributed mode

Running the benchmarks in distributed mode implies:

  • running 1 or more workers in separate terminals
  • running the benchmarks in an additional terminal

The workers can be spawned by passing the --spawn <port> flag, for example, for spawning 3 workers:

cargo run -p datafusion-distributed-benchmarks --release -- tpch --spawn 8000
cargo run -p datafusion-distributed-benchmarks --release -- tpch --spawn 8001
cargo run -p datafusion-distributed-benchmarks --release -- tpch --spawn 8002

With the three workers running in separate terminals, the TPCH benchmarks can be run in distributed mode with:

cargo run -p datafusion-distributed-benchmarks --release -- tpch --workers 8000,8001,8002

A good way of measuring the impact of distribution is to limit the physical threads each worker can use. For example,
it's expected that running 8 workers with 2 physical threads each one (8 * 2 = 16 total) is faster than running in
single-node with just 2 threads (1 * 3 = 2 total).

cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8000 & 
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8001 & 
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8002 & 
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8003 & 
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8004 & 
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8005 & 
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8006 & 
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8007 & 
cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --workers 8000,8001,8002,8003,8004,8005,8006,8007

The run.sh script already does this for you in a more ergonomic way:

WORKERS=8 ./run.sh -m --threads -2

Copy link
Collaborator

@NGA-TRAN NGA-TRAN left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great additional setting options. I am thinking about these as the follow-up PRs:

  1. Add (unit/integration) test to verify the the number of threads
  2. 🤔 : I know DF repartitions data per number of cores. Will the number of threads here represent that number of cores and automatically applied?
  3. 🤔 : Should we try to test number of partitions > number of threads and observe functional and performance behavior?

@gabotechs
Copy link
Collaborator Author

  1. Add (unit/integration) test to verify the the number of threads

Note that this is just for benchmarks, which are some kind of tests by themselves. Users are not really able to use this benchmarking functionality in their programs.

  1. I know DF repartitions data per number of cores. Will the number of threads here represent that number of cores and automatically applied?

I don't think so, DataFusion will still see the original number of cores in the machine, and it will repartition accordingly. The benchmarks allow passing a --partitions flag (inherited from upstream DataFusion) that we can use for specifying the amount of partitions manually.

🤔 Now that you mention it, if --partitions is not provided, but --threads is provided, we should probably default the partition count to the --threads variable. I'll make that change.

  1. Should we try to test number of partitions > number of threads and observe functional and performance behavior?

We can do that now yes! we can do something like --threads 4 and --partitions 16 now.

@NGA-TRAN
Copy link
Collaborator

We can do that now yes! we can do something like --threads 4 and --partitions 16 now.

Another question, is it possible to add PARTITIONs to the run.sh script? Something like this
WORKERS=8 THREADS=2 PARTITIONS=4 ./run.sh

@gabotechs
Copy link
Collaborator Author

Another question, is it possible to add PARTITIONs to the run.sh script? Something like this
WORKERS=8 THREADS=2 PARTITIONS=4 ./run.sh

I added a passthrough of the same arguments the example supports to the script. Now we can do:

WORKERS=8 ./run.sh -m --threads 2 --partitions 4

Base automatically changed from gabrielmusat/in-memory-tpch to main September 11, 2025 14:47
# Conflicts:
#	benchmarks/src/tpch/run.rs
#	benchmarks/src/util/memory.rs
@gabotechs gabotechs merged commit c12c271 into main Sep 11, 2025
3 of 4 checks passed
@gabotechs gabotechs deleted the gabrielmusat/tpch-threads-and-workers branch September 11, 2025 14:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants