Skip to content

Commit d02268d

Browse files
authored
feat: Add clp_get_json_string UDF. (#82)
1 parent bfa18ff commit d02268d

File tree

4 files changed

+64
-10
lines changed

4 files changed

+64
-10
lines changed

presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpFunctions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,4 +97,12 @@ public static boolean clpWildcardBoolColumn()
9797
{
9898
throw new UnsupportedOperationException("CLP_WILDCARD_BOOL_COLUMN is a placeholder function without implementation.");
9999
}
100+
101+
@ScalarFunction(value = "CLP_GET_JSON_STRING", deterministic = false)
102+
@Description("Converts an entire log record into a JSON string.")
103+
@SqlType(StandardTypes.VARCHAR)
104+
public static Slice clpGetJSONString()
105+
{
106+
throw new UnsupportedOperationException("CLP_GET_JSON_STRING is a placeholder function without implementation.");
107+
}
100108
}

presto-clp/src/main/java/com/facebook/presto/plugin/clp/optimization/ClpUdfRewriter.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
public final class ClpUdfRewriter
5959
implements ConnectorPlanOptimizer
6060
{
61+
public static final String JSON_STRING_PLACEHOLDER = "__json_string";
6162
private final FunctionMetadataManager functionManager;
6263

6364
public ClpUdfRewriter(FunctionMetadataManager functionManager)
@@ -123,7 +124,7 @@ public PlanNode visitProject(ProjectNode node, RewriteContext<Void> context)
123124
for (Map.Entry<VariableReferenceExpression, RowExpression> entry : node.getAssignments().getMap().entrySet()) {
124125
newAssignments.put(
125126
entry.getKey(),
126-
rewriteClpUdfs(entry.getValue(), functionManager, variableAllocator));
127+
rewriteClpUdfs(entry.getValue(), functionManager, variableAllocator, true));
127128
}
128129

129130
PlanNode newSource = rewritePlanSubtree(node.getSource());
@@ -148,20 +149,32 @@ public PlanNode visitFilter(FilterNode node, RewriteContext<Void> context)
148149
* @param expression the input expression to analyze and possibly rewrite
149150
* @param functionManager function manager used to resolve function metadata
150151
* @param variableAllocator variable allocator used to create new variable references
152+
* @param inProjectNode whether the CLP UDFs are in a {@link ProjectNode}
151153
* @return a possibly rewritten {@link RowExpression} with <code>CLP_GET_*</code> calls
152154
* replaced
153155
*/
154156
private RowExpression rewriteClpUdfs(
155157
RowExpression expression,
156158
FunctionMetadataManager functionManager,
157-
VariableAllocator variableAllocator)
159+
VariableAllocator variableAllocator,
160+
boolean inProjectNode)
158161
{
159162
// Handle CLP_GET_* function calls
160163
if (expression instanceof CallExpression) {
161164
CallExpression call = (CallExpression) expression;
162165
String functionName = functionManager.getFunctionMetadata(call.getFunctionHandle()).getName().getObjectName().toUpperCase();
163166

164-
if (functionName.startsWith("CLP_GET_")) {
167+
if (inProjectNode && functionName.equals("CLP_GET_JSON_STRING")) {
168+
VariableReferenceExpression newValue = variableAllocator.newVariable(
169+
expression.getSourceLocation(),
170+
JSON_STRING_PLACEHOLDER,
171+
call.getType());
172+
ClpColumnHandle targetHandle = new ClpColumnHandle(JSON_STRING_PLACEHOLDER, call.getType());
173+
174+
globalColumnVarMap.put(targetHandle, newValue);
175+
return newValue;
176+
}
177+
else if (functionName.startsWith("CLP_GET_")) {
165178
if (call.getArguments().size() != 1 || !(call.getArguments().get(0) instanceof ConstantExpression)) {
166179
throw new PrestoException(CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION,
167180
"CLP_GET_* UDF must have a single constant string argument");
@@ -187,7 +200,7 @@ private RowExpression rewriteClpUdfs(
187200

188201
// Recurse into arguments
189202
List<RowExpression> rewrittenArgs = call.getArguments().stream()
190-
.map(arg -> rewriteClpUdfs(arg, functionManager, variableAllocator))
203+
.map(arg -> rewriteClpUdfs(arg, functionManager, variableAllocator, inProjectNode))
191204
.collect(toImmutableList());
192205

193206
return new CallExpression(call.getDisplayName(), call.getFunctionHandle(), call.getType(), rewrittenArgs);
@@ -198,7 +211,7 @@ private RowExpression rewriteClpUdfs(
198211
SpecialFormExpression special = (SpecialFormExpression) expression;
199212

200213
List<RowExpression> rewrittenArgs = special.getArguments().stream()
201-
.map(arg -> rewriteClpUdfs(arg, functionManager, variableAllocator))
214+
.map(arg -> rewriteClpUdfs(arg, functionManager, variableAllocator, inProjectNode))
202215
.collect(toImmutableList());
203216

204217
return new SpecialFormExpression(special.getSourceLocation(), special.getForm(), special.getType(), rewrittenArgs);
@@ -298,7 +311,7 @@ private TableScanNode buildNewTableScanNode(TableScanNode node)
298311
*/
299312
private FilterNode buildNewFilterNode(FilterNode node)
300313
{
301-
RowExpression newPredicate = rewriteClpUdfs(node.getPredicate(), functionManager, variableAllocator);
314+
RowExpression newPredicate = rewriteClpUdfs(node.getPredicate(), functionManager, variableAllocator, false);
302315
PlanNode newSource = rewritePlanSubtree(node.getSource());
303316
return new FilterNode(node.getSourceLocation(), idAllocator.getNextId(), newSource, newPredicate);
304317
}

presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpUdfRewriter.java

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.Float;
7373
import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.Integer;
7474
import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.VarString;
75+
import static com.facebook.presto.plugin.clp.optimization.ClpUdfRewriter.JSON_STRING_PLACEHOLDER;
7576
import static com.facebook.presto.sql.planner.assertions.MatchResult.NO_MATCH;
7677
import static com.facebook.presto.sql.planner.assertions.MatchResult.match;
7778
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree;
@@ -140,7 +141,7 @@ public void tearDown()
140141
}
141142

142143
@Test
143-
public void testScanFilter()
144+
public void testClpGetScanFilter()
144145
{
145146
TransactionId transactionId = localQueryRunner.getTransactionManager().beginTransaction(false);
146147
Session session = testSessionBuilder().setCatalog("clp").setSchema("default").setTransactionId(transactionId).build();
@@ -179,7 +180,7 @@ public void testScanFilter()
179180
}
180181

181182
@Test
182-
public void testScanProject()
183+
public void testClpGetScanProject()
183184
{
184185
TransactionId transactionId = localQueryRunner.getTransactionManager().beginTransaction(false);
185186
Session session = testSessionBuilder().setCatalog("clp").setSchema("default").setTransactionId(transactionId).build();
@@ -229,7 +230,7 @@ public void testScanProject()
229230
}
230231

231232
@Test
232-
public void testScanProjectFilter()
233+
public void testClpGetScanProjectFilter()
233234
{
234235
TransactionId transactionId = localQueryRunner.getTransactionManager().beginTransaction(false);
235236
Session session = testSessionBuilder().setCatalog("clp").setSchema("default").setTransactionId(transactionId).build();
@@ -265,6 +266,38 @@ public void testScanProjectFilter()
265266
city))))));
266267
}
267268

