Skip to content

Commit e530913

Browse files
committed
Address review comments
1 parent 948ccc0 commit e530913

File tree

11 files changed

+335
-237
lines changed

11 files changed

+335
-237
lines changed

presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpExpression.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public ClpExpression(String pushDownExpression)
5858
}
5959

6060
/**
61-
* Creates a ClpExpression from a fully translatable KQL string and give it metadata SQL.
61+
* Creates a ClpExpression from a fully translatable KQL string and a metadata SQL string.
6262
*
6363
* @param pushDownExpression
6464
* @param metadataSql

presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpFilterToKqlConverter.java

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.Optional;
3939
import java.util.Set;
4040

41+
import static com.facebook.presto.common.function.OperatorType.BETWEEN;
4142
import static com.facebook.presto.common.function.OperatorType.EQUAL;
4243
import static com.facebook.presto.common.function.OperatorType.GREATER_THAN;
4344
import static com.facebook.presto.common.function.OperatorType.GREATER_THAN_OR_EQUAL;
@@ -118,35 +119,58 @@ public ClpExpression visitCall(CallExpression node, Void context)
118119
if (operatorType.isComparisonOperator() && operatorType != IS_DISTINCT_FROM) {
119120
return handleLogicalBinary(operatorType, node);
120121
}
121-
else if (OperatorType.BETWEEN == operatorType) {
122-
return handleBetween(operatorType, node);
122+
if (BETWEEN == operatorType) {
123+
return handleBetween(node);
123124
}
124125
}
125126

126127
return new ClpExpression(node);
127128
}
128129

129-
private ClpExpression handleBetween(OperatorType operator, CallExpression node)
130+
/**
131+
* Handles the BETWEEN expression for numeric range, the range must be numeric. If the first
132+
* argument is not a variable reference expression or either range boundaries is not constant
133+
* expressions it won't translate the expression.
134+
*
135+
* <p></p>
136+
* Example: <code>col1 BETWEEN 0 AND 5</code> → <code>col1 >= 0 AND col1 <= 5</code>
137+
*
138+
* @param node the BETWEEN call expression
139+
* @return a ClpExpression containing either the equivalent KQL query, or the original
140+
* expression if it couldn't be translated
141+
*/
142+
private ClpExpression handleBetween(CallExpression node)
130143
{
131144
if (node.getArguments().size() != 3) {
132145
throw new PrestoException(CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION,
133-
"Logical binary operator must have exactly two arguments. Received: " + node);
146+
"Between operator must have exactly three arguments. Received: " + node);
134147
}
135-
Optional<String> variableReferenceDefinition = node.getArguments().get(0).accept(this, null).getPushDownExpression();
136-
Optional<String> lowerBoundConstantDefinition = node.getArguments().get(1).accept(this, null).getPushDownExpression();
137-
Optional<String> upperBoundConstantDefinition = node.getArguments().get(2).accept(this, null).getPushDownExpression();
138-
if (!variableReferenceDefinition.isPresent() || !lowerBoundConstantDefinition.isPresent() || !upperBoundConstantDefinition.isPresent()) {
148+
if (!(node.getArguments().get(0) instanceof VariableReferenceExpression)
149+
|| !(node.getArguments().get(1) instanceof ConstantExpression)
150+
|| !(node.getArguments().get(2) instanceof ConstantExpression)) {
139151
return new ClpExpression(node);
140152
}
153+
Optional<String> variableReferencePushDownExpression = node.getArguments().get(0).accept(this, null).getPushDownExpression();
154+
if (!variableReferencePushDownExpression.isPresent()) {
155+
return new ClpExpression(node);
156+
}
157+
158+
String lowerBoundConstantPushDownExpression = getLiteralString((ConstantExpression) node.getArguments().get(1));
159+
String upperBoundConstantPushDownExpression = getLiteralString((ConstantExpression) node.getArguments().get(2));
141160
String metadataSql = null;
142-
String kql = String.format(
143-
"\"%s\" >= %s AND \"%s\" <= %s",
144-
variableReferenceDefinition.get(),
145-
lowerBoundConstantDefinition.get(),
146-
variableReferenceDefinition.get(),
147-
upperBoundConstantDefinition.get());
148-
if (metadataFilterColumns.contains(variableReferenceDefinition.get())) {
149-
metadataSql = kql;
161+
String kql = format(
162+
"%s >= %s AND %s <= %s",
163+
variableReferencePushDownExpression.get(),
164+
lowerBoundConstantPushDownExpression,
165+
variableReferencePushDownExpression.get(),
166+
upperBoundConstantPushDownExpression);
167+
if (metadataFilterColumns.contains(variableReferencePushDownExpression.get())) {
168+
metadataSql = format(
169+
"\"%s\" >= %s AND \"%s\" <= %s",
170+
variableReferencePushDownExpression.get(),
171+
lowerBoundConstantPushDownExpression,
172+
variableReferencePushDownExpression.get(),
173+
upperBoundConstantPushDownExpression);
150174
}
151175
return new ClpExpression(kql, metadataSql);
152176
}
@@ -235,11 +259,12 @@ private ClpExpression handleNot(CallExpression node)
235259
if (expression.getRemainingExpression().isPresent() || !expression.getPushDownExpression().isPresent()) {
236260
return new ClpExpression(node);
237261
}
262+
String notPushDownExpression = "NOT " + expression.getPushDownExpression().get();
238263
if (expression.getMetadataSql().isPresent()) {
239-
return new ClpExpression("NOT " + expression.getPushDownExpression().get(), "NOT " + expression.getMetadataSql());
264+
return new ClpExpression(notPushDownExpression, "NOT " + expression.getMetadataSql());
240265
}
241266
else {
242-
return new ClpExpression("NOT " + expression.getPushDownExpression().get());
267+
return new ClpExpression(notPushDownExpression);
243268
}
244269
}
245270

presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpMetadataFilterProvider.java

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,7 @@
2424

2525
import java.io.File;
2626
import java.io.IOException;
27-
import java.util.Collections;
2827
import java.util.HashMap;
29-
import java.util.HashSet;
3028
import java.util.List;
3129
import java.util.Map;
3230
import java.util.Objects;
@@ -37,8 +35,37 @@
3735
import static com.facebook.presto.plugin.clp.ClpErrorCode.CLP_METADATA_FILTER_CONFIG_NOT_FOUND;
3836
import static com.google.common.collect.ImmutableMap.toImmutableMap;
3937
import static com.google.common.collect.ImmutableSet.toImmutableSet;
38+
import static java.lang.String.format;
4039
import static java.util.Objects.requireNonNull;
4140

41+
/**
42+
* Provides metadata filter configuration and utilities for remapping and validating metadata
43+
* filters based on a JSON configuration file.
44+
* <p>
45+
* The configuration file locates at {@code clp.metadata-filter-config} and defines metadata filters
46+
* for different scopes:
47+
* <ul>
48+
* <li><b>Catalog-level</b>: applies to all schemas and tables under the catalog.</li>
49+
* <li><b>Schema-level</b>: applies to all tables under the specified catalog and schema.</li>
50+
* <li><b>Table-level</b>: applies to the fully qualified catalog.schema.table.</li>
51+
* </ul>
52+
* <p>
53+
* Each scope maps to a list of filter definitions. Each filter includes:
54+
* <ul>
55+
* <li>{@code filterName}: must match a column name in the table's schema. Note that only numeric
56+
* type column can be used as metadata filter now.</li>
57+
* <li>{@code rangeMapping} (optional): specifies how the filter should be remapped when it
58+
* targets metadata-only columns. Note that this option is valid only if the column is numeric
59+
* type.
60+
* For example, a condition like {@code "msg.timestamp" > 1234} will be rewritten as
61+
* {@code end_timestamp > 1234} to ensure metadata-based filtering produces a superset of the
62+
* actual result.</li>
63+
* </ul>
64+
* <p>
65+
* This provider is used by {@code ClpFilterToKqlConverter} to determine which columns are eligible
66+
* for metadata filter push down, and by {@code ClpMySqlSplitProvider} to construct metadata filter
67+
* queries that determine which splits to read.
68+
*/
4269
public class ClpMetadataFilterProvider
4370
{
4471
private final Map<String, TableConfig> filterMap;
@@ -68,8 +95,7 @@ public void checkContainsAllFilters(SchemaTableName schemaTableName, String meta
6895
{
6996
boolean hasAllMetadataFilterColumns = true;
7097
String notFoundFilterColumnName = "";
71-
for (String columnName : getFilterNames(
72-
CONNECTOR_NAME + "." + schemaTableName.toString())) {
98+
for (String columnName : getFilterNames(format("%s.%s", CONNECTOR_NAME, schemaTableName))) {
7399
if (!metadataFilterKqlQuery.contains(columnName)) {
74100
hasAllMetadataFilterColumns = false;
75101
notFoundFilterColumnName = columnName;
@@ -101,12 +127,14 @@ public String remapFilterSql(String scope, String sql)
101127
String remappedSql = sql;
102128
for (Map.Entry<String, RangeMapping> entry : mappings.entrySet()) {
103129
remappedSql = remappedSql.replaceAll(
104-
"\"(" + entry.getKey() + ")\"\\s(>=?)\\s([0-9]*)", entry.getValue().upperBound + " $2 $3");
130+
format("\"(%s)\"\\s(>=?)\\s([0-9]*)", entry.getKey()),
131+
format("%s $2 $3", entry.getValue().upperBound));
105132
remappedSql = remappedSql.replaceAll(
106-
"\"(" + entry.getKey() + ")\"\\s(<=?)\\s([0-9]*)", entry.getValue().lowerBound + " $2 $3");
133+
format("\"(%s)\"\\s(<=?)\\s([0-9]*)", entry.getKey()),
134+
format("%s $2 $3", entry.getValue().lowerBound));
107135
remappedSql = remappedSql.replaceAll(
108-
"\"(" + entry.getKey() + ")\"\\s(=)\\s([0-9]*)",
109-
"(" + entry.getValue().lowerBound + " <= $3 AND " + entry.getValue().upperBound + " >= $3)");
136+
format("\"(%s)\"\\s(=)\\s([0-9]*)", entry.getKey()),
137+
format("(%s <= $3 AND %s >= $3)", entry.getValue().lowerBound, entry.getValue().upperBound));
110138
}
111139
return remappedSql;
112140
}
@@ -115,18 +143,19 @@ public Set<String> getFilterNames(String scope)
115143
{
116144
String[] splitScope = scope.split("\\.");
117145
if (0 == splitScope.length) {
118-
return Collections.emptySet();
146+
return ImmutableSet.of();
119147
}
120-
Set<String> filterNames = new HashSet<>(getAllFilerNamesFromTableConfig(filterMap.get(splitScope[0])));
148+
ImmutableSet.Builder<String> builder = ImmutableSet.builder();
149+
builder.addAll(getAllFilerNamesFromTableConfig(filterMap.get(splitScope[0])));
121150

122151
if (1 < splitScope.length) {
123-
filterNames.addAll(getAllFilerNamesFromTableConfig(filterMap.get(splitScope[0] + "." + splitScope[1])));
152+
builder.addAll(getAllFilerNamesFromTableConfig(filterMap.get(splitScope[0] + "." + splitScope[1])));
124153
}
125154

126155
if (3 == splitScope.length) {
127-
filterNames.addAll(getAllFilerNamesFromTableConfig(filterMap.get(scope)));
156+
builder.addAll(getAllFilerNamesFromTableConfig(filterMap.get(scope)));
128157
}
129-
return ImmutableSet.copyOf(filterNames);
158+
return builder.build();
130159
}
131160

132161
private Set<String> getAllFilerNamesFromTableConfig(TableConfig tableConfig)

presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlanOptimizer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,7 @@ public PlanNode visitFilter(FilterNode node, RewriteContext<Void> context)
8989
Optional<String> metadataSql = clpExpression.getMetadataSql();
9090
Optional<RowExpression> remainingPredicate = clpExpression.getRemainingExpression();
9191

92-
// This must be checked before check if the KQL is present, otherwise KQL can be emptry then this function
93-
// directly exits.
92+
// Check metadata filters before the KQL query
9493
metadataFilterProvider.checkContainsAllFilters(clpTableHandle.getSchemaTableName(), metadataSql.orElse(""));
9594
if (metadataSql.isPresent()) {
9695
metadataSql = Optional.of(metadataFilterProvider.remapFilterSql(scope, metadataSql.get()));

presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpUtils.java

Lines changed: 0 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -15,84 +15,12 @@
1515

1616
import com.facebook.airlift.log.Logger;
1717

18-
import java.util.regex.Matcher;
19-
import java.util.regex.Pattern;
20-
2118
public class ClpUtils
2219
{
2320
public static class KqlUtils
2421
{
2522
private static final Logger log = Logger.get(KqlUtils.class);
2623

27-
public static String toSql(String kqlQuery)
28-
{
29-
String sqlQuery = kqlQuery;
30-
31-
// Translate string filters
32-
Pattern stringFilterPattern = Pattern.compile("([a-zA-Z_][a-zA-Z0-9_]*):\\s*\"([^\"]+)\"");
33-
Matcher stringFilterMatcher = stringFilterPattern.matcher(sqlQuery);
34-
StringBuilder translateStringFiltersSb = new StringBuilder();
35-
36-
int lastStringFilterEnd = 0;
37-
while (stringFilterMatcher.find()) {
38-
String columnName = stringFilterMatcher.group(1);
39-
String value = stringFilterMatcher.group(2);
40-
41-
// Append everything between last match and this one
42-
translateStringFiltersSb.append(sqlQuery, lastStringFilterEnd, stringFilterMatcher.start());
43-
44-
if (value.contains("*")) {
45-
translateStringFiltersSb.append("\"")
46-
.append(columnName)
47-
.append("\" LIKE '")
48-
.append(value.replace('*', '%'))
49-
.append("'");
50-
}
51-
else {
52-
translateStringFiltersSb.append("\"")
53-
.append(columnName)
54-
.append("\" = '")
55-
.append(value)
56-
.append("'");
57-
}
58-
59-
lastStringFilterEnd = stringFilterMatcher.end();
60-
}
61-
translateStringFiltersSb.append(sqlQuery.substring(lastStringFilterEnd));
62-
sqlQuery = translateStringFiltersSb.toString();
63-
64-
// Translate numeric filters
65-
Pattern numericFilterPattern = Pattern.compile("([a-zA-Z_][a-zA-Z0-9_]*)\\s*([><]=?|:)\\s*(-?\\d+(?:\\.\\d+)?)");
66-
Matcher numericFilterMatcher = numericFilterPattern.matcher(sqlQuery);
67-
StringBuilder translateNumericFilterSb = new StringBuilder();
68-
69-
int lastNumericFilterEnd = 0;
70-
while (numericFilterMatcher.find()) {
71-
String columnName = numericFilterMatcher.group(1);
72-
String operator = numericFilterMatcher.group(2);
73-
if (":".equals(operator)) {
74-
operator = "=";
75-
}
76-
String value = numericFilterMatcher.group(3);
77-
78-
// Append everything between last match and this one
79-
translateNumericFilterSb.append(sqlQuery, lastNumericFilterEnd, numericFilterMatcher.start());
80-
81-
translateNumericFilterSb.append("\"")
82-
.append(columnName)
83-
.append("\" ")
84-
.append(operator)
85-
.append(" ")
86-
.append(value);
87-
88-
lastNumericFilterEnd = numericFilterMatcher.end();
89-
}
90-
translateNumericFilterSb.append(sqlQuery.substring(lastNumericFilterEnd));
91-
sqlQuery = translateNumericFilterSb.toString();
92-
93-
return sqlQuery;
94-
}
95-
9624
public static String escapeKqlSpecialCharsForStringValue(String literalString)
9725
{
9826
String escaped = literalString;

presto-clp/src/main/java/com/facebook/presto/plugin/clp/split/ClpMySqlSplitProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public List<ClpSplit> listSplits(ClpTableLayoutHandle clpTableLayoutHandle)
7373
String metadataFilterQuery = clpTableLayoutHandle.getMetadataSql().get();
7474
archivePathQuery += " AND (" + metadataFilterQuery + ")";
7575
}
76-
log.info("Query for archive: %s", archivePathQuery);
76+
log.debug("Query for archive: %s", archivePathQuery);
7777

7878
try (Connection connection = getConnection()) {
7979
// Fetch archive IDs and create splits

presto-clp/src/test/java/com/facebook/presto/plugin/clp/ClpMetadataDbSetUp.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,13 @@ public final class ClpMetadataDbSetUp
4848
public static final String METADATA_DB_TABLE_PREFIX = "clp_";
4949
public static final String METADATA_DB_URL_TEMPLATE = "jdbc:h2:file:%s;MODE=MySQL;DATABASE_TO_UPPER=FALSE";
5050
public static final String METADATA_DB_USER = "sa";
51-
public static final String ARCHIVE_STORAGE_DIRECTORY_BASE = "/tmp/archives/";
51+
public static final String ARCHIVES_STORAGE_DIRECTORY_BASE = "/tmp/archives/";
5252

5353
private static final Logger log = Logger.get(ClpMetadataDbSetUp.class);
5454
private static final String DATASETS_TABLE_NAME = METADATA_DB_TABLE_PREFIX + DATASETS_TABLE_SUFFIX;
55-
private static final String ARCHIVE_TABLE_COLUMN_BEGIN_TIMESTAMP = "begin_timestamp";
55+
private static final String ARCHIVES_TABLE_COLUMN_BEGIN_TIMESTAMP = "begin_timestamp";
5656
private static final String ARCHIVES_TABLE_COLUMN_PAGINATION_ID = "pagination_id";
57-
private static final String ARCHIVE_TABLE_COLUMN_END_TIMESTAMP = "end_timestamp";
57+
private static final String ARCHIVES_TABLE_COLUMN_END_TIMESTAMP = "end_timestamp";
5858

5959
private ClpMetadataDbSetUp()
6060
{
@@ -143,17 +143,17 @@ public static ClpMySqlSplitProvider setupSplit(DbHandle dbHandle, Map<String, Li
143143
archiveTableName,
144144
ARCHIVES_TABLE_COLUMN_PAGINATION_ID,
145145
ARCHIVES_TABLE_COLUMN_ID,
146-
ARCHIVE_TABLE_COLUMN_BEGIN_TIMESTAMP,
147-
ARCHIVE_TABLE_COLUMN_END_TIMESTAMP);
146+
ARCHIVES_TABLE_COLUMN_BEGIN_TIMESTAMP,
147+
ARCHIVES_TABLE_COLUMN_END_TIMESTAMP);
148148

149149
stmt.execute(createArchiveTableSQL);
150150

151151
String insertArchiveTableSQL = format(
152152
"INSERT INTO %s (%s, %s, %s) VALUES (?, ?, ?)",
153153
archiveTableName,
154154
ARCHIVES_TABLE_COLUMN_ID,
155-
ARCHIVE_TABLE_COLUMN_BEGIN_TIMESTAMP,
156-
ARCHIVE_TABLE_COLUMN_END_TIMESTAMP);
155+
ARCHIVES_TABLE_COLUMN_BEGIN_TIMESTAMP,
156+
ARCHIVES_TABLE_COLUMN_END_TIMESTAMP);
157157
try (PreparedStatement pstmt = conn.prepareStatement(insertArchiveTableSQL)) {
158158
for (ArchiveTableRow split : tableSplits.getValue()) {
159159
pstmt.setString(1, split.id);
@@ -213,7 +213,7 @@ private static void updateDatasetsTable(Connection conn, String tableName)
213213
DATASETS_TABLE_COLUMN_ARCHIVE_STORAGE_DIRECTORY);
214214
try (PreparedStatement pstmt = conn.prepareStatement(insertDatasetsTableSql)) {
215215
pstmt.setString(1, tableName);
216-
pstmt.setString(2, ARCHIVE_STORAGE_DIRECTORY_BASE + tableName);
216+
pstmt.setString(2, ARCHIVES_STORAGE_DIRECTORY_BASE + tableName);
217217
pstmt.executeUpdate();
218218
}
219219
}

0 commit comments

Comments
 (0)