Skip to content

Commit be21002

Browse files
committed
demonstrate broken modifications in unit test.
1 parent 41739fd commit be21002

File tree

6 files changed

+124
-12
lines changed

6 files changed

+124
-12
lines changed

engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationProcessor.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1148,15 +1148,12 @@ private void maybeInitializeVectorColumns(Set<String> groupByColumnSet, final Ta
11481148
private @NotNull GroupByChunkedOperator makeGroupByOperatorForFormula(String[] inputNonKeyColumns,
11491149
final QueryTable table, final String exposedRowsets) {
11501150
final MatchPair[] pairs;
1151-
final List<String> hiddenResults;
11521151
final boolean register;
11531152
if (exposedRowsets == null) {
1154-
hiddenResults = null;
11551153
register = false;
11561154
pairs = Arrays.stream(inputNonKeyColumns).map(col -> MatchPair.of(Pair.parse(col)))
11571155
.toArray(MatchPair[]::new);
11581156
} else {
1159-
hiddenResults = null;// Arrays.stream(pairs).map(mp -> mp.output().name()).collect(Collectors.toList());
11601157
register = true;
11611158
pairs = Arrays
11621159
.stream(inputNonKeyColumns).map(col -> MatchPair.of(
@@ -1165,7 +1162,7 @@ private void maybeInitializeVectorColumns(Set<String> groupByColumnSet, final Ta
11651162
ColumnName.of(col + ROLLUP_GRP_COLUMN_ID + ROLLUP_COLUMN_SUFFIX))))
11661163
.toArray(MatchPair[]::new);
11671164
}
1168-
return new GroupByChunkedOperator(table, register, exposedRowsets, hiddenResults, pairs);
1165+
return new GroupByChunkedOperator(table, register, exposedRowsets, null, pairs);
11691166
}
11701167

11711168
// -----------------------------------------------------------------------------------------------------------------

engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByChunkedOperator.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ public final class GroupByChunkedOperator implements GroupByOperator {
4343
private final ObjectArraySource<Object> addedBuilders;
4444
private final ObjectArraySource<Object> removedBuilders;
4545

46-
private final String[] inputColumnNames;
4746
private final Map<String, AggregateColumnSource<?, ?>> inputAggregatedColumns;
4847
private final String[] inputColumnNamesForResults;
4948
private final Map<String, AggregateColumnSource<?, ?>> resultAggregatedColumns;
@@ -104,7 +103,7 @@ public GroupByChunkedOperator(
104103
"Exposing group RowSets as %s, but this conflicts with a requested grouped output column name",
105104
exposeRowSetsAs));
106105
}
107-
inputColumnNames = MatchPair.getRightColumns(aggregatedColumnPairs);
106+
final String[] inputColumnNames = MatchPair.getRightColumns(aggregatedColumnPairs);
108107
if (live) {
109108
aggregationInputsModifiedColumnSet = inputTable.newModifiedColumnSet(inputColumnNames);
110109
removedBuilders = new ObjectArraySource<>(Object.class);

engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByReaggregateOperator.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public final class GroupByReaggregateOperator implements GroupByOperator {
6161
private final Map<String, AggregateColumnSource<?, ?>> resultAggregatedColumns;
6262

6363
private RowSetBuilderRandom stepDestinationsModified;
64+
private boolean rowsetsModified = false;
6465

6566
private boolean initialized;
6667

@@ -326,8 +327,7 @@ public UnaryOperator<ModifiedColumnSet> initializeRefreshing(
326327

327328
@Override
328329
public boolean hasModifications(boolean columnsModified) {
329-
/* TODO: FIX THIS. */
330-
return true;
330+
return columnsModified || rowsetsModified;
331331
}
332332

333333
private class InputToResultModifiedColumnSetFactory implements UnaryOperator<ModifiedColumnSet> {
@@ -362,6 +362,7 @@ public ModifiedColumnSet apply(@NotNull final ModifiedColumnSet upstreamModified
362362
@Override
363363
public void resetForStep(@NotNull final TableUpdate upstream, final int startingDestinationsCount) {
364364
stepDestinationsModified = new BitmapRandomBuilder(startingDestinationsCount);
365+
rowsetsModified = false;
365366
}
366367

367368
@Override
@@ -455,6 +456,9 @@ public void propagateUpdates(@NotNull final TableUpdate downstream, @NotNull fin
455456
// use the addRowSet as the new rowset
456457
final WritableRowSet addRowSet = nullToEmpty(
457458
extractAndClearBuilderRandom(addedBuildersBackingChunk, backingChunkOffset));
459+
if (!addRowSet.isEmpty()) {
460+
rowsetsModified = true;
461+
}
458462
rowSetBackingChunk.set(backingChunkOffset, live ? addRowSet.toTracking() : addRowSet);
459463
} else {
460464
try (final WritableRowSet addRowSet =
@@ -465,6 +469,9 @@ public void propagateUpdates(@NotNull final TableUpdate downstream, @NotNull fin
465469
backingChunkOffset))) {
466470
workingRowSet.remove(removeRowSet);
467471
workingRowSet.insert(addRowSet);
472+
if (!addRowSet.isEmpty() || !removeRowSet.isEmpty()) {
473+
rowsetsModified = true;
474+
}
468475
}
469476
}
470477
});
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
//
2+
// Copyright (c) 2016-2025 Deephaven Data Labs and Patent Pending
3+
//
4+
package io.deephaven.engine.table.impl;
5+
6+
import io.deephaven.api.ColumnName;
7+
import io.deephaven.api.agg.Aggregation;
8+
import io.deephaven.engine.context.ExecutionContext;
9+
import io.deephaven.engine.rowset.RowSetShiftData;
10+
import io.deephaven.engine.table.ColumnDefinition;
11+
import io.deephaven.engine.table.ModifiedColumnSet;
12+
import io.deephaven.engine.table.Table;
13+
import io.deephaven.engine.table.impl.by.AggregationProcessor;
14+
import io.deephaven.engine.testutil.ControlledUpdateGraph;
15+
import io.deephaven.engine.testutil.TstUtils;
16+
import io.deephaven.engine.testutil.testcase.RefreshingTableTestCase;
17+
import io.deephaven.engine.util.TableTools;
18+
import io.deephaven.test.types.OutOfBandTest;
19+
import org.junit.Test;
20+
import org.junit.experimental.categories.Category;
21+
22+
import java.util.*;
23+
24+
import static io.deephaven.api.agg.Aggregation.*;
25+
import static io.deephaven.engine.table.impl.by.RollupConstants.ROLLUP_COLUMN_SUFFIX;
26+
import static io.deephaven.engine.testutil.TstUtils.assertTableEquals;
27+
import static io.deephaven.engine.testutil.TstUtils.i;
28+
import static io.deephaven.engine.util.TableTools.*;
29+
30+
@Category(OutOfBandTest.class)
31+
public class TestAggGroup extends RefreshingTableTestCase {
32+
@Test
33+
public void testGroupModifications() {
34+
final QueryTable source = TstUtils.testRefreshingTable(
35+
stringCol("Key1", "Alpha", "Bravo", "Alpha", "Charlie", "Charlie", "Bravo", "Bravo"),
36+
stringCol("Key2", "Delta", "Delta", "Echo", "Echo", "Echo", "Echo", "Echo"),
37+
intCol("Sentinel", 1, 2, 3, 4, 5, 6, 7),
38+
intCol("Sentinel2", 101, 102, 103, 104, 105, 106, 107));
39+
40+
final List<Aggregation> aggs =
41+
List.of(AggGroup("Sentinel"), AggSum("Sum=Sentinel"), AggGroup("Sentinel2"), AggSum("Sum2=Sentinel2"));
42+
43+
final QueryTable normal = source.aggNoMemo(AggregationProcessor.forAggregation(aggs), false, null,
44+
List.of(ColumnName.of("Key1")));
45+
final ColumnName rollupColumn = ColumnName.of(ROLLUP_COLUMN_SUFFIX);
46+
final QueryTable base = source.aggNoMemo(AggregationProcessor.forRollupBase(aggs, false, rollupColumn), false,
47+
null, List.of(ColumnName.of("Key1"), ColumnName.of("Key2")));
48+
final QueryTable reaggregated = base.aggNoMemo(AggregationProcessor.forRollupReaggregated(aggs,
49+
List.of(ColumnDefinition.ofString("Key2")), rollupColumn, source), false, null,
50+
List.of(ColumnName.of("Key1")));
51+
52+
TableTools.show(normal);
53+
TableTools.show(base);
54+
TableTools.show(reaggregated);
55+
56+
doCheck(normal, base, reaggregated);
57+
58+
final SimpleListener normalListener = new SimpleListener(normal);
59+
normal.addUpdateListener(normalListener);
60+
final SimpleListener baseListener = new SimpleListener(base);
61+
base.addUpdateListener(baseListener);
62+
final SimpleListener reaggListener = new SimpleListener(reaggregated);
63+
reaggregated.addUpdateListener(reaggListener);
64+
65+
final ControlledUpdateGraph cug = ExecutionContext.getContext().getUpdateGraph().cast();
66+
// modify the value of a Sentinel; check the updates
67+
cug.runWithinUnitTestCycle(() -> {
68+
TstUtils.addToTable(source, i(0), stringCol("Key1", "Alpha"), stringCol("Key2", "Delta"),
69+
intCol("Sentinel", 8), intCol("Sentinel2", 101));
70+
final ModifiedColumnSet mcs = source.getModifiedColumnSetForUpdates();
71+
mcs.clear();
72+
mcs.setAll("Sentinel");
73+
source.notifyListeners(new TableUpdateImpl(i(), i(), i(0), RowSetShiftData.EMPTY, mcs));
74+
});
75+
76+
TableTools.show(normal);
77+
TableTools.show(base);
78+
TableTools.show(reaggregated);
79+
80+
// make sure the aggregation is still consistent
81+
doCheck(normal, base, reaggregated);
82+
83+
// we should have gotten an update from each of our listeners
84+
checkModified(normalListener, normal, "Sentinel", "Sentinel2");
85+
checkModified(baseListener, base, "Sentinel", "Sentinel2");
86+
checkModified(reaggListener, reaggregated, "Sentinel", "Sentinel2");
87+
}
88+
89+
private static void checkModified(SimpleListener listener, QueryTable table, final String modColumn,
90+
final String noModColumn) {
91+
System.out.println("update = " + listener.update);
92+
assertEquals(1, listener.count);
93+
assertTrue(listener.update.added().isEmpty());
94+
assertTrue(listener.update.removed().isEmpty());
95+
assertEquals(1, listener.update.modified().size());
96+
assertTrue(listener.update.modifiedColumnSet().containsAll(table.newModifiedColumnSet(modColumn)));
97+
assertFalse(listener.update.modifiedColumnSet().containsAny(table.newModifiedColumnSet(noModColumn)));
98+
}
99+
100+
private static void doCheck(Table normal, QueryTable base, QueryTable reaggregated) {
101+
assertEquals(0, normal.update("CheckSum=sum(Sentinel)", "CheckSum2=sum(Sentinel2)")
102+
.where("Sum != CheckSum || Sum2 != CheckSum2").size());
103+
assertEquals(0, base.update("CheckSum=sum(Sentinel)", "CheckSum2=sum(Sentinel2)")
104+
.where("Sum != CheckSum || Sum2 != CheckSum2").size());
105+
assertEquals(0, reaggregated.update("CheckSum=sum(Sentinel)", "CheckSum2=sum(Sentinel2)")
106+
.where("Sum != CheckSum || Sum2 != CheckSum2").size());
107+
assertTableEquals(normal.view("Key1", "Sentinel", "Sum", "Sentinel2", "Sum2"),
108+
reaggregated.view("Key1", "Sentinel", "Sum", "Sentinel2", "Sum2"));
109+
}
110+
}

engine/table/src/test/java/io/deephaven/engine/table/impl/TestRollupTable.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ public void testRollupFormulaStatic2() {
428428

429429
final RollupTable rollup1 =
430430
source.updateView("qty=(long)qty").rollup(
431-
List.of(AggFormula("qty", "__FORMULA_DEPTH__ > 0 ? first(qty) : sum(qty)").asReggregating(),
431+
List.of(AggFormula("qty", "__FORMULA_DEPTH__ > 0 ? first(qty) : sum(qty)").asReaggregating(),
432432
AggSum("Dollars")),
433433
"Account", "Sym");
434434

@@ -469,7 +469,7 @@ private void testRollupFormulaStatic3(boolean hasGroup) {
469469
if (hasGroup) {
470470
aggList.add(AggGroup("gqty=qty"));
471471
}
472-
aggList.add(AggFormula("qty", "__FORMULA_DEPTH__ == 2 ? min(1000, sum(qty)) : sum(qty)").asReggregating());
472+
aggList.add(AggFormula("qty", "__FORMULA_DEPTH__ == 2 ? min(1000, sum(qty)) : sum(qty)").asReaggregating());
473473
aggList.add(AggSum("sqty=qty"));
474474

475475
final RollupTable rollup1 =
@@ -610,7 +610,6 @@ public void testReusedGrouping() {
610610
TableTools.showWithRowSet(expected2);
611611
assertTableEquals(expected2, snapshot2.dropColumns("__EXPOSED_GROUP_ROW_SETS__"));
612612
freeSnapshotTableChunks(snapshot2);
613-
614613
// TODO: modify only one column, validate that we get results that we expect without excess modifications
615614
}
616615
}

table-api/src/main/java/io/deephaven/api/agg/Formula.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public static Formula of(Selectable selectable) {
3636
return ImmutableFormula.of(selectable, false);
3737
}
3838

39-
public Formula asReggregating() {
39+
public Formula asReaggregating() {
4040
return ImmutableFormula.of(selectable(), true);
4141
}
4242

0 commit comments

Comments
 (0)