Skip to content

Compute HashJoinExec dynamic filters progressively #19580

@adriangb

Description

@adriangb

Currently the structure is:

DynamicFilter [
  CASE (hash_repartition % 12)
    WHEN 2 THEN 
        a@0 >= ab AND a@0 <= ab  AND b@1 >= bb AND b@1 <= bb 
        AND
        struct(a@0, b@1) IN (SET) ([{c0: ab, c1: bb}])
    WHEN 4 THEN 
        a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba 
        AND
        struct(a@0, b@1) IN (SET) ([{c0: aa, c1: ba}])
    ELSE false 
  END
]

I think we should be able to rewrite this into:

CASE (hash_repartition % 12)
  WHEN 0 THEN
      -- We can no longer optimize away empty partitions / CASE branches...
      DynamicFilter [ true ]
      AND
      DynamicFilter [ true ]
  -- Skipping the empty branches
  WHEN 2 THEN 
      DynamicFilter [a@0 >= ab AND a@0 <= ab  AND b@1 >= bb AND b@1 <= bb ]
      AND
      DynamicFilter [ struct(a@0, b@1) IN (SET) ([{c0: ab, c1: bb}]) ]
  WHEN 4 THEN 
      DynamicFilter [ a@0 >= aa AND a@0 <= aa  AND b@1 >= ba AND b@1 <= ba ]
      AND
      DynamicFilter [ struct(a@0, b@1) IN (SET) ([{c0: aa, c1: ba}]) ]
  ELSE false 
END

We could then also remove the tokio::sync::Barrier but I'm not sure if that's a good idea: if the probe side opens a large batch of files with partial filters and then immediately the filters complete it won't fully pick them up until the next batch of files, which would make for a more inefficient scan.

But I do feel that this more granular structure would be better for distributed systems at least: there would only be 1 producer for each dynamic filter, so a simple broadcast updates would work, without any need to "combine" simultaneous updates from multiple produces on the consumer side.

This also means dropping the fancy bit where we eliminate empty branches from the CASE expression.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions