|
98 | 98 | import static com.facebook.presto.sql.planner.assertions.MatchResult.match; |
99 | 99 | import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyNot; |
100 | 100 | import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree; |
| 101 | +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.callDistributedProcedure; |
101 | 102 | import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.exchange; |
102 | 103 | import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.expression; |
103 | 104 | import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.filter; |
|
107 | 108 | import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.project; |
108 | 109 | import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.strictProject; |
109 | 110 | import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.strictTableScan; |
| 111 | +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.tableFinish; |
110 | 112 | import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.values; |
111 | 113 | import static com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher.searchFrom; |
| 114 | +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.LOCAL; |
| 115 | +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_STREAMING; |
| 116 | +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.GATHER; |
| 117 | +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPARTITION; |
112 | 118 | import static com.google.common.base.MoreObjects.toStringHelper; |
113 | 119 | import static com.google.common.collect.ImmutableList.toImmutableList; |
114 | 120 | import static com.google.common.collect.ImmutableMap.toImmutableMap; |
@@ -730,6 +736,50 @@ public void testThoroughlyPushdownForTableWithUnsupportedSpecsWhoseDataAllDelete |
730 | 736 | } |
731 | 737 | } |
732 | 738 |
|
| 739 | + @Test |
| 740 | + public void testCallDistributedProcedureOnPartitionedTable() |
| 741 | + { |
| 742 | + String tableName = "partition_table_for_call_distributed_procedure"; |
| 743 | + try { |
| 744 | + assertUpdate("CREATE TABLE " + tableName + " (c1 integer, c2 varchar) with (partitioning = ARRAY['c1'])"); |
| 745 | + assertUpdate("INSERT INTO " + tableName + " values(1, 'foo'), (2, 'bar')", 2); |
| 746 | + assertUpdate("INSERT INTO " + tableName + " values(3, 'foo'), (4, 'bar')", 2); |
| 747 | + assertUpdate("INSERT INTO " + tableName + " values(5, 'foo'), (6, 'bar')", 2); |
| 748 | + |
| 749 | + assertPlan(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s')", tableName, getSession().getSchema().get()), |
| 750 | + output(tableFinish(exchange(REMOTE_STREAMING, GATHER, |
| 751 | + callDistributedProcedure( |
| 752 | + exchange(LOCAL, GATHER, |
| 753 | + exchange(REMOTE_STREAMING, REPARTITION, |
| 754 | + strictTableScan(tableName, identityMap("c1", "c2"))))))))); |
| 755 | + |
| 756 | + // Do not support the filter that couldn't be enforced totally by tableScan |
| 757 | + assertQueryFails(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'c2 > ''bar''')", tableName, getSession().getSchema().get()), |
| 758 | + "Unexpected FilterNode found in plan; probably connector was not able to handle provided WHERE expression"); |
| 759 | + |
| 760 | + // Support the filter that could be enforced totally by tableScan |
| 761 | + assertPlan(getSession(), format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'c1 > 3')", tableName, getSession().getSchema().get()), |
| 762 | + output(tableFinish(exchange(REMOTE_STREAMING, GATHER, |
| 763 | + callDistributedProcedure( |
| 764 | + exchange(LOCAL, GATHER, |
| 765 | + exchange(REMOTE_STREAMING, REPARTITION, |
| 766 | + strictTableScan(tableName, identityMap("c1", "c2")))))))), |
| 767 | + plan -> assertTableLayout( |
| 768 | + plan, |
| 769 | + tableName, |
| 770 | + withColumnDomains(ImmutableMap.of( |
| 771 | + new Subfield( |
| 772 | + "c1", |
| 773 | + ImmutableList.of()), |
| 774 | + Domain.create(ValueSet.ofRanges(greaterThan(INTEGER, 3L)), false))), |
| 775 | + TRUE_CONSTANT, |
| 776 | + ImmutableSet.of("c1"))); |
| 777 | + } |
| 778 | + finally { |
| 779 | + assertUpdate("DROP TABLE " + tableName); |
| 780 | + } |
| 781 | + } |
| 782 | + |
733 | 783 | @DataProvider(name = "timezones") |
734 | 784 | public Object[][] timezones() |
735 | 785 | { |
|
0 commit comments