Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,96 +18,102 @@

package org.apache.wayang.api.sql.calcite.converter;

import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rex.*;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexVisitorImpl;
import org.apache.calcite.runtime.SqlFunctions;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.type.SqlTypeName;

import org.apache.wayang.api.sql.calcite.rel.WayangFilter;
import org.apache.wayang.api.sql.calcite.utils.PrintUtils;
import org.apache.wayang.basic.data.Record;
import org.apache.wayang.basic.operators.FilterOperator;
import org.apache.wayang.core.function.FunctionDescriptor;
import org.apache.wayang.core.plan.wayangplan.Operator;

import java.util.EnumSet;
import java.util.List;
import java.util.Set;

public class WayangFilterVisitor extends WayangRelNodeVisitor<WayangFilter> {
WayangFilterVisitor(WayangRelConverter wayangRelConverter) {
WayangFilterVisitor(final WayangRelConverter wayangRelConverter) {
super(wayangRelConverter);
}

@Override
Operator visit(WayangFilter wayangRelNode) {
Operator visit(final WayangFilter wayangRelNode) {

Operator childOp = wayangRelConverter.convert(wayangRelNode.getInput(0));
final Operator childOp = wayangRelConverter.convert(wayangRelNode.getInput(0));

RexNode condition = ((Filter) wayangRelNode).getCondition();
final RexNode condition = ((Filter) wayangRelNode).getCondition();

FilterOperator<Record> filter = new FilterOperator(
final FilterOperator<Record> filter = new FilterOperator<>(
new FilterPredicateImpl(condition),
Record.class
);
Record.class);

childOp.connectTo(0,filter,0);
childOp.connectTo(0, filter, 0);

return filter;
}


private class FilterPredicateImpl implements FunctionDescriptor.SerializablePredicate<Record> {

private final RexNode condition;

private FilterPredicateImpl(RexNode condition) {
private FilterPredicateImpl(final RexNode condition) {
this.condition = condition;
}

@Override
public boolean test(Record record) {
public boolean test(final Record record) {
return condition.accept(new EvaluateFilterCondition(true, record));
}
}


private class EvaluateFilterCondition extends RexVisitorImpl<Boolean> {

final Record record;
protected EvaluateFilterCondition(boolean deep, Record record) {

protected EvaluateFilterCondition(final boolean deep, final Record record) {
super(deep);
this.record = record;
}

@Override
public Boolean visitCall(RexCall call) {
SqlKind kind = call.getKind();
if(!kind.belongsTo(SUPPORTED_OPS)) {
throw new IllegalStateException("Cannot handle this filter predicate yet");
}

RexNode leftOperand = call.getOperands().get(0);
RexNode rightOperand = call.getOperands().get(1);

if(kind == SqlKind.AND) {
return leftOperand.accept(this) && rightOperand.accept(this);
} else if(kind == SqlKind.OR) {
return leftOperand.accept(this) || rightOperand.accept(this);
} else {
return eval(record, kind, leftOperand, rightOperand);
public Boolean visitCall(final RexCall call) {
final SqlKind kind = call.getKind();

if (!kind.belongsTo(WayangFilterVisitor.SUPPORTED_OPS))
throw new IllegalStateException(
"Cannot handle this filter predicate yet: " + kind + " during RexCall: " + call);

switch (kind) {
// Since NOT captures only one operand we just get
// the first
case NOT:
assert (call.getOperands().size() == 1) : "SqlKind.NOT should only have 1 operand in call got: " + call.getOperands().size() + ", call: " + call;
return !(call.getOperands().get(0).accept(this));
case AND:
return call.getOperands().stream().allMatch(operator -> operator.accept(this));
case OR:
return call.getOperands().stream().anyMatch(operator -> operator.accept(this));
default:
assert (call.getOperands().size() == 2);
return eval(record, kind, call.getOperands().get(0), call.getOperands().get(1));
}
}

public boolean eval(Record record, SqlKind kind, RexNode leftOperand, RexNode rightOperand) {
public boolean eval(final Record record, final SqlKind kind, final RexNode leftOperand,
final RexNode rightOperand) {

if(leftOperand instanceof RexInputRef && rightOperand instanceof RexLiteral) {
RexInputRef rexInputRef = (RexInputRef)leftOperand;
int index = rexInputRef.getIndex();
Object field = record.getField(index);
RexLiteral rexLiteral = (RexLiteral) rightOperand;
if (leftOperand instanceof RexInputRef && rightOperand instanceof RexLiteral) {
final RexInputRef rexInputRef = (RexInputRef) leftOperand;
final int index = rexInputRef.getIndex();
final Object field = record.getField(index);
final RexLiteral rexLiteral = (RexLiteral) rightOperand;
switch (kind) {
case LIKE:
return SqlFunctions.like(field.toString(), rexLiteral.toString().replace("'", ""));
case GREATER_THAN:
return isGreaterThan(field, rexLiteral);
case LESS_THAN:
Expand All @@ -129,37 +135,29 @@ public boolean eval(Record record, SqlKind kind, RexNode leftOperand, RexNode ri

}

private boolean isGreaterThan(Object o, RexLiteral rexLiteral) {
// return rexLiteral.getValue().compareTo(o)< 0;
return ((Comparable)o).compareTo(rexLiteral.getValueAs(o.getClass())) > 0;
private boolean isGreaterThan(final Object o, final RexLiteral rexLiteral) {
// return rexLiteral.getValue().compareTo(o)< 0;
return ((Comparable) o).compareTo(rexLiteral.getValueAs(o.getClass())) > 0;

}

private boolean isLessThan(Object o, RexLiteral rexLiteral) {
return ((Comparable)o).compareTo(rexLiteral.getValueAs(o.getClass())) < 0;
private boolean isLessThan(final Object o, final RexLiteral rexLiteral) {
return ((Comparable) o).compareTo(rexLiteral.getValueAs(o.getClass())) < 0;
}

private boolean isEqualTo(Object o, RexLiteral rexLiteral) {
private boolean isEqualTo(final Object o, final RexLiteral rexLiteral) {
try {
return ((Comparable)o).compareTo(rexLiteral.getValueAs(o.getClass())) == 0;
} catch (Exception e) {
return ((Comparable) o).compareTo(rexLiteral.getValueAs(o.getClass())) == 0;
} catch (final Exception e) {
throw new IllegalStateException("Predicate not supported yet");
}
}
}

/**for quick sanity check **/
private static final EnumSet<SqlKind> SUPPORTED_OPS =
EnumSet.of(SqlKind.AND, SqlKind.OR,
SqlKind.EQUALS, SqlKind.NOT_EQUALS,
SqlKind.LESS_THAN, SqlKind.GREATER_THAN,
SqlKind.GREATER_THAN_OR_EQUAL, SqlKind.LESS_THAN_OR_EQUAL);







/** for quick sanity check **/
private static final EnumSet<SqlKind> SUPPORTED_OPS = EnumSet.of(SqlKind.AND, SqlKind.OR, SqlKind.NOT,
SqlKind.EQUALS, SqlKind.NOT_EQUALS,
SqlKind.LESS_THAN, SqlKind.GREATER_THAN,
SqlKind.GREATER_THAN_OR_EQUAL, SqlKind.LESS_THAN_OR_EQUAL, SqlKind.LIKE);

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ public ModelParser(Configuration configuration) throws IOException, ParseExcepti

this.configuration = configuration;
Object obj = new JSONParser().parse(calciteModel);
System.out.println("obj: " + obj);
this.json = (JSONObject) obj;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.wayang.api.sql.calcite.schema.WayangTable;
import org.apache.wayang.api.sql.calcite.schema.WayangTableBuilder;
import org.apache.wayang.api.sql.calcite.utils.ModelParser;
import org.apache.wayang.api.sql.calcite.utils.PrintUtils;
import org.apache.wayang.api.sql.context.SqlContext;
import org.apache.wayang.basic.data.Tuple2;
import org.apache.wayang.core.api.Configuration;
Expand Down Expand Up @@ -208,37 +207,44 @@ public void filterIsNotNull() throws Exception {
final Tuple2<Collection<Record>, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext,
"SELECT * FROM fs.largeLeftTableIndex WHERE (largeLeftTableIndex.NAMEA IS NOT NULL)" //
);

final Collection<Record> result = t.field0;
final WayangPlan wayangPlan = t.field1;
sqlContext.execute(wayangPlan);

assert (!result.stream().anyMatch(record -> record.getField(0).equals(null)));
}

// @Test
@Test
public void filterWithNotLike() throws Exception {
final SqlContext sqlContext = createSqlContext("/model-example-min.json",
"/data/largeLeftTableIndex.csv");

final Tuple2<Collection<Record>, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext,
"SELECT * FROM fs.largeLeftTableIndex WHERE (largeLeftTableIndex.NAMEA NOT LIKE '_est1')" //
);

final Collection<Record> result = t.field0;
final WayangPlan wayangPlan = t.field1;
sqlContext.execute(wayangPlan);

assert (!result.stream().anyMatch(record -> record.getString(0).equals("test1")));
}

// @Test
@Test
public void filterWithLike() throws Exception {
final SqlContext sqlContext = createSqlContext("/model-example-min.json",
"/data/largeLeftTableIndex.csv");

final Tuple2<Collection<Record>, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext,
"SELECT * FROM fs.largeLeftTableIndex WHERE (largeLeftTableIndex.NAMEA LIKE '_est1' OR largeLeftTableIndex.NAMEA LIKE 't%')" //
"SELECT * FROM fs.largeLeftTableIndex WHERE largeLeftTableIndex.NAMEA LIKE '_est1'" //
);

final Collection<Record> result = t.field0;
final WayangPlan wayangPlan = t.field1;
sqlContext.execute(wayangPlan);

assert (result.stream().findFirst().get().equals(new Record("test1", "test1", "test2")));
}

// @Test
Expand Down Expand Up @@ -354,7 +360,7 @@ public void test_simple_sql() throws Exception {
private SqlContext createSqlContext(final String calciteResourceName, final String tableResourceName)
throws IOException, ParseException, SQLException {

final String calciteModelPath = SqlAPI.class.getResource(calciteResourceName).getPath();
final String calciteModelPath = this.getClass().getResource(calciteResourceName).getPath();
assert (calciteModelPath != null && calciteModelPath != "")
: "Could not get calcite model resource from path: " + calciteResourceName;

Expand All @@ -363,7 +369,7 @@ private SqlContext createSqlContext(final String calciteResourceName, final Stri
assert (configuration != null)
: "Could not get configuration with calcite model path: " + calciteModelPath;

final String dataPath = SqlAPI.class.getResource(tableResourceName).getPath();
final String dataPath = this.getClass().getResource(tableResourceName).getPath();
assert (dataPath != null && dataPath != "")
: "Could not get table resource from path: " + tableResourceName;

Expand Down
Loading