Skip to content

Commit 2ca4ae4

Browse files
committed
formula stuff
1 parent fae01cb commit 2ca4ae4

File tree

8 files changed

+277
-49
lines changed

8 files changed

+277
-49
lines changed

engine/table/src/main/java/io/deephaven/engine/table/impl/by/AggregationProcessor.java

Lines changed: 159 additions & 33 deletions
Large diffs are not rendered by default.

engine/table/src/main/java/io/deephaven/engine/table/impl/by/FormulaMultiColumnChunkedOperator.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource;
2020
import io.deephaven.util.SafeCloseable;
2121
import org.jetbrains.annotations.NotNull;
22+
import org.jetbrains.annotations.Nullable;
2223

2324
import java.util.Arrays;
2425
import java.util.HashMap;
@@ -34,11 +35,13 @@ class FormulaMultiColumnChunkedOperator implements IterativeChunkedAggregationOp
3435

3536
private final QueryTable inputTable;
3637

37-
private final GroupByChunkedOperator groupBy;
38+
private final GroupByOperator groupBy;
3839
private final boolean delegateToBy;
3940
private final SelectColumn selectColumn;
4041
private final WritableColumnSource<?> resultColumn;
4142
private final String[] inputKeyColumns;
43+
@Nullable
44+
private final ColumnSource<Integer> formulaDepthSource;
4245

4346
private ChunkSource<Values> formulaDataSource;
4447

