Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,12 @@ public static boolean clpWildcardBoolColumn()
{
throw new UnsupportedOperationException("CLP_WILDCARD_BOOL_COLUMN is a placeholder function without implementation.");
}

@ScalarFunction(value = "CLP_GET_JSON_STRING", deterministic = false)
@Description("Converts an entire log record into a JSON string.")
@SqlType(StandardTypes.VARCHAR)
public static Slice clpGetJSONString()
{
throw new UnsupportedOperationException("CLP_GET_JSON_STRING is a placeholder function without implementation.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
public final class ClpUdfRewriter
implements ConnectorPlanOptimizer
{
public static final String JSON_STRING_PLACEHOLDER = "__json_string";
private final FunctionMetadataManager functionManager;

public ClpUdfRewriter(FunctionMetadataManager functionManager)
Expand Down Expand Up @@ -123,7 +124,7 @@ public PlanNode visitProject(ProjectNode node, RewriteContext<Void> context)
for (Map.Entry<VariableReferenceExpression, RowExpression> entry : node.getAssignments().getMap().entrySet()) {
newAssignments.put(
entry.getKey(),
rewriteClpUdfs(entry.getValue(), functionManager, variableAllocator));
rewriteClpUdfs(entry.getValue(), functionManager, variableAllocator, true));
}

PlanNode newSource = rewritePlanSubtree(node.getSource());
Expand All @@ -148,20 +149,32 @@ public PlanNode visitFilter(FilterNode node, RewriteContext<Void> context)
* @param expression the input expression to analyze and possibly rewrite
* @param functionManager function manager used to resolve function metadata
* @param variableAllocator variable allocator used to create new variable references
* @param inProjectNode whether the CLP UDFs are in a {@link ProjectNode}
* @return a possibly rewritten {@link RowExpression} with <code>CLP_GET_*</code> calls
* replaced
*/
private RowExpression rewriteClpUdfs(
RowExpression expression,
FunctionMetadataManager functionManager,
VariableAllocator variableAllocator)
VariableAllocator variableAllocator,
boolean inProjectNode)
{
// Handle CLP_GET_* function calls
if (expression instanceof CallExpression) {
CallExpression call = (CallExpression) expression;
String functionName = functionManager.getFunctionMetadata(call.getFunctionHandle()).getName().getObjectName().toUpperCase();

if (functionName.startsWith("CLP_GET_")) {
if (inProjectNode && functionName.equals("CLP_GET_JSON_STRING")) {
VariableReferenceExpression newValue = variableAllocator.newVariable(
expression.getSourceLocation(),
JSON_STRING_PLACEHOLDER,
call.getType());
ClpColumnHandle targetHandle = new ClpColumnHandle(JSON_STRING_PLACEHOLDER, call.getType());

globalColumnVarMap.put(targetHandle, newValue);
return newValue;
}
else if (functionName.startsWith("CLP_GET_")) {
if (call.getArguments().size() != 1 || !(call.getArguments().get(0) instanceof ConstantExpression)) {
throw new PrestoException(CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION,
"CLP_GET_* UDF must have a single constant string argument");
Expand All @@ -187,7 +200,7 @@ private RowExpression rewriteClpUdfs(

// Recurse into arguments
List<RowExpression> rewrittenArgs = call.getArguments().stream()
.map(arg -> rewriteClpUdfs(arg, functionManager, variableAllocator))
.map(arg -> rewriteClpUdfs(arg, functionManager, variableAllocator, inProjectNode))
.collect(toImmutableList());

return new CallExpression(call.getDisplayName(), call.getFunctionHandle(), call.getType(), rewrittenArgs);
Expand All @@ -198,7 +211,7 @@ private RowExpression rewriteClpUdfs(
SpecialFormExpression special = (SpecialFormExpression) expression;

List<RowExpression> rewrittenArgs = special.getArguments().stream()
.map(arg -> rewriteClpUdfs(arg, functionManager, variableAllocator))
.map(arg -> rewriteClpUdfs(arg, functionManager, variableAllocator, inProjectNode))
.collect(toImmutableList());

return new SpecialFormExpression(special.getSourceLocation(), special.getForm(), special.getType(), rewrittenArgs);
Expand Down Expand Up @@ -298,7 +311,7 @@ private TableScanNode buildNewTableScanNode(TableScanNode node)
*/
private FilterNode buildNewFilterNode(FilterNode node)
{
RowExpression newPredicate = rewriteClpUdfs(node.getPredicate(), functionManager, variableAllocator);
RowExpression newPredicate = rewriteClpUdfs(node.getPredicate(), functionManager, variableAllocator, false);
PlanNode newSource = rewritePlanSubtree(node.getSource());
return new FilterNode(node.getSourceLocation(), idAllocator.getNextId(), newSource, newPredicate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.Float;
import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.Integer;
import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.VarString;
import static com.facebook.presto.plugin.clp.optimization.ClpUdfRewriter.JSON_STRING_PLACEHOLDER;
import static com.facebook.presto.sql.planner.assertions.MatchResult.NO_MATCH;
import static com.facebook.presto.sql.planner.assertions.MatchResult.match;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree;
Expand Down Expand Up @@ -140,7 +141,7 @@ public void tearDown()
}

@Test
public void testScanFilter()
public void testClpGetScanFilter()
{
TransactionId transactionId = localQueryRunner.getTransactionManager().beginTransaction(false);
Session session = testSessionBuilder().setCatalog("clp").setSchema("default").setTransactionId(transactionId).build();
Expand Down Expand Up @@ -179,7 +180,7 @@ public void testScanFilter()
}

@Test
public void testScanProject()
public void testClpGetScanProject()
{
TransactionId transactionId = localQueryRunner.getTransactionManager().beginTransaction(false);
Session session = testSessionBuilder().setCatalog("clp").setSchema("default").setTransactionId(transactionId).build();
Expand Down Expand Up @@ -229,7 +230,7 @@ public void testScanProject()
}

@Test
public void testScanProjectFilter()
public void testClpGetScanProjectFilter()
{
TransactionId transactionId = localQueryRunner.getTransactionManager().beginTransaction(false);
Session session = testSessionBuilder().setCatalog("clp").setSchema("default").setTransactionId(transactionId).build();
Expand Down Expand Up @@ -265,6 +266,38 @@ public void testScanProjectFilter()
city))))));
}

@Test
public void testClpGetJsonString()
{
TransactionId transactionId = localQueryRunner.getTransactionManager().beginTransaction(false);
Session session = testSessionBuilder().setCatalog("clp").setSchema("default").setTransactionId(transactionId).build();

Plan plan = localQueryRunner.createPlan(
session,
"SELECT CLP_GET_JSON_STRING() from test WHERE CLP_GET_BIGINT('user_id') = 0",
WarningCollector.NOOP);
ClpUdfRewriter udfRewriter = new ClpUdfRewriter(functionAndTypeManager);
PlanNode optimizedPlan = udfRewriter.optimize(plan.getRoot(), session.toConnectorSession(), variableAllocator, planNodeIdAllocator);
ClpComputePushDown optimizer = new ClpComputePushDown(functionAndTypeManager, functionResolution, splitFilterProvider);
optimizedPlan = optimizer.optimize(optimizedPlan, session.toConnectorSession(), variableAllocator, planNodeIdAllocator);

PlanAssert.assertPlan(
session,
localQueryRunner.getMetadata(),
(node, sourceStats, lookup, s, types) -> PlanNodeStatsEstimate.unknown(),
new Plan(optimizedPlan, plan.getTypes(), StatsAndCosts.empty()),
anyTree(
project(
ImmutableMap.of(
"clp_get_json_string",
PlanMatchPattern.expression(JSON_STRING_PLACEHOLDER)),
ClpTableScanMatcher.clpTableScanPattern(
new ClpTableLayoutHandle(table, Optional.of("user_id: 0"), Optional.empty()),
ImmutableSet.of(
new ClpColumnHandle("user_id", BIGINT),
new ClpColumnHandle(JSON_STRING_PLACEHOLDER, VARCHAR))))));
}

private static final class ClpTableScanMatcher
implements Matcher
{
Expand Down
119 changes: 119 additions & 0 deletions presto-docs/src/main/sphinx/connector/clp.rst
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,125 @@ Each JSON log maps to this unified ``ROW`` type, with absent fields represented
``status``, ``thread_num``, ``backtrace``) become fields within the ``ROW``, clearly reflecting the nested and varying
structures of the original JSON logs.

*************
CLP Functions
*************

Semi-structured logs can have many potential keys, which can lead to very wide Presto tables. To keep table metadata
concise and still preserve access to dynamic fields, the connector provides two sets of functions that are specific to
the CLP connector. These functions are not part of standard Presto SQL.

- JSON path functions (e.g., ``CLP_GET_STRING``)
- Wildcard column matching functions for use in filter predicates (e.g., ``CLP_WILDCARD_STRING_COLUMN``)

There is **no performance penalty** when using these functions. During query optimization, the connector rewrites these
functions into references to concrete schema-backed columns or valid symbols in KQL queries. This avoids additional
parsing overhead and delivers performance comparable to querying standard columns.

Path-based Functions
====================

.. function:: CLP_GET_STRING(varchar) -> varchar

Returns the string value at the given JSON path, where the column type is one of: ``ClpString``, ``VarString``, or
``DateString``. Returns a Presto ``VARCHAR``.

.. function:: CLP_GET_BIGINT(varchar) -> bigint

Returns the integer value at the given JSON path, where the column type is ``Integer``. Returns a Presto ``BIGINT``.

.. function:: CLP_GET_DOUBLE(varchar) -> double

Returns the double value at the given JSON path, where the column type is ``Float``. Returns a Presto ``DOUBLE``.

.. function:: CLP_GET_BOOL(varchar) -> boolean

Returns the boolean value at the given JSON path, where the column type is ``Boolean``. Returns a Presto ``BOOLEAN``.

.. function:: CLP_GET_STRING_ARRAY(varchar) -> array(varchar)

Returns the array value at the given JSON path, where the column type is ``UnstructuredArray`` and converts each
element into a string. Returns a Presto ``ARRAY(VARCHAR)``.

.. note::

- JSON paths must be **constant string literals**; variables are not supported.
- Wildcards (e.g., ``msg.*.ts``) are **not supported**.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- Wildcards (e.g., ``msg.*.ts``) are **not supported**.
- Wildcards (e.g., ``msg.*.ts``) are **not supported**, as paths use dot notation.

clarity

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is supported in CLP-S KQL, which also uses dot notation, but it isn’t supported here.

- If a path is invalid or missing, the function returns ``NULL`` rather than raising an error.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think its worth elaborationg? i.e. "If a path is invalid or missing in a given record, "?


Examples
--------

.. code-block:: sql

SELECT CLP_GET_STRING(msg.author) AS author
FROM clp.default.table_1
WHERE CLP_GET_INT('msg.timestamp') > 1620000000;

SELECT CLP_GET_STRING_ARRAY(msg.tags) AS tags
FROM clp.default.table_2
WHERE CLP_GET_BOOL('msg.is_active') = true;


Wildcard Column Functions
=========================

These functions are used to apply filter predicates across all columns of a certain type. They are useful for searching
across unknown or dynamic schemas without specifying exact column names. Similar to the path-based functions, these
functions are rewritten during query optimization to a KQL query that matches the appropriate columns.

.. function:: CLP_WILDCARD_STRING_COLUMN() -> varchar

Represents all columns whose CLP types are ``ClpString``, ``VarString``, or ``DateString``.

.. function:: CLP_WILDCARD_INT_COLUMN() -> bigint

Represents all columns whose CLP type is ``Integer``.

.. function:: CLP_WILDCARD_FLOAT_COLUMN() -> double

Represents all columns whose CLP type is ``Float``.

.. function:: CLP_WILDCARD_BOOL_COLUMN() -> boolean

Represents all columns whose CLP type is ``Boolean``.

.. note::

- Wildcard functions must appear **only in filter conditions** (`WHERE` clause). They cannot be selected and cannot
be passed as arguments to other functions.
- Supported operators include:

::

= (EQUAL)
!= (NOT_EQUAL)
< (LESS_THAN)
<= (LESS_THAN_OR_EQUAL)
> (GREATER_THAN)
>= (GREATER_THAN_OR_EQUAL)
LIKE
BETWEEN
IN

Use of other operators (e.g., arithmetic or function calls) with wildcard functions is not allowed and will result
in a query error.

Examples
--------

.. code-block:: sql

-- Matches if any string column contains "Beijing"
SELECT *
FROM clp.default.table_1
WHERE CLP_WILDCARD_STRING_COLUMN() = 'Beijing';

-- Matches if any integer column equals 1
SELECT *
FROM clp.default.table_2
WHERE CLP_WILDCARD_INT_COLUMN() = 1;

***********
SQL support
***********
Expand Down
Loading