-
Notifications
You must be signed in to change notification settings - Fork 3
feat: Add CLP_GET_* UDFs with rewrites for schemaless querying.
#42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 17 commits
d840972
a6f7870
a1f37c1
06eb758
f5556a8
2bd80ff
52de22c
445dee2
206b809
eae59b7
ecdaa3f
db8aab4
8598686
ba2262f
8e864bf
a6ea16e
7e10ca9
1175af3
644f77b
c88f1ff
080ab18
13bf115
d501fb5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,68 @@ | ||
| /* | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package com.facebook.presto.plugin.clp; | ||
|
|
||
| import com.facebook.presto.common.block.Block; | ||
| import com.facebook.presto.common.type.StandardTypes; | ||
| import com.facebook.presto.spi.function.Description; | ||
| import com.facebook.presto.spi.function.ScalarFunction; | ||
| import com.facebook.presto.spi.function.SqlType; | ||
| import io.airlift.slice.Slice; | ||
|
|
||
| public final class ClpFunctions | ||
| { | ||
| private ClpFunctions() | ||
| { | ||
| } | ||
|
|
||
|
Comment on lines
+23
to
+28
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Centralise placeholder throwing and improve messages Consolidate the placeholder exception and use consistent, actionable messages so accidental execution fails clearly. public final class ClpFunctions
{
private ClpFunctions()
{
}
+ private static RuntimeException placeholderInvocation(String name)
+ {
+ return new PrestoException(
+ NOT_SUPPORTED,
+ name + " must be rewritten by the CLP query rewriter and should not be executed at runtime.");
+ }
+
@ScalarFunction(value = "CLP_GET_BIGINT", deterministic = false)
@Description("Retrieves an integer value corresponding to the given JSON path.")
@SqlType(StandardTypes.BIGINT)
public static long clpGetBigint(@SqlType(StandardTypes.VARCHAR) Slice jsonPath)
{
- throw new UnsupportedOperationException("CLP_GET_BIGINT is a placeholder function without implementation.");
+ throw placeholderInvocation("CLP_GET_BIGINT");
}
@ScalarFunction(value = "CLP_GET_DOUBLE", deterministic = false)
- @Description("Retrieves a floating point value corresponding to the given JSON path.")
+ @Description("Retrieves a double value corresponding to the given JSON path.")
@SqlType(StandardTypes.DOUBLE)
public static double clpGetDouble(@SqlType(StandardTypes.VARCHAR) Slice jsonPath)
{
- throw new UnsupportedOperationException("CLP_GET_DOUBLE is a placeholder function without implementation.");
+ throw placeholderInvocation("CLP_GET_DOUBLE");
}
@ScalarFunction(value = "CLP_GET_BOOL", deterministic = false)
@Description("Retrieves a boolean value corresponding to the given JSON path.")
@SqlType(StandardTypes.BOOLEAN)
public static boolean clpGetBool(@SqlType(StandardTypes.VARCHAR) Slice jsonPath)
{
- throw new UnsupportedOperationException("CLP_GET_BOOL is a placeholder function without implementation.");
+ throw placeholderInvocation("CLP_GET_BOOL");
}
@ScalarFunction(value = "CLP_GET_STRING", deterministic = false)
@Description("Retrieves a string value corresponding to the given JSON path.")
@SqlType(StandardTypes.VARCHAR)
public static Slice clpGetString(@SqlType(StandardTypes.VARCHAR) Slice jsonPath)
{
- throw new UnsupportedOperationException("CLP_GET_STRING is a placeholder function without implementation.");
+ throw placeholderInvocation("CLP_GET_STRING");
}
@ScalarFunction(value = "CLP_GET_STRING_ARRAY", deterministic = false)
@Description("Retrieves an array value corresponding to the given JSON path and converts each element into a string.")
- @SqlType("ARRAY(VARCHAR)")
+ @SqlType("array(varchar)")
public static Block clpGetStringArray(@SqlType(StandardTypes.VARCHAR) Slice jsonPath)
{
- throw new UnsupportedOperationException("CLP_GET_STRING_ARRAY is a placeholder function without implementation.");
+ throw placeholderInvocation("CLP_GET_STRING_ARRAY");
}
}Also applies to: 29-36, 37-44, 45-52, 53-60, 61-67 🤖 Prompt for AI Agents |
||
| @ScalarFunction(value = "CLP_GET_INT", deterministic = false) | ||
anlowee marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| @Description("Retrieves an integer value corresponding to the given JSON path.") | ||
| @SqlType(StandardTypes.BIGINT) | ||
| public static long clpGetInt(@SqlType(StandardTypes.VARCHAR) Slice jsonPath) | ||
| { | ||
| throw new UnsupportedOperationException("CLP_GET_INT is a placeholder function without implementation."); | ||
| } | ||
|
|
||
| @ScalarFunction(value = "CLP_GET_FLOAT", deterministic = false) | ||
|
||
| @Description("Retrieves a floating point value corresponding to the given JSON path.") | ||
anlowee marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| @SqlType(StandardTypes.DOUBLE) | ||
| public static double clpGetFloat(@SqlType(StandardTypes.VARCHAR) Slice jsonPath) | ||
| { | ||
| throw new UnsupportedOperationException("CLP_GET_FLOAT is a placeholder function without implementation."); | ||
| } | ||
|
|
||
| @ScalarFunction(value = "CLP_GET_BOOL", deterministic = false) | ||
| @Description("Retrieves a boolean value corresponding to the given JSON path.") | ||
| @SqlType(StandardTypes.BOOLEAN) | ||
| public static boolean clpGetBool(@SqlType(StandardTypes.VARCHAR) Slice jsonPath) | ||
| { | ||
| throw new UnsupportedOperationException("CLP_GET_BOOL is a placeholder function without implementation."); | ||
| } | ||
|
|
||
| @ScalarFunction(value = "CLP_GET_STRING", deterministic = false) | ||
| @Description("Retrieves a string value corresponding to the given JSON path.") | ||
| @SqlType(StandardTypes.VARCHAR) | ||
| public static Slice clpGetString(@SqlType(StandardTypes.VARCHAR) Slice jsonPath) | ||
| { | ||
| throw new UnsupportedOperationException("CLP_GET_STRING is a placeholder function without implementation."); | ||
| } | ||
|
|
||
| @ScalarFunction(value = "CLP_GET_STRING_ARRAY", deterministic = false) | ||
| @Description("Retrieves an array value corresponding to the given JSON path and converts each element into a string.") | ||
anlowee marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| @SqlType("ARRAY(VARCHAR)") | ||
| public static Block clpGetStringArray(@SqlType(StandardTypes.VARCHAR) Slice jsonPath) | ||
|
Comment on lines
+61
to
+64
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick (assertive) Normalise type literal casing in @SqlType Presto annotations typically use lower-case type literals (array(varchar)). The parser is case-insensitive, but lower-case improves consistency. 🤖 Prompt for AI Agents |
||
| { | ||
| throw new UnsupportedOperationException("CLP_GET_STRING_ARRAY is a placeholder function without implementation."); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -11,9 +11,12 @@ | |||||||||||||||||||||||||||||||||
| * See the License for the specific language governing permissions and | ||||||||||||||||||||||||||||||||||
| * limitations under the License. | ||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||
| package com.facebook.presto.plugin.clp; | ||||||||||||||||||||||||||||||||||
| package com.facebook.presto.plugin.clp.optimization; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| import com.facebook.airlift.log.Logger; | ||||||||||||||||||||||||||||||||||
| import com.facebook.presto.plugin.clp.ClpExpression; | ||||||||||||||||||||||||||||||||||
| import com.facebook.presto.plugin.clp.ClpTableHandle; | ||||||||||||||||||||||||||||||||||
| import com.facebook.presto.plugin.clp.ClpTableLayoutHandle; | ||||||||||||||||||||||||||||||||||
| import com.facebook.presto.plugin.clp.split.filter.ClpSplitFilterProvider; | ||||||||||||||||||||||||||||||||||
| import com.facebook.presto.spi.ColumnHandle; | ||||||||||||||||||||||||||||||||||
| import com.facebook.presto.spi.ConnectorPlanOptimizer; | ||||||||||||||||||||||||||||||||||
|
|
@@ -31,7 +34,6 @@ | |||||||||||||||||||||||||||||||||
| import com.facebook.presto.spi.relation.VariableReferenceExpression; | ||||||||||||||||||||||||||||||||||
| import com.google.common.collect.ImmutableSet; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| import java.util.HashMap; | ||||||||||||||||||||||||||||||||||
| import java.util.HashSet; | ||||||||||||||||||||||||||||||||||
| import java.util.Map; | ||||||||||||||||||||||||||||||||||
| import java.util.Optional; | ||||||||||||||||||||||||||||||||||
|
|
@@ -42,15 +44,15 @@ | |||||||||||||||||||||||||||||||||
| import static java.lang.String.format; | ||||||||||||||||||||||||||||||||||
| import static java.util.Objects.requireNonNull; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| public class ClpPlanOptimizer | ||||||||||||||||||||||||||||||||||
| public class ClpComputePushDown | ||||||||||||||||||||||||||||||||||
| implements ConnectorPlanOptimizer | ||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||
| private static final Logger log = Logger.get(ClpPlanOptimizer.class); | ||||||||||||||||||||||||||||||||||
| private static final Logger log = Logger.get(ClpComputePushDown.class); | ||||||||||||||||||||||||||||||||||
| private final FunctionMetadataManager functionManager; | ||||||||||||||||||||||||||||||||||
| private final StandardFunctionResolution functionResolution; | ||||||||||||||||||||||||||||||||||
| private final ClpSplitFilterProvider splitFilterProvider; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| public ClpPlanOptimizer(FunctionMetadataManager functionManager, StandardFunctionResolution functionResolution, ClpSplitFilterProvider splitFilterProvider) | ||||||||||||||||||||||||||||||||||
| public ClpComputePushDown(FunctionMetadataManager functionManager, StandardFunctionResolution functionResolution, ClpSplitFilterProvider splitFilterProvider) | ||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||
| this.functionManager = requireNonNull(functionManager, "functionManager is null"); | ||||||||||||||||||||||||||||||||||
| this.functionResolution = requireNonNull(functionResolution, "functionResolution is null"); | ||||||||||||||||||||||||||||||||||
|
|
@@ -99,59 +101,75 @@ public PlanNode visitFilter(FilterNode node, RewriteContext<Void> context) | |||||||||||||||||||||||||||||||||
| if (!(node.getSource() instanceof TableScanNode)) { | ||||||||||||||||||||||||||||||||||
| return node; | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| return processFilter(node, (TableScanNode) node.getSource()); | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
wraymo marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| private PlanNode processFilter(FilterNode filterNode, TableScanNode tableScanNode) | ||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||
| hasVisitedFilter = true; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| TableScanNode tableScanNode = (TableScanNode) node.getSource(); | ||||||||||||||||||||||||||||||||||
| Map<VariableReferenceExpression, ColumnHandle> assignments = new HashMap<>(tableScanNode.getAssignments()); | ||||||||||||||||||||||||||||||||||
| TableHandle tableHandle = tableScanNode.getTable(); | ||||||||||||||||||||||||||||||||||
| ClpTableHandle clpTableHandle = (ClpTableHandle) tableHandle.getConnectorHandle(); | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| String tableScope = CONNECTOR_NAME + "." + clpTableHandle.getSchemaTableName().toString(); | ||||||||||||||||||||||||||||||||||
| ClpExpression clpExpression = node.getPredicate().accept( | ||||||||||||||||||||||||||||||||||
| Map<VariableReferenceExpression, ColumnHandle> assignments = tableScanNode.getAssignments(); | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
|
Comment on lines
110
to
+117
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Remove boolean gate and mark the table as “handled” in processFilter Drop hasVisitedFilter and remove the tableScope from tablesWithoutFilter. Apply this diff: - hasVisitedFilter = true;
-
TableHandle tableHandle = tableScanNode.getTable();
ClpTableHandle clpTableHandle = (ClpTableHandle) tableHandle.getConnectorHandle();
- String tableScope = CONNECTOR_NAME + "." + clpTableHandle.getSchemaTableName().toString();
+ String tableScope = format("%s.%s", CONNECTOR_NAME, clpTableHandle.getSchemaTableName());
+ tablesWithoutFilter.remove(tableScope);
Map<VariableReferenceExpression, ColumnHandle> assignments = tableScanNode.getAssignments();📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||
| ClpExpression clpExpression = filterNode.getPredicate().accept( | ||||||||||||||||||||||||||||||||||
| new ClpFilterToKqlConverter( | ||||||||||||||||||||||||||||||||||
| functionResolution, | ||||||||||||||||||||||||||||||||||
| functionManager, | ||||||||||||||||||||||||||||||||||
| assignments, | ||||||||||||||||||||||||||||||||||
| splitFilterProvider.getColumnNames(tableScope)), | ||||||||||||||||||||||||||||||||||
| assignments); | ||||||||||||||||||||||||||||||||||
| null); | ||||||||||||||||||||||||||||||||||
| Optional<String> kqlQuery = clpExpression.getPushDownExpression(); | ||||||||||||||||||||||||||||||||||
| Optional<String> metadataSqlQuery = clpExpression.getMetadataSqlQuery(); | ||||||||||||||||||||||||||||||||||
| Optional<RowExpression> remainingPredicate = clpExpression.getRemainingExpression(); | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // Perform required metadata filter checks before handling the KQL query (if kqlQuery | ||||||||||||||||||||||||||||||||||
| // isn't present, we'll return early, skipping subsequent checks). | ||||||||||||||||||||||||||||||||||
| splitFilterProvider.checkContainsRequiredFilters(ImmutableSet.of(tableScope), metadataSqlQuery.orElse("")); | ||||||||||||||||||||||||||||||||||
| if (metadataSqlQuery.isPresent()) { | ||||||||||||||||||||||||||||||||||
| boolean hasMetadataFilter = metadataSqlQuery.isPresent() && !metadataSqlQuery.get().isEmpty(); | ||||||||||||||||||||||||||||||||||
anlowee marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||
| if (hasMetadataFilter) { | ||||||||||||||||||||||||||||||||||
| metadataSqlQuery = Optional.of(splitFilterProvider.remapSplitFilterPushDownExpression(tableScope, metadataSqlQuery.get())); | ||||||||||||||||||||||||||||||||||
| log.debug("Metadata SQL query: %s", metadataSqlQuery); | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| if (!kqlQuery.isPresent()) { | ||||||||||||||||||||||||||||||||||
| return node; | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| log.debug("KQL query: %s", kqlQuery); | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| ClpTableLayoutHandle clpTableLayoutHandle = new ClpTableLayoutHandle(clpTableHandle, kqlQuery, metadataSqlQuery); | ||||||||||||||||||||||||||||||||||
| TableScanNode newTableScanNode = new TableScanNode( | ||||||||||||||||||||||||||||||||||
| tableScanNode.getSourceLocation(), | ||||||||||||||||||||||||||||||||||
| idAllocator.getNextId(), | ||||||||||||||||||||||||||||||||||
| new TableHandle( | ||||||||||||||||||||||||||||||||||
| tableHandle.getConnectorId(), | ||||||||||||||||||||||||||||||||||
| clpTableHandle, | ||||||||||||||||||||||||||||||||||
| tableHandle.getTransaction(), | ||||||||||||||||||||||||||||||||||
| Optional.of(clpTableLayoutHandle)), | ||||||||||||||||||||||||||||||||||
| tableScanNode.getOutputVariables(), | ||||||||||||||||||||||||||||||||||
| tableScanNode.getAssignments(), | ||||||||||||||||||||||||||||||||||
| tableScanNode.getTableConstraints(), | ||||||||||||||||||||||||||||||||||
| tableScanNode.getCurrentConstraint(), | ||||||||||||||||||||||||||||||||||
| tableScanNode.getEnforcedConstraint(), | ||||||||||||||||||||||||||||||||||
| tableScanNode.getCteMaterializationInfo()); | ||||||||||||||||||||||||||||||||||
| if (!remainingPredicate.isPresent()) { | ||||||||||||||||||||||||||||||||||
| return newTableScanNode; | ||||||||||||||||||||||||||||||||||
| if (kqlQuery.isPresent() || hasMetadataFilter) { | ||||||||||||||||||||||||||||||||||
| if (kqlQuery.isPresent()) { | ||||||||||||||||||||||||||||||||||
| log.debug("KQL query: %s", kqlQuery); | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
wraymo marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||
| ClpTableLayoutHandle layoutHandle = new ClpTableLayoutHandle(clpTableHandle, kqlQuery, metadataSqlQuery); | ||||||||||||||||||||||||||||||||||
| TableHandle newTableHandle = new TableHandle( | ||||||||||||||||||||||||||||||||||
| tableHandle.getConnectorId(), | ||||||||||||||||||||||||||||||||||
| clpTableHandle, | ||||||||||||||||||||||||||||||||||
| tableHandle.getTransaction(), | ||||||||||||||||||||||||||||||||||
| Optional.of(layoutHandle)); | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| tableScanNode = new TableScanNode( | ||||||||||||||||||||||||||||||||||
| tableScanNode.getSourceLocation(), | ||||||||||||||||||||||||||||||||||
| idAllocator.getNextId(), | ||||||||||||||||||||||||||||||||||
| newTableHandle, | ||||||||||||||||||||||||||||||||||
| tableScanNode.getOutputVariables(), | ||||||||||||||||||||||||||||||||||
| tableScanNode.getAssignments(), | ||||||||||||||||||||||||||||||||||
| tableScanNode.getTableConstraints(), | ||||||||||||||||||||||||||||||||||
| tableScanNode.getCurrentConstraint(), | ||||||||||||||||||||||||||||||||||
| tableScanNode.getEnforcedConstraint(), | ||||||||||||||||||||||||||||||||||
| tableScanNode.getCteMaterializationInfo()); | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| return new FilterNode(node.getSourceLocation(), | ||||||||||||||||||||||||||||||||||
| idAllocator.getNextId(), | ||||||||||||||||||||||||||||||||||
| newTableScanNode, | ||||||||||||||||||||||||||||||||||
| remainingPredicate.get()); | ||||||||||||||||||||||||||||||||||
| if (remainingPredicate.isPresent()) { | ||||||||||||||||||||||||||||||||||
| // Not all predicate pushed down, need new FilterNode | ||||||||||||||||||||||||||||||||||
| return new FilterNode( | ||||||||||||||||||||||||||||||||||
| filterNode.getSourceLocation(), | ||||||||||||||||||||||||||||||||||
| idAllocator.getNextId(), | ||||||||||||||||||||||||||||||||||
| tableScanNode, | ||||||||||||||||||||||||||||||||||
| remainingPredicate.get()); | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| else { | ||||||||||||||||||||||||||||||||||
| return tableScanNode; | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Use PrestoException with NOT_SUPPORTED for clearer user-facing errors
Throwing UnsupportedOperationException surfaces as an internal error to users. Prefer a PrestoException with NOT_SUPPORTED and a clear action message.
📝 Committable suggestion
🤖 Prompt for AI Agents