@@ -63,15 +66,17 @@ class FormulaMultiColumnChunkedOperator implements IterativeChunkedAggregationOp
6366
*/
6467
FormulaMultiColumnChunkedOperator(
6568
@NotNull final QueryTable inputTable,
66-
@NotNull final GroupByChunkedOperator groupBy,
69+
@NotNull final GroupByOperator groupBy,
6770
final boolean delegateToBy,
6871
@NotNull final SelectColumn selectColumn,
69-
@NotNull final String[] inputKeyColumns) {
72+
@NotNull final String[] inputKeyColumns,
73+
@Nullable final ColumnSource<Integer> formulaDepthSource) {
7074
this.inputTable = inputTable;
7175
this.groupBy = groupBy;
7276
this.delegateToBy = delegateToBy;
7377
this.selectColumn = selectColumn;
7478
this.inputKeyColumns = inputKeyColumns;
79+
this.formulaDepthSource = formulaDepthSource;
7580

7681
resultColumn = ArrayBackedColumnSource.getMemoryColumnSource(
7782
0, selectColumn.getReturnedType(), selectColumn.getReturnedComponentType());
@@ -199,7 +204,7 @@ public boolean modifyRowKeys(final SingletonContext context,
199204

200205
@Override
201206
public boolean requiresRowKeys() {
202-
return delegateToBy;
207+
return delegateToBy && groupBy.requiresRowKeys();
203208
}
204209

205210
@Override
@@ -222,13 +227,14 @@ public void propagateInitialState(@NotNull final QueryTable resultTable, int sta
222227
}
223228

224229
final Map<String, ColumnSource<?>> sourceColumns;
225-
if (inputKeyColumns.length == 0) {
230+
if (inputKeyColumns.length == 0 && formulaDepthSource == null) {
226231
// noinspection unchecked
227232
sourceColumns = (Map<String, ColumnSource<?>>) groupBy.getInputResultColumns();
228233
} else {
229234
final Map<String, ColumnSource<?>> columnSourceMap = resultTable.getColumnSourceMap();
230235
sourceColumns = new HashMap<>(groupBy.getInputResultColumns());
231236
Arrays.stream(inputKeyColumns).forEach(col -> sourceColumns.put(col, columnSourceMap.get(col)));
237+
sourceColumns.put(AggregationProcessor.ROLLUP_FORMULA_DEPTH.name(), formulaDepthSource);
232238
}
233239
selectColumn.initInputs(resultTable.getRowSet(), sourceColumns);
234240
formulaDataSource = selectColumn.getDataView();
@@ -263,8 +269,7 @@ public UnaryOperator<ModifiedColumnSet> initializeRefreshing(@NotNull final Quer
263269
final String[] inputColumnNames = selectColumn.getColumns().toArray(String[]::new);
264270
final ModifiedColumnSet inputMCS = inputTable.newModifiedColumnSet(inputColumnNames);
265271
return inputToResultModifiedColumnSetFactory = input -> {
266-
if (groupBy.getSomeKeyHasAddsOrRemoves() ||
267-
(groupBy.getSomeKeyHasModifies() && input.containsAny(inputMCS))) {
272+
if (groupBy.hasModifications(input.containsAny(inputMCS))) {
268273
return resultMCS;
269274
}
270275
return ModifiedColumnSet.EMPTY;

engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByChunkedOperator.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
* An {@link IterativeChunkedAggregationOperator} used in the implementation of {@link Table#groupBy},
3434
* {@link io.deephaven.api.agg.spec.AggSpecGroup}, and {@link io.deephaven.api.agg.Aggregation#AggGroup(String...)}.
3535
*/
36-
public final class GroupByChunkedOperator implements IterativeChunkedAggregationOperator {
36+
public final class GroupByChunkedOperator implements GroupByOperator {
3737

3838
private final QueryTable inputTable;
3939
private final boolean registeredWithHelper;
@@ -392,9 +392,7 @@ public void ensureCapacity(final long tableSize) {
392392
return resultAggregatedColumns;
393393
}
394394

395-
/**
396-
* Get a map from input column names to the corresponding output {@link ColumnSource}.
397-
*/
395+
@Override
398396
public Map<String, ? extends ColumnSource<?>> getInputResultColumns() {
399397
return inputAggregatedColumns;
400398
}
@@ -635,4 +633,9 @@ boolean getSomeKeyHasAddsOrRemoves() {
635633
boolean getSomeKeyHasModifies() {
636634
return someKeyHasModifies;
637635
}
636+
637+
@Override
638+
public boolean hasModifications(boolean columnsModified) {
639+
return getSomeKeyHasAddsOrRemoves() || (getSomeKeyHasModifies() && columnsModified);
640+
}
638641
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
//
2+
// Copyright (c) 2016-2025 Deephaven Data Labs and Patent Pending
3+
//
4+
package io.deephaven.engine.table.impl.by;
5+
6+
import io.deephaven.engine.table.ColumnSource;
7+
8+
import java.util.Map;
9+
10+
public interface GroupByOperator extends IterativeChunkedAggregationOperator {
11+
/**
12+
* Get a map from input column names to the corresponding output {@link ColumnSource}.
13+
*/
14+
Map<String, ? extends ColumnSource<?>> getInputResultColumns();
15+
16+
/**
17+
* Given that there have been modified input columns, should we propagate changes?
18+
*
19+
* @param columnsModified have any of the input columns been modified (as per the MCS)?
20+
* @return true if we have modified our output (e.g., because of additions or modifications)
21+
*/
22+
boolean hasModifications(final boolean columnsModified);
23+
}

engine/table/src/main/java/io/deephaven/engine/table/impl/by/GroupByReaggregateOperator.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
* AggregateColumnSources -- not the immediately prior level).
4444
* </p>
4545
*/
46-
public final class GroupByReaggregateOperator implements IterativeChunkedAggregationOperator {
46+
public final class GroupByReaggregateOperator implements GroupByOperator {
4747

4848
private final QueryTable inputTable;
4949
private final boolean registeredWithHelper;
@@ -56,6 +56,7 @@ public final class GroupByReaggregateOperator implements IterativeChunkedAggrega
5656

5757
private final String[] inputColumnNames;
5858

59+
private final Map<String, AggregateColumnSource<?, ?>> inputAggregatedColumns;
5960
private final Map<String, AggregateColumnSource<?, ?>> resultAggregatedColumns;
6061

6162
private RowSetBuilderRandom stepDestinationsModified;
@@ -79,6 +80,7 @@ public GroupByReaggregateOperator(
7980
rowSets = new ObjectArraySource<>(WritableRowSet.class);
8081
addedBuilders = new ObjectArraySource<>(Object.class);
8182

83+
inputAggregatedColumns = new LinkedHashMap<>(aggregatedColumnPairs.length);
8284
resultAggregatedColumns = new LinkedHashMap<>(aggregatedColumnPairs.length);
8385
Arrays.stream(aggregatedColumnPairs).forEach(pair -> {
8486
final ColumnSource<Object> source = inputTable.getColumnSource(pair.rightColumn());
@@ -89,6 +91,7 @@ public GroupByReaggregateOperator(
8991
final ColumnSource<?> realSource = ((AggregateColumnSource) source).getAggregatedSource();
9092
final AggregateColumnSource<?, ?> aggregateColumnSource = AggregateColumnSource.make(realSource, rowSets);
9193
resultAggregatedColumns.put(pair.leftColumn(), aggregateColumnSource);
94+
inputAggregatedColumns.put(pair.rightColumn(), aggregateColumnSource);
9295
});
9396

9497
if (resultAggregatedColumns.containsKey(exposeRowSetsAs)) {
@@ -306,6 +309,17 @@ public UnaryOperator<ModifiedColumnSet> initializeRefreshing(
306309
: null;
307310
}
308311

312+
@Override
313+
public Map<String, ? extends ColumnSource<?>> getInputResultColumns() {
314+
return inputAggregatedColumns;
315+
}
316+
317+
@Override
318+
public boolean hasModifications(boolean columnsModified) {
319+
/* TODO: FIX THIS. */
320+
return true;
321+
}
322+
309323
private class InputToResultModifiedColumnSetFactory implements UnaryOperator<ModifiedColumnSet> {
310324

311325
private final ModifiedColumnSet updateModifiedColumnSet;

engine/table/src/main/java/io/deephaven/engine/table/impl/hierarchical/RollupTableImpl.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ public RollupTable withFilter(@NotNull final Filter filter) {
272272
final AggregationRowLookup[] levelRowLookups = makeLevelRowLookupsArray(numLevels, filteredBaseLevelRowLookup);
273273
final ColumnSource<Table>[] levelNodeTableSources = makeLevelNodeTableSourcesArray(
274274
numLevels, filteredBaseLevel.getColumnSource(ROLLUP_COLUMN.name(), Table.class));
275-
rollupFromBase(levelTables, levelRowLookups, levelNodeTableSources, aggregations, groupByColumns);
275+
rollupFromBase(levelTables, levelRowLookups, levelNodeTableSources, aggregations, groupByColumns, source);
276276

277277
final WhereFilter[] newFilters;
278278
if (rollupKeyFilters == null) {
@@ -589,7 +589,7 @@ public static RollupTable makeRollup(
589589
final AggregationRowLookup[] levelRowLookups = makeLevelRowLookupsArray(numLevels, getRowLookup(baseLevel));
590590
final ColumnSource<Table>[] levelNodeTableSources = makeLevelNodeTableSourcesArray(
591591
numLevels, baseLevel.getColumnSource(ROLLUP_COLUMN.name(), Table.class));
592-
rollupFromBase(levelTables, levelRowLookups, levelNodeTableSources, aggregations, groupByColumns);
592+
rollupFromBase(levelTables, levelRowLookups, levelNodeTableSources, aggregations, groupByColumns, source);
593593

594594
return new RollupTableImpl(
595595
attributes,
@@ -670,13 +670,15 @@ private static ColumnSource<Table>[] makeLevelNodeTableSourcesArray(
670670
* already filled
671671
* @param aggregations The aggregations
672672
* @param groupByColumns The group-by columns
673+
* @param source
673674
*/
674675
private static void rollupFromBase(
675676
@NotNull final QueryTable[] levelTables,
676677
@NotNull final AggregationRowLookup[] levelRowLookups,
677678
@NotNull final ColumnSource<Table>[] levelNodeTableSources,
678679
@NotNull final Collection<? extends Aggregation> aggregations,
679-
@NotNull final Collection<? extends ColumnName> groupByColumns) {
680+
@NotNull final Collection<? extends ColumnName> groupByColumns,
681+
@NotNull final QueryTable source) {
680682
final Deque<ColumnName> columnsToReaggregateBy = new ArrayDeque<>(groupByColumns);
681683
final Deque<String> nullColumnNames = new ArrayDeque<>(groupByColumns.size());
682684
int lastLevelIndex = levelTables.length - 1;
@@ -688,7 +690,7 @@ private static void rollupFromBase(
688690
nullColumnNames.stream().map(lastLevelDefinition::getColumn).collect(Collectors.toList());
689691

690692
lastLevel = lastLevel.aggNoMemo(
691-
AggregationProcessor.forRollupReaggregated(aggregations, nullColumns, ROLLUP_COLUMN),
693+
AggregationProcessor.forRollupReaggregated(aggregations, nullColumns, ROLLUP_COLUMN, source),
692694
false, null, new ArrayList<>(columnsToReaggregateBy));
693695
--lastLevelIndex;
694696
levelTables[lastLevelIndex] = lastLevel;

engine/table/src/test/java/io/deephaven/engine/table/impl/TestAggBy.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,27 @@ public void setUp() throws Exception {
6262
super.setUp();
6363
}
6464

65+
@Test
66+
public void testDoubleFormula() {
67+
ColumnHolder<?> aHolder = col("A", 0, 0, 1, 1, 0, 0, 1, 1, 0, 0);
68+
ColumnHolder<?> bHolder = col("B", 1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
69+
ColumnHolder<?> cHolder = col("C", 1, 1, 1, 1, 1, 1, 1, 1, 1, 1);
70+
Table table = TableTools.newTable(aHolder, bHolder, cHolder);
71+
show(table);
72+
assertEquals(10, table.size());
73+
assertEquals(2, table.groupBy("A").size());
74+
75+
Table minMax = table.aggBy(
76+
List.of(
77+
AggFormula("f_const=6.0 + 3"),
78+
AggFormula("f_max=max(B)"),
79+
AggFormula("f_sum_two_col=sum(B) + sum(C)")),
80+
"A");
81+
show(minMax);
82+
83+
assertEquals(2, minMax.size());
84+
}
85+
6586
@Test
6687
public void testBy() {
6788
ColumnHolder<?> aHolder = col("A", 0, 0, 1, 1, 0, 0, 1, 1, 0, 0);

engine/table/src/test/java/io/deephaven/engine/table/impl/TestRollupTable.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,40 @@ public void testRollupGroupStatic() {
373373
freeSnapshotTableChunks(snapshot);
374374
}
375375

376+
@Test
377+
public void testRollupFormulaStatic() {
378+
final Table source = TableTools.newTable(
379+
stringCol("Key1", "Alpha", "Bravo", "Alpha", "Charlie", "Charlie", "Bravo", "Bravo"),
380+
stringCol("Key2", "Delta", "Delta", "Echo", "Echo", "Echo", "Echo", "Echo"),
381+
intCol("Sentinel", 1, 2, 3, 4, 5, 6, 7));
382+
TableTools.show(source);
383+
384+
final RollupTable rollup1 =
385+
source.rollup(
386+
List.of(AggGroup("Sentinel"), AggSum("Sum=Sentinel"),
387+
AggFormula("FSum", "__FORMULA_DEPTH__ == 0 ? max(Sentinel) : 1 + sum(Sentinel)")),
388+
"Key1", "Key2");
389+
390+
final String[] arrayWithNull = new String[1];
391+
final Table keyTable = newTable(
392+
intCol(rollup1.getRowDepthColumn().name(), 0),
393+
stringCol("Key1", arrayWithNull),
394+
stringCol("Key2", arrayWithNull),
395+
byteCol("Action", HierarchicalTable.KEY_TABLE_ACTION_EXPAND_ALL));
396+
397+
final HierarchicalTable.SnapshotState ss1 = rollup1.makeSnapshotState();
398+
final Table snapshot =
399+
snapshotToTable(rollup1, ss1, keyTable, ColumnName.of("Action"), null, RowSetFactory.flat(30));
400+
TableTools.showWithRowSet(snapshot);
401+
402+
TableTools.show(snapshot.view(rollup1.getRowDepthColumn().name(), rollup1.getRowExpandedColumn().name(), "Key1",
403+
"Key2", "Sentinel", "Sum", "FSum"));
404+
405+
final Table expected = initialExpectedGrouped(rollup1).update("FSum=ii == 0 ? 7 : 1 + Sum");
406+
assertTableEquals(expected, snapshot.dropColumns("__EXPOSED_GROUP_ROW_SETS__"));
407+
freeSnapshotTableChunks(snapshot);
408+
}
409+
376410
private static Table initialExpectedGrouped(RollupTable rollup1) {
377411
return TableTools.newTable(intCol(rollup1.getRowDepthColumn().name(), 1, 2, 3, 3, 2, 3, 3, 2, 3),
378412
booleanCol(rollup1.getRowExpandedColumn().name(), true, true, null, null, true, null, null,

0 commit comments

Comments
 (0)