@@ -385,6 +385,79 @@ mod tests {
385385 " ) ;
386386 }
387387
388+ #[ tokio:: test]
389+ async fn test_left_join_distributed ( ) {
390+ let query = r#"
391+ WITH a AS (
392+ SELECT
393+ AVG("MinTemp") as "MinTemp",
394+ "RainTomorrow"
395+ FROM weather
396+ WHERE "RainToday" = 'yes'
397+ GROUP BY "RainTomorrow"
398+ ), b AS (
399+ SELECT
400+ AVG("MaxTemp") as "MaxTemp",
401+ "RainTomorrow"
402+ FROM weather
403+ WHERE "RainToday" = 'no'
404+ GROUP BY "RainTomorrow"
405+ )
406+ SELECT
407+ a."MinTemp",
408+ b."MaxTemp"
409+ FROM a
410+ LEFT JOIN b
411+ ON a."RainTomorrow" = b."RainTomorrow"
412+
413+ "# ;
414+ let plan = sql_to_explain ( query) . await . unwrap ( ) ;
415+ assert_snapshot ! ( plan, @r"
416+ ┌───── Stage 5 Task: partitions: 0..3,unassigned]
417+ │partitions [out:4 <-- in:4 ] CoalesceBatchesExec: target_batch_size=8192
418+ │partitions [out:4 <-- in:1 ] HashJoinExec: mode=CollectLeft, join_type=Left, on=[(RainTomorrow@1, RainTomorrow@1)], projection=[MinTemp@0, MaxTemp@2]
419+ │partitions [out:1 <-- in:4 ] CoalescePartitionsExec
420+ │partitions [out:4 <-- in:4 ] ProjectionExec: expr=[avg(weather.MinTemp)@1 as MinTemp, RainTomorrow@0 as RainTomorrow]
421+ │partitions [out:4 <-- in:4 ] AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MinTemp)]
422+ │partitions [out:4 <-- in:4 ] CoalesceBatchesExec: target_batch_size=8192
423+ │partitions [out:4 ] ArrowFlightReadExec: Stage 2
424+ │partitions [out:4 <-- in:4 ] ProjectionExec: expr=[avg(weather.MaxTemp)@1 as MaxTemp, RainTomorrow@0 as RainTomorrow]
425+ │partitions [out:4 <-- in:4 ] AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MaxTemp)]
426+ │partitions [out:4 <-- in:4 ] CoalesceBatchesExec: target_batch_size=8192
427+ │partitions [out:4 ] ArrowFlightReadExec: Stage 4
428+ │
429+ └──────────────────────────────────────────────────
430+ ┌───── Stage 4 Task: partitions: 0..3,unassigned]
431+ │partitions [out:4 <-- in:4 ] RepartitionExec: partitioning=Hash([RainTomorrow@0], 4), input_partitions=4
432+ │partitions [out:4 <-- in:4 ] AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MaxTemp)]
433+ │partitions [out:4 <-- in:4 ] CoalesceBatchesExec: target_batch_size=8192
434+ │partitions [out:4 <-- in:4 ] FilterExec: RainToday@1 = no, projection=[MaxTemp@0, RainTomorrow@2]
435+ │partitions [out:4 ] ArrowFlightReadExec: Stage 3
436+ │
437+ └──────────────────────────────────────────────────
438+ ┌───── Stage 3 Task: partitions: 0..3,unassigned]
439+ │partitions [out:4 <-- in:1 ] RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
440+ │partitions [out:1 ] DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[MaxTemp, RainToday, RainTomorrow], file_type=parquet, predicate=RainToday@1 = no, pruning_predicate=RainToday_null_count@2 != row_count@3 AND RainToday_min@0 <= no AND no <= RainToday_max@1, required_guarantees=[RainToday in (no)]
441+ │
442+ │
443+ └──────────────────────────────────────────────────
444+ ┌───── Stage 2 Task: partitions: 0..3,unassigned]
445+ │partitions [out:4 <-- in:4 ] RepartitionExec: partitioning=Hash([RainTomorrow@0], 4), input_partitions=4
446+ │partitions [out:4 <-- in:4 ] AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MinTemp)]
447+ │partitions [out:4 <-- in:4 ] CoalesceBatchesExec: target_batch_size=8192
448+ │partitions [out:4 <-- in:4 ] FilterExec: RainToday@1 = yes, projection=[MinTemp@0, RainTomorrow@2]
449+ │partitions [out:4 ] ArrowFlightReadExec: Stage 1
450+ │
451+ └──────────────────────────────────────────────────
452+ ┌───── Stage 1 Task: partitions: 0..3,unassigned]
453+ │partitions [out:4 <-- in:1 ] RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
454+ │partitions [out:1 ] DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[MinTemp, RainToday, RainTomorrow], file_type=parquet, predicate=RainToday@1 = yes, pruning_predicate=RainToday_null_count@2 != row_count@3 AND RainToday_min@0 <= yes AND yes <= RainToday_max@1, required_guarantees=[RainToday in (yes)]
455+ │
456+ │
457+ └──────────────────────────────────────────────────
458+ " ) ;
459+ }
460+
388461 #[ tokio:: test]
389462 async fn test_sort ( ) {
390463 let query = r#"SELECT * FROM weather ORDER BY "MinTemp" DESC "# ;
0 commit comments