Skip to content

Conversation

@NGA-TRAN
Copy link
Collaborator

@NGA-TRAN NGA-TRAN commented Jul 24, 2025

This PR:

  • Enables us to create 3 different types of test clusters:
    1. Two-worker cluster that understands small tpc-h files stored in ./tpch/data/
    2. Two-worker cluster with mock data of 2 tables (and we add as many as we want)
    3. Custom number of workers and custom data files
  • Adds some tests
    1. Tests for cluster setups
    2. Tests for all tpc-h queries that run on small data set (20 rows each file)
    3. Basic tests on mock data

Best way to review this PR

  1. Start with cluster_setup.rs that is a util to create cluster for us. Main jobs of this file:
    • Help create 3 different choices of clusters for different testing purposes in one line
    • Help execute a given query in one line
    • Tests in this files testing cluster setup and operation
  2. Then tpch_small.rs that tests all 22 tpc-h queries on a small set of data
  3. Then mock_data.rs that creates 2 tables and add some data
  4. Then basic_Tests.rs that shows some example how to test the mock data

Next step

todo: Will link a ticket for useful tests to add shortly

NGA-TRAN and others added 30 commits June 20, 2025 10:57
…s have been removed and new built, scripts and readme are added
Move code from internal mp-rs to open source. All datadog dependencie…
Rename all datafusion-distributed to distributed-datafusion and remov…
…-version

Increase minimum Rust version to 1.82
Add distributed plan and stages to EXPLAIN output
}

// Give processes time to die
thread::sleep(Duration::from_secs(1));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A sub issue to improve this to poll the workers until they die would be good 👍🏽

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, it can be done in this PR similar to is_cluster_ready

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So many needed sleeps so I agree we should have one sub issue to make them better

cmd.args(["--mode", "proxy", "--port", &proxy_port.to_string()])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.env("RUST_LOG", "info,distributed_datafusion=debug")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This environment vars are a bit confusing. How do they get set?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its used here upon startup

Copy link
Collaborator

@LiaCastaneda LiaCastaneda Jul 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but i agree we should document it somewhere

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we should add RUST_LOG into the document. We need a PR to write and update our document

…g)> with Vec<TableConfig> for improved clarity
@NGA-TRAN NGA-TRAN marked this pull request as draft July 28, 2025 20:10

Ok(())
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes are necessary to support view registration in the same way we currently register tables. They'll remain in place until we establish a proper catalog implementation.

@@ -1,1222 +1,3 @@
use std::fs;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The red lines here are moved to tpch_validation_helpers.rs to keep this file short and only for importing modules

@@ -0,0 +1,1222 @@
use std::fs;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The content of this file is from mod.rs. No need to review it. Also, since now we have a way to run queries without going thru Python, soon I will remove this file after I simplify the tpch validation tests

@NGA-TRAN NGA-TRAN marked this pull request as ready for review July 30, 2025 02:32
@NGA-TRAN NGA-TRAN requested review from LiaCastaneda, fmonjalet and jayshrivastava and removed request for LiaCastaneda and jayshrivastava July 30, 2025 03:04
");
}

// TODO: Investigate why only at most 4 or 5 queries can be executed.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will create a ticket for this

Copy link
Collaborator

@fmonjalet fmonjalet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No blocker on my end since this is a net improvement of the testing situation. I left a few comments that are now on outdated diffs but are still relevant.

Eventually I think we should avoid spawning processes and listening to network ports for these tests, and rather spawn tokio tasks for the proxy and worker logic, with in memory channels. The reasons for this are mostly:

  • The ability to instrument for race reproducibility and do fault injection. This is something we had to do a lot on a previous distributed query engine.
  • The ability to debug and put breakpoints easily (maybe VSCode or RustRover are handling this well though).
  • Not having to get process and network management right (which can be hard in corner cases).
  • It's impossible to do when the code relies on global / implicit state, which is a nice feature of the 1 process approach: state has to be properly contained. This is a very important property if we want people to be able to embed this logic in various projects.

But getting a first framework that works well is a great step forward, and the APIs are well built so that the above changes should not impact the tests too much. Thanks for the great work!

static MOCK_DATA_INIT: Once = Once::new();

/// Global cluster instance shared across basic tests
static BASIC_CLUSTER: OnceCell<TestCluster> = OnceCell::const_new();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these are sub processes, how is this cluster cleaned up in case of panic or harder (sigsegv, sigabrt) fault?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very good point. Let me dig in a bit more how we handle this


// Configure cluster with mock data tables
let config = ClusterConfig::new()
.with_base_port(33000) // Use different port range from TPC-H tests
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally in the future we have in-process workers and no network to avoid this kind of hassle (using in-memory channels)

Comment on lines 75 to 76
let result = execute_basic_query("SELECT * FROM customers ORDER BY customer_id").await;
assert_snapshot!(result, @r"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a really nice and simple API to write tests!

minor (since it's easy to address at any point in the future): Looking at it, I am even wondering if we should not adhere to sqllogic test format instead, so that we can reuse existing test beds.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean we use sqllogictest instead of of this insta assert_snapshot? I like sqllogictest and I think it is a good idea

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes exactly this. They have their own quirks so we'll also need the type of test you just wrote in some cases, but there is a huge number of existing tests that we could easily port, which is useful.

"5,Eve Brown,Sydney,Australia",
];

fs::write(path, customers_data.join("\n"))?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do we need to write this to the FS vs registering this as a MemTable directly?

pub fn allocate_port_range() -> u16 {
let mut current_port = CURRENT_MAX_PORT.lock().unwrap();
let base_port = *current_port;
*current_port += MAX_CLUSTER_SIZE;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine as long as we don't create too many clusters and tests. Do we have tests that run concurrently and compete for ports?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a big range of available ports for us to use and the goal is all test in the same file/module should use only one cluster so the range won't grow too much but I agree this can be an issue when we add a lot more tests.

I will open ticket for us to improve this. In general, the test infrastructure for a distributed system is way more complicated than a single-node and will need a lot of work before we feel comfortable

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants