Skip to content

Commit 873dc4d

Browse files
LantaoJinxinyual
authored andcommitted
Support parse command with Calcite (#3474)
--------- Signed-off-by: Lantao Jin <ltjin@amazon.com> Signed-off-by: xinyual <xinyual@amazon.com>
1 parent 04881a0 commit 873dc4d

File tree

11 files changed

+314
-86
lines changed

11 files changed

+314
-86
lines changed

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 78 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.calcite.rex.RexLiteral;
3232
import org.apache.calcite.rex.RexNode;
3333
import org.apache.calcite.rex.RexWindowBounds;
34+
import org.apache.calcite.sql.fun.SqlLibraryOperators;
3435
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
3536
import org.apache.calcite.tools.RelBuilder;
3637
import org.apache.calcite.tools.RelBuilder.AggCall;
@@ -42,6 +43,8 @@
4243
import org.opensearch.sql.ast.expression.Argument;
4344
import org.opensearch.sql.ast.expression.Field;
4445
import org.opensearch.sql.ast.expression.Let;
46+
import org.opensearch.sql.ast.expression.Literal;
47+
import org.opensearch.sql.ast.expression.ParseMethod;
4548
import org.opensearch.sql.ast.expression.UnresolvedExpression;
4649
import org.opensearch.sql.ast.expression.subquery.SubqueryExpression;
4750
import org.opensearch.sql.ast.tree.AD;
@@ -73,6 +76,7 @@
7376
import org.opensearch.sql.calcite.utils.JoinAndLookupUtils;
7477
import org.opensearch.sql.exception.CalciteUnsupportedException;
7578
import org.opensearch.sql.exception.SemanticCheckException;
79+
import org.opensearch.sql.utils.ParseUtils;
7680

7781
public class CalciteRelNodeVisitor extends AbstractNodeVisitor<RelNode, CalcitePlanContext> {
7882

@@ -247,69 +251,86 @@ public RelNode visitHead(Head node, CalcitePlanContext context) {
247251
}
248252

249253
@Override
250-
public RelNode visitEval(Eval node, CalcitePlanContext context) {
254+
public RelNode visitParse(Parse node, CalcitePlanContext context) {
251255
visitChildren(node, context);
252256
List<String> originalFieldNames = context.relBuilder.peek().getRowType().getFieldNames();
253-
List<RexNode> evalList =
254-
node.getExpressionList().stream()
257+
RexNode sourceField = rexVisitor.analyze(node.getSourceField(), context);
258+
ParseMethod parseMethod = node.getParseMethod();
259+
java.util.Map<String, Literal> arguments = node.getArguments();
260+
String pattern = (String) node.getPattern().getValue();
261+
List<String> groupCandidates =
262+
ParseUtils.getNamedGroupCandidates(parseMethod, pattern, arguments);
263+
List<RexNode> newFields =
264+
groupCandidates.stream()
255265
.map(
256-
expr -> {
257-
boolean containsSubqueryExpression = containsSubqueryExpression(expr);
258-
final Holder<@Nullable RexCorrelVariable> v = Holder.empty();
259-
if (containsSubqueryExpression) {
260-
context.relBuilder.variable(v::set);
261-
context.pushCorrelVar(v.get());
262-
}
263-
RexNode eval = rexVisitor.analyze(expr, context);
264-
if (containsSubqueryExpression) {
265-
// RelBuilder.projectPlus doesn't have a parameter with variablesSet:
266-
// projectPlus(Iterable<CorrelationId> variablesSet, RexNode... nodes)
267-
context.relBuilder.project(
268-
Iterables.concat(context.relBuilder.fields(), ImmutableList.of(eval)),
269-
ImmutableList.of(),
270-
false,
271-
ImmutableList.of(v.get().id));
272-
context.popCorrelVar();
273-
} else {
274-
context.relBuilder.projectPlus(eval);
275-
}
276-
return eval;
277-
})
278-
.collect(Collectors.toList());
279-
// Overriding the existing field if the alias has the same name with original field name. For
280-
// example, eval field = 1
281-
List<String> overriding =
282-
evalList.stream()
283-
.filter(expr -> expr.getKind() == AS)
284-
.map(
285-
expr ->
286-
((RexLiteral) ((RexCall) expr).getOperands().get(1)).getValueAs(String.class))
287-
.collect(Collectors.toList());
288-
overriding.retainAll(originalFieldNames);
289-
if (!overriding.isEmpty()) {
290-
List<RexNode> toDrop = context.relBuilder.fields(overriding);
291-
context.relBuilder.projectExcept(toDrop);
292-
293-
// the overriding field in Calcite will add a numeric suffix, for example:
294-
// `| eval SAL = SAL + 1` creates a field SAL0 to replace SAL, so we rename it back to SAL,
295-
// or query `| eval SAL=SAL + 1 | where exists [ source=DEPT | where emp.SAL=HISAL ]` fails.
296-
List<String> newNames =
297-
context.relBuilder.peek().getRowType().getFieldNames().stream()
298-
.map(
299-
cur -> {
300-
String noNumericSuffix = cur.replaceAll("\\d", "");
301-
if (overriding.contains(noNumericSuffix)) {
302-
return noNumericSuffix;
303-
} else {
304-
return cur;
305-
}
306-
})
307-
.collect(Collectors.toList());
308-
context.relBuilder.rename(newNames);
309-
}
266+
group ->
267+
context.rexBuilder.makeCall(
268+
SqlLibraryOperators.REGEXP_EXTRACT,
269+
sourceField,
270+
context.rexBuilder.makeLiteral(pattern)))
271+
.collect(Collectors.toList());
272+
projectPlusOverriding(newFields, groupCandidates, context);
310273
return context.relBuilder.peek();
311274
}
312275

276+
@Override
277+
public RelNode visitEval(Eval node, CalcitePlanContext context) {
278+
visitChildren(node, context);
279+
List<String> originalFieldNames = context.relBuilder.peek().getRowType().getFieldNames();
280+
node.getExpressionList()
281+
.forEach(
282+
expr -> {
283+
boolean containsSubqueryExpression = containsSubqueryExpression(expr);
284+
final Holder<@Nullable RexCorrelVariable> v = Holder.empty();
285+
if (containsSubqueryExpression) {
286+
context.relBuilder.variable(v::set);
287+
context.pushCorrelVar(v.get());
288+
}
289+
RexNode eval = rexVisitor.analyze(expr, context);
290+
if (containsSubqueryExpression) {
291+
// RelBuilder.projectPlus doesn't have a parameter with variablesSet:
292+
// projectPlus(Iterable<CorrelationId> variablesSet, RexNode... nodes)
293+
context.relBuilder.project(
294+
Iterables.concat(context.relBuilder.fields(), ImmutableList.of(eval)),
295+
ImmutableList.of(),
296+
false,
297+
ImmutableList.of(v.get().id));
298+
context.popCorrelVar();
299+
} else {
300+
// Overriding the existing field if the alias has the same name with original field.
301+
String alias =
302+
((RexLiteral) ((RexCall) eval).getOperands().get(1)).getValueAs(String.class);
303+
projectPlusOverriding(List.of(eval), List.of(alias), context);
304+
}
305+
});
306+
return context.relBuilder.peek();
307+
}
308+
309+
private void projectPlusOverriding(
310+
List<RexNode> newFields, List<String> newNames, CalcitePlanContext context) {
311+
List<String> originalFieldNames = context.relBuilder.peek().getRowType().getFieldNames();
312+
List<RexNode> toOverrideList =
313+
originalFieldNames.stream()
314+
.filter(newNames::contains)
315+
.map(a -> (RexNode) context.relBuilder.field(a))
316+
.toList();
317+
// 1. add the new fields, For example "age0, country0"
318+
context.relBuilder.projectPlus(newFields);
319+
// 2. drop the overriding field list, it's duplicated now. For example "age, country"
320+
if (!toOverrideList.isEmpty()) {
321+
context.relBuilder.projectExcept(toOverrideList);
322+
}
323+
// 3. get current fields list, the "age0, country0" should include in it.
324+
List<String> currentFields = context.relBuilder.peek().getRowType().getFieldNames();
325+
int length = currentFields.size();
326+
// 4. add new names "age, country" to the end of rename list.
327+
List<String> expectedRenameFields =
328+
new ArrayList<>(currentFields.subList(0, length - newNames.size()));
329+
expectedRenameFields.addAll(newNames);
330+
// 5. rename
331+
context.relBuilder.rename(expectedRenameFields);
332+
}
333+
313334
@Override
314335
public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) {
315336
visitChildren(node, context);
@@ -607,11 +628,6 @@ public RelNode visitFillNull(FillNull fillNull, CalcitePlanContext context) {
607628
throw new CalciteUnsupportedException("FillNull command is unsupported in Calcite");
608629
}
609630

610-
@Override
611-
public RelNode visitParse(Parse node, CalcitePlanContext context) {
612-
throw new CalciteUnsupportedException("Parse command is unsupported in Calcite");
613-
}
614-
615631
@Override
616632
public RelNode visitRareTopN(RareTopN node, CalcitePlanContext context) {
617633
throw new CalciteUnsupportedException("Rare and Top commands are unsupported in Calcite");

core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212

1313
import java.math.BigDecimal;
1414
import java.util.List;
15-
import java.util.Map;
1615
import java.util.stream.Collectors;
1716
import lombok.RequiredArgsConstructor;
1817
import org.apache.calcite.rel.RelNode;
@@ -261,16 +260,6 @@ public RexNode visitQualifiedName(QualifiedName node, CalcitePlanContext context
261260
.peekCorrelVar()
262261
.map(correlVar -> context.relBuilder.field(correlVar, qualifiedName))
263262
.orElseGet(() -> context.relBuilder.field(qualifiedName));
264-
}
265-
// 3. resolve overriding fields, for example, `eval SAL = SAL + 1` will delete the original SAL
266-
// and add a SAL0. SAL0 in currentFields, but qualifiedName is SAL.
267-
// TODO now we cannot handle the case using a overriding fields in subquery, for example
268-
// source = EMP | eval DEPTNO = DEPTNO + 1 | where exists [ source = DEPT | where emp.DEPTNO =
269-
// DEPTNO ]
270-
Map<String, String> fieldMap =
271-
currentFields.stream().collect(Collectors.toMap(s -> s.replaceAll("\\d", ""), s -> s));
272-
if (fieldMap.containsKey(qualifiedName)) {
273-
return context.relBuilder.field(fieldMap.get(qualifiedName));
274263
} else {
275264
throw new IllegalArgumentException(
276265
String.format(
@@ -326,13 +315,6 @@ private boolean isTimeBased(SpanUnit unit) {
326315
return !(unit == NONE || unit == UNKNOWN);
327316
}
328317

329-
// @Override
330-
// public RexNode visitAggregateFunction(AggregateFunction node, Context context) {
331-
// RexNode field = analyze(node.getField(), context);
332-
// AggregateCall aggregateCall = translateAggregateCall(node, field, relBuilder);
333-
// return new MyAggregateCall(aggregateCall);
334-
// }
335-
336318
@Override
337319
public RexNode visitLet(Let node, CalcitePlanContext context) {
338320
RexNode expr = analyze(node.getExpression(), context);

integ-test/src/test/java/org/opensearch/sql/calcite/remote/nonfallback/NonFallbackCalciteParseCommandIT.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,8 @@
55

66
package org.opensearch.sql.calcite.remote.nonfallback;
77

8-
import org.junit.Ignore;
98
import org.opensearch.sql.calcite.remote.fallback.CalciteParseCommandIT;
109

11-
@Ignore("https://github.com/opensearch-project/sql/issues/3463")
1210
public class NonFallbackCalciteParseCommandIT extends CalciteParseCommandIT {
1311
@Override
1412
public void init() throws Exception {

integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalciteDedupIT.java renamed to integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLDedupIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import org.json.JSONObject;
1616
import org.junit.jupiter.api.Test;
1717

18-
public class CalciteDedupIT extends CalcitePPLIntegTestCase {
18+
public class CalcitePPLDedupIT extends CalcitePPLIntegTestCase {
1919

2020
@Override
2121
public void init() throws IOException {

integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalciteDedupPushdownIT.java renamed to integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLDedupPushdownIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import org.opensearch.sql.common.setting.Settings;
99

10-
public class CalciteDedupPushdownIT extends CalciteDedupIT {
10+
public class CalcitePPLDedupPushdownIT extends CalcitePPLDedupIT {
1111

1212
@Override
1313
protected Settings getSettings() {
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.calcite.standalone;
7+
8+
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK;
9+
import static org.opensearch.sql.util.MatcherUtils.rows;
10+
import static org.opensearch.sql.util.MatcherUtils.schema;
11+
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
12+
import static org.opensearch.sql.util.MatcherUtils.verifyErrorMessageContains;
13+
import static org.opensearch.sql.util.MatcherUtils.verifySchema;
14+
15+
import java.io.IOException;
16+
import org.json.JSONObject;
17+
import org.junit.Test;
18+
import org.opensearch.client.Request;
19+
20+
public class CalcitePPLParseIT extends CalcitePPLIntegTestCase {
21+
@Override
22+
public void init() throws IOException {
23+
super.init();
24+
25+
loadIndex(Index.BANK);
26+
loadIndex(Index.BANK_WITH_NULL_VALUES);
27+
}
28+
29+
@Test
30+
public void testParseEmail() {
31+
JSONObject result =
32+
executeQuery(
33+
String.format(
34+
"""
35+
source = %s | parse email '.+@(?<host>.+)' | fields email, host
36+
""",
37+
TEST_INDEX_BANK));
38+
verifySchema(result, schema("email", "string"), schema("host", "string"));
39+
verifyDataRows(
40+
result,
41+
rows("amberduke@pyrami.com", "pyrami.com"),
42+
rows("hattiebond@netagy.com", "netagy.com"),
43+
rows("nanettebates@quility.com", "quility.com"),
44+
rows("daleadams@boink.com", "boink.com"),
45+
rows("elinorratliff@scentric.com", "scentric.com"),
46+
rows("virginiaayala@filodyne.com", "filodyne.com"),
47+
rows("dillardmcpherson@quailcom.com", "quailcom.com"));
48+
}
49+
50+
@Test
51+
public void testParseOverriding() {
52+
JSONObject result =
53+
executeQuery(
54+
String.format(
55+
"""
56+
source = %s | parse email '.+@(?<email>.+)' | fields email
57+
""",
58+
TEST_INDEX_BANK));
59+
verifySchema(result, schema("email", "string"));
60+
verifyDataRows(
61+
result,
62+
rows("pyrami.com"),
63+
rows("netagy.com"),
64+
rows("quility.com"),
65+
rows("boink.com"),
66+
rows("scentric.com"),
67+
rows("filodyne.com"),
68+
rows("quailcom.com"));
69+
}
70+
71+
@Test
72+
public void testParseEmailCountByHost() {
73+
JSONObject result =
74+
executeQuery(
75+
String.format(
76+
"""
77+
source = %s | parse email '.+@(?<host>.+)' | stats count() by host
78+
""",
79+
TEST_INDEX_BANK));
80+
verifySchema(result, schema("count()", "long"), schema("host", "string"));
81+
verifyDataRows(
82+
result,
83+
rows(1, "pyrami.com"),
84+
rows(1, "netagy.com"),
85+
rows(1, "quility.com"),
86+
rows(1, "boink.com"),
87+
rows(1, "scentric.com"),
88+
rows(1, "filodyne.com"),
89+
rows(1, "quailcom.com"));
90+
}
91+
92+
@Test
93+
public void testParseStreetNumber() {
94+
JSONObject result =
95+
executeQuery(
96+
String.format(
97+
"""
98+
source = %s | parse address '(?<streetNumber>\\d+)'
99+
| eval streetNumberInt = cast(streetNumber as integer)
100+
| where streetNumberInt > 500
101+
| sort streetNumberInt
102+
| fields streetNumberInt, address
103+
""",
104+
TEST_INDEX_BANK));
105+
verifySchema(result, schema("streetNumberInt", "integer"), schema("address", "string"));
106+
verifyDataRows(
107+
result,
108+
rows(671, "671 Bristol Street"),
109+
rows(702, "702 Quentin Street"),
110+
rows(789, "789 Madison Street"),
111+
rows(880, "880 Holmes Lane"));
112+
}
113+
114+
// TODO Multiple capturing groups are not allowed in Calcite REGEXP_EXTRACT function.
115+
// https://github.com/opensearch-project/sql/issues/3472
116+
@Test
117+
public void testParseMultipleGroups() {
118+
RuntimeException e =
119+
assertThrows(
120+
RuntimeException.class,
121+
() ->
122+
executeQuery(
123+
String.format(
124+
"""
125+
source = %s | parse address '(?<streetNumber>\\d+) (?<street>.+)'
126+
| fields streetNumber, street
127+
""",
128+
TEST_INDEX_BANK)));
129+
verifyErrorMessageContains(
130+
e, "Multiple capturing groups (count=2) not allowed in regex input for REGEXP_EXTRACT");
131+
}
132+
133+
@Test
134+
public void testParseOverriding2() throws IOException {
135+
Request request1 = new Request("PUT", "/test/_doc/1?refresh=true");
136+
request1.setJsonEntity(
137+
"{\"email\": \"a@a.com\", \"email0\": \"b@b.com\", \"email1\": \"c@c.com\"}");
138+
client().performRequest(request1);
139+
JSONObject result;
140+
result =
141+
executeQuery(
142+
"source = test | parse email '.+@(?<email0>.+)' | fields email, email0, email1");
143+
verifyDataRows(result, rows("a@a.com", "a.com", "c@c.com"));
144+
result =
145+
executeQuery(
146+
"source = test | parse email '.+@(?<email>.+)' | fields email, email0, email1");
147+
verifyDataRows(result, rows("a.com", "b@b.com", "c@c.com"));
148+
}
149+
}

0 commit comments

Comments
 (0)