Skip to content

Commit 74b9176

Browse files
authored
Merge pull request #523 from mspruc/main
Add support for SQL like operation
2 parents 3a461b5 + 08a7703 commit 74b9176

File tree

4 files changed

+133
-123
lines changed

4 files changed

+133
-123
lines changed

wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangFilterVisitor.java

Lines changed: 60 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -18,96 +18,102 @@
1818

1919
package org.apache.wayang.api.sql.calcite.converter;
2020

21-
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
2221
import org.apache.calcite.rel.core.Filter;
23-
import org.apache.calcite.rex.*;
22+
import org.apache.calcite.rex.RexCall;
23+
import org.apache.calcite.rex.RexInputRef;
24+
import org.apache.calcite.rex.RexLiteral;
25+
import org.apache.calcite.rex.RexNode;
26+
import org.apache.calcite.rex.RexVisitorImpl;
27+
import org.apache.calcite.runtime.SqlFunctions;
2428
import org.apache.calcite.sql.SqlKind;
25-
import org.apache.calcite.sql.SqlOperator;
26-
import org.apache.calcite.sql.type.SqlTypeName;
29+
2730
import org.apache.wayang.api.sql.calcite.rel.WayangFilter;
28-
import org.apache.wayang.api.sql.calcite.utils.PrintUtils;
2931
import org.apache.wayang.basic.data.Record;
3032
import org.apache.wayang.basic.operators.FilterOperator;
3133
import org.apache.wayang.core.function.FunctionDescriptor;
3234
import org.apache.wayang.core.plan.wayangplan.Operator;
3335

3436
import java.util.EnumSet;
35-
import java.util.List;
36-
import java.util.Set;
3737

3838
public class WayangFilterVisitor extends WayangRelNodeVisitor<WayangFilter> {
39-
WayangFilterVisitor(WayangRelConverter wayangRelConverter) {
39+
WayangFilterVisitor(final WayangRelConverter wayangRelConverter) {
4040
super(wayangRelConverter);
4141
}
4242

4343
@Override
44-
Operator visit(WayangFilter wayangRelNode) {
44+
Operator visit(final WayangFilter wayangRelNode) {
4545

46-
Operator childOp = wayangRelConverter.convert(wayangRelNode.getInput(0));
46+
final Operator childOp = wayangRelConverter.convert(wayangRelNode.getInput(0));
4747

48-
RexNode condition = ((Filter) wayangRelNode).getCondition();
48+
final RexNode condition = ((Filter) wayangRelNode).getCondition();
4949

50-
FilterOperator<Record> filter = new FilterOperator(
50+
final FilterOperator<Record> filter = new FilterOperator<>(
5151
new FilterPredicateImpl(condition),
52-
Record.class
53-
);
52+
Record.class);
5453

55-
childOp.connectTo(0,filter,0);
54+
childOp.connectTo(0, filter, 0);
5655

5756
return filter;
5857
}
5958

60-
6159
private class FilterPredicateImpl implements FunctionDescriptor.SerializablePredicate<Record> {
6260

6361
private final RexNode condition;
6462

65-
private FilterPredicateImpl(RexNode condition) {
63+
private FilterPredicateImpl(final RexNode condition) {
6664
this.condition = condition;
6765
}
6866

6967
@Override
70-
public boolean test(Record record) {
68+
public boolean test(final Record record) {
7169
return condition.accept(new EvaluateFilterCondition(true, record));
7270
}
7371
}
7472

75-
7673
private class EvaluateFilterCondition extends RexVisitorImpl<Boolean> {
7774

7875
final Record record;
79-
protected EvaluateFilterCondition(boolean deep, Record record) {
76+
77+
protected EvaluateFilterCondition(final boolean deep, final Record record) {
8078
super(deep);
8179
this.record = record;
8280
}
8381

8482
@Override
85-
public Boolean visitCall(RexCall call) {
86-
SqlKind kind = call.getKind();
87-
if(!kind.belongsTo(SUPPORTED_OPS)) {
88-
throw new IllegalStateException("Cannot handle this filter predicate yet");
89-
}
90-
91-
RexNode leftOperand = call.getOperands().get(0);
92-
RexNode rightOperand = call.getOperands().get(1);
93-
94-
if(kind == SqlKind.AND) {
95-
return leftOperand.accept(this) && rightOperand.accept(this);
96-
} else if(kind == SqlKind.OR) {
97-
return leftOperand.accept(this) || rightOperand.accept(this);
98-
} else {
99-
return eval(record, kind, leftOperand, rightOperand);
83+
public Boolean visitCall(final RexCall call) {
84+
final SqlKind kind = call.getKind();
85+
86+
if (!kind.belongsTo(WayangFilterVisitor.SUPPORTED_OPS))
87+
throw new IllegalStateException(
88+
"Cannot handle this filter predicate yet: " + kind + " during RexCall: " + call);
89+
90+
switch (kind) {
91+
// Since NOT captures only one operand we just get
92+
// the first
93+
case NOT:
94+
assert (call.getOperands().size() == 1) : "SqlKind.NOT should only have 1 operand in call got: " + call.getOperands().size() + ", call: " + call;
95+
return !(call.getOperands().get(0).accept(this));
96+
case AND:
97+
return call.getOperands().stream().allMatch(operator -> operator.accept(this));
98+
case OR:
99+
return call.getOperands().stream().anyMatch(operator -> operator.accept(this));
100+
default:
101+
assert (call.getOperands().size() == 2);
102+
return eval(record, kind, call.getOperands().get(0), call.getOperands().get(1));
100103
}
101104
}
102105

103-
public boolean eval(Record record, SqlKind kind, RexNode leftOperand, RexNode rightOperand) {
106+
public boolean eval(final Record record, final SqlKind kind, final RexNode leftOperand,
107+
final RexNode rightOperand) {
104108

105-
if(leftOperand instanceof RexInputRef && rightOperand instanceof RexLiteral) {
106-
RexInputRef rexInputRef = (RexInputRef)leftOperand;
107-
int index = rexInputRef.getIndex();
108-
Object field = record.getField(index);
109-
RexLiteral rexLiteral = (RexLiteral) rightOperand;
109+
if (leftOperand instanceof RexInputRef && rightOperand instanceof RexLiteral) {
110+
final RexInputRef rexInputRef = (RexInputRef) leftOperand;
111+
final int index = rexInputRef.getIndex();
112+
final Object field = record.getField(index);
113+
final RexLiteral rexLiteral = (RexLiteral) rightOperand;
110114
switch (kind) {
115+
case LIKE:
116+
return SqlFunctions.like(field.toString(), rexLiteral.toString().replace("'", ""));
111117
case GREATER_THAN:
112118
return isGreaterThan(field, rexLiteral);
113119
case LESS_THAN:
@@ -129,37 +135,29 @@ public boolean eval(Record record, SqlKind kind, RexNode leftOperand, RexNode ri
129135

130136
}
131137

132-
private boolean isGreaterThan(Object o, RexLiteral rexLiteral) {
133-
// return rexLiteral.getValue().compareTo(o)< 0;
134-
return ((Comparable)o).compareTo(rexLiteral.getValueAs(o.getClass())) > 0;
138+
private boolean isGreaterThan(final Object o, final RexLiteral rexLiteral) {
139+
// return rexLiteral.getValue().compareTo(o)< 0;
140+
return ((Comparable) o).compareTo(rexLiteral.getValueAs(o.getClass())) > 0;
135141

136142
}
137143

138-
private boolean isLessThan(Object o, RexLiteral rexLiteral) {
139-
return ((Comparable)o).compareTo(rexLiteral.getValueAs(o.getClass())) < 0;
144+
private boolean isLessThan(final Object o, final RexLiteral rexLiteral) {
145+
return ((Comparable) o).compareTo(rexLiteral.getValueAs(o.getClass())) < 0;
140146
}
141147

142-
private boolean isEqualTo(Object o, RexLiteral rexLiteral) {
148+
private boolean isEqualTo(final Object o, final RexLiteral rexLiteral) {
143149
try {
144-
return ((Comparable)o).compareTo(rexLiteral.getValueAs(o.getClass())) == 0;
145-
} catch (Exception e) {
150+
return ((Comparable) o).compareTo(rexLiteral.getValueAs(o.getClass())) == 0;
151+
} catch (final Exception e) {
146152
throw new IllegalStateException("Predicate not supported yet");
147153
}
148154
}
149155
}
150156

151-
/**for quick sanity check **/
152-
private static final EnumSet<SqlKind> SUPPORTED_OPS =
153-
EnumSet.of(SqlKind.AND, SqlKind.OR,
154-
SqlKind.EQUALS, SqlKind.NOT_EQUALS,
155-
SqlKind.LESS_THAN, SqlKind.GREATER_THAN,
156-
SqlKind.GREATER_THAN_OR_EQUAL, SqlKind.LESS_THAN_OR_EQUAL);
157-
158-
159-
160-
161-
162-
163-
157+
/** for quick sanity check **/
158+
private static final EnumSet<SqlKind> SUPPORTED_OPS = EnumSet.of(SqlKind.AND, SqlKind.OR, SqlKind.NOT,
159+
SqlKind.EQUALS, SqlKind.NOT_EQUALS,
160+
SqlKind.LESS_THAN, SqlKind.GREATER_THAN,
161+
SqlKind.GREATER_THAN_OR_EQUAL, SqlKind.LESS_THAN_OR_EQUAL, SqlKind.LIKE);
164162

165163
}

wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/utils/ModelParser.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ public ModelParser(Configuration configuration) throws IOException, ParseExcepti
4343

4444
this.configuration = configuration;
4545
Object obj = new JSONParser().parse(calciteModel);
46-
System.out.println("obj: " + obj);
4746
this.json = (JSONObject) obj;
4847
}
4948

@@ -67,6 +66,23 @@ public ModelParser(Configuration configuration, String calciteModelPath) throws
6766
this.json = (JSONObject) obj;
6867
}
6968

69+
/**
70+
* This method allows you to specify the Calcite path, useful for testing.
71+
* See also {@link #ModelParser(Configuration)} and {@link #ModelParser()}.
72+
*
73+
* @param configuration An empty configuration. Usage:
74+
* {@code Configuration configuration = new ModelParser(new Configuration(), calciteModelPath).setProperties();}
75+
* @param calciteModel JSONized object of your calcite model
76+
* @throws IOException If an I/O error occurs.
77+
* @throws ParseException If unable to parse the file at
78+
* {@code calciteModelPath}.
79+
*/
80+
public ModelParser(Configuration configuration, JSONObject calciteModel) throws IOException, ParseException {
81+
this.configuration = configuration;
82+
this.json = calciteModel;
83+
}
84+
85+
7086
public Configuration setProperties() {
7187
JSONObject calciteObj = (JSONObject) json.get("calcite");
7288
String calciteModel = calciteObj.toString();

0 commit comments

Comments
 (0)