Skip to content

Commit b4cfa0e

Browse files
committed
reaggregate support for groupBy.
1 parent 7390976 commit b4cfa0e

File tree

8 files changed

+634
-8
lines changed

8 files changed

+634
-8
lines changed

engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationProcessor.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1015,12 +1015,6 @@ default void visit(AggSpecFreeze freeze) {
10151015
rollupUnsupported("Freeze");
10161016
}
10171017

1018-
@Override
1019-
@FinalDefault
1020-
default void visit(@NotNull final AggSpecGroup group) {
1021-
rollupUnsupported("Group");
1022-
}
1023-
10241018
@Override
10251019
@FinalDefault
10261020
default void visit(@NotNull final AggSpecFormula formula) {
@@ -1108,6 +1102,13 @@ public void visit(@NotNull final Partition partition) {
11081102
addNoInputOperator(partitionOperator);
11091103
}
11101104

1105+
@Override
1106+
public void visit(AggSpecGroup group) {
1107+
unsupportedForBlinkTables("Group for rollup");
1108+
addNoInputOperator(new GroupByChunkedOperator(table, true, EXPOSED_GROUP_ROW_SETS.name(),
1109+
MatchPair.fromPairs(resultPairs)));
1110+
}
1111+
11111112
// -------------------------------------------------------------------------------------------------------------
11121113
// AggSpec.Visitor
11131114
// -------------------------------------------------------------------------------------------------------------
@@ -1265,6 +1266,17 @@ public void visit(@NotNull final Partition partition) {
12651266
addNoInputOperator(partitionOperator);
12661267
}
12671268

1269+
@Override
1270+
public void visit(AggSpecGroup group) {
1271+
final ColumnSource<?> groupRowSet = table.getColumnSource(EXPOSED_GROUP_ROW_SETS.name());
1272+
final MatchPair[] pairs = new MatchPair[resultPairs.size()];
1273+
for (int ii = 0; ii < resultPairs.size(); ++ii) {
1274+
pairs[ii] = new MatchPair(resultPairs.get(ii).output().name(), resultPairs.get(ii).output().name());
1275+
}
1276+
addOperator(new GroupByReaggreagateOperator(table, true, EXPOSED_GROUP_ROW_SETS.name(), pairs), groupRowSet,
1277+
EXPOSED_GROUP_ROW_SETS.name());
1278+
}
1279+
12681280
// -------------------------------------------------------------------------------------------------------------
12691281
// AggSpec.Visitor
12701282
// -------------------------------------------------------------------------------------------------------------

engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByReaggreagateOperator.java

Lines changed: 493 additions & 0 deletions
Large diffs are not rendered by default.

engine/table/src/main/java/io/deephaven/engine/table/impl/hierarchical/RollupTableImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.deephaven.engine.table.impl.select.WhereFilter;
2828
import io.deephaven.engine.table.impl.sources.NullValueColumnSource;
2929
import io.deephaven.engine.table.impl.util.RowRedirection;
30+
import io.deephaven.engine.util.PrintListener;
3031
import io.deephaven.util.annotations.InternalUseOnly;
3132
import io.deephaven.util.type.TypeUtils;
3233
import org.apache.commons.lang3.mutable.MutableObject;

engine/table/src/main/java/io/deephaven/engine/table/impl/sources/aggregate/AggregateColumnSource.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ public interface AggregateColumnSource<VECTOR_TYPE extends Vector<VECTOR_TYPE>,
2222

2323
UngroupedColumnSource<COMPONENT_TYPE> ungrouped();
2424

25+
ColumnSource<COMPONENT_TYPE> getAggregatedSource();
26+
2527
static <VECTOR_TYPE extends Vector<VECTOR_TYPE>, DATA_TYPE> AggregateColumnSource<VECTOR_TYPE, DATA_TYPE> make(
2628
@NotNull final ColumnSource<DATA_TYPE> aggregatedSource,
2729
@NotNull final ColumnSource<? extends RowSet> groupRowSetSource) {

engine/table/src/main/java/io/deephaven/engine/table/impl/sources/aggregate/BaseAggregateColumnSource.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,4 +290,9 @@ public boolean isStateless() {
290290
public boolean isImmutable() {
291291
return aggregatedSource.isImmutable() && groupRowSetSource.isImmutable();
292292
}
293+
294+
@Override
295+
public ColumnSource<COMPONENT_TYPE> getAggregatedSource() {
296+
return aggregatedSource;
297+
}
293298
}

engine/table/src/main/java/io/deephaven/engine/table/impl/sources/aggregate/BaseAggregateSlicedColumnSource.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,4 +381,9 @@ public boolean isImmutable() {
381381
&& (startSource == null || startSource.isImmutable())
382382
&& (endSource == null || endSource.isImmutable());
383383
}
384+
385+
@Override
386+
public ColumnSource<COMPONENT_TYPE> getAggregatedSource() {
387+
return aggregatedSource;
388+
}
384389
}

engine/table/src/main/java/io/deephaven/engine/table/impl/sources/aggregate/RangeAggregateColumnSource.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,4 +346,9 @@ public boolean isImmutable() {
346346
&& startPositionsInclusive.isImmutable()
347347
&& endPositionsExclusive.isImmutable();
348348
}
349+
350+
@Override
351+
public ColumnSource<COMPONENT_TYPE> getAggregatedSource() {
352+
return aggregated;
353+
}
349354
}

engine/table/src/test/java/io/deephaven/engine/table/impl/TestRollupTable.java

Lines changed: 105 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,13 @@
88
import io.deephaven.api.agg.Aggregation;
99
import io.deephaven.engine.context.QueryScope;
1010
import io.deephaven.engine.rowset.RowSetFactory;
11+
import io.deephaven.engine.rowset.RowSetShiftData;
12+
import io.deephaven.engine.table.ModifiedColumnSet;
1113
import io.deephaven.engine.table.Table;
1214
import io.deephaven.engine.table.hierarchical.HierarchicalTable;
1315
import io.deephaven.engine.table.hierarchical.RollupTable;
1416
import io.deephaven.engine.testutil.ColumnInfo;
17+
import io.deephaven.engine.testutil.ControlledUpdateGraph;
1518
import io.deephaven.engine.testutil.EvalNuggetInterface;
1619
import io.deephaven.engine.testutil.TstUtils;
1720
import io.deephaven.engine.testutil.generator.IntGenerator;
@@ -22,6 +25,7 @@
2225
import io.deephaven.test.types.OutOfBandTest;
2326
import io.deephaven.vector.IntVector;
2427
import io.deephaven.vector.IntVectorDirect;
28+
import org.jspecify.annotations.NonNull;
2529
import org.junit.Assert;
2630
import org.junit.Test;
2731
import org.junit.experimental.categories.Category;
@@ -62,7 +66,8 @@ public class TestRollupTable extends RefreshingTableTestCase {
6266
AggUnique("unique=intCol"),
6367
AggVar("var=intCol"),
6468
AggWAvg("intCol", "wavg=intCol"),
65-
AggWSum("intCol", "wsum=intCol"));
69+
AggWSum("intCol", "wsum=intCol"),
70+
AggGroup("grp=intCol"));
6671

6772
// Companion list of columns to compare between rollup root and the zero-key equivalent
6873
private final String[] columnsToCompare = new String[] {
@@ -83,7 +88,8 @@ public class TestRollupTable extends RefreshingTableTestCase {
8388
"unique",
8489
"var",
8590
"wavg",
86-
"wsum"
91+
"wsum",
92+
"grp"
8793
};
8894

8995
/**
@@ -339,4 +345,101 @@ public void testVectorKeyColumn() {
339345
snapshot);
340346
freeSnapshotTableChunks(snapshot);
341347
}
348+
349+
@Test
350+
public void testRollupGroupStatic() {
351+
final Table source = TableTools.newTable(
352+
stringCol("Key1", "Alpha", "Bravo", "Alpha", "Charlie", "Charlie", "Bravo", "Bravo"),
353+
stringCol("Key2", "Delta", "Delta", "Echo", "Echo", "Echo", "Echo", "Echo"),
354+
intCol("Sentinel", 1, 2, 3, 4, 5, 6, 7));
355+
356+
final RollupTable rollup1 =
357+
source.rollup(List.of(AggGroup("Sentinel"), AggSum("Sum=Sentinel")), "Key1", "Key2");
358+
359+
final String[] arrayWithNull = new String[1];
360+
final Table keyTable = newTable(
361+
intCol(rollup1.getRowDepthColumn().name(), 0),
362+
stringCol("Key1", arrayWithNull),
363+
stringCol("Key2", arrayWithNull),
364+
byteCol("Action", HierarchicalTable.KEY_TABLE_ACTION_EXPAND_ALL));
365+
366+
final HierarchicalTable.SnapshotState ss1 = rollup1.makeSnapshotState();
367+
final Table snapshot =
368+
snapshotToTable(rollup1, ss1, keyTable, ColumnName.of("Action"), null, RowSetFactory.flat(30));
369+
TableTools.showWithRowSet(snapshot);
370+
371+
final Table expected = initialExpectedGrouped(rollup1);
372+
assertTableEquals(expected, snapshot.dropColumns("__EXPOSED_GROUP_ROW_SETS__"));
373+
freeSnapshotTableChunks(snapshot);
374+
}
375+
376+
private static Table initialExpectedGrouped(RollupTable rollup1) {
377+
return TableTools.newTable(intCol(rollup1.getRowDepthColumn().name(), 1, 2, 3, 3, 2, 3, 3, 2, 3),
378+
booleanCol(rollup1.getRowExpandedColumn().name(), true, true, null, null, true, null, null,
379+
true, null),
380+
col("Key1", null, "Alpha", "Alpha", "Alpha", "Bravo", "Bravo", "Bravo", "Charlie", "Charlie"),
381+
col("Key2", null, null, "Delta", "Echo", null, "Delta", "Echo", null, "Echo"),
382+
col("Sentinel", iv(1, 2, 3, 4, 5, 6, 7), iv(1, 3), iv(1), iv(3), iv(2, 6, 7), iv(2), iv(6, 7),
383+
iv(4, 5), iv(4, 5)))
384+
.update("Sum=sum(Sentinel)");
385+
}
386+
387+
private static Table secondExpectedGrouped(RollupTable rollup1) {
388+
return TableTools.newTable(intCol(rollup1.getRowDepthColumn().name(), 1, 2, 3, 3, 2, 3, 3, 2, 3),
389+
booleanCol(rollup1.getRowExpandedColumn().name(), true, true, null, null, true, null, null,
390+
true, null),
391+
col("Key1", null, "Alpha", "Alpha", "Alpha", "Bravo", "Bravo", "Bravo", "Charlie", "Charlie"),
392+
col("Key2", null, null, "Delta", "Echo", null, "Delta", "Echo", null, "Echo"),
393+
col("Sentinel", iv(1, 2, 3, 4, 5, 7, 8, 9), iv(1, 3, 8), iv(1), iv(3, 8), iv(2, 7), iv(2), iv(7),
394+
iv(4, 5, 9), iv(4, 5, 9)))
395+
.update("Sum=sum(Sentinel)");
396+
}
397+
398+
private static @NonNull IntVector iv(final int... ints) {
399+
return new IntVectorDirect(ints);
400+
}
401+
402+
@Test
403+
public void testRollupGroupIncremental() {
404+
final QueryTable source = TstUtils.testRefreshingTable(
405+
stringCol("Key1", "Alpha", "Bravo", "Alpha", "Charlie", "Charlie", "Bravo", "Bravo"),
406+
stringCol("Key2", "Delta", "Delta", "Echo", "Echo", "Echo", "Echo", "Echo"),
407+
intCol("Sentinel", 1, 2, 3, 4, 5, 6, 7));
408+
409+
final RollupTable rollup1 =
410+
source.rollup(List.of(AggGroup("Sentinel"), AggSum("Sum=Sentinel")), "Key1", "Key2");
411+
412+
final String[] arrayWithNull = new String[1];
413+
final Table keyTable = newTable(
414+
intCol(rollup1.getRowDepthColumn().name(), 0),
415+
stringCol("Key1", arrayWithNull),
416+
stringCol("Key2", arrayWithNull),
417+
byteCol("Action", HierarchicalTable.KEY_TABLE_ACTION_EXPAND_ALL));
418+
419+
final HierarchicalTable.SnapshotState ss1 = rollup1.makeSnapshotState();
420+
final Table snapshot =
421+
snapshotToTable(rollup1, ss1, keyTable, ColumnName.of("Action"), null, RowSetFactory.flat(30));
422+
TableTools.showWithRowSet(snapshot);
423+
424+
final Table expected = initialExpectedGrouped(rollup1);
425+
assertTableEquals(expected, snapshot.dropColumns("__EXPOSED_GROUP_ROW_SETS__"));
426+
freeSnapshotTableChunks(snapshot);
427+
428+
final ControlledUpdateGraph cug = source.getUpdateGraph().cast();
429+
cug.runWithinUnitTestCycle(() -> {
430+
addToTable(source, i(10, 11), stringCol("Key1", "Alpha", "Charlie"), stringCol("Key2", "Echo", "Echo"),
431+
intCol("Sentinel", 8, 9));
432+
removeRows(source, i(5));
433+
source.notifyListeners(
434+
new TableUpdateImpl(i(10, 11), i(5), i(), RowSetShiftData.EMPTY, ModifiedColumnSet.EMPTY));
435+
});
436+
437+
final Table snapshot2 =
438+
snapshotToTable(rollup1, ss1, keyTable, ColumnName.of("Action"), null, RowSetFactory.flat(30));
439+
TableTools.showWithRowSet(snapshot2);
440+
Table expected2 = secondExpectedGrouped(rollup1);
441+
TableTools.showWithRowSet(expected2);
442+
assertTableEquals(expected2, snapshot2.dropColumns("__EXPOSED_GROUP_ROW_SETS__"));
443+
freeSnapshotTableChunks(snapshot2);
444+
}
342445
}

0 commit comments

Comments
 (0)