269+
@Test
270+
public void testClpGetJsonString()
271+
{
272+
TransactionId transactionId = localQueryRunner.getTransactionManager().beginTransaction(false);
273+
Session session = testSessionBuilder().setCatalog("clp").setSchema("default").setTransactionId(transactionId).build();
274+
275+
Plan plan = localQueryRunner.createPlan(
276+
session,
277+
"SELECT CLP_GET_JSON_STRING() from test WHERE CLP_GET_BIGINT('user_id') = 0",
278+
WarningCollector.NOOP);
279+
ClpUdfRewriter udfRewriter = new ClpUdfRewriter(functionAndTypeManager);
280+
PlanNode optimizedPlan = udfRewriter.optimize(plan.getRoot(), session.toConnectorSession(), variableAllocator, planNodeIdAllocator);
281+
ClpComputePushDown optimizer = new ClpComputePushDown(functionAndTypeManager, functionResolution, splitFilterProvider);
282+
optimizedPlan = optimizer.optimize(optimizedPlan, session.toConnectorSession(), variableAllocator, planNodeIdAllocator);
283+
284+
PlanAssert.assertPlan(
285+
session,
286+
localQueryRunner.getMetadata(),
287+
(node, sourceStats, lookup, s, types) -> PlanNodeStatsEstimate.unknown(),
288+
new Plan(optimizedPlan, plan.getTypes(), StatsAndCosts.empty()),
289+
anyTree(
290+
project(
291+
ImmutableMap.of(
292+
"clp_get_json_string",
293+
PlanMatchPattern.expression(JSON_STRING_PLACEHOLDER)),
294+
ClpTableScanMatcher.clpTableScanPattern(
295+
new ClpTableLayoutHandle(table, Optional.of("user_id: 0"), Optional.empty()),
296+
ImmutableSet.of(
297+
new ClpColumnHandle("user_id", BIGINT),
298+
new ClpColumnHandle(JSON_STRING_PLACEHOLDER, VARCHAR))))));
299+
}
300+
268301
private static final class ClpTableScanMatcher
269302
implements Matcher
270303
{

0 commit comments

Comments
 (0)