Skip to content

Conversation

@gabotechs
Copy link
Collaborator

@gabotechs gabotechs commented Sep 29, 2025

Previously, upon crossing a network boundary, we serialize and deserialize the full plan for sending it over the wire. This results in a good amount of unnecessary serialization/deserialization, for example:

  • The NetworkBoundary implementations do not need to read the input stage plan at all, they just need it there so that it can be sent in the gRPC request to the ArrowFlightEndpoint, therefore, it does not need to be deserialized at that point
  • The ArrowFlightEndpoint needs the current Stage's subplan deserialized as it needs to be executed there, but it does not need any of the input stages plans to be deserialized, those can perfectly live encoded as Bytes, as they are just useful to be sent over the wire to other ArrowFlightEndpoints.

This PR Introduces a new InputStage struct that store input stages either:

  • Decoded: the input StageExec struct is stored as is
  • Encoded: the input StageExec is stored as Bytes

Storing the InputStage as Encoded allows us to just passthrough the Bytes without performing any unnecessary decoding + re-encoding in parts of the application that just need it be passed as part of a protobuf request.


The changes in this PR running against TPCH benchmarks with a 0.1 scale factor result in this:

==== Comparison with the previous benchmark from 2025-09-28 18:57:06 UTC ====
os:        macos
arch:      aarch64
cpu cores: 16
threads:   2 -> 2
workers:   8 -> 8
=============================================================================
 Query 1: prev=  45 ms, new=  44 ms, diff=1.02 faster ✔
 Query 2: prev=  11 ms, new=  11 ms, diff=1.00 none
 Query 3: prev=  23 ms, new=  21 ms, diff=1.10 faster ✔
 Query 4: prev=  20 ms, new=  19 ms, diff=1.05 faster ✔
 Query 5: prev=  23 ms, new=  21 ms, diff=1.10 faster ✔
 Query 6: prev=  11 ms, new=  11 ms, diff=1.00 none
 Query 7: prev=  32 ms, new=  29 ms, diff=1.10 faster ✔
 Query 8: prev=  42 ms, new=  35 ms, diff=1.20 faster ✔
 Query 9: prev=  50 ms, new=  39 ms, diff=1.28 faster ✅
Query 10: prev=  34 ms, new=  31 ms, diff=1.10 faster ✔
Query 11: prev=   8 ms, new=   8 ms, diff=1.00 none
Query 12: prev=  39 ms, new=  36 ms, diff=1.08 faster ✔
Query 13: prev=  36 ms, new=  35 ms, diff=1.03 faster ✔
Query 14: prev=  20 ms, new=  20 ms, diff=1.00 none
Query 15: prev=  28 ms, new=  26 ms, diff=1.08 faster ✔
Query 16: prev=  25 ms, new=  26 ms, diff=1.04 slower ✖
Query 17: prev=  28 ms, new=  27 ms, diff=1.04 faster ✔
Query 18: prev= 142 ms, new=  52 ms, diff=2.73 faster ✅
Query 19: prev=  18 ms, new=  18 ms, diff=1.00 none
Query 20: prev=  16 ms, new=  16 ms, diff=1.00 none
Query 21: prev=  37 ms, new=  32 ms, diff=1.16 faster ✔
Query 22: prev=  13 ms, new=  11 ms, diff=1.18 faster ✔

Copy link
Collaborator

@NGA-TRAN NGA-TRAN left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This kind of optimization is always needed in distributed system. Nice work @gabotechs

@gabotechs gabotechs force-pushed the gabrielmusat/remove-unnecessary-stage-proto-serde branch from c303d80 to a1f5d8b Compare September 29, 2025 14:30
Copy link
Collaborator

@jayshrivastava jayshrivastava left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM :)

I think some comments on stage_from_proto and proto_from_stage might be useful.

  • proto_from_stage never needs to decode stage.inputs because these will be passed through
  • stage_from_proto sees decoded InputStage in the coordinator only, otherwise, to should be decoded.

Having two variants for InputStage adds some complexity so it would be nice to make this behavior more clear to the reader

…serde

# Conflicts:
#	src/distributed_physical_optimizer_rule.rs
#	src/execution_plans/stage.rs
#	src/protobuf/mod.rs
@robtandy
Copy link
Collaborator

This the exact improvement that I was hoping we’d get to from the first design of stages; we only need to decode the top of the stage tree. Nice work! I’ll get to review the PR later today

Copy link
Collaborator

@robtandy robtandy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super happy about this PR. Thank you @gabotechs 🚀 !

Addressing @jayshrivastava 's comments about clarity to the library user, perhaps something we can add in an architecture document in the future is a diagram of the execution stage tree and how it gets sent over the network showing the decoded root and encoded children. Perhaps that will help the clarity.

The alternative is the way we used to have things structured, which was to send all tasks to workers before execution, which added an additional round of preflight requests and more state to keep track of. I very much prefer this solution of the execution tree arriving in the do_get_statement payload, so I'm glad this optimization arrives.

Regarding quantifing improvements. Scale factor 0.1 is quite small, so the relative importance of communication overhead is amplified. If we had done an improvement to the flow of data instead of communication overhead, it might get lost at such a small scale. I'm not sure where the right value for scale factor, but I think it is somewhere closer to SF=10, where it can still be done in a single node way, but we are also moving a more significant amount of data. WDYT, @gabotechs ?

@gabotechs
Copy link
Collaborator Author

I'm not sure where the right value for scale factor, but I think it is somewhere closer to SF=10, where it can still be done in a single node way, but we are also moving a more significant amount of data. WDYT?

With a SF=10 the overhead of moving data is so much that it outweights the overhead of serializing/deserializing plans over the network. The truth is that TPCH queries are very small compared to the amount of data they move, so they are not the best tool for measuring the changes in this PR.

A more suitable way of benchmarking this would be to have another set of SQL queries with thousands of LOC, but I don't think we have those benchmarks.

@gabotechs gabotechs merged commit 0d912ba into main Sep 30, 2025
4 checks passed
@gabotechs gabotechs deleted the gabrielmusat/remove-unnecessary-stage-proto-serde branch September 30, 2025 06:20
Comment on lines 293 to 298
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(StageExec {
query_id: self.query_id,
num: self.num,
name: self.name.clone(),
plan: self.plan.clone(),
inputs: children,
tasks: self.tasks.clone(),
depth: self.depth,
}))
plan_err!("with_new_children() not supported for StageExec")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI with this change datafusion-distributed is now incompatible with datafusion-tracing which I believe uses the TreeNode API to wrap every node: https://github.com/datafusion-contrib/datafusion-tracing/blob/f0aee9ed2960fa101570ddc7ad11670c8ee64289/datafusion-tracing/src/instrument_rule.rs#L72

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And there's no trivial solution: it seems that both this optimizer rule and the instrumentation rule want to be the last / outermost optimizer 😢

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 this seems challenging...

Even if we were able to return children like before, the children are the child StageExec, not the inner StageExec.plan that is actually the thing that gets executed, so pretty much the whole plan is going to be left untraced.

In order to solve this, the children of StageExec would probably need to be its inner plan, and not its InputStages.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll see if I can draft something up, seems difficult but not impossible

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants