Skip to content

Conversation

@gabotechs
Copy link
Collaborator

#191.

There are certain nodes we cannot distribute, like the ones related to table introspection. This PR adds a new controlled error in the distributed planner that prompts it to not distribute the query at all in a controlled manner, and uses it for StreamingTableExec.

Copy link
Contributor

@adriangb adriangb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me! In theory I think we could partially distribute plans, e.g. if there's a join and only one subtree has one of these non distributable plans we can still distribute the other subtree. But I don't have a use case for that at the moment and it can just be a future improvement.

@gabotechs
Copy link
Collaborator Author

if there's a join and only one subtree has one of these non distributable plans we can still distribute the other subtree

What I found challenging is to send StreamingTableExec over the wire. As it contains those Vec<Arc<dyn PartitionStream>>, I'd expect the effort of serializing those to be too big to justify the work.

CoalesceBatchesExec: target_batch_size=8192
FilterExec: table_name@2 = weather
RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
StreamingTableExec: partition_sizes=1, projection=[table_catalog, table_schema, table_name, column_name, is_nullable, data_type]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So a non-distributed plan can still include NetworkCoalesceExec? I had assumed it was exclusive to distributed plans. Was the intended fix to ensure that plans with StreamingTableExec remain non-distributed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wait, you are right, there's something wrong here. Let me fix it

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch! 👍

@gabotechs gabotechs force-pushed the gabrielmusat/add-non-distributable-nodes branch from 8baa2e8 to 3c7e489 Compare October 17, 2025 15:04
@gabotechs gabotechs merged commit 821bc3f into main Oct 17, 2025
4 checks passed
@gabotechs gabotechs deleted the gabrielmusat/add-non-distributable-nodes branch October 17, 2025 15:12
@adriangb
Copy link
Contributor

What I found challenging is to send StreamingTableExec over the wire. As it contains those Vec<Arc<dyn PartitionStream>>, I'd expect the effort of serializing those to be too big to justify the work.

Right, agreed, we shouldn't even try to serialize a StreamingTableExec over the wire.
I don't think it matters for StreamingTableExec because as you say it's only used by information schema, etc.
But since we introduced a general "unserializable" error/handling here I do think there's a world where we handle that more gracefully. E.g. imagine you have:

HashJoinExec
    RepartitionExec
        AggregateExec
             DataSourceExec
    RepartitionExec
        SortExec
           StreamingTableExec

We could distribute this as:

HashJoinExec
    NetworkRepartitionExec  <- distribute this sub-tree of the plan
        AggregateExec
             DataSourceExec
    RepartitionExec  <- keep this sub-tree 
        SortExec
           StreamingTableExec

But I don't think this should be a priority until we have some use case where there is a non-serializable node and a subtree that is expensive / benefits from distributing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants