Skip to content

Commit fda3419

Browse files
IGNITE-27907 SQL Calcite: Fix EXCEPT set op operator wrong result - Fixes #12768.
Signed-off-by: Aleksey Plekhanov <plehanov.alex@gmail.com>
1 parent 37b5a95 commit fda3419

File tree

5 files changed

+97
-2
lines changed

5 files changed

+97
-2
lines changed

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ else if (all) {
7878

7979
/** {@inheritDoc} */
8080
@Override protected boolean affectResult(int[] cntrs) {
81-
return cntrs[0] != cntrs[1];
81+
return !all || cntrs[0] != cntrs[1];
8282
}
8383

8484
/** {@inheritDoc} */

modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractSetOpExecutionTest.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.ignite.internal.processors.query.calcite.exec.rel;
1919

20+
import java.util.ArrayList;
2021
import java.util.Arrays;
2122
import java.util.Collections;
2223
import java.util.Comparator;
@@ -169,6 +170,75 @@ protected void checkSetOp(boolean single, boolean all, List<List<Object[]>> data
169170
assertFalse(root.hasNext());
170171
}
171172

173+
/** */
174+
@Test
175+
public void testDistributedInputs() {
176+
// Check all variants containing from 0 to 2 duplicated rows for 2 inputs.
177+
int[][] inputVariants = {
178+
{0, 0}, {0, 1}, {0, 2},
179+
{1, 0}, {1, 1}, {1, 2},
180+
{2, 0}, {2, 1}, {2, 2}
181+
};
182+
183+
// Check 2 cluster nodes.
184+
int[][] rowsCnt = new int[2][];
185+
186+
for (int node0input = 0; node0input < inputVariants.length; node0input++) {
187+
for (int node1input = 0; node1input < inputVariants.length; node1input++) {
188+
rowsCnt[0] = inputVariants[node0input];
189+
rowsCnt[1] = inputVariants[node1input];
190+
checkDistributedSetOp(false, rowsCnt);
191+
checkDistributedSetOp(true, rowsCnt);
192+
}
193+
}
194+
}
195+
196+
/**
197+
* @param all All.
198+
* @param rowsCnt Count of duplicated rows per node per input.
199+
*/
200+
protected void checkDistributedSetOp(boolean all, int[][] rowsCnt) {
201+
ExecutionContext<Object[]> ctx = executionContext();
202+
RelDataType rowType = TypeUtils.createRowType(ctx.getTypeFactory(), String.class);
203+
204+
List<Node<Object[]>> mapNodes = new ArrayList<>();
205+
int[] totalRowsCnt = new int[rowsCnt[0].length];
206+
207+
for (int i = 0; i < rowsCnt.length; i++) {
208+
for (int j = 0; j < rowsCnt[i].length; j++)
209+
totalRowsCnt[j] += rowsCnt[i][j];
210+
211+
List<Node<Object[]>> inputs = Arrays.stream(rowsCnt[i])
212+
.mapToObj(cnt -> new ScanNode<>(ctx, rowType, new TestTable(cnt, rowType, r -> "test")))
213+
.collect(Collectors.toList());
214+
215+
AbstractSetOpNode<Object[]> mapNode;
216+
217+
mapNode = setOpNodeFactory(ctx, rowType, MAP, all, inputs.size());
218+
219+
mapNode.register(inputs);
220+
221+
mapNodes.add(mapNode);
222+
}
223+
224+
// Use union all to emulate streams from different cluster nodes.
225+
Node<Object[]> unionNode = new UnionAllNode<>(ctx, rowType);
226+
unionNode.register(mapNodes);
227+
228+
AbstractSetOpNode<Object[]> reduceNode = setOpNodeFactory(ctx, rowType, REDUCE, all, 1);
229+
230+
reduceNode.register(Collections.singletonList(unionNode));
231+
232+
RootNode<Object[]> root = new RootNode<>(ctx, rowType);
233+
root.register(reduceNode);
234+
235+
assertEquals("Unexpected result [rowsCnt=" + Arrays.deepToString(rowsCnt) + ", all=" + all + ']',
236+
expectedResultSize(totalRowsCnt, all), F.size(root));
237+
}
238+
239+
/** */
240+
protected abstract int expectedResultSize(int[] totalRowsCnt, boolean all);
241+
172242
/** */
173243
protected abstract AbstractSetOpNode<Object[]> setOpNodeFactory(ExecutionContext<Object[]> ctx, RelDataType rowType,
174244
AggregateType type, boolean all, int inputsCnt);

modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IntersectExecutionTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,17 @@ public class IntersectExecutionTest extends AbstractSetOpExecutionTest {
7979

8080
checkSetOp(single, all, Arrays.asList(ds1, ds2, ds3), expectedResult);
8181
}
82+
83+
/** {@inheritDoc} */
84+
@Override protected int expectedResultSize(int[] totalRowsCnt, boolean all) {
85+
int min = totalRowsCnt[0];
86+
87+
for (int i = 1; i < totalRowsCnt.length; i++)
88+
min = Math.min(totalRowsCnt[i], min);
89+
90+
if (all)
91+
return min;
92+
else
93+
return min > 0 ? 1 : 0;
94+
}
8295
}

modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusExecutionTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,16 @@ public class MinusExecutionTest extends AbstractSetOpExecutionTest {
7979

8080
checkSetOp(single, all, Arrays.asList(ds1, ds2, ds3), expectedResult);
8181
}
82+
83+
/** {@inheritDoc} */
84+
@Override protected int expectedResultSize(int[] totalRowsCnt, boolean all) {
85+
int sum1 = 0;
86+
for (int i = 1; i < totalRowsCnt.length; i++)
87+
sum1 += totalRowsCnt[i];
88+
89+
if (all)
90+
return Math.max(totalRowsCnt[0] - sum1, 0);
91+
else
92+
return (totalRowsCnt[0] > 0 && sum1 == 0) ? 1 : 0;
93+
}
8294
}

modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/MemoryQuotasIntegrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public void testMinusNode() {
116116
sql("INSERT INTO tbl3 VALUES (?, ?)", i, new byte[1000]);
117117

118118
assertQuery("SELECT /*+ DISABLE_RULE('ColocatedMinusConverterRule') */ * FROM " +
119-
"(SELECT id, b FROM tbl2 EXCEPT SELECT id, b FROM tbl3 WHERE id < 800)")
119+
"(SELECT id, b FROM tbl2 WHERE id < 800 EXCEPT SELECT id, b FROM tbl3 WHERE id < 600)")
120120
.matches(QueryChecker.containsSubPlan("IgniteMapMinus"))
121121
.resultSize(200)
122122
.check();

0 commit comments

Comments
 (0)