Skip to content

Commit 295b624

Browse files
authored
Merge pull request #531 from mspruc/main
add support for cross joins in the sql-api
2 parents 4df2121 + 1cc53aa commit 295b624

File tree

6 files changed

+153
-0
lines changed

6 files changed

+153
-0
lines changed
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.api.sql.calcite.converter;
20+
21+
import java.io.Serializable;
22+
23+
import org.apache.wayang.api.sql.calcite.converter.functions.FlattenJoinResult;
24+
import org.apache.wayang.api.sql.calcite.rel.WayangJoin;
25+
import org.apache.wayang.basic.data.Record;
26+
import org.apache.wayang.basic.data.Tuple2;
27+
import org.apache.wayang.basic.operators.CartesianOperator;
28+
import org.apache.wayang.basic.operators.MapOperator;
29+
import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction;
30+
import org.apache.wayang.core.plan.wayangplan.Operator;
31+
import org.apache.wayang.core.util.ReflectionUtils;
32+
33+
public class WayangCrossJoinVisitor extends WayangRelNodeVisitor<WayangJoin> implements Serializable {
34+
35+
WayangCrossJoinVisitor(final WayangRelConverter wayangRelConverter) {
36+
super(wayangRelConverter);
37+
}
38+
39+
@Override
40+
Operator visit(final WayangJoin wayangRelNode) {
41+
final Operator childOpLeft = wayangRelConverter.convert(wayangRelNode.getInput(0));
42+
final Operator childOpRight = wayangRelConverter.convert(wayangRelNode.getInput(1));
43+
44+
final CartesianOperator<Record, Record> join = new CartesianOperator<Record, Record>(
45+
Record.class,
46+
Record.class);
47+
48+
childOpLeft.connectTo(0, join, 0);
49+
childOpRight.connectTo(0, join, 1);
50+
51+
final SerializableFunction<Tuple2<Record, Record>, Record> mp = new FlattenJoinResult();
52+
53+
final MapOperator<Tuple2<Record, Record>, Record> mapOperator = new MapOperator<Tuple2<Record, Record>, Record>(
54+
mp,
55+
ReflectionUtils.specify(Tuple2.class),
56+
Record.class);
57+
58+
join.connectTo(0, mapOperator, 0);
59+
60+
return mapOperator;
61+
}
62+
63+
}

wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangRelConverter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ public Operator convert(final RelNode node) {
5454
return new WayangProjectVisitor(this).visit((WayangProject) node);
5555
} else if (node instanceof WayangFilter) {
5656
return new WayangFilterVisitor(this).visit((WayangFilter) node);
57+
} else if (node instanceof WayangJoin && WayangJoin.class.cast(node).getCondition().isAlwaysTrue()) {
58+
return new WayangCrossJoinVisitor(this).visit((WayangJoin) node);
5759
} else if (node instanceof WayangJoin) {
5860
return new WayangJoinVisitor(this).visit((WayangJoin) node);
5961
} else if (node instanceof WayangAggregate) {
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.api.sql.calcite.converter.functions;
20+
21+
import org.apache.wayang.basic.data.Record;
22+
import org.apache.wayang.basic.data.Tuple2;
23+
import org.apache.wayang.core.function.FunctionDescriptor;
24+
25+
/**
26+
* Flattens the result of a join i.e. a {@link Tuple2} of a left and a right
27+
* {@link Record} to a single {@link Record}.
28+
*/
29+
public class FlattenJoinResult implements FunctionDescriptor.SerializableFunction<Tuple2<Record, Record>, Record> {
30+
31+
@Override
32+
public Record apply(final Tuple2<Record, Record> tuple2) {
33+
final int length0 = tuple2.getField0().size();
34+
final int length1 = tuple2.getField1().size();
35+
36+
final int totalLength = length0 + length1;
37+
38+
final Object[] fields = new Object[totalLength];
39+
40+
for (int i = 0; i < length0; i++) {
41+
fields[i] = tuple2.getField0().getField(i);
42+
}
43+
44+
for (int i = length0; i < totalLength; i++) {
45+
fields[i] = tuple2.getField1().getField(i - length0);
46+
}
47+
48+
return new Record(fields);
49+
}
50+
}

wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,10 @@
5858
import java.sql.SQLException;
5959
import java.util.ArrayList;
6060
import java.util.Collection;
61+
import java.util.List;
62+
import java.util.Map;
6163
import java.util.Properties;
64+
import java.util.stream.Collectors;
6265

6366
public class SqlToWayangRelTest {
6467

@@ -212,6 +215,34 @@ public void filterIsNotNull() throws Exception {
212215
assert (!result.stream().anyMatch(record -> record.getField(0).equals(null)));
213216
}
214217

218+
@Test
219+
public void javaCrossJoin() throws Exception {
220+
final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv");
221+
222+
final Tuple2<Collection<Record>, WayangPlan> t = this.buildCollectorAndWayangPlan(
223+
sqlContext,
224+
"select * from fs.exampleSmallA cross join fs.exampleSmallB");
225+
226+
final Collection<Record> result = t.field0;
227+
final WayangPlan wayangPlan = t.field1;
228+
229+
sqlContext.execute(wayangPlan);
230+
231+
final List<Record> shouldBe = List.of(
232+
new Record("item1", "item2", "item1", "item2", "item3"),
233+
new Record("item1", "item2", "item1", "item2", "item3"),
234+
new Record("item1", "item2", "item1", "item2", "item3"),
235+
new Record("item1", "item2", "item1", "item2", "item3"),
236+
new Record("item1", "item2", "x" , "x" , "x"),
237+
new Record("item1", "item2", "x" , "x" , "x")
238+
);
239+
240+
final Map<Record, Integer> resultTally = result.stream().collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum));
241+
final Map<Record, Integer> shouldBeTally = shouldBe.stream().collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum));
242+
243+
assert (resultTally.equals(shouldBeTally));
244+
}
245+
215246
@Test
216247
public void filterWithNotLike() throws Exception {
217248
final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv");
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
COLA:string,COLB:string
2+
item1;item2
3+
item1;item2
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
COLA:string,COLB:string,COLC:string
2+
item1;item2;item3
3+
item1;item2;item3
4+
x;x;x

0 commit comments

Comments
 (0)