Skip to content

Commit 0fc04bd

Browse files
authored
feat: Add CLP_GET_* UDFs with rewrites for schemaless querying. (#42)
1 parent a07fe2f commit 0fc04bd

File tree

13 files changed

+848
-109
lines changed

13 files changed

+848
-109
lines changed

presto-clp/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,13 @@
135135
<scope>test</scope>
136136
</dependency>
137137

138+
<dependency>
139+
<groupId>com.facebook.presto</groupId>
140+
<artifactId>presto-main-base</artifactId>
141+
<type>test-jar</type>
142+
<scope>test</scope>
143+
</dependency>
144+
138145
<dependency>
139146
<groupId>org.apache.commons</groupId>
140147
<artifactId>commons-math3</artifactId>

presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConnector.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import com.facebook.airlift.bootstrap.LifeCycleManager;
1717
import com.facebook.airlift.log.Logger;
18+
import com.facebook.presto.plugin.clp.optimization.ClpPlanOptimizerProvider;
1819
import com.facebook.presto.plugin.clp.split.filter.ClpSplitFilterProvider;
1920
import com.facebook.presto.spi.connector.Connector;
2021
import com.facebook.presto.spi.connector.ConnectorMetadata;
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.plugin.clp;
15+
16+
import com.facebook.presto.common.block.Block;
17+
import com.facebook.presto.common.type.StandardTypes;
18+
import com.facebook.presto.spi.function.Description;
19+
import com.facebook.presto.spi.function.ScalarFunction;
20+
import com.facebook.presto.spi.function.SqlType;
21+
import io.airlift.slice.Slice;
22+
23+
public final class ClpFunctions
24+
{
25+
private ClpFunctions()
26+
{
27+
}
28+
29+
@ScalarFunction(value = "CLP_GET_BIGINT", deterministic = false)
30+
@Description("Retrieves an integer value corresponding to the given JSON path.")
31+
@SqlType(StandardTypes.BIGINT)
32+
public static long clpGetBigint(@SqlType(StandardTypes.VARCHAR) Slice jsonPath)
33+
{
34+
throw new UnsupportedOperationException("CLP_GET_BIGINT is a placeholder function without implementation.");
35+
}
36+
37+
@ScalarFunction(value = "CLP_GET_DOUBLE", deterministic = false)
38+
@Description("Retrieves a floating point value corresponding to the given JSON path.")
39+
@SqlType(StandardTypes.DOUBLE)
40+
public static double clpGetDouble(@SqlType(StandardTypes.VARCHAR) Slice jsonPath)
41+
{
42+
throw new UnsupportedOperationException("CLP_GET_DOUBLE is a placeholder function without implementation.");
43+
}
44+
45+
@ScalarFunction(value = "CLP_GET_BOOL", deterministic = false)
46+
@Description("Retrieves a boolean value corresponding to the given JSON path.")
47+
@SqlType(StandardTypes.BOOLEAN)
48+
public static boolean clpGetBool(@SqlType(StandardTypes.VARCHAR) Slice jsonPath)
49+
{
50+
throw new UnsupportedOperationException("CLP_GET_BOOL is a placeholder function without implementation.");
51+
}
52+
53+
@ScalarFunction(value = "CLP_GET_STRING", deterministic = false)
54+
@Description("Retrieves a string value corresponding to the given JSON path.")
55+
@SqlType(StandardTypes.VARCHAR)
56+
public static Slice clpGetString(@SqlType(StandardTypes.VARCHAR) Slice jsonPath)
57+
{
58+
throw new UnsupportedOperationException("CLP_GET_STRING is a placeholder function without implementation.");
59+
}
60+
61+
@ScalarFunction(value = "CLP_GET_STRING_ARRAY", deterministic = false)
62+
@Description("Retrieves an array value corresponding to the given JSON path and converts each element into a string.")
63+
@SqlType("ARRAY(VARCHAR)")
64+
public static Block clpGetStringArray(@SqlType(StandardTypes.VARCHAR) Slice jsonPath)
65+
{
66+
throw new UnsupportedOperationException("CLP_GET_STRING_ARRAY is a placeholder function without implementation.");
67+
}
68+
}

presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlugin.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
import com.facebook.presto.spi.Plugin;
1717
import com.facebook.presto.spi.connector.ConnectorFactory;
1818
import com.google.common.collect.ImmutableList;
19+
import com.google.common.collect.ImmutableSet;
20+
21+
import java.util.Set;
1922

2023
public class ClpPlugin
2124
implements Plugin
@@ -25,4 +28,10 @@ public Iterable<ConnectorFactory> getConnectorFactories()
2528
{
2629
return ImmutableList.of(new ClpConnectorFactory());
2730
}
31+
32+
@Override
33+
public Set<Class<?>> getFunctions()
34+
{
35+
return ImmutableSet.of(ClpFunctions.class);
36+
}
2837
}

presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlanOptimizer.java renamed to presto-clp/src/main/java/com/facebook/presto/plugin/clp/optimization/ClpComputePushDown.java

Lines changed: 55 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,12 @@
1111
* See the License for the specific language governing permissions and
1212
* limitations under the License.
1313
*/
14-
package com.facebook.presto.plugin.clp;
14+
package com.facebook.presto.plugin.clp.optimization;
1515

1616
import com.facebook.airlift.log.Logger;
17+
import com.facebook.presto.plugin.clp.ClpExpression;
18+
import com.facebook.presto.plugin.clp.ClpTableHandle;
19+
import com.facebook.presto.plugin.clp.ClpTableLayoutHandle;
1720
import com.facebook.presto.plugin.clp.split.filter.ClpSplitFilterProvider;
1821
import com.facebook.presto.spi.ColumnHandle;
1922
import com.facebook.presto.spi.ConnectorPlanOptimizer;
@@ -31,7 +34,6 @@
3134
import com.facebook.presto.spi.relation.VariableReferenceExpression;
3235
import com.google.common.collect.ImmutableSet;
3336

34-
import java.util.HashMap;
3537
import java.util.HashSet;
3638
import java.util.Map;
3739
import java.util.Optional;
@@ -42,15 +44,15 @@
4244
import static java.lang.String.format;
4345
import static java.util.Objects.requireNonNull;
4446

45-
public class ClpPlanOptimizer
47+
public class ClpComputePushDown
4648
implements ConnectorPlanOptimizer
4749
{
48-
private static final Logger log = Logger.get(ClpPlanOptimizer.class);
50+
private static final Logger log = Logger.get(ClpComputePushDown.class);
4951
private final FunctionMetadataManager functionManager;
5052
private final StandardFunctionResolution functionResolution;
5153
private final ClpSplitFilterProvider splitFilterProvider;
5254

53-
public ClpPlanOptimizer(FunctionMetadataManager functionManager, StandardFunctionResolution functionResolution, ClpSplitFilterProvider splitFilterProvider)
55+
public ClpComputePushDown(FunctionMetadataManager functionManager, StandardFunctionResolution functionResolution, ClpSplitFilterProvider splitFilterProvider)
5456
{
5557
this.functionManager = requireNonNull(functionManager, "functionManager is null");
5658
this.functionResolution = requireNonNull(functionResolution, "functionResolution is null");
@@ -99,59 +101,75 @@ public PlanNode visitFilter(FilterNode node, RewriteContext<Void> context)
99101
if (!(node.getSource() instanceof TableScanNode)) {
100102
return node;
101103
}
104+
105+
return processFilter(node, (TableScanNode) node.getSource());
106+
}
107+
108+
private PlanNode processFilter(FilterNode filterNode, TableScanNode tableScanNode)
109+
{
102110
hasVisitedFilter = true;
103111

104-
TableScanNode tableScanNode = (TableScanNode) node.getSource();
105-
Map<VariableReferenceExpression, ColumnHandle> assignments = new HashMap<>(tableScanNode.getAssignments());
106112
TableHandle tableHandle = tableScanNode.getTable();
107113
ClpTableHandle clpTableHandle = (ClpTableHandle) tableHandle.getConnectorHandle();
114+
108115
String tableScope = CONNECTOR_NAME + "." + clpTableHandle.getSchemaTableName().toString();
109-
ClpExpression clpExpression = node.getPredicate().accept(
116+
Map<VariableReferenceExpression, ColumnHandle> assignments = tableScanNode.getAssignments();
117+
118+
ClpExpression clpExpression = filterNode.getPredicate().accept(
110119
new ClpFilterToKqlConverter(
111120
functionResolution,
112121
functionManager,
122+
assignments,
113123
splitFilterProvider.getColumnNames(tableScope)),
114-
assignments);
124+
null);
115125
Optional<String> kqlQuery = clpExpression.getPushDownExpression();
116126
Optional<String> metadataSqlQuery = clpExpression.getMetadataSqlQuery();
117127
Optional<RowExpression> remainingPredicate = clpExpression.getRemainingExpression();
118128

119129
// Perform required metadata filter checks before handling the KQL query (if kqlQuery
120130
// isn't present, we'll return early, skipping subsequent checks).
121131
splitFilterProvider.checkContainsRequiredFilters(ImmutableSet.of(tableScope), metadataSqlQuery.orElse(""));
122-
if (metadataSqlQuery.isPresent()) {
132+
boolean hasMetadataFilter = metadataSqlQuery.isPresent() && !metadataSqlQuery.get().isEmpty();
133+
if (hasMetadataFilter) {
123134
metadataSqlQuery = Optional.of(splitFilterProvider.remapSplitFilterPushDownExpression(tableScope, metadataSqlQuery.get()));
124-
log.debug("Metadata SQL query: %s", metadataSqlQuery);
135+
log.debug("Metadata SQL query: %s", metadataSqlQuery.get());
125136
}
126137

127-
if (!kqlQuery.isPresent()) {
128-
return node;
129-
}
130-
log.debug("KQL query: %s", kqlQuery);
131-
132-
ClpTableLayoutHandle clpTableLayoutHandle = new ClpTableLayoutHandle(clpTableHandle, kqlQuery, metadataSqlQuery);
133-
TableScanNode newTableScanNode = new TableScanNode(
134-
tableScanNode.getSourceLocation(),
135-
idAllocator.getNextId(),
136-
new TableHandle(
137-
tableHandle.getConnectorId(),
138-
clpTableHandle,
139-
tableHandle.getTransaction(),
140-
Optional.of(clpTableLayoutHandle)),
141-
tableScanNode.getOutputVariables(),
142-
tableScanNode.getAssignments(),
143-
tableScanNode.getTableConstraints(),
144-
tableScanNode.getCurrentConstraint(),
145-
tableScanNode.getEnforcedConstraint(),
146-
tableScanNode.getCteMaterializationInfo());
147-
if (!remainingPredicate.isPresent()) {
148-
return newTableScanNode;
138+
if (kqlQuery.isPresent() || hasMetadataFilter) {
139+
if (kqlQuery.isPresent()) {
140+
log.debug("KQL query: %s", kqlQuery.get());
141+
}
142+
143+
ClpTableLayoutHandle layoutHandle = new ClpTableLayoutHandle(clpTableHandle, kqlQuery, metadataSqlQuery);
144+
TableHandle newTableHandle = new TableHandle(
145+
tableHandle.getConnectorId(),
146+
clpTableHandle,
147+
tableHandle.getTransaction(),
148+
Optional.of(layoutHandle));
149+
150+
tableScanNode = new TableScanNode(
151+
tableScanNode.getSourceLocation(),
152+
idAllocator.getNextId(),
153+
newTableHandle,
154+
tableScanNode.getOutputVariables(),
155+
tableScanNode.getAssignments(),
156+
tableScanNode.getTableConstraints(),
157+
tableScanNode.getCurrentConstraint(),
158+
tableScanNode.getEnforcedConstraint(),
159+
tableScanNode.getCteMaterializationInfo());
149160
}
150161

151-
return new FilterNode(node.getSourceLocation(),
152-
idAllocator.getNextId(),
153-
newTableScanNode,
154-
remainingPredicate.get());
162+
if (remainingPredicate.isPresent()) {
163+
// Not all predicate pushed down, need new FilterNode
164+
return new FilterNode(
165+
filterNode.getSourceLocation(),
166+
idAllocator.getNextId(),
167+
tableScanNode,
168+
remainingPredicate.get());
169+
}
170+
else {
171+
return tableScanNode;
172+
}
155173
}
156174
}
157175
}

0 commit comments

Comments
 (0)