Skip to content

Commit 8d53e4d

Browse files
committed
[Address Comment] Add logical planner test for call distributed procedure
1 parent 850c05a commit 8d53e4d

File tree

3 files changed

+71
-2
lines changed

3 files changed

+71
-2
lines changed

presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergLogicalPlanner.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@
9898
import static com.facebook.presto.sql.planner.assertions.MatchResult.match;
9999
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyNot;
100100
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree;
101+
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.callDistributedProcedure;
101102
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.exchange;
102103
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.expression;
103104
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.filter;
@@ -107,8 +108,13 @@
107108
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.project;
108109
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.strictProject;
109110
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.strictTableScan;
111+
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.tableFinish;
110112
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.values;
111113
import static com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher.searchFrom;
114+
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.LOCAL;
115+
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_STREAMING;
116+
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.GATHER;
117+
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPARTITION;
112118
import static com.google.common.base.MoreObjects.toStringHelper;
113119
import static com.google.common.collect.ImmutableList.toImmutableList;
114120
import static com.google.common.collect.ImmutableMap.toImmutableMap;
@@ -730,6 +736,50 @@ public void testThoroughlyPushdownForTableWithUnsupportedSpecsWhoseDataAllDelete
730736
}
731737
}
732738

739+
@Test
740+
public void testCallDistributedProcedureOnPartitionedTable()
741+
{
742+
String tableName = "partition_table_for_call_distributed_procedure";
743+
try {
744+
assertUpdate("CREATE TABLE " + tableName + " (c1 integer, c2 varchar) with (partitioning = ARRAY['c1'])");
745+
assertUpdate("INSERT INTO " + tableName + " values(1, 'foo'), (2, 'bar')", 2);
746+
assertUpdate("INSERT INTO " + tableName + " values(3, 'foo'), (4, 'bar')", 2);
747+
assertUpdate("INSERT INTO " + tableName + " values(5, 'foo'), (6, 'bar')", 2);
748+
749+
assertPlan(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s')", tableName, getSession().getSchema().get()),
750+
output(tableFinish(exchange(REMOTE_STREAMING, GATHER,
751+
callDistributedProcedure(
752+
exchange(LOCAL, GATHER,
753+
exchange(REMOTE_STREAMING, REPARTITION,
754+
strictTableScan(tableName, identityMap("c1", "c2")))))))));
755+
756+
// Do not support the filter that couldn't be enforced totally by tableScan
757+
assertQueryFails(format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'c2 > ''bar''')", tableName, getSession().getSchema().get()),
758+
"Unexpected FilterNode found in plan; probably connector was not able to handle provided WHERE expression");
759+
760+
// Support the filter that could be enforced totally by tableScan
761+
assertPlan(getSession(), format("CALL system.rewrite_data_files(table_name => '%s', schema => '%s', filter => 'c1 > 3')", tableName, getSession().getSchema().get()),
762+
output(tableFinish(exchange(REMOTE_STREAMING, GATHER,
763+
callDistributedProcedure(
764+
exchange(LOCAL, GATHER,
765+
exchange(REMOTE_STREAMING, REPARTITION,
766+
strictTableScan(tableName, identityMap("c1", "c2")))))))),
767+
plan -> assertTableLayout(
768+
plan,
769+
tableName,
770+
withColumnDomains(ImmutableMap.of(
771+
new Subfield(
772+
"c1",
773+
ImmutableList.of()),
774+
Domain.create(ValueSet.ofRanges(greaterThan(INTEGER, 3L)), false))),
775+
TRUE_CONSTANT,
776+
ImmutableSet.of("c1")));
777+
}
778+
finally {
779+
assertUpdate("DROP TABLE " + tableName);
780+
}
781+
}
782+
733783
@DataProvider(name = "timezones")
734784
public Object[][] timezones()
735785
{

presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/StatementAnalyzer.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1198,8 +1198,15 @@ protected Scope visitCall(Call call, Optional<Scope> scope)
11981198
if (analysis.isDescribe()) {
11991199
return createAndAssignScope(call, scope);
12001200
}
1201-
QualifiedObjectName procedureName = analysis.getProcedureName()
1202-
.orElse(createQualifiedObjectName(session, call, call.getName(), metadata));
1201+
Optional<QualifiedObjectName> procedureNameOptional = analysis.getProcedureName();
1202+
QualifiedObjectName procedureName;
1203+
if (!procedureNameOptional.isPresent()) {
1204+
procedureName = createQualifiedObjectName(session, call, call.getName(), metadata);
1205+
analysis.setProcedureName(Optional.of(procedureName));
1206+
}
1207+
else {
1208+
procedureName = procedureNameOptional.get();
1209+
}
12031210
ConnectorId connectorId = metadata.getCatalogHandle(session, procedureName.getCatalogName())
12041211
.orElseThrow(() -> new SemanticException(MISSING_CATALOG, call, "Catalog %s does not exist", procedureName.getCatalogName()));
12051212

presto-main-base/src/test/java/com/facebook/presto/sql/planner/assertions/PlanMatchPattern.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import com.facebook.presto.spi.plan.SemiJoinNode;
4444
import com.facebook.presto.spi.plan.SortNode;
4545
import com.facebook.presto.spi.plan.SpatialJoinNode;
46+
import com.facebook.presto.spi.plan.TableFinishNode;
4647
import com.facebook.presto.spi.plan.TableWriterNode;
4748
import com.facebook.presto.spi.plan.TopNNode;
4849
import com.facebook.presto.spi.plan.UnionNode;
@@ -60,6 +61,7 @@
6061
import com.facebook.presto.sql.planner.iterative.GroupReference;
6162
import com.facebook.presto.sql.planner.plan.ApplyNode;
6263
import com.facebook.presto.sql.planner.plan.AssignUniqueId;
64+
import com.facebook.presto.sql.planner.plan.CallDistributedProcedureNode;
6365
import com.facebook.presto.sql.planner.plan.EnforceSingleRowNode;
6466
import com.facebook.presto.sql.planner.plan.ExchangeNode;
6567
import com.facebook.presto.sql.planner.plan.GroupIdNode;
@@ -691,6 +693,16 @@ public static PlanMatchPattern enforceSingleRow(PlanMatchPattern source)
691693
return node(EnforceSingleRowNode.class, source);
692694
}
693695

696+
public static PlanMatchPattern callDistributedProcedure(PlanMatchPattern source)
697+
{
698+
return node(CallDistributedProcedureNode.class, source);
699+
}
700+
701+
public static PlanMatchPattern tableFinish(PlanMatchPattern source)
702+
{
703+
return node(TableFinishNode.class, source);
704+
}
705+
694706
public static PlanMatchPattern tableWriter(List<String> columns, List<String> columnNames, PlanMatchPattern source)
695707
{
696708
return node(TableWriterNode.class, source).with(new TableWriterMatcher(columns, columnNames));

0 commit comments

Comments
 (0)