diff --git a/presto-clp/pom.xml b/presto-clp/pom.xml index 6e0b35d56bd3e..18edc61257c0a 100644 --- a/presto-clp/pom.xml +++ b/presto-clp/pom.xml @@ -140,5 +140,22 @@ commons-io test + + com.facebook.presto + presto-tests + test + + + + com.facebook.presto + presto-main-base + test-jar + test + + + + com.facebook.presto + presto-expressions + diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpFilterToKqlConverter.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpFilterToKqlConverter.java index 7e6fae13fdb8f..d9f0ee3884e47 100644 --- a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpFilterToKqlConverter.java +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpFilterToKqlConverter.java @@ -30,10 +30,12 @@ import com.facebook.presto.spi.relation.RowExpressionVisitor; import com.facebook.presto.spi.relation.SpecialFormExpression; import com.facebook.presto.spi.relation.VariableReferenceExpression; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import io.airlift.slice.Slice; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -95,7 +97,7 @@ * */ public class ClpFilterToKqlConverter - implements RowExpressionVisitor + implements RowExpressionVisitor> { private static final Set LOGICAL_BINARY_OPS_FILTER = ImmutableSet.of(EQUAL, NOT_EQUAL, LESS_THAN, LESS_THAN_OR_EQUAL, GREATER_THAN, GREATER_THAN_OR_EQUAL); @@ -113,70 +115,71 @@ public ClpFilterToKqlConverter( { this.standardFunctionResolution = requireNonNull(standardFunctionResolution, "standardFunctionResolution is null"); this.functionMetadataManager = requireNonNull(functionMetadataManager, "function metadata manager is null"); - this.assignments = requireNonNull(assignments, "assignments is null"); + this.assignments = new HashMap<>(requireNonNull(assignments, "assignments is null")); this.metadataFilterColumns = requireNonNull(metadataFilterColumns, "metadataFilterColumns is null"); } @Override - public ClpExpression visitCall(CallExpression node, Void context) + public ClpExpression visitCall(CallExpression node, Set context) { - FunctionHandle functionHandle = node.getFunctionHandle(); + CallExpression newNode = (CallExpression) maybeReplaceClpUdfArgument(node, context); + FunctionHandle functionHandle = newNode.getFunctionHandle(); if (standardFunctionResolution.isNotFunction(functionHandle)) { - return handleNot(node); + return handleNot(newNode, context); } if (standardFunctionResolution.isLikeFunction(functionHandle)) { - return handleLike(node); + return handleLike(newNode, context); } - FunctionMetadata functionMetadata = functionMetadataManager.getFunctionMetadata(node.getFunctionHandle()); + FunctionMetadata functionMetadata = functionMetadataManager.getFunctionMetadata(newNode.getFunctionHandle()); Optional operatorTypeOptional = functionMetadata.getOperatorType(); if (operatorTypeOptional.isPresent()) { OperatorType operatorType = operatorTypeOptional.get(); if (operatorType.isComparisonOperator() && operatorType != IS_DISTINCT_FROM) { - return handleLogicalBinary(operatorType, node); + return handleLogicalBinary(operatorType, newNode, context); } if (BETWEEN == operatorType) { - return handleBetween(node); + return handleBetween(newNode, context); } } - return new ClpExpression(node); + return new ClpExpression(newNode); } @Override - public ClpExpression visitConstant(ConstantExpression node, Void context) + public ClpExpression visitConstant(ConstantExpression node, Set context) { return new ClpExpression(getLiteralString(node)); } @Override - public ClpExpression visitVariableReference(VariableReferenceExpression node, Void context) + public ClpExpression visitVariableReference(VariableReferenceExpression node, Set context) { return new ClpExpression(getVariableName(node)); } @Override - public ClpExpression visitSpecialForm(SpecialFormExpression node, Void context) + public ClpExpression visitSpecialForm(SpecialFormExpression node, Set context) { switch (node.getForm()) { case AND: - return handleAnd(node); + return handleAnd(node, context); case OR: - return handleOr(node); + return handleOr(node, context); case IN: - return handleIn(node); + return handleIn(node, context); case IS_NULL: - return handleIsNull(node); + return handleIsNull(node, context); case DEREFERENCE: - return handleDereference(node); + return handleDereference(node, context); default: return new ClpExpression(node); } } @Override - public ClpExpression visitExpression(RowExpression node, Void context) + public ClpExpression visitExpression(RowExpression node, Set context) { // For all other expressions, return the original expression return new ClpExpression(node); @@ -204,7 +207,77 @@ private String getLiteralString(ConstantExpression literal) */ private String getVariableName(VariableReferenceExpression variable) { - return ((ClpColumnHandle) assignments.get(variable)).getOriginalColumnName(); + Object handle = assignments.get(variable); + if (!(handle instanceof ClpColumnHandle)) { + return null; + } + return ((ClpColumnHandle) handle).getOriginalColumnName(); + } + + /** + * Rewrites CLP UDFs (e.g., CLP_GET_*) in a RowExpression tree into VariableReferenceExpressions, + * enabling them to be used as pushdown filters. + *

+ * Traverses the expression tree recursively, replacing supported CLP UDFs with uniquely named + * variables and tracking these variables in the assignments and context. If the CLP UDF takes + * a constant string argument, that string is used as the new variable name. Unsupported + * argument types (non-constants) or invalid expressions will throw an exception. + *

+ * Examples: + *
    + *
  • CLP_GET_STRING('field')field (as a VariableReferenceExpression)
  • + *
  • CLP_GET_INT('field')field (not mapped to a KQL column)
  • + *
+ * + * @param rowExpression the input expression to analyze and possibly rewrite + * @param context a set of VariableReferenceExpressions used for pushdown; newly created ones + * will be added here + * @return a possibly rewritten RowExpression with CLP UDFs replaced + */ + private RowExpression maybeReplaceClpUdfArgument(RowExpression rowExpression, Set context) + { + requireNonNull(context); + if (!(rowExpression instanceof CallExpression)) { + return rowExpression; + } + + CallExpression callExpression = (CallExpression) rowExpression; + + // Recursively process the arguments of this CallExpression + List newArgs = callExpression.getArguments().stream() + .map(childArg -> maybeReplaceClpUdfArgument(childArg, context)) + .collect(ImmutableList.toImmutableList()); + + FunctionMetadata metadata = functionMetadataManager.getFunctionMetadata(callExpression.getFunctionHandle()); + String functionName = metadata.getName().getObjectName().toUpperCase(); + + if (functionName.startsWith("CLP_GET")) { + // Replace CLP UDF with VariableReferenceExpression + int numArguments = callExpression.getArguments().size(); + if (numArguments == 1) { + RowExpression argument = callExpression.getArguments().get(0); + if (!(argument instanceof ConstantExpression)) { + throw new PrestoException( + CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION, + "The argument of " + functionName + " must be a ConstantExpression"); + } + Optional definition = argument.accept(this, context).getPushDownExpression(); + if (definition.isPresent()) { + VariableReferenceExpression newVar = new VariableReferenceExpression( + Optional.empty(), + definition.get(), + callExpression.getType()); + context.add(newVar); + assignments.put(newVar, new ClpColumnHandle(definition.get(), callExpression.getType(), true)); + return newVar; + } + else { + throw new PrestoException(CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION, "Unrecognized parameter in " + functionName); + } + } + } + + return new CallExpression(callExpression.getDisplayName(), callExpression.getFunctionHandle(), callExpression.getType(), newArgs); } /** @@ -219,11 +292,12 @@ private String getVariableName(VariableReferenceExpression variable) *

* Example: col1 BETWEEN 0 AND 5col1 >= 0 AND col1 <= 5 * - * @param node the {@code BETWEEN} call expression + * @param node the BETWEEN call expression + * @param context a set of VariableReferenceExpressions used for pushdown; * @return a ClpExpression containing either the equivalent KQL query, or the original * expression if it couldn't be translated */ - private ClpExpression handleBetween(CallExpression node) + private ClpExpression handleBetween(CallExpression node, Set context) { List arguments = node.getArguments(); if (arguments.size() != 3) { @@ -243,7 +317,7 @@ private ClpExpression handleBetween(CallExpression node) || !isClpCompatibleNumericType(third.getType())) { return new ClpExpression(node); } - Optional variableOpt = first.accept(this, null).getPushDownExpression(); + Optional variableOpt = first.accept(this, context).getPushDownExpression(); if (!variableOpt.isPresent()) { return new ClpExpression(node); } @@ -263,10 +337,11 @@ private ClpExpression handleBetween(CallExpression node) * Example: NOT (col1 = 5)NOT col1: 5 * * @param node the NOT call expression + * @param context a set of VariableReferenceExpressions used for pushdown; * @return a ClpExpression containing either the equivalent KQL query, or the original * expression if it couldn't be translated */ - private ClpExpression handleNot(CallExpression node) + private ClpExpression handleNot(CallExpression node, Set context) { if (node.getArguments().size() != 1) { throw new PrestoException(CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION, @@ -274,7 +349,7 @@ private ClpExpression handleNot(CallExpression node) } RowExpression input = node.getArguments().get(0); - ClpExpression expression = input.accept(this, null); + ClpExpression expression = input.accept(this, context); if (expression.getRemainingExpression().isPresent() || !expression.getPushDownExpression().isPresent()) { return new ClpExpression(node); } @@ -297,15 +372,16 @@ private ClpExpression handleNot(CallExpression node) * Example: col1 LIKE 'a_bc%'col1: "a?bc*" * * @param node the LIKE call expression + * @param context a set of VariableReferenceExpressions used for pushdown; * @return a ClpExpression containing either the equivalent KQL query, or the original * expression if it couldn't be translated */ - private ClpExpression handleLike(CallExpression node) + private ClpExpression handleLike(CallExpression node, Set context) { if (node.getArguments().size() != 2) { throw new PrestoException(CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION, "LIKE operator must have exactly two arguments. Received: " + node); } - ClpExpression variable = node.getArguments().get(0).accept(this, null); + ClpExpression variable = node.getArguments().get(0).accept(this, context); if (!variable.getPushDownExpression().isPresent()) { return new ClpExpression(node); } @@ -346,10 +422,11 @@ else if (argument instanceof CallExpression) { * * @param operator the binary operator (e.g., EQUAL, NOT_EQUAL) * @param node the call expression representing the binary operation + * @param context a set of VariableReferenceExpressions used for pushdown; * @return a ClpExpression containing either the equivalent KQL query, or the original * expression if it couldn't be translated */ - private ClpExpression handleLogicalBinary(OperatorType operator, CallExpression node) + private ClpExpression handleLogicalBinary(OperatorType operator, CallExpression node, Set context) { if (node.getArguments().size() != 2) { throw new PrestoException(CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION, @@ -358,18 +435,28 @@ private ClpExpression handleLogicalBinary(OperatorType operator, CallExpression RowExpression left = node.getArguments().get(0); RowExpression right = node.getArguments().get(1); - ClpExpression maybeLeftSubstring = tryInterpretSubstringEquality(operator, left, right); - if (maybeLeftSubstring.getPushDownExpression().isPresent()) { - return maybeLeftSubstring; + Optional maybeLeftSubstring = tryInterpretSubstringEquality(operator, left, right, context); + if (maybeLeftSubstring.isPresent()) { + return maybeLeftSubstring.get(); + } + + Optional maybeRightSubstring = tryInterpretSubstringEquality(operator, right, left, context); + if (maybeRightSubstring.isPresent()) { + return maybeRightSubstring.get(); + } + + Optional clpWildcardLeft = tryInterpretClpWildcard(left, right, operator, context, node); + if (clpWildcardLeft.isPresent()) { + return clpWildcardLeft.get(); } - ClpExpression maybeRightSubstring = tryInterpretSubstringEquality(operator, right, left); - if (maybeRightSubstring.getPushDownExpression().isPresent()) { - return maybeRightSubstring; + Optional clpWildcardRight = tryInterpretClpWildcard(right, left, operator, context, node); + if (clpWildcardRight.isPresent()) { + return clpWildcardRight.get(); } - ClpExpression leftExpression = left.accept(this, null); - ClpExpression rightExpression = right.accept(this, null); + ClpExpression leftExpression = left.accept(this, context); + ClpExpression rightExpression = right.accept(this, context); Optional leftDefinition = leftExpression.getPushDownExpression(); Optional rightDefinition = rightExpression.getPushDownExpression(); if (!leftDefinition.isPresent() || !rightDefinition.isPresent()) { @@ -471,26 +558,28 @@ else if (LOGICAL_BINARY_OPS_FILTER.contains(operator) && !(literalType instanceo * @param operator the comparison operator (should be EQUAL) * @param possibleSubstring the left or right expression, possibly a SUBSTR call * @param possibleLiteral the opposite expression, possibly a string constant - * @return a ClpExpression containing either the equivalent KQL query, or nothing if it couldn't - * be translated + * @param context a set of VariableReferenceExpressions used for pushdown; + * @return an Optional containing a ClpExpression with the equivalent KQL query, or nothing if + * it couldn't be translated */ - private ClpExpression tryInterpretSubstringEquality( + private Optional tryInterpretSubstringEquality( OperatorType operator, RowExpression possibleSubstring, - RowExpression possibleLiteral) + RowExpression possibleLiteral, + Set context) { if (!operator.equals(EQUAL)) { - return new ClpExpression(); + return Optional.empty(); } if (!(possibleSubstring instanceof CallExpression) || !(possibleLiteral instanceof ConstantExpression)) { - return new ClpExpression(); + return Optional.empty(); } - Optional maybeSubstringCall = parseSubstringCall((CallExpression) possibleSubstring); + Optional maybeSubstringCall = parseSubstringCall((CallExpression) possibleSubstring, context); if (!maybeSubstringCall.isPresent()) { - return new ClpExpression(); + return Optional.empty(); } String targetString = getLiteralString((ConstantExpression) possibleLiteral); @@ -501,9 +590,10 @@ private ClpExpression tryInterpretSubstringEquality( * Parses a SUBSTR(x, start [, length]) call into a SubstrInfo object if valid. * * @param callExpression the call expression to inspect + * @param context a set of VariableReferenceExpressions used for pushdown; * @return an Optional containing SubstrInfo if the expression is a valid SUBSTR call */ - private Optional parseSubstringCall(CallExpression callExpression) + private Optional parseSubstringCall(CallExpression callExpression, Set context) { FunctionMetadata functionMetadata = functionMetadataManager.getFunctionMetadata(callExpression.getFunctionHandle()); String functionName = functionMetadata.getName().getObjectName(); @@ -516,7 +606,7 @@ private Optional parseSubstringCall(CallExpression callExpression) return Optional.empty(); } - ClpExpression variable = callExpression.getArguments().get(0).accept(this, null); + ClpExpression variable = callExpression.getArguments().get(0).accept(this, context); if (!variable.getPushDownExpression().isPresent()) { return Optional.empty(); } @@ -545,10 +635,10 @@ private Optional parseSubstringCall(CallExpression callExpression) * * @param info parsed SUBSTR call info * @param targetString the literal string being compared to - * @return a ClpExpression containing either the equivalent KQL query, or nothing if it couldn't - * be translated + * @return an Optional containing either a ClpExpression with the equivalent KQL query, + * or empty if it couldn't be translated */ - private ClpExpression interpretSubstringEquality(SubstrInfo info, String targetString) + private Optional interpretSubstringEquality(SubstrInfo info, String targetString) { if (info.lengthExpression != null) { Optional maybeStart = parseIntValue(info.startExpression); @@ -564,7 +654,7 @@ private ClpExpression interpretSubstringEquality(SubstrInfo info, String targetS result.append("?"); } result.append(targetString).append("*\""); - return new ClpExpression(result.toString()); + return Optional.of(new ClpExpression(result.toString())); } } } @@ -579,15 +669,15 @@ private ClpExpression interpretSubstringEquality(SubstrInfo info, String targetS result.append("?"); } result.append(targetString).append("\""); - return new ClpExpression(result.toString()); + return Optional.of(new ClpExpression(result.toString())); } if (start == -targetString.length()) { - return new ClpExpression(format("%s: \"*%s\"", info.variableName, targetString)); + return Optional.of(new ClpExpression(format("%s: \"*%s\"", info.variableName, targetString))); } } } - return new ClpExpression(); + return Optional.empty(); } /** @@ -646,6 +736,65 @@ private Optional parseLengthLiteral(RowExpression lengthExpression, Str return Optional.empty(); } + /** + * Attempts to interpret a binary comparison involving a CLP_WILDCARD UDF and a constant value. + *

+ * This method checks whether the possibleCall is a CLP_WILDCARD-style function + * call (e.g., CLP_WILDCARD_STRING_COLUMN()) and whether possibleConst + * is a {@link ConstantExpression}. If both conditions are met, it constructs a corresponding + * {@link ClpExpression} using the wildcard symbol "*" as the variable name. + *

+ * Examples: + *
    + *
  • CLP_WILDCARD_STRING_COLUMN() = 'abc'* : "abc"
  • + *
  • CLP_WILDCARD_INT_COLUMN() >= 123NOT * > 123
  • + *
+ * + * @param possibleCall the left or right side of the comparison, expected to be a CLP_WILDCARD UDF + * @param possibleConst the other side of the comparison, expected to be a constant + * @param operator the binary operator used in the comparison (e.g., =, !=) + * @param context the context set to track introduced variable references (not used directly here) + * @param parentNode the original expression node for fallback purposes + * @return an {@link Optional} containing the translated {@link ClpExpression} if successful, or {@link Optional#empty()} otherwise + * @throws PrestoException if the CLP_WILDCARD UDF is used with a non-constant operand, or if the constant cannot be converted + */ + private Optional tryInterpretClpWildcard( + RowExpression possibleCall, + RowExpression possibleConst, + OperatorType operator, + Set context, + CallExpression parentNode) + { + if (!(possibleCall instanceof CallExpression)) { + return Optional.empty(); + } + + FunctionMetadata metadata = functionMetadataManager.getFunctionMetadata( + ((CallExpression) possibleCall).getFunctionHandle()); + String functionName = metadata.getName().getObjectName().toUpperCase(); + + if (!functionName.startsWith("CLP_WILDCARD")) { + return Optional.empty(); + } + + if (!(possibleConst instanceof ConstantExpression)) { + throw new PrestoException(CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION, + "CLP_WILDCARD_COLUMN can only be used in a filter against a constant value"); + } + + ClpExpression constExpression = possibleConst.accept(this, context); + if (!constExpression.getPushDownExpression().isPresent()) { + throw new PrestoException(CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION, possibleConst + " is not supported in CLP_WILDCARD UDFs"); + } + + return Optional.of(buildClpExpression( + "*", + constExpression.getPushDownExpression().get(), + operator, + possibleCall.getType(), + parentNode)); + } + /** * Handles the logical AND expression. *

@@ -655,9 +804,10 @@ private Optional parseLengthLiteral(RowExpression lengthExpression, Str * Example: col1 = 5 AND col2 = 'abc'(col1: 5 AND col2: "abc") * * @param node the AND special form expression + * @param context a set of VariableReferenceExpressions used for pushdown; * @return a ClpExpression containing the KQL query and any remaining sub-expressions */ - private ClpExpression handleAnd(SpecialFormExpression node) + private ClpExpression handleAnd(SpecialFormExpression node, Set context) { StringBuilder metadataQueryBuilder = new StringBuilder(); metadataQueryBuilder.append("("); @@ -667,7 +817,7 @@ private ClpExpression handleAnd(SpecialFormExpression node) boolean hasMetadataSql = false; boolean hasPushDownExpression = false; for (RowExpression argument : node.getArguments()) { - ClpExpression expression = argument.accept(this, null); + ClpExpression expression = argument.accept(this, context); if (expression.getPushDownExpression().isPresent()) { hasPushDownExpression = true; queryBuilder.append(expression.getPushDownExpression().get()); @@ -713,20 +863,23 @@ else if (!remainingExpressions.isEmpty()) { * Example: col1 = 5 OR col1 = 10(col1: 5 OR col1: 10) * * @param node the OR special form expression + * @param context a set of VariableReferenceExpressions used for pushdown; * @return a ClpExpression containing either the equivalent KQL query, or the original * expression if it couldn't be fully translated */ - private ClpExpression handleOr(SpecialFormExpression node) + private ClpExpression handleOr(SpecialFormExpression node, Set context) { StringBuilder metadataQueryBuilder = new StringBuilder(); metadataQueryBuilder.append("("); StringBuilder queryBuilder = new StringBuilder(); queryBuilder.append("("); + boolean allPushedDown = true; boolean hasAllMetadataSql = true; for (RowExpression argument : node.getArguments()) { - ClpExpression expression = argument.accept(this, null); + ClpExpression expression = argument.accept(this, context); if (expression.getRemainingExpression().isPresent() || !expression.getPushDownExpression().isPresent()) { - return new ClpExpression(node); + allPushedDown = false; + continue; } queryBuilder.append(expression.getPushDownExpression().get()); queryBuilder.append(" OR "); @@ -738,10 +891,13 @@ private ClpExpression handleOr(SpecialFormExpression node) hasAllMetadataSql = false; } } - // Remove the last " OR " from the query - return new ClpExpression( - queryBuilder.substring(0, queryBuilder.length() - 4) + ")", - hasAllMetadataSql ? metadataQueryBuilder.substring(0, metadataQueryBuilder.length() - 4) + ")" : null); + if (allPushedDown) { + // Remove the last " OR " from the query + return new ClpExpression( + queryBuilder.substring(0, queryBuilder.length() - 4) + ")", + hasAllMetadataSql ? metadataQueryBuilder.substring(0, metadataQueryBuilder.length() - 4) + ")" : null); + } + return new ClpExpression(node); } /** @@ -750,12 +906,13 @@ private ClpExpression handleOr(SpecialFormExpression node) * Example: col1 IN (1, 2, 3)(col1: 1 OR col1: 2 OR col1: 3) * * @param node the IN special form expression + * @param context a set of VariableReferenceExpressions used for pushdown; * @return a ClpExpression containing either the equivalent KQL query, or the original * expression if it couldn't be translated */ - private ClpExpression handleIn(SpecialFormExpression node) + private ClpExpression handleIn(SpecialFormExpression node, Set context) { - ClpExpression variable = node.getArguments().get(0).accept(this, null); + ClpExpression variable = node.getArguments().get(0).accept(this, context); if (!variable.getPushDownExpression().isPresent()) { return new ClpExpression(node); } @@ -788,17 +945,18 @@ private ClpExpression handleIn(SpecialFormExpression node) * Example: col1 IS NULLNOT col1: * * * @param node the IS_NULL special form expression + * @param context a set of VariableReferenceExpressions used for pushdown; * @return a ClpExpression containing either the equivalent KQL query, or the original * expression if it couldn't be translated */ - private ClpExpression handleIsNull(SpecialFormExpression node) + private ClpExpression handleIsNull(SpecialFormExpression node, Set context) { if (node.getArguments().size() != 1) { throw new PrestoException(CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION, "IS NULL operator must have exactly one argument. Received: " + node); } - ClpExpression expression = node.getArguments().get(0).accept(this, null); + ClpExpression expression = node.getArguments().get(0).accept(this, context); if (!expression.getPushDownExpression().isPresent()) { return new ClpExpression(node); } @@ -816,13 +974,14 @@ private ClpExpression handleIsNull(SpecialFormExpression node) * * @param expression the dereference expression ({@link SpecialFormExpression} or * {@link VariableReferenceExpression}) + * @param context a set of VariableReferenceExpressions used for pushdown; * @return a ClpExpression containing either the dot-separated field name, or the original * expression if it couldn't be translated */ - private ClpExpression handleDereference(RowExpression expression) + private ClpExpression handleDereference(RowExpression expression, Set context) { if (expression instanceof VariableReferenceExpression) { - return expression.accept(this, null); + return expression.accept(this, context); } if (!(expression instanceof SpecialFormExpression)) { @@ -862,7 +1021,7 @@ private ClpExpression handleDereference(RowExpression expression) RowType.Field field = rowType.getFields().get(fieldIndex); String fieldName = field.getName().orElse("field" + fieldIndex); - ClpExpression baseString = handleDereference(base); + ClpExpression baseString = handleDereference(base, context); if (!baseString.getPushDownExpression().isPresent()) { return new ClpExpression(expression); } diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpFunctions.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpFunctions.java new file mode 100644 index 0000000000000..69c53810f141e --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpFunctions.java @@ -0,0 +1,105 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.common.block.Block; +import com.facebook.presto.common.block.BlockBuilder; +import com.facebook.presto.common.type.StandardTypes; +import com.facebook.presto.spi.function.Description; +import com.facebook.presto.spi.function.ScalarFunction; +import com.facebook.presto.spi.function.SqlType; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; + +import static com.facebook.presto.common.type.VarcharType.VARCHAR; + +public final class ClpFunctions +{ + private ClpFunctions() + { + } + + @ScalarFunction(value = "CLP_GET_INT", deterministic = false) + @Description("Retrieves an integer value corresponding to the given JSON path.") + @SqlType(StandardTypes.BIGINT) + public static long clpGetInt(@SqlType(StandardTypes.VARCHAR) Slice jsonPath) + { + return 0; + } + + @ScalarFunction(value = "CLP_GET_FLOAT", deterministic = false) + @Description("Retrieves a floating point value corresponding to the given JSON path.") + @SqlType(StandardTypes.DOUBLE) + public static double clpGetFloat(@SqlType(StandardTypes.VARCHAR) Slice jsonPath) + { + return 0.0; + } + + @ScalarFunction(value = "CLP_GET_BOOL", deterministic = false) + @Description("Retrieves a boolean value corresponding to the given JSON path.") + @SqlType(StandardTypes.BOOLEAN) + public static boolean clpGetBool(@SqlType(StandardTypes.VARCHAR) Slice jsonPath) + { + return false; + } + + @ScalarFunction(value = "CLP_GET_STRING", deterministic = false) + @Description("Retrieves a string value corresponding to the given JSON path.") + @SqlType(StandardTypes.VARCHAR) + public static Slice clpGetString(@SqlType(StandardTypes.VARCHAR) Slice jsonPath) + { + return Slices.EMPTY_SLICE; + } + + @ScalarFunction(value = "CLP_GET_STRING_ARRAY", deterministic = false) + @Description("Retrieves an array value corresponding to the given JSON path and converts each element into a string.") + @SqlType("ARRAY(VARCHAR)") + public static Block clpGetStringArray(@SqlType(StandardTypes.VARCHAR) Slice jsonPath) + { + BlockBuilder blockBuilder = VARCHAR.createBlockBuilder(null, 0); + return blockBuilder.build(); + } + + @ScalarFunction(value = "CLP_WILDCARD_STRING_COLUMN", deterministic = false) + @Description("Used in filter expressions to allow comparisons with any string column in the log record.") + @SqlType(StandardTypes.VARCHAR) + public static Slice clpWildcardStringColumn() + { + return Slices.EMPTY_SLICE; + } + + @ScalarFunction(value = "CLP_WILDCARD_INT_COLUMN", deterministic = false) + @Description("Used in filter expressions to allow comparisons with any integer column in the log record.") + @SqlType(StandardTypes.BIGINT) + public static long clpWildcardIntColumn() + { + return 0; + } + + @ScalarFunction(value = "CLP_WILDCARD_FLOAT_COLUMN", deterministic = false) + @Description("Used in filter expressions to allow comparisons with any floating point column in the log record.") + @SqlType(StandardTypes.DOUBLE) + public static double clpWildcardFloatColumn() + { + return 0.0; + } + + @ScalarFunction(value = "CLP_WILDCARD_BOOL_COLUMN", deterministic = false) + @Description("Used in filter expressions to allow comparisons with any boolean column in the log record.") + @SqlType(StandardTypes.BOOLEAN) + public static boolean clpWildcardBoolColumn() + { + return false; + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpMetadataFilterProvider.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpMetadataFilterProvider.java index de886392136de..04d5a42bc646e 100644 --- a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpMetadataFilterProvider.java +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpMetadataFilterProvider.java @@ -84,11 +84,16 @@ public class ClpMetadataFilterProvider public ClpMetadataFilterProvider(ClpConfig config) { requireNonNull(config, "config is null"); + String configPath = config.getMetadataFilterConfig(); + if (configPath == null || configPath.isEmpty()) { + filterMap = ImmutableMap.of(); + return; + } ObjectMapper mapper = new ObjectMapper(); try { filterMap = mapper.readValue( - new File(config.getMetadataFilterConfig()), + new File(configPath), new TypeReference>>() {}); } catch (IOException e) { diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlanOptimizer.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlanOptimizer.java index 157af459cb80e..20816d9321d0d 100644 --- a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlanOptimizer.java +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlanOptimizer.java @@ -14,25 +14,39 @@ package com.facebook.presto.plugin.clp; import com.facebook.airlift.log.Logger; +import com.facebook.presto.expressions.DefaultRowExpressionTraversalVisitor; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorPlanOptimizer; import com.facebook.presto.spi.ConnectorPlanRewriter; import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.TableHandle; import com.facebook.presto.spi.VariableAllocator; import com.facebook.presto.spi.function.FunctionMetadataManager; import com.facebook.presto.spi.function.StandardFunctionResolution; +import com.facebook.presto.spi.plan.Assignments; import com.facebook.presto.spi.plan.FilterNode; import com.facebook.presto.spi.plan.PlanNode; import com.facebook.presto.spi.plan.PlanNodeIdAllocator; +import com.facebook.presto.spi.plan.ProjectNode; import com.facebook.presto.spi.plan.TableScanNode; +import com.facebook.presto.spi.relation.CallExpression; +import com.facebook.presto.spi.relation.ConstantExpression; import com.facebook.presto.spi.relation.RowExpression; +import com.facebook.presto.spi.relation.RowExpressionVisitor; import com.facebook.presto.spi.relation.VariableReferenceExpression; +import io.airlift.slice.Slice; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import static com.facebook.presto.plugin.clp.ClpConnectorFactory.CONNECTOR_NAME; +import static com.facebook.presto.plugin.clp.ClpErrorCode.CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION; import static com.facebook.presto.spi.ConnectorPlanRewriter.rewriteWith; import static java.util.Objects.requireNonNull; @@ -67,6 +81,82 @@ public Rewriter(PlanNodeIdAllocator idAllocator) this.idAllocator = idAllocator; } + @Override + public PlanNode visitProject(ProjectNode node, RewriteContext context) + { + Assignments.Builder assignmentsBuilder = new Assignments.Builder(); + Set clpUdfVariablesInProjectNode = new HashSet<>(); + for (Map.Entry entry : node.getAssignments().getMap().entrySet()) { + VariableReferenceExpression oldKey = entry.getKey(); + RowExpression oldValue = entry.getValue(); + + if (!(oldValue instanceof CallExpression)) { + assignmentsBuilder.put(oldKey, oldValue); + continue; + } + + CallExpression callExpression = (CallExpression) oldValue; + String functionName = functionManager.getFunctionMetadata((callExpression).getFunctionHandle()) + .getName().getObjectName().toUpperCase(); + + if (!functionName.startsWith("CLP_GET")) { + assignmentsBuilder.put(oldKey, oldValue); + continue; + } + + if (!callExpression.getArguments().isEmpty()) { + RowExpression argument = callExpression.getArguments().get(0); + if (!(argument instanceof ConstantExpression)) { + throw new PrestoException( + CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION, + "The argument of " + functionName + " must be a ConstantExpression"); + } + String jsonPath = ((Slice) ((ConstantExpression) argument).getValue()).toStringUtf8(); + + VariableReferenceExpression newValue = new VariableReferenceExpression( + oldValue.getSourceLocation(), jsonPath, oldValue.getType()); + assignmentsBuilder.put(oldKey, newValue); + clpUdfVariablesInProjectNode.add(newValue); + } + } + + PlanNode childNode = node.getSource(); + + // Handle Project -> TableScan + if (childNode instanceof TableScanNode) { + TableScanNode newTableScanNode = buildNewTableScanNode((TableScanNode) childNode, clpUdfVariablesInProjectNode); + return new ProjectNode( + newTableScanNode.getSourceLocation(), + idAllocator.getNextId(), + newTableScanNode, + assignmentsBuilder.build(), + node.getLocality()); + } + + // Handle Project -> Filter -> TableScan + if (childNode instanceof FilterNode && ((FilterNode) childNode).getSource() instanceof TableScanNode) { + FilterNode filterNode = (FilterNode) childNode; + TableScanNode tableScanNode = (TableScanNode) filterNode.getSource(); + + // Build new TableScanNode with CLP_GET projection pushes (even if empty) + TableScanNode newTableScanNode = buildNewTableScanNode(tableScanNode, clpUdfVariablesInProjectNode); + + // Apply KQL pushdown for the FilterNode + PlanNode newSourceNode = processFilter(filterNode, newTableScanNode); + return new ProjectNode( + newSourceNode.getSourceLocation(), + idAllocator.getNextId(), + newSourceNode, + assignmentsBuilder.build(), + node.getLocality()); + } + if (clpUdfVariablesInProjectNode.isEmpty()) { + return node; + } + throw new PrestoException(CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION, + "Unsupported plan shape for CLP pushdown: " + childNode.getClass().getSimpleName()); + } + @Override public PlanNode visitFilter(FilterNode node, RewriteContext context) { @@ -74,57 +164,116 @@ public PlanNode visitFilter(FilterNode node, RewriteContext context) return node; } - TableScanNode tableScanNode = (TableScanNode) node.getSource(); + return processFilter(node, (TableScanNode) node.getSource()); + } + + private TableScanNode buildNewTableScanNode( + TableScanNode tableScanNode, + Set clpUdfVariables) + { + List newOutputVariables = new ArrayList<>(tableScanNode.getOutputVariables()); + Map newAssignments = new HashMap<>(tableScanNode.getAssignments()); + for (VariableReferenceExpression var : clpUdfVariables) { + newOutputVariables.add(var); + newAssignments.put(var, new ClpColumnHandle(var.getName(), var.getType(), true)); + } + + return new TableScanNode( + tableScanNode.getSourceLocation(), + idAllocator.getNextId(), + tableScanNode.getTable(), + newOutputVariables, + newAssignments, + tableScanNode.getTableConstraints(), + tableScanNode.getCurrentConstraint(), + tableScanNode.getEnforcedConstraint(), + tableScanNode.getCteMaterializationInfo()); + } + + private PlanNode processFilter(FilterNode filterNode, TableScanNode tableScanNode) + { Map assignments = tableScanNode.getAssignments(); TableHandle tableHandle = tableScanNode.getTable(); ClpTableHandle clpTableHandle = (ClpTableHandle) tableHandle.getConnectorHandle(); + + Set clpUdfVariablesInFilterNode = new HashSet<>(); String scope = CONNECTOR_NAME + "." + clpTableHandle.getSchemaTableName().toString(); - ClpExpression clpExpression = node.getPredicate().accept( + ClpExpression clpExpression = filterNode.getPredicate().accept( new ClpFilterToKqlConverter( functionResolution, functionManager, assignments, - metadataFilterProvider.getColumnNames(scope)), null); + metadataFilterProvider.getColumnNames(scope)), + clpUdfVariablesInFilterNode); + Optional kqlQuery = clpExpression.getPushDownExpression(); Optional metadataSqlQuery = clpExpression.getMetadataSqlQuery(); Optional remainingPredicate = clpExpression.getRemainingExpression(); - // Perform required metadata filter checks before handling the KQL query (if kqlQuery - // isn't present, we'll return early, skipping subsequent checks). + if (remainingPredicate.isPresent()) { + // Collect all variables actually present in the remainingPredicate + Set variablesInPredicate = new HashSet<>(); + + RowExpressionVisitor visitor = new DefaultRowExpressionTraversalVisitor() { + @Override + public Void visitVariableReference(VariableReferenceExpression variable, Void context) + { + variablesInPredicate.add(variable); + return null; + } + }; + + remainingPredicate.get().accept(visitor, null); + // Retain only the variables that also exist in the remainingPredicate + clpUdfVariablesInFilterNode.retainAll(variablesInPredicate); + if (!clpUdfVariablesInFilterNode.isEmpty()) { + tableScanNode = buildNewTableScanNode(tableScanNode, clpUdfVariablesInFilterNode); + } + } + + // Perform required metadata filter checks metadataFilterProvider.checkContainsRequiredFilters(clpTableHandle.getSchemaTableName(), metadataSqlQuery.orElse("")); - if (metadataSqlQuery.isPresent()) { + boolean hasMetadataFilter = metadataSqlQuery.isPresent() && !metadataSqlQuery.get().isEmpty(); + if (hasMetadataFilter) { metadataSqlQuery = Optional.of(metadataFilterProvider.remapFilterSql(scope, metadataSqlQuery.get())); log.debug("Metadata SQL query: %s", metadataSqlQuery); } - if (!kqlQuery.isPresent()) { - return node; - } - log.debug("KQL query: %s", kqlQuery); + if (kqlQuery.isPresent() || hasMetadataFilter) { + if (kqlQuery.isPresent()) { + log.debug("KQL query: %s", kqlQuery); + } - ClpTableLayoutHandle clpTableLayoutHandle = new ClpTableLayoutHandle(clpTableHandle, kqlQuery, metadataSqlQuery); - TableScanNode newTableScanNode = new TableScanNode( - tableScanNode.getSourceLocation(), - idAllocator.getNextId(), - new TableHandle( - tableHandle.getConnectorId(), - clpTableHandle, - tableHandle.getTransaction(), - Optional.of(clpTableLayoutHandle)), - tableScanNode.getOutputVariables(), - tableScanNode.getAssignments(), - tableScanNode.getTableConstraints(), - tableScanNode.getCurrentConstraint(), - tableScanNode.getEnforcedConstraint(), - tableScanNode.getCteMaterializationInfo()); - if (!remainingPredicate.isPresent()) { - return newTableScanNode; + ClpTableLayoutHandle layoutHandle = new ClpTableLayoutHandle(clpTableHandle, kqlQuery, metadataSqlQuery); + TableHandle newTableHandle = new TableHandle( + tableHandle.getConnectorId(), + clpTableHandle, + tableHandle.getTransaction(), + Optional.of(layoutHandle)); + + tableScanNode = new TableScanNode( + tableScanNode.getSourceLocation(), + idAllocator.getNextId(), + newTableHandle, + tableScanNode.getOutputVariables(), + tableScanNode.getAssignments(), + tableScanNode.getTableConstraints(), + tableScanNode.getCurrentConstraint(), + tableScanNode.getEnforcedConstraint(), + tableScanNode.getCteMaterializationInfo()); } - return new FilterNode(node.getSourceLocation(), - idAllocator.getNextId(), - newTableScanNode, - remainingPredicate.get()); + if (remainingPredicate.isPresent()) { + // Not all predicate pushed down, need new FilterNode + return new FilterNode( + filterNode.getSourceLocation(), + idAllocator.getNextId(), + tableScanNode, + remainingPredicate.get()); + } + else { + return tableScanNode; + } } } } diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlugin.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlugin.java index 985c707a32483..ac79ffe1e0452 100644 --- a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlugin.java +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlugin.java @@ -16,10 +16,19 @@ import com.facebook.presto.spi.Plugin; import com.facebook.presto.spi.connector.ConnectorFactory; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import java.util.Set; public class ClpPlugin implements Plugin { + @Override + public Set> getFunctions() + { + return ImmutableSet.of(ClpFunctions.class); + } + @Override public Iterable getConnectorFactories() { diff --git a/presto-clp/src/test/java/com/facebook/presto/plugin/clp/ClpMetadataDbSetUp.java b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/ClpMetadataDbSetUp.java index 91fc3c780d941..d1d0ee6964c8e 100644 --- a/presto-clp/src/test/java/com/facebook/presto/plugin/clp/ClpMetadataDbSetUp.java +++ b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/ClpMetadataDbSetUp.java @@ -226,6 +226,11 @@ static final class DbHandle { this.dbPath = dbPath; } + + public String getDbPath() + { + return dbPath; + } } static final class ArchivesTableRow diff --git a/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpFilterToKql.java b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpFilterToKql.java index 5007a0ff10bd3..ebced0fac3436 100644 --- a/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpFilterToKql.java +++ b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpFilterToKql.java @@ -14,9 +14,12 @@ package com.facebook.presto.plugin.clp; import com.facebook.presto.spi.relation.RowExpression; +import com.facebook.presto.spi.relation.VariableReferenceExpression; +import com.facebook.presto.sql.planner.TypeProvider; import com.google.common.collect.ImmutableSet; import org.testng.annotations.Test; +import java.util.HashSet; import java.util.Optional; import java.util.Set; @@ -266,16 +269,56 @@ public void testMetadataSqlGeneration() testMetadataFilterColumns); } + @Test + public void testClpGetUdf() + { + SessionHolder sessionHolder = new SessionHolder(); + testPushDown(sessionHolder, "CLP_GET_STRING('city.Name') = 'Beijing'", "city.Name: \"Beijing\"", null); + testPushDown(sessionHolder, "CLP_GET_INT('id') = 1", "id: 1", null); + testPushDown(sessionHolder, "CLP_GET_FLOAT('fare') > 0", "fare > 0", null); + testPushDown(sessionHolder, "CLP_GET_BOOL('isHoliday') = true", "isHoliday: true", null); + + testPushDown(sessionHolder, "cardinality(CLP_GET_STRING_ARRAY('clp_array')) = 2", null, "cardinality(clp_array) = 2"); + testPushDown( + sessionHolder, + "CLP_GET_STRING('city.Name') = 'Beijing' AND CLP_GET_INT('id') = 1 AND city.Region.Id = 1", + "((city.Name: \"Beijing\" AND id: 1) AND city.Region.Id: 1)", + null); + testPushDown( + sessionHolder, + "lower(CLP_GET_STRING('user.Name')) = 'John' AND CLP_GET_INT('id') = 1 AND city.Region.Id = 1", + "((id: 1) AND city.Region.Id: 1)", + "lower(\"user.Name\") = 'John'"); + } + + @Test + public void testClpWildcardUdf() + { + SessionHolder sessionHolder = new SessionHolder(); + testPushDown(sessionHolder, "CLP_WILDCARD_STRING_COLUMN() = 'Beijing'", "*: \"Beijing\"", null); + testPushDown(sessionHolder, "CLP_WILDCARD_INT_COLUMN() = 1", "*: 1", null); + testPushDown(sessionHolder, "CLP_WILDCARD_FLOAT_COLUMN() > 0", "* > 0", null); + testPushDown(sessionHolder, "CLP_WILDCARD_BOOL_COLUMN() = true", "*: true", null); + + testPushDown( + sessionHolder, + "CLP_WILDCARD_STRING_COLUMN() = 'Beijing' AND CLP_WILDCARD_INT_COLUMN() = 1 AND city.Region.Id = 1", + "((*: \"Beijing\" AND *: 1) AND city.Region.Id: 1)", + null); + } + private void testPushDown(SessionHolder sessionHolder, String sql, String expectedKql, String expectedRemaining) { - ClpExpression clpExpression = tryPushDown(sql, sessionHolder, ImmutableSet.of()); - testFilter(clpExpression, expectedKql, expectedRemaining, sessionHolder); + HashSet clpUdfVariables = new HashSet<>(); + ClpExpression clpExpression = tryPushDown(sql, sessionHolder, ImmutableSet.of(), clpUdfVariables); + testFilter(clpExpression, expectedKql, expectedRemaining, clpUdfVariables, sessionHolder); } private void testPushDown(SessionHolder sessionHolder, String sql, String expectedKql, String expectedMetadataSqlQuery, Set metadataFilterColumns) { - ClpExpression clpExpression = tryPushDown(sql, sessionHolder, metadataFilterColumns); - testFilter(clpExpression, expectedKql, null, sessionHolder); + HashSet clpUdfVariables = new HashSet<>(); + ClpExpression clpExpression = tryPushDown(sql, sessionHolder, metadataFilterColumns, clpUdfVariables); + testFilter(clpExpression, expectedKql, null, clpUdfVariables, sessionHolder); if (expectedMetadataSqlQuery != null) { assertTrue(clpExpression.getMetadataSqlQuery().isPresent()); assertEquals(clpExpression.getMetadataSqlQuery().get(), expectedMetadataSqlQuery); @@ -285,22 +328,26 @@ private void testPushDown(SessionHolder sessionHolder, String sql, String expect } } - private ClpExpression tryPushDown(String sqlExpression, SessionHolder sessionHolder, Set metadataFilterColumns) + private ClpExpression tryPushDown( + String sqlExpression, + SessionHolder sessionHolder, + Set metadataFilterColumns, + Set clpUdfVariables) { RowExpression pushDownExpression = getRowExpression(sqlExpression, sessionHolder); - return pushDownExpression.accept( - new ClpFilterToKqlConverter( - standardFunctionResolution, - functionAndTypeManager, - variableToColumnHandleMap, - metadataFilterColumns), - null); + ClpFilterToKqlConverter converter = new ClpFilterToKqlConverter( + standardFunctionResolution, + functionAndTypeManager, + variableToColumnHandleMap, + metadataFilterColumns); + return pushDownExpression.accept(converter, clpUdfVariables); } private void testFilter( ClpExpression clpExpression, String expectedKqlExpression, String expectedRemainingExpression, + Set clpUdfVariables, SessionHolder sessionHolder) { Optional kqlExpression = clpExpression.getPushDownExpression(); @@ -315,7 +362,15 @@ private void testFilter( if (expectedRemainingExpression != null) { assertTrue(remainingExpression.isPresent()); - assertEquals(remainingExpression.get(), getRowExpression(expectedRemainingExpression, sessionHolder)); + if (!clpUdfVariables.isEmpty()) { + Set newVariableSet = new HashSet<>(variableToColumnHandleMap.keySet()); + newVariableSet.addAll(clpUdfVariables); + TypeProvider newTypeProvider = TypeProvider.fromVariables(newVariableSet); + assertEquals(remainingExpression.get(), getRowExpression(expectedRemainingExpression, newTypeProvider, sessionHolder)); + } + else { + assertEquals(remainingExpression.get(), getRowExpression(expectedRemainingExpression, sessionHolder)); + } } else { assertFalse(remainingExpression.isPresent()); diff --git a/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpPlanOptimizer.java b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpPlanOptimizer.java new file mode 100644 index 0000000000000..432b93c952eed --- /dev/null +++ b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpPlanOptimizer.java @@ -0,0 +1,308 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.airlift.log.Logger; +import com.facebook.presto.Session; +import com.facebook.presto.common.transaction.TransactionId; +import com.facebook.presto.common.type.VarcharType; +import com.facebook.presto.cost.PlanNodeStatsEstimate; +import com.facebook.presto.cost.StatsAndCosts; +import com.facebook.presto.cost.StatsProvider; +import com.facebook.presto.metadata.FunctionAndTypeManager; +import com.facebook.presto.metadata.Metadata; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.spi.WarningCollector; +import com.facebook.presto.spi.plan.PlanNode; +import com.facebook.presto.spi.plan.PlanNodeIdAllocator; +import com.facebook.presto.spi.plan.TableScanNode; +import com.facebook.presto.spi.relation.VariableReferenceExpression; +import com.facebook.presto.sql.planner.Plan; +import com.facebook.presto.sql.planner.assertions.MatchResult; +import com.facebook.presto.sql.planner.assertions.Matcher; +import com.facebook.presto.sql.planner.assertions.PlanAssert; +import com.facebook.presto.sql.planner.assertions.PlanMatchPattern; +import com.facebook.presto.sql.planner.assertions.SymbolAliases; +import com.facebook.presto.sql.relational.FunctionResolution; +import com.facebook.presto.sql.tree.SymbolReference; +import com.facebook.presto.testing.LocalQueryRunner; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.commons.math3.util.Pair; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static com.facebook.presto.common.Utils.checkState; +import static com.facebook.presto.metadata.FunctionExtractor.extractFunctions; +import static com.facebook.presto.plugin.clp.ClpMetadataDbSetUp.ARCHIVES_STORAGE_DIRECTORY_BASE; +import static com.facebook.presto.plugin.clp.ClpMetadataDbSetUp.METADATA_DB_PASSWORD; +import static com.facebook.presto.plugin.clp.ClpMetadataDbSetUp.METADATA_DB_TABLE_PREFIX; +import static com.facebook.presto.plugin.clp.ClpMetadataDbSetUp.METADATA_DB_URL_TEMPLATE; +import static com.facebook.presto.plugin.clp.ClpMetadataDbSetUp.METADATA_DB_USER; +import static com.facebook.presto.plugin.clp.ClpMetadataDbSetUp.getDbHandle; +import static com.facebook.presto.plugin.clp.ClpMetadataDbSetUp.setupMetadata; +import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.Boolean; +import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.ClpString; +import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.Float; +import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.Integer; +import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.VarString; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.filter; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.node; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.project; +import static com.facebook.presto.testing.TestingSession.testSessionBuilder; +import static java.lang.String.format; + +@Test(singleThreaded = true) +public class TestClpPlanOptimizer + extends TestClpQueryBase +{ + private static final Logger log = Logger.get(TestClpPlanOptimizer.class); + private final Session defaultSession = testSessionBuilder() + .setCatalog("clp") + .setSchema(ClpMetadata.DEFAULT_SCHEMA_NAME) + .build(); + + private ClpMetadataDbSetUp.DbHandle dbHandle; + ClpTableHandle table; + + private LocalQueryRunner localQueryRunner; + private FunctionAndTypeManager functionAndTypeManager; + private FunctionResolution functionResolution; + private ClpMetadataFilterProvider metadataFilterProvider; + private PlanNodeIdAllocator planNodeIdAllocator; + + @BeforeMethod + public void setUp() + { + dbHandle = getDbHandle("metadata_query_testdb"); + final String tableName = "test"; + final String tablePath = ARCHIVES_STORAGE_DIRECTORY_BASE + tableName; + table = new ClpTableHandle(new SchemaTableName("default", tableName), tablePath); + + setupMetadata(dbHandle, + ImmutableMap.of( + tableName, + ImmutableList.of( + new Pair<>("city.Name", ClpString), + new Pair<>("city.Region.Id", Integer), + new Pair<>("city.Region.Name", VarString), + new Pair<>("fare", Float), + new Pair<>("isHoliday", Boolean)))); + + localQueryRunner = new LocalQueryRunner(defaultSession); + localQueryRunner.createCatalog("clp", new ClpConnectorFactory(), ImmutableMap.of( + "clp.metadata-db-url", format(METADATA_DB_URL_TEMPLATE, dbHandle.getDbPath()), + "clp.metadata-db-user", METADATA_DB_USER, + "clp.metadata-db-password", METADATA_DB_PASSWORD, + "clp.metadata-table-prefix", METADATA_DB_TABLE_PREFIX)); + localQueryRunner.getMetadata().registerBuiltInFunctions(extractFunctions(new ClpPlugin().getFunctions())); + functionAndTypeManager = localQueryRunner.getMetadata().getFunctionAndTypeManager(); + functionResolution = new FunctionResolution(functionAndTypeManager.getFunctionAndTypeResolver()); + metadataFilterProvider = new ClpMetadataFilterProvider(new ClpConfig()); + planNodeIdAllocator = new PlanNodeIdAllocator(); + } + + @AfterMethod + public void tearDown() + { + localQueryRunner.close(); + ClpMetadataDbSetUp.tearDown(dbHandle); + } + + @Test + public void testScanProjectFilter() + { + TransactionId transactionId = localQueryRunner.getTransactionManager().beginTransaction(false); + Session session = testSessionBuilder() + .setCatalog("clp") + .setSchema("default") + .setTransactionId(transactionId) + .build(); + + Plan plan = localQueryRunner.createPlan( + session, + "SELECT CLP_GET_STRING('user') from test WHERE CLP_GET_INT('user_id') = 0 AND LOWER(city.Name) = 'BEIJING'", + WarningCollector.NOOP); + ClpPlanOptimizer optimizer = new ClpPlanOptimizer(functionAndTypeManager, functionResolution, metadataFilterProvider); + PlanNode optimizedPlan = optimizer.optimize( + plan.getRoot(), + session.toConnectorSession(), + null, + new PlanNodeIdAllocator()); + log.info(plan.toString()); + PlanAssert.assertPlan( + session, + localQueryRunner.getMetadata(), + (node, sourceStats, lookup, s, types) -> PlanNodeStatsEstimate.unknown(), + new Plan(optimizedPlan, plan.getTypes(), StatsAndCosts.empty()), + anyTree( + project( + ImmutableMap.of( + "clp_get_string", + PlanMatchPattern.expression("user")), + filter( + expression("lower(city.Name) = 'BEIJING'"), + ClpTableScanMatcher.clpTableScanPattern( + new ClpTableLayoutHandle(table, Optional.of("(user_id: 0)"), Optional.empty()), + ImmutableSet.of( + new ClpColumnHandle( + "user", + VarcharType.VARCHAR, + true), + city)))))); + } + + @Test + public void testScanProject() + { + TransactionId transactionId = localQueryRunner.getTransactionManager().beginTransaction(false); + Session session = testSessionBuilder() + .setCatalog("clp") + .setSchema("default") + .setTransactionId(transactionId) + .build(); + + Plan plan = localQueryRunner.createPlan( + session, + "SELECT CLP_GET_STRING('user') FROM test", + WarningCollector.NOOP); + ClpPlanOptimizer optimizer = new ClpPlanOptimizer(functionAndTypeManager, functionResolution, metadataFilterProvider); + PlanNode optimizedPlan = optimizer.optimize( + plan.getRoot(), + session.toConnectorSession(), + null, + planNodeIdAllocator); + + PlanAssert.assertPlan( + session, + localQueryRunner.getMetadata(), + (node, sourceStats, lookup, s, types) -> PlanNodeStatsEstimate.unknown(), + new Plan(optimizedPlan, plan.getTypes(), StatsAndCosts.empty()), + anyTree( + project( + ImmutableMap.of( + "clp_get_string", + PlanMatchPattern.expression("user")), + ClpTableScanMatcher.clpTableScanPattern( + new ClpTableLayoutHandle( + table, + Optional.empty(), + Optional.empty()), + ImmutableSet.of( + new ClpColumnHandle( + "user", + VarcharType.VARCHAR, + true)))))); + } + + @Test + public void testScanFilter() + { + TransactionId transactionId = localQueryRunner.getTransactionManager().beginTransaction(false); + Session session = testSessionBuilder() + .setCatalog("clp") + .setSchema("default") + .setTransactionId(transactionId) + .build(); + + Plan plan = localQueryRunner.createPlan( + session, + "SELECT * FROM test WHERE CLP_GET_INT('user_id') = 0 AND LOWER(city.Name) = 'BEIJING'", + WarningCollector.NOOP); + ClpPlanOptimizer optimizer = new ClpPlanOptimizer(functionAndTypeManager, functionResolution, metadataFilterProvider); + PlanNode optimizedPlan = optimizer.optimize( + plan.getRoot(), + session.toConnectorSession(), + null, + planNodeIdAllocator); + + PlanAssert.assertPlan( + session, + localQueryRunner.getMetadata(), + (node, sourceStats, lookup, s, types) -> PlanNodeStatsEstimate.unknown(), + new Plan(optimizedPlan, plan.getTypes(), StatsAndCosts.empty()), + anyTree( + filter( + expression("lower(city.Name) = 'BEIJING'"), + ClpTableScanMatcher.clpTableScanPattern( + new ClpTableLayoutHandle(table, Optional.of("(user_id: 0)"), Optional.empty()), + ImmutableSet.of(city, fare, isHoliday))))); + } + + private static final class ClpTableScanMatcher + implements Matcher + { + private final ClpTableLayoutHandle expectedLayoutHandle; + private final Set expectedColumns; + + private ClpTableScanMatcher(ClpTableLayoutHandle expectedLayoutHandle, Set expectedColumns) + { + this.expectedLayoutHandle = expectedLayoutHandle; + this.expectedColumns = expectedColumns; + } + + static PlanMatchPattern clpTableScanPattern(ClpTableLayoutHandle layoutHandle, Set columns) + { + return node(TableScanNode.class).with(new ClpTableScanMatcher(layoutHandle, columns)); + } + + @Override + public boolean shapeMatches(PlanNode node) + { + return node instanceof TableScanNode; + } + + @Override + public MatchResult detailMatches( + PlanNode node, + StatsProvider stats, + Session session, + Metadata metadata, + SymbolAliases symbolAliases) + { + checkState(shapeMatches(node), "Plan testing framework error: shapeMatches returned false"); + TableScanNode tableScanNode = (TableScanNode) node; + ClpTableLayoutHandle actualLayoutHandle = (ClpTableLayoutHandle) tableScanNode.getTable().getLayout().get(); + + // Check layout handle + if (!expectedLayoutHandle.equals(actualLayoutHandle)) { + return MatchResult.NO_MATCH; + } + + // Check assignments contain expected columns + Map actualAssignments = tableScanNode.getAssignments(); + Set actualColumns = new HashSet<>(actualAssignments.values()); + + if (!expectedColumns.equals(actualColumns)) { + return MatchResult.NO_MATCH; + } + + SymbolAliases.Builder aliasesBuilder = SymbolAliases.builder(); + for (VariableReferenceExpression variable : tableScanNode.getOutputVariables()) { + aliasesBuilder.put(variable.getName(), new SymbolReference(variable.getName())); + } + + return MatchResult.match(aliasesBuilder.build()); + } + } +} diff --git a/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpQueryBase.java b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpQueryBase.java index e476c439eaa1f..fc8f4cd3cb7be 100644 --- a/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpQueryBase.java +++ b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpQueryBase.java @@ -22,13 +22,13 @@ import com.facebook.presto.metadata.CatalogManager; import com.facebook.presto.metadata.ColumnPropertyManager; import com.facebook.presto.metadata.FunctionAndTypeManager; +import com.facebook.presto.metadata.FunctionExtractor; import com.facebook.presto.metadata.Metadata; import com.facebook.presto.metadata.MetadataManager; import com.facebook.presto.metadata.SchemaPropertyManager; import com.facebook.presto.metadata.TablePropertyManager; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorSession; -import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.function.StandardFunctionResolution; import com.facebook.presto.spi.relation.RowExpression; import com.facebook.presto.spi.relation.VariableReferenceExpression; @@ -64,6 +64,9 @@ public class TestClpQueryBase { protected static final FunctionAndTypeManager functionAndTypeManager = createTestFunctionAndTypeManager(); + static { + functionAndTypeManager.registerBuiltInFunctions(FunctionExtractor.extractFunctions(ClpFunctions.class)); + } protected static final StandardFunctionResolution standardFunctionResolution = new FunctionResolution(functionAndTypeManager.getFunctionAndTypeResolver()); protected static final Metadata metadata = new MetadataManager( functionAndTypeManager, @@ -75,14 +78,13 @@ public class TestClpQueryBase new AnalyzePropertyManager(), createTestTransactionManager(new CatalogManager())); - protected static final ClpTableHandle table = new ClpTableHandle(new SchemaTableName("default", "test"), ""); protected static final ClpColumnHandle city = new ClpColumnHandle( "city", RowType.from(ImmutableList.of( + RowType.field("Name", VARCHAR), RowType.field("Region", RowType.from(ImmutableList.of( RowType.field("Id", BIGINT), - RowType.field("Name", VARCHAR)))), - RowType.field("Name", VARCHAR))), + RowType.field("Name", VARCHAR)))))), true); protected static final ClpColumnHandle fare = new ClpColumnHandle("fare", DOUBLE, true); protected static final ClpColumnHandle isHoliday = new ClpColumnHandle("isHoliday", BOOLEAN, true); diff --git a/presto-docs/src/main/sphinx/connector/clp.rst b/presto-docs/src/main/sphinx/connector/clp.rst index 666c601b85f5d..cc88d9b4b4df1 100644 --- a/presto-docs/src/main/sphinx/connector/clp.rst +++ b/presto-docs/src/main/sphinx/connector/clp.rst @@ -286,6 +286,124 @@ Each JSON log maps to this unified ``ROW`` type, with absent fields represented ``status``, ``thread_num``, ``backtrace``) become fields within the ``ROW``, clearly reflecting the nested and varying structures of the original JSON logs. +CLP Functions +------------- + +In semi-structured logs, the number of potential keys can grow significantly, resulting in extremely wide Presto tables +with many columns. To manage this complexity, the metadata provider may expose only a subset of the full schema, +typically the static fields or those most relevant to expected queries. + +To enable access to dynamic or less common fields not present in the exposed schema, CLP provides two set of functions +to help users query flexible log schemas while keeping the table metadata definition concise. These functions are only +available in the CLP connector and are not part of standard Presto SQL. + +- JSON path functions (e.g., ``CLP_GET_STRING``) +- Wildcard column matching functions for use in filter predicates (e.g., ``CLP_WILDCARD_STRING_COLUMN``) + +There is **no performance penalty** for using these functions. During query optimization, they are rewritten into +references to actual schema-backed columns or valid symbols in KQL queries. This avoids additional parsing overhead and +delivers performance comparable to querying standard columns. + +Path-Based Functions +^^^^^^^^^^^^^^^^^^^^ + +.. function:: CLP_GET_STRING(varchar) -> varchar + + Returns the string value of the given JSON path, where the column type is one of: ``ClpString``, ``VarString``, or + ``DateString``. Returns a Presto ``VARCHAR``. + +.. function:: CLP_GET_INT(varchar) -> bigint + + Returns the string value of the given JSON path, where the column type is ``Integer``, Returns a Presto ``BIGINT``. + +.. function:: CLP_GET_FLOAT(varchar) -> double + + Returns the boolean value of the given JSON path, where the column type is ``Float``. Returns a Presto ``DOUBLE``. + +.. function:: CLP_GET_BOOL(varchar) -> boolean + + Returns the double value of the given JSON path, where the column type is ``Boolean``. Returns a Presto + ``BOOLEAN``. + +.. function:: CLP_GET_STRING_ARRAY(varchar) -> array(varchar) + + Returns the array value of the given JSON path, where the column type is ``UnstructuredArray`` and converts each + element into a string. Returns a Presto ``ARRAY(VARCHAR)``. + +.. note:: + + - JSON paths must be **constant string literals**; variables are not supported. + - Wildcards (e.g., ``msg.*.ts``) are **not supported**. + - If a path is invalid or missing, the function returns ``NULL`` rather than raising an error. + +Examples: + +.. code-block:: sql + + SELECT CLP_GET_STRING(msg.author) AS author + FROM clp.default.table_1 + WHERE CLP_GET_INT('msg.timestamp') > 1620000000; + + SELECT CLP_GET_ARRAY(msg.tags) AS tags + FROM clp.default.table_2 + WHERE CLP_GET_BOOL('msg.is_active') = true; + + +Wildcard Column Functions +^^^^^^^^^^^^^^^^^^^^^^^^^ + +These functions are used to apply filter predicates across all columns of a certain type. They are useful for searching +across unknown or dynamic schemas without specifying exact column names. Similar to the path-based functions, these +functions are rewritten during query optimization to a KQL query that matches the appropriate columns. + +.. function:: CLP_WILDCARD_STRING_COLUMN() -> varchar + + Represents all columns of CLP types: ``ClpString``, ``VarString``, and ``DateString``. + +.. function:: CLP_WILDCARD_INT_COLUMN() -> bigint + + Represents all columns of CLP type: ``Integer``. + +.. function:: CLP_WILDCARD_FLOAT_COLUMN() -> double + + Represents all columns of CLP type: ``Float``. + +.. function:: CLP_WILDCARD_BOOL_COLUMN() -> boolean + + Represents all columns of CLP type: ``Boolean``. + +.. note:: + + - They must appear **only in filter conditions** (`WHERE` clause). They cannot be selected or passed as arguments + to other functions. + - Supported operators are limited to **logical binary comparisons**, including: + + :: + + = (EQUAL) + != (NOT_EQUAL) + < (LESS_THAN) + <= (LESS_THAN_OR_EQUAL) + > (GREATER_THAN) + >= (GREATER_THAN_OR_EQUAL) + + Use of other operators (e.g., arithmetic, `LIKE`, or function calls) with wildcard functions is not allowed and + will result in a query error. + +Examples: + +.. code-block:: sql + + -- Matches if any string column contains "Beijing" + SELECT * + FROM clp.default.table_1 + WHERE CLP_WILDCARD_STRING_COLUMN() = 'Beijing'; + + -- Matches if any integer column equals 1 + SELECT * + FROM clp.default.table_2 + WHERE CLP_WILDCARD_INT_COLUMN() = 1; + SQL support -----------