diff --git a/presto-clp/pom.xml b/presto-clp/pom.xml index 2a4be7f9df24f..d13b59968aff5 100644 --- a/presto-clp/pom.xml +++ b/presto-clp/pom.xml @@ -135,6 +135,13 @@ test + + com.facebook.presto + presto-main-base + test-jar + test + + org.apache.commons commons-math3 diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConnector.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConnector.java index 7b24e7a650e3c..3b81d6bb2062f 100644 --- a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConnector.java +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConnector.java @@ -15,6 +15,7 @@ import com.facebook.airlift.bootstrap.LifeCycleManager; import com.facebook.airlift.log.Logger; +import com.facebook.presto.plugin.clp.optimization.ClpPlanOptimizerProvider; import com.facebook.presto.plugin.clp.split.filter.ClpSplitFilterProvider; import com.facebook.presto.spi.connector.Connector; import com.facebook.presto.spi.connector.ConnectorMetadata; diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpFunctions.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpFunctions.java new file mode 100644 index 0000000000000..39adbe1172b33 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpFunctions.java @@ -0,0 +1,68 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.common.block.Block; +import com.facebook.presto.common.type.StandardTypes; +import com.facebook.presto.spi.function.Description; +import com.facebook.presto.spi.function.ScalarFunction; +import com.facebook.presto.spi.function.SqlType; +import io.airlift.slice.Slice; + +public final class ClpFunctions +{ + private ClpFunctions() + { + } + + @ScalarFunction(value = "CLP_GET_BIGINT", deterministic = false) + @Description("Retrieves an integer value corresponding to the given JSON path.") + @SqlType(StandardTypes.BIGINT) + public static long clpGetBigint(@SqlType(StandardTypes.VARCHAR) Slice jsonPath) + { + throw new UnsupportedOperationException("CLP_GET_BIGINT is a placeholder function without implementation."); + } + + @ScalarFunction(value = "CLP_GET_DOUBLE", deterministic = false) + @Description("Retrieves a floating point value corresponding to the given JSON path.") + @SqlType(StandardTypes.DOUBLE) + public static double clpGetDouble(@SqlType(StandardTypes.VARCHAR) Slice jsonPath) + { + throw new UnsupportedOperationException("CLP_GET_DOUBLE is a placeholder function without implementation."); + } + + @ScalarFunction(value = "CLP_GET_BOOL", deterministic = false) + @Description("Retrieves a boolean value corresponding to the given JSON path.") + @SqlType(StandardTypes.BOOLEAN) + public static boolean clpGetBool(@SqlType(StandardTypes.VARCHAR) Slice jsonPath) + { + throw new UnsupportedOperationException("CLP_GET_BOOL is a placeholder function without implementation."); + } + + @ScalarFunction(value = "CLP_GET_STRING", deterministic = false) + @Description("Retrieves a string value corresponding to the given JSON path.") + @SqlType(StandardTypes.VARCHAR) + public static Slice clpGetString(@SqlType(StandardTypes.VARCHAR) Slice jsonPath) + { + throw new UnsupportedOperationException("CLP_GET_STRING is a placeholder function without implementation."); + } + + @ScalarFunction(value = "CLP_GET_STRING_ARRAY", deterministic = false) + @Description("Retrieves an array value corresponding to the given JSON path and converts each element into a string.") + @SqlType("ARRAY(VARCHAR)") + public static Block clpGetStringArray(@SqlType(StandardTypes.VARCHAR) Slice jsonPath) + { + throw new UnsupportedOperationException("CLP_GET_STRING_ARRAY is a placeholder function without implementation."); + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlugin.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlugin.java index 985c707a32483..3cd0594a04d15 100644 --- a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlugin.java +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlugin.java @@ -16,6 +16,9 @@ import com.facebook.presto.spi.Plugin; import com.facebook.presto.spi.connector.ConnectorFactory; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import java.util.Set; public class ClpPlugin implements Plugin @@ -25,4 +28,10 @@ public Iterable getConnectorFactories() { return ImmutableList.of(new ClpConnectorFactory()); } + + @Override + public Set> getFunctions() + { + return ImmutableSet.of(ClpFunctions.class); + } } diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlanOptimizer.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/optimization/ClpComputePushDown.java similarity index 67% rename from presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlanOptimizer.java rename to presto-clp/src/main/java/com/facebook/presto/plugin/clp/optimization/ClpComputePushDown.java index f9b4f611fe7a3..2c216614af10f 100644 --- a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlanOptimizer.java +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/optimization/ClpComputePushDown.java @@ -11,9 +11,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.facebook.presto.plugin.clp; +package com.facebook.presto.plugin.clp.optimization; import com.facebook.airlift.log.Logger; +import com.facebook.presto.plugin.clp.ClpExpression; +import com.facebook.presto.plugin.clp.ClpTableHandle; +import com.facebook.presto.plugin.clp.ClpTableLayoutHandle; import com.facebook.presto.plugin.clp.split.filter.ClpSplitFilterProvider; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorPlanOptimizer; @@ -31,7 +34,6 @@ import com.facebook.presto.spi.relation.VariableReferenceExpression; import com.google.common.collect.ImmutableSet; -import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Optional; @@ -42,15 +44,15 @@ import static java.lang.String.format; import static java.util.Objects.requireNonNull; -public class ClpPlanOptimizer +public class ClpComputePushDown implements ConnectorPlanOptimizer { - private static final Logger log = Logger.get(ClpPlanOptimizer.class); + private static final Logger log = Logger.get(ClpComputePushDown.class); private final FunctionMetadataManager functionManager; private final StandardFunctionResolution functionResolution; private final ClpSplitFilterProvider splitFilterProvider; - public ClpPlanOptimizer(FunctionMetadataManager functionManager, StandardFunctionResolution functionResolution, ClpSplitFilterProvider splitFilterProvider) + public ClpComputePushDown(FunctionMetadataManager functionManager, StandardFunctionResolution functionResolution, ClpSplitFilterProvider splitFilterProvider) { this.functionManager = requireNonNull(functionManager, "functionManager is null"); this.functionResolution = requireNonNull(functionResolution, "functionResolution is null"); @@ -99,19 +101,27 @@ public PlanNode visitFilter(FilterNode node, RewriteContext context) if (!(node.getSource() instanceof TableScanNode)) { return node; } + + return processFilter(node, (TableScanNode) node.getSource()); + } + + private PlanNode processFilter(FilterNode filterNode, TableScanNode tableScanNode) + { hasVisitedFilter = true; - TableScanNode tableScanNode = (TableScanNode) node.getSource(); - Map assignments = new HashMap<>(tableScanNode.getAssignments()); TableHandle tableHandle = tableScanNode.getTable(); ClpTableHandle clpTableHandle = (ClpTableHandle) tableHandle.getConnectorHandle(); + String tableScope = CONNECTOR_NAME + "." + clpTableHandle.getSchemaTableName().toString(); - ClpExpression clpExpression = node.getPredicate().accept( + Map assignments = tableScanNode.getAssignments(); + + ClpExpression clpExpression = filterNode.getPredicate().accept( new ClpFilterToKqlConverter( functionResolution, functionManager, + assignments, splitFilterProvider.getColumnNames(tableScope)), - assignments); + null); Optional kqlQuery = clpExpression.getPushDownExpression(); Optional metadataSqlQuery = clpExpression.getMetadataSqlQuery(); Optional remainingPredicate = clpExpression.getRemainingExpression(); @@ -119,39 +129,47 @@ public PlanNode visitFilter(FilterNode node, RewriteContext context) // Perform required metadata filter checks before handling the KQL query (if kqlQuery // isn't present, we'll return early, skipping subsequent checks). splitFilterProvider.checkContainsRequiredFilters(ImmutableSet.of(tableScope), metadataSqlQuery.orElse("")); - if (metadataSqlQuery.isPresent()) { + boolean hasMetadataFilter = metadataSqlQuery.isPresent() && !metadataSqlQuery.get().isEmpty(); + if (hasMetadataFilter) { metadataSqlQuery = Optional.of(splitFilterProvider.remapSplitFilterPushDownExpression(tableScope, metadataSqlQuery.get())); - log.debug("Metadata SQL query: %s", metadataSqlQuery); + log.debug("Metadata SQL query: %s", metadataSqlQuery.get()); } - if (!kqlQuery.isPresent()) { - return node; - } - log.debug("KQL query: %s", kqlQuery); - - ClpTableLayoutHandle clpTableLayoutHandle = new ClpTableLayoutHandle(clpTableHandle, kqlQuery, metadataSqlQuery); - TableScanNode newTableScanNode = new TableScanNode( - tableScanNode.getSourceLocation(), - idAllocator.getNextId(), - new TableHandle( - tableHandle.getConnectorId(), - clpTableHandle, - tableHandle.getTransaction(), - Optional.of(clpTableLayoutHandle)), - tableScanNode.getOutputVariables(), - tableScanNode.getAssignments(), - tableScanNode.getTableConstraints(), - tableScanNode.getCurrentConstraint(), - tableScanNode.getEnforcedConstraint(), - tableScanNode.getCteMaterializationInfo()); - if (!remainingPredicate.isPresent()) { - return newTableScanNode; + if (kqlQuery.isPresent() || hasMetadataFilter) { + if (kqlQuery.isPresent()) { + log.debug("KQL query: %s", kqlQuery.get()); + } + + ClpTableLayoutHandle layoutHandle = new ClpTableLayoutHandle(clpTableHandle, kqlQuery, metadataSqlQuery); + TableHandle newTableHandle = new TableHandle( + tableHandle.getConnectorId(), + clpTableHandle, + tableHandle.getTransaction(), + Optional.of(layoutHandle)); + + tableScanNode = new TableScanNode( + tableScanNode.getSourceLocation(), + idAllocator.getNextId(), + newTableHandle, + tableScanNode.getOutputVariables(), + tableScanNode.getAssignments(), + tableScanNode.getTableConstraints(), + tableScanNode.getCurrentConstraint(), + tableScanNode.getEnforcedConstraint(), + tableScanNode.getCteMaterializationInfo()); } - return new FilterNode(node.getSourceLocation(), - idAllocator.getNextId(), - newTableScanNode, - remainingPredicate.get()); + if (remainingPredicate.isPresent()) { + // Not all predicate pushed down, need new FilterNode + return new FilterNode( + filterNode.getSourceLocation(), + idAllocator.getNextId(), + tableScanNode, + remainingPredicate.get()); + } + else { + return tableScanNode; + } } } } diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpFilterToKqlConverter.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/optimization/ClpFilterToKqlConverter.java similarity index 90% rename from presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpFilterToKqlConverter.java rename to presto-clp/src/main/java/com/facebook/presto/plugin/clp/optimization/ClpFilterToKqlConverter.java index 9c8b8ab1c300f..c82883d25ec17 100644 --- a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpFilterToKqlConverter.java +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/optimization/ClpFilterToKqlConverter.java @@ -11,13 +11,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.facebook.presto.plugin.clp; +package com.facebook.presto.plugin.clp.optimization; import com.facebook.presto.common.function.OperatorType; import com.facebook.presto.common.type.DecimalType; import com.facebook.presto.common.type.RowType; import com.facebook.presto.common.type.Type; import com.facebook.presto.common.type.VarcharType; +import com.facebook.presto.plugin.clp.ClpColumnHandle; +import com.facebook.presto.plugin.clp.ClpExpression; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.function.FunctionHandle; @@ -85,6 +87,7 @@ * constant. *
  • Dereferencing fields from row-typed variables.
  • *
  • Logical operators AND, OR, and NOT.
  • + *
  • CLP_GET_* UDFs.
  • * *

    * Supported translations for SQL include: @@ -95,35 +98,38 @@ * */ public class ClpFilterToKqlConverter - implements RowExpressionVisitor> + implements RowExpressionVisitor { private static final Set LOGICAL_BINARY_OPS_FILTER = ImmutableSet.of(EQUAL, NOT_EQUAL, LESS_THAN, LESS_THAN_OR_EQUAL, GREATER_THAN, GREATER_THAN_OR_EQUAL); private final StandardFunctionResolution standardFunctionResolution; private final FunctionMetadataManager functionMetadataManager; + private final Map assignments; private final Set metadataFilterColumns; public ClpFilterToKqlConverter( StandardFunctionResolution standardFunctionResolution, FunctionMetadataManager functionMetadataManager, + Map assignments, Set metadataFilterColumns) { this.standardFunctionResolution = requireNonNull(standardFunctionResolution, "standardFunctionResolution is null"); this.functionMetadataManager = requireNonNull(functionMetadataManager, "function metadata manager is null"); + this.assignments = requireNonNull(assignments, "assignments is null"); this.metadataFilterColumns = requireNonNull(metadataFilterColumns, "metadataFilterColumns is null"); } @Override - public ClpExpression visitCall(CallExpression node, Map context) + public ClpExpression visitCall(CallExpression node, Void context) { FunctionHandle functionHandle = node.getFunctionHandle(); if (standardFunctionResolution.isNotFunction(functionHandle)) { - return handleNot(node, context); + return handleNot(node); } if (standardFunctionResolution.isLikeFunction(functionHandle)) { - return handleLike(node, context); + return handleLike(node); } FunctionMetadata functionMetadata = functionMetadataManager.getFunctionMetadata(node.getFunctionHandle()); @@ -131,10 +137,10 @@ public ClpExpression visitCall(CallExpression node, Map context) + public ClpExpression visitConstant(ConstantExpression node, Void context) { return new ClpExpression(getLiteralString(node)); } @Override - public ClpExpression visitVariableReference(VariableReferenceExpression node, Map context) + public ClpExpression visitVariableReference(VariableReferenceExpression node, Void context) { - return new ClpExpression(getVariableName(node, context)); + return new ClpExpression(getVariableName(node)); } @Override - public ClpExpression visitSpecialForm(SpecialFormExpression node, Map context) + public ClpExpression visitSpecialForm(SpecialFormExpression node, Void context) { switch (node.getForm()) { case AND: - return handleAnd(node, context); + return handleAnd(node); case OR: - return handleOr(node, context); + return handleOr(node); case IN: - return handleIn(node, context); + return handleIn(node); case IS_NULL: - return handleIsNull(node, context); + return handleIsNull(node); case DEREFERENCE: - return handleDereference(node, context); + return handleDereference(node); default: return new ClpExpression(node); } } @Override - public ClpExpression visitExpression(RowExpression node, Map context) + public ClpExpression visitExpression(RowExpression node, Void context) { // For all other expressions, return the original expression return new ClpExpression(node); @@ -197,12 +203,11 @@ private String getLiteralString(ConstantExpression literal) * Retrieves the original column name from a variable reference. * * @param variable the variable reference expression - * @param context a mapping from variable references to column handles used for pushdown * @return the original column name as a string */ - private String getVariableName(VariableReferenceExpression variable, Map context) + private String getVariableName(VariableReferenceExpression variable) { - return ((ClpColumnHandle) context.get(variable)).getOriginalColumnName(); + return ((ClpColumnHandle) assignments.get(variable)).getOriginalColumnName(); } /** @@ -218,11 +223,10 @@ private String getVariableName(VariableReferenceExpression variable, Mapcol1 BETWEEN 0 AND 5 → col1 >= 0 AND col1 <= 5 * * @param node the BETWEEN call expression - * @param context a mapping from variable references to column handles used for pushdown * @return a ClpExpression containing either the equivalent KQL query, or the original * expression if it couldn't be translated */ - private ClpExpression handleBetween(CallExpression node, Map context) + private ClpExpression handleBetween(CallExpression node) { List arguments = node.getArguments(); if (arguments.size() != 3) { @@ -242,7 +246,7 @@ private ClpExpression handleBetween(CallExpression node, Map variableOpt = first.accept(this, context).getPushDownExpression(); + Optional variableOpt = first.accept(this, null).getPushDownExpression(); if (!variableOpt.isPresent()) { return new ClpExpression(node); } @@ -262,11 +266,10 @@ private ClpExpression handleBetween(CallExpression node, MapNOT (col1 = 5) → NOT col1: 5 * * @param node the NOT call expression - * @param context a mapping from variable references to column handles used for pushdown * @return a ClpExpression containing either the equivalent KQL query, or the original * expression if it couldn't be translated */ - private ClpExpression handleNot(CallExpression node, Map context) + private ClpExpression handleNot(CallExpression node) { if (node.getArguments().size() != 1) { throw new PrestoException(CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION, @@ -274,7 +277,7 @@ private ClpExpression handleNot(CallExpression node, Mapcol1 LIKE 'a_bc%' → col1: "a?bc*" * * @param node the LIKE call expression - * @param context a mapping from variable references to column handles used for pushdown * @return a ClpExpression containing either the equivalent KQL query, or the original * expression if it couldn't be translated */ - private ClpExpression handleLike(CallExpression node, Map context) + private ClpExpression handleLike(CallExpression node) { if (node.getArguments().size() != 2) { throw new PrestoException(CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION, "LIKE operator must have exactly two arguments. Received: " + node); } - ClpExpression variable = node.getArguments().get(0).accept(this, context); + ClpExpression variable = node.getArguments().get(0).accept(this, null); if (!variable.getPushDownExpression().isPresent()) { return new ClpExpression(node); } @@ -347,11 +349,10 @@ else if (argument instanceof CallExpression) { * * @param operator the binary operator (e.g., EQUAL, NOT_EQUAL) * @param node the call expression representing the binary operation - * @param context a mapping from variable references to column handles used for pushdown * @return a ClpExpression containing either the equivalent KQL query, or the original * expression if it couldn't be translated */ - private ClpExpression handleLogicalBinary(OperatorType operator, CallExpression node, Map context) + private ClpExpression handleLogicalBinary(OperatorType operator, CallExpression node) { if (node.getArguments().size() != 2) { throw new PrestoException(CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION, @@ -360,18 +361,18 @@ private ClpExpression handleLogicalBinary(OperatorType operator, CallExpression RowExpression left = node.getArguments().get(0); RowExpression right = node.getArguments().get(1); - Optional maybeLeftSubstring = tryInterpretSubstringEquality(operator, left, right, context); + Optional maybeLeftSubstring = tryInterpretSubstringEquality(operator, left, right); if (maybeLeftSubstring.isPresent()) { return maybeLeftSubstring.get(); } - Optional maybeRightSubstring = tryInterpretSubstringEquality(operator, right, left, context); + Optional maybeRightSubstring = tryInterpretSubstringEquality(operator, right, left); if (maybeRightSubstring.isPresent()) { return maybeRightSubstring.get(); } - ClpExpression leftExpression = left.accept(this, context); - ClpExpression rightExpression = right.accept(this, context); + ClpExpression leftExpression = left.accept(this, null); + ClpExpression rightExpression = right.accept(this, null); Optional leftPushDownExpression = leftExpression.getPushDownExpression(); Optional rightPushDownExpression = rightExpression.getPushDownExpression(); if (!leftPushDownExpression.isPresent() || !rightPushDownExpression.isPresent()) { @@ -473,14 +474,12 @@ else if (LOGICAL_BINARY_OPS_FILTER.contains(operator) && !(literalType instanceo * @param operator the comparison operator (should be EQUAL) * @param possibleSubstring the left or right expression, possibly a SUBSTR call * @param possibleLiteral the opposite expression, possibly a string constant - * @param context a mapping from variable references to column handles used for pushdown * @return an Optional containing a ClpExpression with the equivalent KQL query */ private Optional tryInterpretSubstringEquality( OperatorType operator, RowExpression possibleSubstring, - RowExpression possibleLiteral, - Map context) + RowExpression possibleLiteral) { if (!operator.equals(EQUAL)) { return Optional.empty(); @@ -491,7 +490,7 @@ private Optional tryInterpretSubstringEquality( return Optional.empty(); } - Optional maybeSubstringCall = parseSubstringCall((CallExpression) possibleSubstring, context); + Optional maybeSubstringCall = parseSubstringCall((CallExpression) possibleSubstring); if (!maybeSubstringCall.isPresent()) { return Optional.empty(); } @@ -504,10 +503,9 @@ private Optional tryInterpretSubstringEquality( * Parses a SUBSTR(x, start [, length]) call into a SubstrInfo object if valid. * * @param callExpression the call expression to inspect - * @param context a mapping from variable references to column handles used for pushdown * @return an Optional containing SubstrInfo if the expression is a valid SUBSTR call */ - private Optional parseSubstringCall(CallExpression callExpression, Map context) + private Optional parseSubstringCall(CallExpression callExpression) { FunctionMetadata functionMetadata = functionMetadataManager.getFunctionMetadata(callExpression.getFunctionHandle()); String functionName = functionMetadata.getName().getObjectName(); @@ -520,7 +518,7 @@ private Optional parseSubstringCall(CallExpression callExpression, M return Optional.empty(); } - ClpExpression variable = callExpression.getArguments().get(0).accept(this, context); + ClpExpression variable = callExpression.getArguments().get(0).accept(this, null); if (!variable.getPushDownExpression().isPresent()) { return Optional.empty(); } @@ -658,10 +656,9 @@ private Optional parseLengthLiteral(RowExpression lengthExpression, Str * Example: col1 = 5 AND col2 = 'abc'(col1: 5 AND col2: "abc") * * @param node the AND special form expression - * @param context a mapping from variable references to column handles used for pushdown * @return a ClpExpression containing the KQL query and any remaining sub-expressions */ - private ClpExpression handleAnd(SpecialFormExpression node, Map context) + private ClpExpression handleAnd(SpecialFormExpression node) { StringBuilder metadataQueryBuilder = new StringBuilder(); metadataQueryBuilder.append("("); @@ -671,7 +668,7 @@ private ClpExpression handleAnd(SpecialFormExpression node, Mapcol1 = 5 OR col1 = 10 → (col1: 5 OR col1: 10) * * @param node the OR special form expression - * @param context a mapping from variable references to column handles used for pushdown * @return a ClpExpression containing either the equivalent KQL query, or the original * expression if it couldn't be fully translated */ - private ClpExpression handleOr(SpecialFormExpression node, Map context) + private ClpExpression handleOr(SpecialFormExpression node) { StringBuilder metadataQueryBuilder = new StringBuilder(); metadataQueryBuilder.append("("); @@ -730,7 +726,7 @@ private ClpExpression handleOr(SpecialFormExpression node, Mapcol1 IN (1, 2, 3) → (col1: 1 OR col1: 2 OR col1: 3) * * @param node the IN special form expression - * @param context a mapping from variable references to column handles used for pushdown * @return a ClpExpression containing either the equivalent KQL query, or the original * expression if it couldn't be translated */ - private ClpExpression handleIn(SpecialFormExpression node, Map context) + private ClpExpression handleIn(SpecialFormExpression node) { - ClpExpression variable = node.getArguments().get(0).accept(this, context); + ClpExpression variable = node.getArguments().get(0).accept(this, null); if (!variable.getPushDownExpression().isPresent()) { return new ClpExpression(node); } @@ -801,18 +796,17 @@ private ClpExpression handleIn(SpecialFormExpression node, Mapcol1 IS NULL → NOT col1: * * * @param node the IS_NULL special form expression - * @param context a mapping from variable references to column handles used for pushdown * @return a ClpExpression containing either the equivalent KQL query, or the original * expression if it couldn't be translated */ - private ClpExpression handleIsNull(SpecialFormExpression node, Map context) + private ClpExpression handleIsNull(SpecialFormExpression node) { if (node.getArguments().size() != 1) { throw new PrestoException(CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION, "IS NULL operator must have exactly one argument. Received: " + node); } - ClpExpression expression = node.getArguments().get(0).accept(this, context); + ClpExpression expression = node.getArguments().get(0).accept(this, null); if (!expression.getPushDownExpression().isPresent()) { return new ClpExpression(node); } @@ -830,14 +824,13 @@ private ClpExpression handleIsNull(SpecialFormExpression node, Map context) + private ClpExpression handleDereference(RowExpression expression) { if (expression instanceof VariableReferenceExpression) { - return expression.accept(this, context); + return expression.accept(this, null); } if (!(expression instanceof SpecialFormExpression)) { @@ -877,7 +870,7 @@ private ClpExpression handleDereference(RowExpression expression, Map getLogicalPlanOptimizers() { - return ImmutableSet.of(); + return ImmutableSet.of(new ClpUdfRewriter(functionManager)); } @Override public Set getPhysicalPlanOptimizers() { - return ImmutableSet.of(new ClpPlanOptimizer(functionManager, functionResolution, splitFilterProvider)); + return ImmutableSet.of(new ClpComputePushDown(functionManager, functionResolution, splitFilterProvider)); } } diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/optimization/ClpUdfRewriter.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/optimization/ClpUdfRewriter.java new file mode 100644 index 0000000000000..aff40041e2d58 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/optimization/ClpUdfRewriter.java @@ -0,0 +1,306 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp.optimization; + +import com.facebook.presto.plugin.clp.ClpColumnHandle; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorPlanOptimizer; +import com.facebook.presto.spi.ConnectorPlanRewriter; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.VariableAllocator; +import com.facebook.presto.spi.function.FunctionMetadataManager; +import com.facebook.presto.spi.plan.Assignments; +import com.facebook.presto.spi.plan.FilterNode; +import com.facebook.presto.spi.plan.PlanNode; +import com.facebook.presto.spi.plan.PlanNodeIdAllocator; +import com.facebook.presto.spi.plan.ProjectNode; +import com.facebook.presto.spi.plan.TableScanNode; +import com.facebook.presto.spi.relation.CallExpression; +import com.facebook.presto.spi.relation.ConstantExpression; +import com.facebook.presto.spi.relation.RowExpression; +import com.facebook.presto.spi.relation.SpecialFormExpression; +import com.facebook.presto.spi.relation.VariableReferenceExpression; +import io.airlift.slice.Slice; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static com.facebook.presto.plugin.clp.ClpErrorCode.CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION; +import static com.facebook.presto.spi.ConnectorPlanRewriter.rewriteWith; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static java.util.Objects.requireNonNull; + +/** + * Utility for rewriting CLP UDFs (e.g., CLP_GET_*) in {@link RowExpression} trees. + *

    + * Traverses a query plan and rewrites calls to CLP_GET_* UDFs into + * {@link VariableReferenceExpression}s with meaningful names derived from their arguments. + *

    + * This enables querying fields that are not part of the original table schema but are available + * in CLP. + */ +public final class ClpUdfRewriter + implements ConnectorPlanOptimizer +{ + private final FunctionMetadataManager functionManager; + + public ClpUdfRewriter(FunctionMetadataManager functionManager) + { + this.functionManager = requireNonNull(functionManager, "functionManager is null"); + } + + @Override + public PlanNode optimize(PlanNode maxSubplan, ConnectorSession session, VariableAllocator allocator, PlanNodeIdAllocator idAllocator) + { + return rewriteWith(new Rewriter(idAllocator, allocator, collectExistingScanAssignments(maxSubplan)), maxSubplan); + } + + /** + * Collects all existing variable assignments from {@link TableScanNode} instances that map + * {@link ColumnHandle}s to {@link VariableReferenceExpression}s. + *

    + * This method traverses the given plan subtree, visiting the {@link TableScanNode} and + * extracting its assignments. The resulting map allows tracking which scan-level variables + * already exist for specific columns, so that subsequent optimizer passes (e.g., CLP UDF + * rewriting) can reuse them instead of creating duplicates. + *

    + * The key of the returned map is the {@link ColumnHandle}, and the value is the + * {@link VariableReferenceExpression} assigned to it in the scan node. + * + * @param root the root {@link PlanNode} of the plan subtree + * @return a map from column handle to its correh hsponding scan-level variable + */ + private Map collectExistingScanAssignments(PlanNode root) + { + Map map = new HashMap<>(); + root.getSources().forEach(source -> map.putAll(collectExistingScanAssignments(source))); + if (root instanceof TableScanNode) { + TableScanNode scan = (TableScanNode) root; + scan.getAssignments().forEach((var, handle) -> { + map.putIfAbsent(handle, var); + }); + } + return map; + } + + private class Rewriter + extends ConnectorPlanRewriter + { + private final PlanNodeIdAllocator idAllocator; + private final VariableAllocator variableAllocator; + private final Map globalColumnVarMap; + + public Rewriter( + PlanNodeIdAllocator idAllocator, + VariableAllocator variableAllocator, + Map globalColumnVarMap) + { + this.idAllocator = idAllocator; + this.variableAllocator = variableAllocator; + this.globalColumnVarMap = globalColumnVarMap; + } + + @Override + public PlanNode visitProject(ProjectNode node, RewriteContext context) + { + Assignments.Builder newAssignments = Assignments.builder(); + for (Map.Entry entry : node.getAssignments().getMap().entrySet()) { + newAssignments.put( + entry.getKey(), + rewriteClpUdfs(entry.getValue(), functionManager, variableAllocator)); + } + + PlanNode newSource = rewritePlanSubtree(node.getSource()); + return new ProjectNode(node.getSourceLocation(), idAllocator.getNextId(), newSource, newAssignments.build(), node.getLocality()); + } + + @Override + public PlanNode visitFilter(FilterNode node, RewriteContext context) + { + return buildNewFilterNode(node); + } + + /** + * Rewrites CLP_GET_* UDFs in a {@link RowExpression}, collecting each + * resulting variable into the given map along with its associated {@link ColumnHandle}. + *

    + * Each CLP_GET_* UDF must take a single constant string argument, which is + * used to construct the name of the variable reference (e.g. + * CLP_GET_STRING('foo') becomes a variable name foo). Invalid + * usages (e.g., non-constant arguments) will throw a {@link PrestoException}. + * + * @param expression the input expression to analyze and possibly rewrite + * @param functionManager function manager used to resolve function metadata + * @param variableAllocator variable allocator used to create new variable references + * @return a possibly rewritten {@link RowExpression} with CLP_GET_* calls + * replaced + */ + private RowExpression rewriteClpUdfs( + RowExpression expression, + FunctionMetadataManager functionManager, + VariableAllocator variableAllocator) + { + // Handle CLP_GET_* function calls + if (expression instanceof CallExpression) { + CallExpression call = (CallExpression) expression; + String functionName = functionManager.getFunctionMetadata(call.getFunctionHandle()).getName().getObjectName().toUpperCase(); + + if (functionName.startsWith("CLP_GET_")) { + if (call.getArguments().size() != 1 || !(call.getArguments().get(0) instanceof ConstantExpression)) { + throw new PrestoException(CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION, + "CLP_GET_* UDF must have a single constant string argument"); + } + + ConstantExpression constant = (ConstantExpression) call.getArguments().get(0); + String jsonPath = ((Slice) constant.getValue()).toStringUtf8(); + ClpColumnHandle targetHandle = new ClpColumnHandle(jsonPath, call.getType()); + + // Check if a variable with the same ClpColumnHandle already exists + VariableReferenceExpression existingVar = globalColumnVarMap.get(targetHandle); + if (existingVar != null) { + return existingVar; + } + + VariableReferenceExpression newVar = variableAllocator.newVariable( + expression.getSourceLocation(), + encodeJsonPath(jsonPath), + call.getType()); + globalColumnVarMap.put(targetHandle, newVar); + return newVar; + } + + // Recurse into arguments + List rewrittenArgs = call.getArguments().stream() + .map(arg -> rewriteClpUdfs(arg, functionManager, variableAllocator)) + .collect(toImmutableList()); + + return new CallExpression(call.getDisplayName(), call.getFunctionHandle(), call.getType(), rewrittenArgs); + } + + // Handle special forms (e.g., AND, OR, etc.) + if (expression instanceof SpecialFormExpression) { + SpecialFormExpression special = (SpecialFormExpression) expression; + + List rewrittenArgs = special.getArguments().stream() + .map(arg -> rewriteClpUdfs(arg, functionManager, variableAllocator)) + .collect(toImmutableList()); + + return new SpecialFormExpression(special.getSourceLocation(), special.getForm(), special.getType(), rewrittenArgs); + } + + return expression; + } + + /** + * Recursively rewrites the subtree of a plan node to include any new variables produced by + * CLP UDF rewrites. + * + * @param node the plan node to rewrite + * @return the rewritten plan node + */ + private PlanNode rewritePlanSubtree(PlanNode node) + { + if (node instanceof TableScanNode) { + return buildNewTableScanNode((TableScanNode) node); + } + else if (node instanceof FilterNode) { + return buildNewFilterNode((FilterNode) node); + } + + List rewrittenChildren = node.getSources().stream() + .map(source -> rewritePlanSubtree(source)) + .collect(toImmutableList()); + + return node.replaceChildren(rewrittenChildren); + } + + /** + * Encodes a JSON path into a valid variable name by replacing uppercase letters with + * "_ux", dots with "_dot_", and underscores with "_und_". + *

    + * This is only used internally to ensure that the variable names generated from JSON paths + * are valid and do not conflict with other variable names in the expression. + * + * @param jsonPath the JSON path to encode + * @return the encoded variable name + */ + private String encodeJsonPath(String jsonPath) + { + StringBuilder sb = new StringBuilder(); + for (char c : jsonPath.toCharArray()) { + if (Character.isUpperCase(c)) { + sb.append("_ux").append(Character.toLowerCase(c)); + } + else if (c == '.') { + sb.append("_dot_"); + } + else if (c == '_') { + sb.append("_und_"); + } + else { + sb.append(c); + } + } + return sb.toString(); + } + + /** + * Builds a new {@link TableScanNode} that includes additional + * {@link VariableReferenceExpression}s and {@link ColumnHandle}s for rewritten CLP UDFs. + * + * @param node the original table scan node + * @return the updated table scan node + */ + private TableScanNode buildNewTableScanNode(TableScanNode node) + { + Set outputVars = new LinkedHashSet<>(node.getOutputVariables()); + Map newAssignments = new HashMap<>(node.getAssignments()); + + // Add any missing variables for known handles + globalColumnVarMap.forEach((handle, var) -> { + outputVars.add(var); + newAssignments.put(var, handle); + }); + + return new TableScanNode( + node.getSourceLocation(), + idAllocator.getNextId(), + node.getTable(), + new ArrayList<>(outputVars), + newAssignments, + node.getTableConstraints(), + node.getCurrentConstraint(), + node.getEnforcedConstraint(), + node.getCteMaterializationInfo()); + } + + /** + * Builds a new {@link FilterNode} with its predicate rewritten to replace CLP UDF calls. + * + * @param node the original filter node + * @return the updated filter node + */ + private FilterNode buildNewFilterNode(FilterNode node) + { + RowExpression newPredicate = rewriteClpUdfs(node.getPredicate(), functionManager, variableAllocator); + PlanNode newSource = rewritePlanSubtree(node.getSource()); + return new FilterNode(node.getSourceLocation(), idAllocator.getNextId(), newSource, newPredicate); + } + } +} diff --git a/presto-clp/src/test/java/com/facebook/presto/plugin/clp/ClpMetadataDbSetUp.java b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/ClpMetadataDbSetUp.java index 91fc3c780d941..d1d0ee6964c8e 100644 --- a/presto-clp/src/test/java/com/facebook/presto/plugin/clp/ClpMetadataDbSetUp.java +++ b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/ClpMetadataDbSetUp.java @@ -226,6 +226,11 @@ static final class DbHandle { this.dbPath = dbPath; } + + public String getDbPath() + { + return dbPath; + } } static final class ArchivesTableRow diff --git a/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpFilterToKql.java b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpFilterToKql.java index 9e9e9db13a665..979c9cbfbcf69 100644 --- a/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpFilterToKql.java +++ b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpFilterToKql.java @@ -13,6 +13,7 @@ */ package com.facebook.presto.plugin.clp; +import com.facebook.presto.plugin.clp.optimization.ClpFilterToKqlConverter; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.relation.RowExpression; import com.facebook.presto.spi.relation.VariableReferenceExpression; @@ -289,7 +290,10 @@ private void testPushDown(SessionHolder sessionHolder, String sql, String expect } } - private ClpExpression tryPushDown(String sqlExpression, SessionHolder sessionHolder, Set metadataFilterColumns) + private ClpExpression tryPushDown( + String sqlExpression, + SessionHolder sessionHolder, + Set metadataFilterColumns) { RowExpression pushDownExpression = getRowExpression(sqlExpression, sessionHolder); Map assignments = new HashMap<>(variableToColumnHandleMap); @@ -297,11 +301,16 @@ private ClpExpression tryPushDown(String sqlExpression, SessionHolder sessionHol new ClpFilterToKqlConverter( standardFunctionResolution, functionAndTypeManager, + assignments, metadataFilterColumns), - assignments); + null); } - private void testFilter(ClpExpression clpExpression, String expectedKqlExpression, String expectedRemainingExpression, SessionHolder sessionHolder) + private void testFilter( + ClpExpression clpExpression, + String expectedKqlExpression, + String expectedRemainingExpression, + SessionHolder sessionHolder) { Optional kqlExpression = clpExpression.getPushDownExpression(); Optional remainingExpression = clpExpression.getRemainingExpression(); diff --git a/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpQueryBase.java b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpQueryBase.java index 87cc35d6d2b4e..ee259d67e17c9 100644 --- a/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpQueryBase.java +++ b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpQueryBase.java @@ -22,6 +22,7 @@ import com.facebook.presto.metadata.CatalogManager; import com.facebook.presto.metadata.ColumnPropertyManager; import com.facebook.presto.metadata.FunctionAndTypeManager; +import com.facebook.presto.metadata.FunctionExtractor; import com.facebook.presto.metadata.Metadata; import com.facebook.presto.metadata.MetadataManager; import com.facebook.presto.metadata.SchemaPropertyManager; @@ -64,6 +65,9 @@ public class TestClpQueryBase { protected static final FunctionAndTypeManager functionAndTypeManager = createTestFunctionAndTypeManager(); + static { + functionAndTypeManager.registerBuiltInFunctions(FunctionExtractor.extractFunctions(ClpFunctions.class)); + } protected static final StandardFunctionResolution standardFunctionResolution = new FunctionResolution(functionAndTypeManager.getFunctionAndTypeResolver()); protected static final Metadata metadata = new MetadataManager( functionAndTypeManager, @@ -79,10 +83,10 @@ public class TestClpQueryBase protected static final ClpColumnHandle city = new ClpColumnHandle( "city", RowType.from(ImmutableList.of( + RowType.field("Name", VARCHAR), RowType.field("Region", RowType.from(ImmutableList.of( RowType.field("Id", BIGINT), - RowType.field("Name", VARCHAR)))), - RowType.field("Name", VARCHAR)))); + RowType.field("Name", VARCHAR))))))); protected static final ClpColumnHandle fare = new ClpColumnHandle("fare", DOUBLE); protected static final ClpColumnHandle isHoliday = new ClpColumnHandle("isHoliday", BOOLEAN); protected static final Map variableToColumnHandleMap = @@ -116,11 +120,6 @@ protected RowExpression getRowExpression(String sqlExpression, SessionHolder ses return toRowExpression(expression(sqlExpression), typeProvider, sessionHolder.getSession()); } - protected RowExpression getRowExpression(String sqlExpression, TypeProvider typeProvider, SessionHolder sessionHolder) - { - return toRowExpression(expression(sqlExpression), typeProvider, sessionHolder.getSession()); - } - protected static class SessionHolder { private final ConnectorSession connectorSession; diff --git a/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpUdfRewriter.java b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpUdfRewriter.java new file mode 100644 index 0000000000000..b82866f3d8dd9 --- /dev/null +++ b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpUdfRewriter.java @@ -0,0 +1,324 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.Session; +import com.facebook.presto.common.transaction.TransactionId; +import com.facebook.presto.common.type.ArrayType; +import com.facebook.presto.cost.PlanNodeStatsEstimate; +import com.facebook.presto.cost.StatsAndCosts; +import com.facebook.presto.cost.StatsProvider; +import com.facebook.presto.metadata.FunctionAndTypeManager; +import com.facebook.presto.metadata.Metadata; +import com.facebook.presto.plugin.clp.optimization.ClpComputePushDown; +import com.facebook.presto.plugin.clp.optimization.ClpUdfRewriter; +import com.facebook.presto.plugin.clp.split.filter.ClpMySqlSplitFilterProvider; +import com.facebook.presto.plugin.clp.split.filter.ClpSplitFilterProvider; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.spi.VariableAllocator; +import com.facebook.presto.spi.WarningCollector; +import com.facebook.presto.spi.plan.PlanNode; +import com.facebook.presto.spi.plan.PlanNodeIdAllocator; +import com.facebook.presto.spi.plan.TableScanNode; +import com.facebook.presto.spi.relation.VariableReferenceExpression; +import com.facebook.presto.sql.planner.Plan; +import com.facebook.presto.sql.planner.assertions.MatchResult; +import com.facebook.presto.sql.planner.assertions.Matcher; +import com.facebook.presto.sql.planner.assertions.PlanAssert; +import com.facebook.presto.sql.planner.assertions.PlanMatchPattern; +import com.facebook.presto.sql.planner.assertions.SymbolAliases; +import com.facebook.presto.sql.relational.FunctionResolution; +import com.facebook.presto.sql.tree.SymbolReference; +import com.facebook.presto.testing.LocalQueryRunner; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.commons.math3.util.Pair; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static com.facebook.presto.common.Utils.checkState; +import static com.facebook.presto.common.type.BigintType.BIGINT; +import static com.facebook.presto.common.type.DoubleType.DOUBLE; +import static com.facebook.presto.common.type.VarcharType.VARCHAR; +import static com.facebook.presto.metadata.FunctionExtractor.extractFunctions; +import static com.facebook.presto.plugin.clp.ClpMetadataDbSetUp.ARCHIVES_STORAGE_DIRECTORY_BASE; +import static com.facebook.presto.plugin.clp.ClpMetadataDbSetUp.METADATA_DB_PASSWORD; +import static com.facebook.presto.plugin.clp.ClpMetadataDbSetUp.METADATA_DB_TABLE_PREFIX; +import static com.facebook.presto.plugin.clp.ClpMetadataDbSetUp.METADATA_DB_URL_TEMPLATE; +import static com.facebook.presto.plugin.clp.ClpMetadataDbSetUp.METADATA_DB_USER; +import static com.facebook.presto.plugin.clp.ClpMetadataDbSetUp.getDbHandle; +import static com.facebook.presto.plugin.clp.ClpMetadataDbSetUp.setupMetadata; +import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.Boolean; +import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.ClpString; +import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.Float; +import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.Integer; +import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.VarString; +import static com.facebook.presto.sql.planner.assertions.MatchResult.NO_MATCH; +import static com.facebook.presto.sql.planner.assertions.MatchResult.match; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.filter; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.node; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.project; +import static com.facebook.presto.testing.TestingSession.testSessionBuilder; +import static java.lang.String.format; + +@Test(singleThreaded = true) +public class TestClpUdfRewriter + extends TestClpQueryBase +{ + private final Session defaultSession = testSessionBuilder() + .setCatalog("clp") + .setSchema(ClpMetadata.DEFAULT_SCHEMA_NAME) + .build(); + + private ClpMetadataDbSetUp.DbHandle dbHandle; + ClpTableHandle table; + + private LocalQueryRunner localQueryRunner; + private FunctionAndTypeManager functionAndTypeManager; + private FunctionResolution functionResolution; + private ClpSplitFilterProvider splitFilterProvider; + private PlanNodeIdAllocator planNodeIdAllocator; + private VariableAllocator variableAllocator; + + @BeforeMethod + public void setUp() + { + dbHandle = getDbHandle("metadata_query_testdb"); + final String tableName = "test"; + final String tablePath = ARCHIVES_STORAGE_DIRECTORY_BASE + tableName; + table = new ClpTableHandle(new SchemaTableName("default", tableName), tablePath); + + setupMetadata(dbHandle, + ImmutableMap.of( + tableName, + ImmutableList.of( + new Pair<>("city.Name", ClpString), + new Pair<>("city.Region.Id", Integer), + new Pair<>("city.Region.Name", VarString), + new Pair<>("fare", Float), + new Pair<>("isHoliday", Boolean)))); + + localQueryRunner = new LocalQueryRunner(defaultSession); + localQueryRunner.createCatalog("clp", new ClpConnectorFactory(), ImmutableMap.of( + "clp.metadata-db-url", format(METADATA_DB_URL_TEMPLATE, dbHandle.getDbPath()), + "clp.metadata-db-user", METADATA_DB_USER, + "clp.metadata-db-password", METADATA_DB_PASSWORD, + "clp.metadata-table-prefix", METADATA_DB_TABLE_PREFIX)); + localQueryRunner.getMetadata().registerBuiltInFunctions(extractFunctions(new ClpPlugin().getFunctions())); + functionAndTypeManager = localQueryRunner.getMetadata().getFunctionAndTypeManager(); + functionResolution = new FunctionResolution(functionAndTypeManager.getFunctionAndTypeResolver()); + splitFilterProvider = new ClpMySqlSplitFilterProvider(new ClpConfig()); + planNodeIdAllocator = new PlanNodeIdAllocator(); + variableAllocator = new VariableAllocator(); + } + + @AfterMethod + public void tearDown() + { + localQueryRunner.close(); + ClpMetadataDbSetUp.tearDown(dbHandle); + } + + @Test + public void testScanFilter() + { + TransactionId transactionId = localQueryRunner.getTransactionManager().beginTransaction(false); + Session session = testSessionBuilder().setCatalog("clp").setSchema("default").setTransactionId(transactionId).build(); + + Plan plan = localQueryRunner.createPlan( + session, + "SELECT * FROM test WHERE CLP_GET_BIGINT('user_id') = 0 AND CLP_GET_DOUBLE('fare') < 50.0 AND CLP_GET_STRING('city') = 'SF' AND " + + "CLP_GET_BOOL('isHoliday') = true AND cardinality(CLP_GET_STRING_ARRAY('tags')) > 0 AND LOWER(city.Name) = 'beijing'", + WarningCollector.NOOP); + ClpUdfRewriter udfRewriter = new ClpUdfRewriter(functionAndTypeManager); + PlanNode optimizedPlan = udfRewriter.optimize(plan.getRoot(), session.toConnectorSession(), variableAllocator, planNodeIdAllocator); + ClpComputePushDown optimizer = new ClpComputePushDown(functionAndTypeManager, functionResolution, splitFilterProvider); + optimizedPlan = optimizer.optimize(optimizedPlan, session.toConnectorSession(), variableAllocator, planNodeIdAllocator); + + PlanAssert.assertPlan( + session, + localQueryRunner.getMetadata(), + (node, sourceStats, lookup, s, types) -> PlanNodeStatsEstimate.unknown(), + new Plan(optimizedPlan, plan.getTypes(), StatsAndCosts.empty()), + anyTree( + filter( + expression("lower(city.Name) = 'beijing' AND cardinality(tags) > 0"), + ClpTableScanMatcher.clpTableScanPattern( + new ClpTableLayoutHandle( + table, + Optional.of( + "(((user_id: 0 AND fare < 50.0) AND (city: \"SF\" AND isHoliday: true)))"), + Optional.empty()), + ImmutableSet.of( + city, + fare, + isHoliday, + new ClpColumnHandle("user_id", BIGINT), + new ClpColumnHandle("city", VARCHAR), + new ClpColumnHandle("tags", new ArrayType(VARCHAR))))))); + } + + @Test + public void testScanProject() + { + TransactionId transactionId = localQueryRunner.getTransactionManager().beginTransaction(false); + Session session = testSessionBuilder().setCatalog("clp").setSchema("default").setTransactionId(transactionId).build(); + + Plan plan = localQueryRunner.createPlan( + session, + "SELECT CLP_GET_BIGINT('user_id'), CLP_GET_DOUBLE('fare'), CLP_GET_STRING('user'), " + + "CLP_GET_BOOL('isHoliday'), CLP_GET_STRING_ARRAY('tags'), city.Name FROM test", + WarningCollector.NOOP); + ClpUdfRewriter udfRewriter = new ClpUdfRewriter(functionAndTypeManager); + PlanNode optimizedPlan = udfRewriter.optimize(plan.getRoot(), session.toConnectorSession(), variableAllocator, planNodeIdAllocator); + ClpComputePushDown optimizer = new ClpComputePushDown(functionAndTypeManager, functionResolution, splitFilterProvider); + optimizedPlan = optimizer.optimize(optimizedPlan, session.toConnectorSession(), variableAllocator, planNodeIdAllocator); + + PlanAssert.assertPlan( + session, + localQueryRunner.getMetadata(), + (node, sourceStats, lookup, s, types) -> PlanNodeStatsEstimate.unknown(), + new Plan(optimizedPlan, plan.getTypes(), StatsAndCosts.empty()), + anyTree( + project( + ImmutableMap.of( + "clp_get_bigint", + PlanMatchPattern.expression("user_und_id"), + "clp_get_double", + PlanMatchPattern.expression("fare"), + "clp_get_string", + PlanMatchPattern.expression("user"), + "clp_get_bool", + PlanMatchPattern.expression("is_uxholiday"), + "clp_get_string_array", + PlanMatchPattern.expression("tags"), + "expr", + PlanMatchPattern.expression("city.Name")), + ClpTableScanMatcher.clpTableScanPattern( + new ClpTableLayoutHandle( + table, + Optional.empty(), + Optional.empty()), + ImmutableSet.of( + new ClpColumnHandle("user_id", BIGINT), + new ClpColumnHandle("fare", DOUBLE), + new ClpColumnHandle("user", VARCHAR), + isHoliday, + new ClpColumnHandle("tags", new ArrayType(VARCHAR)), + city))))); + } + + @Test + public void testScanProjectFilter() + { + TransactionId transactionId = localQueryRunner.getTransactionManager().beginTransaction(false); + Session session = testSessionBuilder().setCatalog("clp").setSchema("default").setTransactionId(transactionId).build(); + + Plan plan = localQueryRunner.createPlan( + session, + "SELECT LOWER(city.Name), LOWER(CLP_GET_STRING('city.Name')) from test WHERE CLP_GET_BIGINT('user_id') = 0 AND LOWER(city.Name) = 'beijing'", + WarningCollector.NOOP); + ClpUdfRewriter udfRewriter = new ClpUdfRewriter(functionAndTypeManager); + PlanNode optimizedPlan = udfRewriter.optimize(plan.getRoot(), session.toConnectorSession(), variableAllocator, planNodeIdAllocator); + ClpComputePushDown optimizer = new ClpComputePushDown(functionAndTypeManager, functionResolution, splitFilterProvider); + optimizedPlan = optimizer.optimize(optimizedPlan, session.toConnectorSession(), variableAllocator, planNodeIdAllocator); + + PlanAssert.assertPlan( + session, + localQueryRunner.getMetadata(), + (node, sourceStats, lookup, s, types) -> PlanNodeStatsEstimate.unknown(), + new Plan(optimizedPlan, plan.getTypes(), StatsAndCosts.empty()), + anyTree( + project( + ImmutableMap.of( + "lower", + PlanMatchPattern.expression("lower(city.Name)"), + "lower_0", + PlanMatchPattern.expression("lower(city_dot__uxname)")), + filter( + expression("lower(city.Name) = 'beijing'"), + ClpTableScanMatcher.clpTableScanPattern( + new ClpTableLayoutHandle(table, Optional.of("(user_id: 0)"), Optional.empty()), + ImmutableSet.of( + new ClpColumnHandle("city.Name", VARCHAR), + new ClpColumnHandle("user_id", BIGINT), + city)))))); + } + + private static final class ClpTableScanMatcher + implements Matcher + { + private final ClpTableLayoutHandle expectedLayoutHandle; + private final Set expectedColumns; + + private ClpTableScanMatcher(ClpTableLayoutHandle expectedLayoutHandle, Set expectedColumns) + { + this.expectedLayoutHandle = expectedLayoutHandle; + this.expectedColumns = expectedColumns; + } + + static PlanMatchPattern clpTableScanPattern(ClpTableLayoutHandle layoutHandle, Set columns) + { + return node(TableScanNode.class).with(new ClpTableScanMatcher(layoutHandle, columns)); + } + + @Override + public boolean shapeMatches(PlanNode node) + { + return node instanceof TableScanNode; + } + + @Override + public MatchResult detailMatches( + PlanNode node, + StatsProvider stats, + Session session, + Metadata metadata, + SymbolAliases symbolAliases) + { + checkState(shapeMatches(node), "Plan testing framework error: shapeMatches returned false"); + TableScanNode tableScanNode = (TableScanNode) node; + ClpTableLayoutHandle actualLayoutHandle = (ClpTableLayoutHandle) tableScanNode.getTable().getLayout().get(); + + // Check layout handle + if (!expectedLayoutHandle.equals(actualLayoutHandle)) { + return NO_MATCH; + } + + // Check assignments contain expected columns + Map actualAssignments = tableScanNode.getAssignments(); + Set actualColumns = new HashSet<>(actualAssignments.values()); + + if (!expectedColumns.equals(actualColumns)) { + return NO_MATCH; + } + + SymbolAliases.Builder aliasesBuilder = SymbolAliases.builder(); + for (VariableReferenceExpression variable : tableScanNode.getOutputVariables()) { + aliasesBuilder.put(variable.getName(), new SymbolReference(variable.getName())); + } + + return match(aliasesBuilder.build()); + } + } +} diff --git a/presto-native-execution/velox b/presto-native-execution/velox index 7c55762bc37ed..ab8f44627de6a 160000 --- a/presto-native-execution/velox +++ b/presto-native-execution/velox @@ -1 +1 @@ -Subproject commit 7c55762bc37edc3f93cb8f5f1ff625e06cc10deb +Subproject commit ab8f44627de6ac2fe46fb5ca8ed3af09ee477b42