Skip to content
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
27978f0
Changed RegionedColumnSourceBase to handle single column filters and …
lbooker42 Feb 5, 2026
d44e00d
Lots more work, allowing column region and table locatiop pushdown op…
lbooker42 Feb 10, 2026
73adb07
Addressed copilot PR comments.
lbooker42 Feb 10, 2026
3b67e34
Improved and consolidated RowKeyAgnosticChunkSource filtering.
lbooker42 Feb 12, 2026
163f4c7
Cleanup and spotless
lbooker42 Feb 12, 2026
557eae2
Support for ColumnRegion Constant and Null regions and tests.
lbooker42 Feb 17, 2026
43e895d
Merge branch 'main' into nightly/DH-21522-parquettablelocation
lbooker42 Feb 17, 2026
8dc65e6
Updated the new test to use QueryTable disables rather than burying u…
lbooker42 Feb 17, 2026
0450746
Update test to reflect new reality.
lbooker42 Feb 17, 2026
e5e9e1f
Coordinated changes to allow Core+ optimizations.
lbooker42 Feb 24, 2026
5afc2ae
Merge branch 'main' into nightly/DH-21522-parquettablelocation
lbooker42 Feb 24, 2026
fda03ee
Final co-pilot suggestion.
lbooker42 Feb 24, 2026
4c8f2de
Use createNull for new null regions.
lbooker42 Feb 25, 2026
49dcf57
Minor changes from co-pilot.
lbooker42 Feb 25, 2026
3464b4a
Consolidation of repeated code.
lbooker42 Feb 25, 2026
ece9334
More cleanup
lbooker42 Feb 25, 2026
bcb14f0
WIP
lbooker42 Feb 25, 2026
7358204
Lots of self-review.
lbooker42 Feb 25, 2026
d9f08a8
Changes to the column region classes
lbooker42 Feb 26, 2026
76e8df6
Rework filter contexts.
lbooker42 Feb 26, 2026
9eb02f7
Correct failing test, more cleanup in the filter contexts.
lbooker42 Feb 26, 2026
f94919f
Revert RCF to not include ExposesChunkFilter.
lbooker42 Feb 26, 2026
7a6c143
Correct test and verify coverage.
lbooker42 Feb 27, 2026
0ef8684
Final copilot PR comments addressed.
lbooker42 Feb 27, 2026
5117682
Changes to improve coverage.
lbooker42 Feb 28, 2026
62c13dc
Spotless :(
lbooker42 Feb 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import io.deephaven.chunk.attributes.Any;
import io.deephaven.chunk.util.pools.MultiChunkPool;

import io.deephaven.function.ArraySort;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You've added imports without any actual code changes here and in WritableBooleanChunk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These were auto-added when I executed the replication code. I'll look deeper and try to understand how/why

import io.deephaven.util.type.TypeUtils;
import org.jetbrains.annotations.NotNull;

import java.util.Arrays;
// region FillWithNullValueImports
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import io.deephaven.chunk.util.pools.MultiChunkPool;

import io.deephaven.function.ArraySort;
import io.deephaven.util.type.TypeUtils;
import org.jetbrains.annotations.NotNull;

import java.util.Arrays;
// region FillWithNullValueImports
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,25 @@
//
package io.deephaven.engine.table.impl;

import io.deephaven.api.Strings;
import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.LongChunk;
import io.deephaven.chunk.WritableLongChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.liveness.LivenessScopeStack;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.TrackingRowSet;
import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.impl.chunkfilter.ChunkFilter;
import io.deephaven.engine.table.impl.select.*;
import io.deephaven.engine.table.impl.sources.NullValueColumnSource;
import io.deephaven.engine.util.TableTools;
import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.MustBeInvokedByOverriders;

import java.util.List;
import java.util.Map;

/**
* Base class for {@link PushdownFilterContext} to help with execution cost tracking.
*/
public class BasePushdownFilterContext implements PushdownFilterContext {
public interface BasePushdownFilterContext extends PushdownFilterContext {

/**
* Enum for the behavior of a filter when applied to null values.
*/
public enum FilterNullBehavior {
enum FilterNullBehavior {
/**
* The filter includes nulls in its results, like {@code x == null}.
*/
Expand All @@ -49,270 +38,58 @@ public enum FilterNullBehavior {
FAILS_ON_NULLS
}

protected final WhereFilter filter;
private final List<ColumnSource<?>> columnSources;

private final boolean isRangeFilter;
private final boolean isMatchFilter;
private final boolean supportsDictionaryFiltering;

private long executedFilterCost;

/**
* The behavior of this filter when applied to null values. This is lazily computed on first access. Should be
* accessed via {@link #filterNullBehavior()}.
*/
private volatile FilterNullBehavior filterNullBehavior;

/**
* A dummy table to use for initializing {@link ConditionFilter}. This is lazily computed on first access. Should be
* accessed via {@link #conditionalFilterInitTable()}.
*/
private volatile Table conditionalFilterInitTable;

/**
* Interface for a unified chunk filter that can be used to apply a filter to a chunk of data, whether the
* underlying filter is a {@link ExposesChunkFilter} or a {@link ConditionFilter}.
*/
public interface UnifiedChunkFilter extends SafeCloseable {
interface UnifiedChunkFilter extends SafeCloseable {
LongChunk<OrderedRowKeys> filter(Chunk<? extends Values> values, LongChunk<OrderedRowKeys> keys);
}

public BasePushdownFilterContext(
final WhereFilter filter,
final List<ColumnSource<?>> columnSources) {
if (!filter.permitParallelization()) {
throw new IllegalArgumentException(
"filter must be stateless, but does not permit parallelization: " + filter);
}
this.filter = filter;
this.columnSources = columnSources;

executedFilterCost = 0;

isRangeFilter = filter instanceof RangeFilter
&& ((RangeFilter) filter).getRealFilter() instanceof AbstractRangeFilter;
isMatchFilter = filter instanceof MatchFilter &&
((MatchFilter) filter).getFailoverFilterIfCached() == null;
final boolean isConditionFilter = filter instanceof ConditionFilter;

// TODO (DH-19666): Multi column filters are not supported yet
// Do not use columnSources.size(), multiple logical columns may alias (rename) the same physical column,
// yielding a single entry.
supportsDictionaryFiltering = (isRangeFilter || isMatchFilter
|| (isConditionFilter && ((ConditionFilter) filter).getNumInputsUsed() == 1))
&& ((filter instanceof ExposesChunkFilter && ((ExposesChunkFilter) filter).chunkFilter().isPresent())
|| isConditionFilter);

filterNullBehavior = null; // lazily initialized
}

@Override
public long executedFilterCost() {
return executedFilterCost;
}

@Override
public void updateExecutedFilterCost(long executedFilterCost) {
this.executedFilterCost = executedFilterCost;
}
/**
* Gets the filter associated with this context.
*/
WhereFilter filter();

/**
* Get the column sources this filter will use.
*/
public List<ColumnSource<?>> columnSources() {
return columnSources;
}
List<ColumnSource<?>> columnSources();

/**
* Whether this filter supports parquet dictionary filtering, which necessitates direct chunk filtering, i.e., it
* can be applied to a chunk of data rather than a table. This includes any filter that implements {#@link
* ExposesChunkFilter} or {@link ConditionFilter} with exactly one column.
* Whether this filter supports direct chunk filtering, i.e., it can be applied to a chunk of data rather than a
* table. This includes any filter that implements {@link ExposesChunkFilter} or {@link ConditionFilter} with
* exactly one column.
*/
public final boolean supportsDictionaryFiltering() {
return supportsDictionaryFiltering;
}
boolean supportsChunkFiltering();

/**
* Whether this filter supports filtering based on parquet metadata.
*/
public final boolean supportsMetadataFiltering() {
return isRangeFilter || isMatchFilter;
}
boolean supportsMetadataFiltering();

public final boolean supportsInMemoryDataIndexFiltering() {
// Note: if there is a cheap way to check if the filter will never be applicable for an in-memory data index, we
// would like to add that check here.
return true;
}
boolean supportsInMemoryDataIndexFiltering();

public final boolean supportsDeferredDataIndexFiltering() {
// Note: if there is a cheap way to check if the filter will never be applicable for a deferred data index, we
// would like to add that check here.
return true;
}
boolean supportsDeferredDataIndexFiltering();

/**
* The filter to use for parquet metadata filtering. Can only call when {@link #supportsMetadataFiltering()} is
* {@code true}.
*/
public final WhereFilter filterForMetadataFiltering() {
if (isRangeFilter) {
return ((RangeFilter) filter).getRealFilter();
}
if (isMatchFilter) {
return filter;
}
throw new IllegalStateException("Should only use when supportsMetadataFiltering is true");
}
WhereFilter filterForMetadataFiltering();

/**
* Get the behavior of this filter when applied to null values. This is lazily computed on first access.
*/
public FilterNullBehavior filterNullBehavior() {
FilterNullBehavior local = filterNullBehavior;
if (local == null) {
synchronized (this) {
local = filterNullBehavior;
if (local == null) {
local = computeFilterNullBehavior();
filterNullBehavior = local;
}
}
}
return local;
}

private FilterNullBehavior computeFilterNullBehavior() {
// Create a dummy table with a single row and column, and `null` entry, and apply the filter to see
// if the filter includes nulls.
final ColumnSource<?> columnSource = columnSources.get(0);
final NullValueColumnSource<?> nullValueColumnSource =
NullValueColumnSource.getInstance(columnSource.getType(), columnSource.getComponentType());
final Map<String, ColumnSource<?>> columnSourceMap =
Map.of(filter.getColumns().get(0), nullValueColumnSource);
try (final SafeCloseable ignored = LivenessScopeStack.open()) {
final Table nullTestDummyTable = TableTools.newTable(1, columnSourceMap);
final TrackingRowSet rowSet = nullTestDummyTable.getRowSet();
try (final RowSet result = filter.filter(rowSet, rowSet, nullTestDummyTable, false)) {
return result.isEmpty() ? FilterNullBehavior.EXCLUDES_NULLS : FilterNullBehavior.INCLUDES_NULLS;
} catch (final Exception e) {
return FilterNullBehavior.FAILS_ON_NULLS;
}
}
}
FilterNullBehavior filterNullBehavior();

/**
* Create a {@link UnifiedChunkFilter} for the {@link WhereFilter} that efficiently filters chunks of data. Every
* thread that uses this should create its own instance and must close it after use. Can only call when
* {@link #supportsDictionaryFiltering()} is {@code true}
* {@link #supportsChunkFiltering()} is {@code true}
*
* @param maxChunkSize the maximum size of the chunk that will be filtered
* @return the initialized {@link UnifiedChunkFilter}
*/
public final UnifiedChunkFilter createChunkFilter(final int maxChunkSize) {
if (!supportsDictionaryFiltering) {
throw new IllegalStateException("Filter does not support chunk filtering: " + Strings.of(filter));
}
if (filter instanceof ExposesChunkFilter) {
final ChunkFilter chunkFilter = ((ExposesChunkFilter) filter).chunkFilter()
.orElseThrow(() -> new IllegalStateException("ExposesChunkFilter#chunkFilter() returned null."));
return new DirectChunkFilter(chunkFilter, maxChunkSize);
} else if (filter instanceof ConditionFilter) {
// Create a dummy table with no rows and single column of the correct type and name as the filter. This is
// used to extract a chunk filter kernel from the conditional filter and bind it to the correct name and
// type without capturing references to the actual table or its column sources.
final Table initTable = conditionalFilterInitTable();
try {
final ConditionFilter conditionFilter = (ConditionFilter) filter;
final AbstractConditionFilter.Filter acfFilter =
conditionFilter.getFilter(initTable, initTable.getRowSet());
return new ConditionKernelChunkFilter(acfFilter, maxChunkSize);
} catch (final Exception e) {
throw new IllegalArgumentException("Error creating condition filter in BasePushdownFilterContext", e);
}
} else {
throw new UnsupportedOperationException(
"Filter does not support chunk filtering: " + Strings.of(filter));
}
}

private Table conditionalFilterInitTable() {
Table local = conditionalFilterInitTable;
if (local == null) {
synchronized (this) {
local = conditionalFilterInitTable;
if (local == null) {
final Map<String, ColumnSource<?>> columnSourceMap = Map.of(filter.getColumns().get(0),
NullValueColumnSource.getInstance(
columnSources.get(0).getType(),
columnSources.get(0).getComponentType()));
local = TableTools.newTable(0, columnSourceMap);
conditionalFilterInitTable = local;
}
}
}
return local;
}


/**
* A {@link UnifiedChunkFilter} that wraps a {@link ChunkFilter} directly.
*/
private static final class DirectChunkFilter implements UnifiedChunkFilter {
private final ChunkFilter chunkFilter;
private final WritableLongChunk<OrderedRowKeys> resultChunk;

private DirectChunkFilter(final ChunkFilter chunkFilter, final int maxChunkSize) {
this.chunkFilter = chunkFilter;
// We need to create a WritableLongChunk to hold the results of the chunk filter.
this.resultChunk = WritableLongChunk.makeWritableChunk(maxChunkSize);
}

@Override
public LongChunk<OrderedRowKeys> filter(Chunk<? extends Values> values, LongChunk<OrderedRowKeys> keys) {
chunkFilter.filter(values, keys, resultChunk);
return resultChunk;
}

@Override
public void close() {
resultChunk.close();
}
}

/**
* A {@link UnifiedChunkFilter} that wraps a {@link ConditionFilter} by extracting its kernel and context.
*/
private static final class ConditionKernelChunkFilter implements UnifiedChunkFilter {
private final AbstractConditionFilter.Filter acfFilter;
private final ConditionFilter.FilterKernel.Context conditionFilterContext;

private ConditionKernelChunkFilter(
final AbstractConditionFilter.Filter acfFilter,
final int maxChunkSize) {
this.acfFilter = acfFilter;
// Create the context for the ConditionFilter, which will be used to filter chunks.
this.conditionFilterContext = acfFilter.getContext(maxChunkSize);
}

@Override
public LongChunk<OrderedRowKeys> filter(
Chunk<? extends Values> values,
LongChunk<OrderedRowKeys> keys) {
// noinspection unchecked
return acfFilter.filter(conditionFilterContext, keys, new Chunk[] {values});
}

@Override
public void close() {
conditionFilterContext.close();
}
}

@MustBeInvokedByOverriders
@Override
public void close() {
conditionalFilterInitTable = null;
}
UnifiedChunkFilter createChunkFilter(final int maxChunkSize);
}
Loading