Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,12 @@ public static boolean clpWildcardBoolColumn()
{
throw new UnsupportedOperationException("CLP_WILDCARD_BOOL_COLUMN is a placeholder function without implementation.");
}

@ScalarFunction(value = "CLP_GET_JSON_STRING", deterministic = false)
@Description("Converts an entire log record into a JSON string.")
@SqlType(StandardTypes.VARCHAR)
public static Slice clpGetJSONString()
{
throw new UnsupportedOperationException("CLP_GET_JSON_STRING is a placeholder function without implementation.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
public final class ClpUdfRewriter
implements ConnectorPlanOptimizer
{
public static final String JSON_STRING_PLACEHOLDER = "__json_string";
private final FunctionMetadataManager functionManager;

public ClpUdfRewriter(FunctionMetadataManager functionManager)
Expand Down Expand Up @@ -123,17 +124,43 @@ public PlanNode visitProject(ProjectNode node, RewriteContext<Void> context)
for (Map.Entry<VariableReferenceExpression, RowExpression> entry : node.getAssignments().getMap().entrySet()) {
newAssignments.put(
entry.getKey(),
rewriteClpUdfs(entry.getValue(), functionManager, variableAllocator));
rewriteClpUdfs(entry.getValue(), functionManager, variableAllocator, true));
}

PlanNode newSource = rewritePlanSubtree(node.getSource());
PlanNode newSource = node.getSource().accept(this, context);
return new ProjectNode(node.getSourceLocation(), idAllocator.getNextId(), newSource, newAssignments.build(), node.getLocality());
}

@Override
public PlanNode visitFilter(FilterNode node, RewriteContext<Void> context)
{
return buildNewFilterNode(node);
RowExpression newPredicate = rewriteClpUdfs(node.getPredicate(), functionManager, variableAllocator, false);
PlanNode newSource = node.getSource().accept(this, context);
return new FilterNode(node.getSourceLocation(), idAllocator.getNextId(), newSource, newPredicate);
}

@Override
public PlanNode visitTableScan(TableScanNode node, RewriteContext<Void> context)
{
Set<VariableReferenceExpression> outputVars = new LinkedHashSet<>(node.getOutputVariables());
Map<VariableReferenceExpression, ColumnHandle> newAssignments = new HashMap<>(node.getAssignments());

// Add any missing variables for known handles
globalColumnVarMap.forEach((handle, var) -> {
outputVars.add(var);
newAssignments.put(var, handle);
});

return new TableScanNode(
node.getSourceLocation(),
idAllocator.getNextId(),
node.getTable(),
new ArrayList<>(outputVars),
newAssignments,
node.getTableConstraints(),
node.getCurrentConstraint(),
node.getEnforcedConstraint(),
node.getCteMaterializationInfo());
}

/**
Expand All @@ -148,20 +175,32 @@ public PlanNode visitFilter(FilterNode node, RewriteContext<Void> context)
* @param expression the input expression to analyze and possibly rewrite
* @param functionManager function manager used to resolve function metadata
* @param variableAllocator variable allocator used to create new variable references
* @param inProjectNode whether the CLP UDFs are in a {@link ProjectNode}
* @return a possibly rewritten {@link RowExpression} with <code>CLP_GET_*</code> calls
* replaced
*/
private RowExpression rewriteClpUdfs(
RowExpression expression,
FunctionMetadataManager functionManager,
VariableAllocator variableAllocator)
VariableAllocator variableAllocator,
boolean inProjectNode)
{
// Handle CLP_GET_* function calls
if (expression instanceof CallExpression) {
CallExpression call = (CallExpression) expression;
String functionName = functionManager.getFunctionMetadata(call.getFunctionHandle()).getName().getObjectName().toUpperCase();

if (functionName.startsWith("CLP_GET_")) {
if (inProjectNode && functionName.equals("CLP_GET_JSON_STRING")) {
VariableReferenceExpression newValue = variableAllocator.newVariable(
expression.getSourceLocation(),
JSON_STRING_PLACEHOLDER,
call.getType());
ClpColumnHandle targetHandle = new ClpColumnHandle(JSON_STRING_PLACEHOLDER, call.getType());

globalColumnVarMap.put(targetHandle, newValue);
return newValue;
}
else if (functionName.startsWith("CLP_GET_")) {
if (call.getArguments().size() != 1 || !(call.getArguments().get(0) instanceof ConstantExpression)) {
throw new PrestoException(CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION,
"CLP_GET_* UDF must have a single constant string argument");
Expand All @@ -187,7 +226,7 @@ private RowExpression rewriteClpUdfs(

// Recurse into arguments
List<RowExpression> rewrittenArgs = call.getArguments().stream()
.map(arg -> rewriteClpUdfs(arg, functionManager, variableAllocator))
.map(arg -> rewriteClpUdfs(arg, functionManager, variableAllocator, inProjectNode))
.collect(toImmutableList());

return new CallExpression(call.getDisplayName(), call.getFunctionHandle(), call.getType(), rewrittenArgs);
Expand All @@ -198,7 +237,7 @@ private RowExpression rewriteClpUdfs(
SpecialFormExpression special = (SpecialFormExpression) expression;

List<RowExpression> rewrittenArgs = special.getArguments().stream()
.map(arg -> rewriteClpUdfs(arg, functionManager, variableAllocator))
.map(arg -> rewriteClpUdfs(arg, functionManager, variableAllocator, inProjectNode))
.collect(toImmutableList());

return new SpecialFormExpression(special.getSourceLocation(), special.getForm(), special.getType(), rewrittenArgs);
Expand All @@ -207,29 +246,6 @@ private RowExpression rewriteClpUdfs(
return expression;
}

/**
* Recursively rewrites the subtree of a plan node to include any new variables produced by
* CLP UDF rewrites.
*
* @param node the plan node to rewrite
* @return the rewritten plan node
*/
private PlanNode rewritePlanSubtree(PlanNode node)
{
if (node instanceof TableScanNode) {
return buildNewTableScanNode((TableScanNode) node);
}
else if (node instanceof FilterNode) {
return buildNewFilterNode((FilterNode) node);
}

List<PlanNode> rewrittenChildren = node.getSources().stream()
.map(source -> rewritePlanSubtree(source))
.collect(toImmutableList());

return node.replaceChildren(rewrittenChildren);
}

/**
* Encodes a JSON path into a valid variable name by replacing uppercase letters with
* "_ux<lowercase letter>", dots with "_dot_", and underscores with "_und_".
Expand Down Expand Up @@ -259,48 +275,5 @@ else if (c == '_') {
}
return sb.toString();
}

/**
* Builds a new {@link TableScanNode} that includes additional
* {@link VariableReferenceExpression}s and {@link ColumnHandle}s for rewritten CLP UDFs.
*
* @param node the original table scan node
* @return the updated table scan node
*/
private TableScanNode buildNewTableScanNode(TableScanNode node)
{
Set<VariableReferenceExpression> outputVars = new LinkedHashSet<>(node.getOutputVariables());
Map<VariableReferenceExpression, ColumnHandle> newAssignments = new HashMap<>(node.getAssignments());

// Add any missing variables for known handles
globalColumnVarMap.forEach((handle, var) -> {
outputVars.add(var);
newAssignments.put(var, handle);
});

return new TableScanNode(
node.getSourceLocation(),
idAllocator.getNextId(),
node.getTable(),
new ArrayList<>(outputVars),
newAssignments,
node.getTableConstraints(),
node.getCurrentConstraint(),
node.getEnforcedConstraint(),
node.getCteMaterializationInfo());
}

/**
* Builds a new {@link FilterNode} with its predicate rewritten to replace CLP UDF calls.
*
* @param node the original filter node
* @return the updated filter node
*/
private FilterNode buildNewFilterNode(FilterNode node)
{
RowExpression newPredicate = rewriteClpUdfs(node.getPredicate(), functionManager, variableAllocator);
PlanNode newSource = rewritePlanSubtree(node.getSource());
return new FilterNode(node.getSourceLocation(), idAllocator.getNextId(), newSource, newPredicate);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.Float;
import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.Integer;
import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.VarString;
import static com.facebook.presto.plugin.clp.optimization.ClpUdfRewriter.JSON_STRING_PLACEHOLDER;
import static com.facebook.presto.sql.planner.assertions.MatchResult.NO_MATCH;
import static com.facebook.presto.sql.planner.assertions.MatchResult.match;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree;
Expand Down Expand Up @@ -140,7 +141,7 @@ public void tearDown()
}

@Test
public void testScanFilter()
public void testClpGetScanFilter()
{
TransactionId transactionId = localQueryRunner.getTransactionManager().beginTransaction(false);
Session session = testSessionBuilder().setCatalog("clp").setSchema("default").setTransactionId(transactionId).build();
Expand Down Expand Up @@ -179,7 +180,7 @@ public void testScanFilter()
}

@Test
public void testScanProject()
public void testClpGetScanProject()
{
TransactionId transactionId = localQueryRunner.getTransactionManager().beginTransaction(false);
Session session = testSessionBuilder().setCatalog("clp").setSchema("default").setTransactionId(transactionId).build();
Expand Down Expand Up @@ -229,7 +230,7 @@ public void testScanProject()
}

@Test
public void testScanProjectFilter()
public void testClpGetScanProjectFilter()
{
TransactionId transactionId = localQueryRunner.getTransactionManager().beginTransaction(false);
Session session = testSessionBuilder().setCatalog("clp").setSchema("default").setTransactionId(transactionId).build();
Expand Down Expand Up @@ -265,6 +266,39 @@ public void testScanProjectFilter()
city))))));
}

@Test
public void testClpGetJsonString()
{
TransactionId transactionId = localQueryRunner.getTransactionManager().beginTransaction(false);
Session session = testSessionBuilder().setCatalog("clp").setSchema("default").setTransactionId(transactionId).build();

Plan plan = localQueryRunner.createPlan(
session,
"SELECT CLP_GET_JSON_STRING() from test WHERE CLP_GET_BIGINT('user_id') = 0 ORDER BY fare",
WarningCollector.NOOP);
ClpUdfRewriter udfRewriter = new ClpUdfRewriter(functionAndTypeManager);
PlanNode optimizedPlan = udfRewriter.optimize(plan.getRoot(), session.toConnectorSession(), variableAllocator, planNodeIdAllocator);
ClpComputePushDown optimizer = new ClpComputePushDown(functionAndTypeManager, functionResolution, splitFilterProvider);
optimizedPlan = optimizer.optimize(optimizedPlan, session.toConnectorSession(), variableAllocator, planNodeIdAllocator);

PlanAssert.assertPlan(
session,
localQueryRunner.getMetadata(),
(node, sourceStats, lookup, s, types) -> PlanNodeStatsEstimate.unknown(),
new Plan(optimizedPlan, plan.getTypes(), StatsAndCosts.empty()),
anyTree(
project(
ImmutableMap.of(
"clp_get_json_string",
PlanMatchPattern.expression(JSON_STRING_PLACEHOLDER)),
ClpTableScanMatcher.clpTableScanPattern(
new ClpTableLayoutHandle(table, Optional.of("user_id: 0"), Optional.empty()),
ImmutableSet.of(
fare,
new ClpColumnHandle("user_id", BIGINT),
new ClpColumnHandle(JSON_STRING_PLACEHOLDER, VARCHAR))))));
}

private static final class ClpTableScanMatcher
implements Matcher
{
Expand Down
Loading
Loading