Skip to content

Commit 3199fd3

Browse files
authored
fix: DH-20680: Reindexing filters to support barriers (#7547)
ReindexingFilters are treated as "serial", but the existing code did not handle barriers properly when a RIF was part of a list of filters. This was due to the independent execution of subsets of filters preceding or following the RIF. This code tracks the encountered filter barriers and provides them to subsequent filter executions so the respected barrier requirements can be met during execution.
1 parent bbe4bdc commit 3199fd3

File tree

4 files changed

+724
-31
lines changed

4 files changed

+724
-31
lines changed

engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java

Lines changed: 75 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,7 @@
3131
import io.deephaven.engine.table.hierarchical.TreeTable;
3232
import io.deephaven.engine.table.impl.MemoizedOperationKey.SelectUpdateViewOrUpdateView.Flavor;
3333
import io.deephaven.engine.table.impl.by.*;
34-
import io.deephaven.engine.table.impl.filter.ExtractBarriers;
35-
import io.deephaven.engine.table.impl.filter.ExtractInnerConjunctiveFilters;
36-
import io.deephaven.engine.table.impl.filter.ExtractShiftedColumnDefinitions;
37-
import io.deephaven.engine.table.impl.filter.ExtractRespectedBarriers;
34+
import io.deephaven.engine.table.impl.filter.*;
3835
import io.deephaven.engine.table.impl.hierarchical.RollupTableImpl;
3936
import io.deephaven.engine.table.impl.hierarchical.TreeTableImpl;
4037
import io.deephaven.engine.table.impl.indexer.DataIndexer;
@@ -1332,6 +1329,21 @@ private void initializeFilters(@NotNull final WhereFilter[] filters) {
13321329
compilationProcessor.compile();
13331330
}
13341331

1332+
/**
1333+
* Create a copy of the array of filters, introducing the declared barriers into the first filter if any are
1334+
* provided.
1335+
*/
1336+
private WhereFilter[] cloneFiltersForReindexing(final WhereFilter[] filters,
1337+
final Collection<Object> declaredBarriers) {
1338+
if (filters.length == 0 || declaredBarriers.isEmpty()) {
1339+
return filters;
1340+
}
1341+
final WhereFilter[] toUse = filters.clone();
1342+
// Force the first filter to declare all the barriers we have already processed.
1343+
toUse[0] = toUse[0].withDeclaredBarriers(declaredBarriers.toArray());
1344+
return toUse;
1345+
}
1346+
13351347
private QueryTable whereInternal(final WhereFilter... filters) {
13361348
if (filters.length == 0) {
13371349
return (QueryTable) prepareReturnThis();
@@ -1346,32 +1358,66 @@ private QueryTable whereInternal(final WhereFilter... filters) {
13461358
() -> {
13471359
initializeFilters(extractedFilters);
13481360

1349-
for (int fi = 0; fi < extractedFilters.length; ++fi) {
1350-
if (!(extractedFilters[fi] instanceof ReindexingFilter)) {
1351-
continue;
1352-
}
1353-
final ReindexingFilter reindexingFilter = (ReindexingFilter) extractedFilters[fi];
1354-
final boolean first = fi == 0;
1355-
final boolean last = fi == extractedFilters.length - 1;
1356-
if (last && !reindexingFilter.requiresSorting()) {
1357-
// If this is the last (or only) filter, we can just run it as normal unless it requires
1358-
// sorting.
1359-
break;
1360-
}
1361-
QueryTable result = this;
1362-
if (!first) {
1363-
result = result.whereInternal(Arrays.copyOf(extractedFilters, fi));
1364-
}
1365-
if (reindexingFilter.requiresSorting()) {
1366-
result = (QueryTable) result.sort(reindexingFilter.getSortColumns());
1367-
reindexingFilter.sortingDone();
1368-
}
1369-
result = result.whereInternal(reindexingFilter);
1370-
if (!last) {
1371-
result = result.whereInternal(
1372-
Arrays.copyOfRange(extractedFilters, fi + 1, extractedFilters.length));
1361+
final boolean hasReindexingFilter = Arrays.stream(extractedFilters)
1362+
.anyMatch(filter -> !ExtractReindexingFilters.of(filter).isEmpty());
1363+
if (hasReindexingFilter) {
1364+
// Filters will be processed in subsets of the provided list. Need to track all the
1365+
// declared barriers that we have encountered.
1366+
final Set<Object> declaredBarriers = new HashSet<>();
1367+
1368+
for (int fi = 0; fi < extractedFilters.length; ++fi) {
1369+
final WhereFilter filter = extractedFilters[fi];
1370+
final Collection<ReindexingFilter> reindexingFilters = ExtractReindexingFilters.of(filter);
1371+
if (reindexingFilters.isEmpty()) {
1372+
continue;
1373+
}
1374+
if (reindexingFilters.size() > 1) {
1375+
throw new IllegalStateException(
1376+
"Expected at most one reindexing filter per component filter: " + filter);
1377+
}
1378+
final ReindexingFilter reindexingFilter = reindexingFilters.iterator().next();
1379+
final boolean first = fi == 0;
1380+
final boolean last = fi == extractedFilters.length - 1;
1381+
if (last && !reindexingFilter.requiresSorting()) {
1382+
// If this is the last (or only) filter, we can just run it as normal unless it requires
1383+
// sorting.
1384+
break;
1385+
}
1386+
QueryTable result = this;
1387+
if (!first) {
1388+
final WhereFilter[] subset = Arrays.copyOf(extractedFilters, fi);
1389+
final WhereFilter[] toUse = cloneFiltersForReindexing(subset, declaredBarriers);
1390+
result = result.whereInternal(toUse);
1391+
// collect all the newly declared barriers
1392+
for (final WhereFilter priorFilter : subset) {
1393+
declaredBarriers.addAll(ExtractBarriers.of(priorFilter));
1394+
}
1395+
}
1396+
if (reindexingFilter.requiresSorting()) {
1397+
result = (QueryTable) result.sort(reindexingFilter.getSortColumns());
1398+
reindexingFilter.sortingDone();
1399+
}
1400+
// Execute the current filter (which is or wraps a reindexing filter)
1401+
// adding all previous encountered barriers as declared barriers.
1402+
if (!declaredBarriers.isEmpty()) {
1403+
result = result.whereInternal(filter.withDeclaredBarriers(declaredBarriers.toArray()));
1404+
} else {
1405+
result = result.whereInternal(filter);
1406+
}
1407+
// Add the barriers from the current filter.
1408+
declaredBarriers.addAll(ExtractBarriers.of(filter));
1409+
if (!last) {
1410+
final WhereFilter[] subset =
1411+
Arrays.copyOfRange(extractedFilters, fi + 1, extractedFilters.length);
1412+
final WhereFilter[] toUse = cloneFiltersForReindexing(subset, declaredBarriers);
1413+
result = result.whereInternal(toUse);
1414+
// collect all the new barriers that may have been processed
1415+
for (final WhereFilter priorFilter : subset) {
1416+
declaredBarriers.addAll(ExtractBarriers.of(priorFilter));
1417+
}
1418+
}
1419+
return result;
13731420
}
1374-
return result;
13751421
}
13761422

13771423
boolean hasConstArrayOffsetFilter = false;
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
//
2+
// Copyright (c) 2016-2025 Deephaven Data Labs and Patent Pending
3+
//
4+
package io.deephaven.engine.table.impl.filter;
5+
6+
import io.deephaven.engine.table.impl.select.*;
7+
8+
import java.util.Collection;
9+
import java.util.List;
10+
import java.util.stream.Collectors;
11+
12+
/**
13+
* This will return the ReindexingFilter instances contained within a WhereFilter, if any exist. Note that this removes
14+
* all inversion, barrier and serial wrappers and the returned filters should not be used directly for filtering.
15+
*/
16+
public enum ExtractReindexingFilters implements WhereFilter.Visitor<Collection<ReindexingFilter>> {
17+
EXTRACT_REINDEXING_FILTERS;
18+
19+
public static Collection<ReindexingFilter> of(final WhereFilter filter) {
20+
return filter.walk(EXTRACT_REINDEXING_FILTERS);
21+
}
22+
23+
@Override
24+
public Collection<ReindexingFilter> visitOther(final WhereFilter filter) {
25+
if (filter instanceof ReindexingFilter) {
26+
return List.of((ReindexingFilter) filter);
27+
}
28+
return List.of();
29+
}
30+
31+
@Override
32+
public Collection<ReindexingFilter> visit(final WhereFilterInvertedImpl filter) {
33+
return of(filter.getWrappedFilter());
34+
}
35+
36+
@Override
37+
public Collection<ReindexingFilter> visit(final WhereFilterSerialImpl filter) {
38+
return of(filter.getWrappedFilter());
39+
}
40+
41+
@Override
42+
public Collection<ReindexingFilter> visit(final WhereFilterWithDeclaredBarriersImpl filter) {
43+
return of(filter.getWrappedFilter());
44+
}
45+
46+
@Override
47+
public Collection<ReindexingFilter> visit(final WhereFilterWithRespectedBarriersImpl filter) {
48+
return of(filter.getWrappedFilter());
49+
}
50+
51+
@Override
52+
public Collection<ReindexingFilter> visit(final DisjunctiveFilter filter) {
53+
// DisjunctiveFilter should disallow reindexing filter components, but visit the children anyway because
54+
// this could change.
55+
return filter.getFilters().stream()
56+
.flatMap(whereFilter -> of(whereFilter).stream())
57+
.collect(Collectors.toList());
58+
}
59+
60+
@Override
61+
public Collection<ReindexingFilter> visit(final ConjunctiveFilter filter) {
62+
// ConjunctiveFilter should disallow reindexing filter components, but visit the children anyway because
63+
// this could change.
64+
return filter.getFilters().stream()
65+
.flatMap(whereFilter -> of(whereFilter).stream())
66+
.collect(Collectors.toList());
67+
}
68+
}

engine/table/src/main/java/io/deephaven/engine/table/impl/select/WhereFilterInvertedImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@
1212
public class WhereFilterInvertedImpl extends WhereFilterDelegatingBase {
1313

1414
public static WhereFilter of(WhereFilter filter) {
15+
if (filter instanceof ReindexingFilter) {
16+
throw new UnsupportedOperationException(
17+
"WhereFilterInvertedImpl does not support ReindexingFilters: " + filter);
18+
}
1519
return new WhereFilterInvertedImpl(filter);
1620
}
1721

0 commit comments

Comments
 (0)