-
Notifications
You must be signed in to change notification settings - Fork 14
Explain Analyze + Refactor #42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
NGA-TRAN
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very nice, Rob.
Thanks for the long description. It makes it easier to follow and review.
- I like you make it clearer that a stage can be sent to different worker through different tasks. I told myself we need something like this to display/annotate duration for explain analyze. I bet in the future, Gabriel will be able to make the display even easier to read. He has been doing this in retriever.
- Thanks for refactoring the explain. It makes a lot more sense now.
- I have not looked into the details of explain analyze because I am not familiar of the context. But I have no concern as you structure the code very well. I will get more familiar when I use it, add more tests for it and other features.
- I think the PR will break the tpc-h validation tests because you have removed the view creation for q15. If it takes so much time to make the tests pass, it is fine to merge this and fix the test later.
- I also think all of us should work on adding more unit tests and integration tests
| rich_table.add_row(*row_data) | ||
| console = Console() | ||
| console.print(rich_table, markup=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Showing table format is a lot better.
|
|
||
| # Define views required for TPC-H queries (e.g., q15) | ||
| export DFRAY_VIEWS="create view revenue0 (supplier_no, total_revenue) as select l_suppkey, sum(l_extendedprice * (1 - l_discount)) from lineitem where l_shipdate >= date '1996-08-01' and l_shipdate < date '1996-08-01' + interval '3' month group by l_suppkey" | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could not find where this view is created. It is needed to run tpc-h q15. Do you plan to create the view when we run the query?
I have disabled the tpc-h test because it is slow now. Can you try to run this command to see if all the queries pass? I suspect q15 will fail
cargo test --test tpch_validation test_tpch_validation_all_queries -- --ignored --nocapture
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know yet where we will keep catalog metadata that is typically held in the SessionState in a distributed world. I will think more about this.
| if result.is_empty() { | ||
| result.push_str("No distributed stages generated"); | ||
| if i < tasks.len() - 1 { | ||
| result.push('\n'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very minor mostly to make it easier for us to read the plan if we add one more layer in the display:
Stage 0
Task 1
...
Task 2
...
Stage 1
Task 3
...
Task 4
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, i agree that is better.
| pub mod flight_handlers; | ||
| pub mod friendly; | ||
| pub mod isolator; | ||
| pub mod k8s; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
I have been thinking to remove this, too. We can always add it back when it is done in a more generic way
| // add other logical plans for local execution here following the pattern for explain | ||
| p @ LogicalPlan::DescribeTable(_) => self.prepare_local(p, ctx).await, | ||
| p => self.prepare_query(p, ctx).await, | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 This matching logic is a lot better. I should have used this 🙂
LiaCastaneda
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a very nice refactor, thanks! 🙇♀️ I’ve read through the code, but I’ll probably need to revisit DfRayProcessorHandler::make_stream later, as I didn’t fully grasp how it works
| target_stage_ids: &[u64], | ||
| stages: &[StageData], | ||
| ) -> Result<StageAddrs> { | ||
| fn get_stage_addrs_from_tasks(target_stage_ids: &[u64], stages: &[DDTask]) -> Result<StageAddrs> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| fn get_stage_addrs_from_tasks(target_stage_ids: &[u64], stages: &[DDTask]) -> Result<StageAddrs> { | |
| fn get_stage_addrs_from_tasks(target_stage_ids: &[u64], tasks: &[DDTask]) -> Result<StageAddrs> { |
Aren't these all the tasks from all the stages?
| } | ||
| } | ||
|
|
||
| impl DisplayAs for DistributedAnalyzeRootExec { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to document both nodes DistributedAnalyzeRootExec and DistributedAnalyzeExec so we can know the difference, without having to read the planning code but I agree all doc can be included in a follow up PR
Summary
This PR started with adding Explain Analyze functionality to distributed queries, but has extended to clean up some anti patterns and make some useful refactors.
Refactors
Physical Plan -> Stages -> Tasks
The word
Stagewas overloaded in the code to mean a portion of the physical plan, but it also meant the individual partitions of that plan when we further divided it for distribution. Now we have the notion of:Physical Plan- this is the physical plan as generated by DataFusion on the proxy node. Same physical plan as the single query node case.Stage- The physical plan is chopped up in to Stages during execution planning and is is the portion of the physical execution plan that can be executed in a distributed manner.Task- Each stage then has a number of associated number of partitions. When we choose a number of these partitions to execute on a single worker, this is a Task.Note that an important parameter in physical planning is the number of desired partitions. In DataFusion this is, by default, the number of physical cores. In Distributed DataFusion, this is a free parameter. And at the moment its hard coded. I would like to change this to be: First, determined by a
SETvariable statement in SQL, then Second, able to be intelligently determined by the execution planning step.Another important parameter in execution planning is
partitions per workerand this means if a stage has 10 partitions, and partitions per worker is set to 3, then we will chop this stage up in to 4Tasks, and they will execution partitions[0,1,2], [3,4,5], [6,7,8], [9]. This should be renamed topartitions-per-task. I would like this to be able to be determined in SQL by aSET variableparameter, and then later determined automatically or by some policy.Query Execution Path simplified
All queries will either be:
RecordBatchExecand distributed per normalThis greatly simplifies query planning and removes much of the conditional logic for specific types of queries. See the
query_planner.rsfile for details, and how it is used inproxy.rsExplainfunctionality has been refactored to follow the above pattern, instead of sending data back to the client via theTicketfor this explicit query type.Describe Tablehas been added to show how to extend this pattern to other query types.FlightRequestHandler removed
FlightRequestHandler previously had a fair bit of conditional logic to handle different query types. Now that they all work in the same way, this layer of indirection was eliminated and selected code was rolled back in to
DFRayProcessorHandlerTrailing data optionally included in
DoGetstreams fromDFRayProcessorHandlerIn order to handle
Explain Analyze, we need to send the results of query execution back, per normal, but when we want to send back the annotated plan, we need a mechanism to do so.GRPC itself has a notion of trailing metadata, but it is not supported well in
tonicso It has been added to theDFRayProcessorHandlerinmake_streamsuch that if we know we want to send metadata back after exhausting our stream, we can do so. This has been plumbed through so that the metadata is propagated all the way up through the distributed plan. At the moment its used for explain analyze, but if we need to bubble up more data, we should use this mechanism to do so.Host names are shared
Proxies and Processors now assign themselves a unique friendly hostname, and share it upon discovering one another. This makes plans easy to read and logs easier to parse because we will have lots of hosts floating around and friendly names are easier than IP addresses to read and keep in you head.
Much less Tuples
The code base had way too many functions that returned complicated Tuples types
Result<(Vec<String>, String, String)>or something like that. Its not clear what those strings represent, and to a large degree this PR replaces them with sensible structs. We're not fully tuple-free but its much closer and the codebase has improved as a result.Test coverage is much less
The draw back of this large PR is that many previous tests did not apply and have been eliminated. New ones were not yet added in favor of not making the PR any bigger or later.
New Functionality
Describe
Describe table has been implemented in a way to show the pattern of how we can execute queries local to the proxy but still retain the same client interaction and execution path.
From the python client shell script:
Explain
Explain shows the distributed plan, where the stage markers are indicated, but further more shows how the stages are broken in to tasks.
From the python client shell script:
Explain Analyze
Explain shows the results of each Task's Execution plan annotated with metrics produced during execution.
TODO: Also show logical, physical, and stagesFrom the python client shell script: