Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
27978f0
Changed RegionedColumnSourceBase to handle single column filters and …
lbooker42 Feb 5, 2026
d44e00d
Lots more work, allowing column region and table locatiop pushdown op…
lbooker42 Feb 10, 2026
73adb07
Addressed copilot PR comments.
lbooker42 Feb 10, 2026
3b67e34
Improved and consolidated RowKeyAgnosticChunkSource filtering.
lbooker42 Feb 12, 2026
163f4c7
Cleanup and spotless
lbooker42 Feb 12, 2026
557eae2
Support for ColumnRegion Constant and Null regions and tests.
lbooker42 Feb 17, 2026
43e895d
Merge branch 'main' into nightly/DH-21522-parquettablelocation
lbooker42 Feb 17, 2026
8dc65e6
Updated the new test to use QueryTable disables rather than burying u…
lbooker42 Feb 17, 2026
0450746
Update test to reflect new reality.
lbooker42 Feb 17, 2026
e5e9e1f
Coordinated changes to allow Core+ optimizations.
lbooker42 Feb 24, 2026
5afc2ae
Merge branch 'main' into nightly/DH-21522-parquettablelocation
lbooker42 Feb 24, 2026
fda03ee
Final co-pilot suggestion.
lbooker42 Feb 24, 2026
4c8f2de
Use createNull for new null regions.
lbooker42 Feb 25, 2026
49dcf57
Minor changes from co-pilot.
lbooker42 Feb 25, 2026
3464b4a
Consolidation of repeated code.
lbooker42 Feb 25, 2026
ece9334
More cleanup
lbooker42 Feb 25, 2026
bcb14f0
WIP
lbooker42 Feb 25, 2026
7358204
Lots of self-review.
lbooker42 Feb 25, 2026
d9f08a8
Changes to the column region classes
lbooker42 Feb 26, 2026
76e8df6
Rework filter contexts.
lbooker42 Feb 26, 2026
9eb02f7
Correct failing test, more cleanup in the filter contexts.
lbooker42 Feb 26, 2026
f94919f
Revert RCF to not include ExposesChunkFilter.
lbooker42 Feb 26, 2026
7a6c143
Correct test and verify coverage.
lbooker42 Feb 27, 2026
0ef8684
Final copilot PR comments addressed.
lbooker42 Feb 27, 2026
5117682
Changes to improve coverage.
lbooker42 Feb 28, 2026
62c13dc
Spotless :(
lbooker42 Feb 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import io.deephaven.chunk.attributes.Any;
import io.deephaven.chunk.util.pools.MultiChunkPool;

import io.deephaven.function.ArraySort;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You've added imports without any actual code changes here and in WritableBooleanChunk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These were auto-added when I executed the replication code. I'll look deeper and try to understand how/why

import io.deephaven.util.type.TypeUtils;
import org.jetbrains.annotations.NotNull;

import java.util.Arrays;
// region FillWithNullValueImports
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import io.deephaven.chunk.util.pools.MultiChunkPool;

import io.deephaven.function.ArraySort;
import io.deephaven.util.type.TypeUtils;
import org.jetbrains.annotations.NotNull;

import java.util.Arrays;
// region FillWithNullValueImports
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
/**
* Base class for {@link PushdownFilterContext} to help with execution cost tracking.
*/
public class BasePushdownFilterContext implements PushdownFilterContext {
public abstract class BasePushdownFilterContext implements PushdownFilterContext {

/**
* Enum for the behavior of a filter when applied to null values.
Expand All @@ -49,12 +49,13 @@ public enum FilterNullBehavior {
FAILS_ON_NULLS
}

protected final WhereFilter filter;
private final WhereFilter filter;

private final List<ColumnSource<?>> columnSources;

private final boolean isRangeFilter;
private final boolean isMatchFilter;
private final boolean supportsDictionaryFiltering;
private final boolean supportsChunkFiltering;

private long executedFilterCost;

Expand Down Expand Up @@ -99,7 +100,7 @@ public BasePushdownFilterContext(
// TODO (DH-19666): Multi column filters are not supported yet
// Do not use columnSources.size(), multiple logical columns may alias (rename) the same physical column,
// yielding a single entry.
supportsDictionaryFiltering = (isRangeFilter || isMatchFilter
supportsChunkFiltering = (isRangeFilter || isMatchFilter
|| (isConditionFilter && ((ConditionFilter) filter).getNumInputsUsed() == 1))
&& ((filter instanceof ExposesChunkFilter && ((ExposesChunkFilter) filter).chunkFilter().isPresent())
|| isConditionFilter);
Expand All @@ -117,20 +118,27 @@ public void updateExecutedFilterCost(long executedFilterCost) {
this.executedFilterCost = executedFilterCost;
}

/**
* Gets the filter associated with this context.
*/
public final WhereFilter filter() {
return filter;
}

/**
* Get the column sources this filter will use.
*/
public List<ColumnSource<?>> columnSources() {
public final List<ColumnSource<?>> columnSources() {
return columnSources;
}

/**
* Whether this filter supports parquet dictionary filtering, which necessitates direct chunk filtering, i.e., it
* can be applied to a chunk of data rather than a table. This includes any filter that implements {#@link
* ExposesChunkFilter} or {@link ConditionFilter} with exactly one column.
* Whether this filter supports direct chunk filtering, i.e., it can be applied to a chunk of data rather than a
* table. This includes any filter that implements {#@link ExposesChunkFilter} or {@link ConditionFilter} with
* exactly one column.
*/
public final boolean supportsDictionaryFiltering() {
return supportsDictionaryFiltering;
public final boolean supportsChunkFiltering() {
return supportsChunkFiltering;
}

/**
Expand Down Expand Up @@ -205,13 +213,13 @@ private FilterNullBehavior computeFilterNullBehavior() {
/**
* Create a {@link UnifiedChunkFilter} for the {@link WhereFilter} that efficiently filters chunks of data. Every
* thread that uses this should create its own instance and must close it after use. Can only call when
* {@link #supportsDictionaryFiltering()} is {@code true}
* {@link #supportsChunkFiltering()} is {@code true}
*
* @param maxChunkSize the maximum size of the chunk that will be filtered
* @return the initialized {@link UnifiedChunkFilter}
*/
public final UnifiedChunkFilter createChunkFilter(final int maxChunkSize) {
if (!supportsDictionaryFiltering) {
if (!supportsChunkFiltering) {
throw new IllegalStateException("Filter does not support chunk filtering: " + Strings.of(filter));
}
if (filter instanceof ExposesChunkFilter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,17 @@ public interface PushdownFilterMatcher {
* that the filter cannot be pushed down.
* @param onError Consumer of any exceptions that occur during the estimate operation
*/
void estimatePushdownFilterCost(
default void estimatePushdownFilterCost(
final WhereFilter filter,
final RowSet selection,
final boolean usePrev,
final PushdownFilterContext context,
final JobScheduler jobScheduler,
final LongConsumer onComplete,
final Consumer<Exception> onError);
final Consumer<Exception> onError) {
// Default to having no benefit by pushing down.
onComplete.accept(Long.MAX_VALUE);
}

/**
* Push down the given filter to the underlying table and pass the result to the consumer. This method is expected
Expand All @@ -76,15 +79,18 @@ void estimatePushdownFilterCost(
* @param onComplete Consumer of the output rowsets for added and modified rows that pass the filter
* @param onError Consumer of any exceptions that occur during the pushdown operation
*/
void pushdownFilter(
default void pushdownFilter(
final WhereFilter filter,
final RowSet selection,
final boolean usePrev,
final PushdownFilterContext context,
final long costCeiling,
final JobScheduler jobScheduler,
final Consumer<PushdownResult> onComplete,
final Consumer<Exception> onError);
final Consumer<Exception> onError) {
// Default to returning all results as "maybe"
onComplete.accept(PushdownResult.allMaybeMatch(selection));
}

/**
* Create a pushdown filter context for this entity.
Expand All @@ -94,9 +100,11 @@ void pushdownFilter(
*
* @return the created filter context
*/
PushdownFilterContext makePushdownFilterContext(
default PushdownFilterContext makePushdownFilterContext(
final WhereFilter filter,
final List<ColumnSource<?>> filterSources);
final List<ColumnSource<?>> filterSources) {
return PushdownFilterContext.NO_PUSHDOWN_CONTEXT;
}

/**
* Given a filter and a list of column sources, return the appropriate {@link PushdownFilterMatcher} to use for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ public final class PushdownResult implements SafeCloseable {
* The entire column contains a single value, so a single read is sufficient to determine matches.
*/
public static final long SINGLE_VALUE_COLUMN_COST = 1_000L;
/**
* The entire region contains a single value, so a single read is sufficient to determine matches.
*/
public static final long SINGLE_VALUE_REGION_COST = 2_000L;
/**
* Only table/row-group statistics are checked, assuming the metadata is already loaded
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.engine.table.BasicDataIndex;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.impl.PushdownFilterMatcher;
import io.deephaven.engine.table.impl.sources.regioned.RegionedPushdownFilterMatcher;
import io.deephaven.io.log.impl.LogOutputStringImpl;
import io.deephaven.util.annotations.FinalDefault;
import io.deephaven.util.type.NamedImplementation;
Expand All @@ -24,7 +24,8 @@
* source table.
*/
public interface TableLocation
extends NamedImplementation, LogOutputAppendable, TableLocationState, LivenessReferent, PushdownFilterMatcher {
extends NamedImplementation, LogOutputAppendable, TableLocationState, LivenessReferent,
RegionedPushdownFilterMatcher {

/**
* Listener interface for anything that wants to know about changes to a location.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import io.deephaven.engine.table.impl.PushdownFilterContext;
import io.deephaven.engine.table.impl.PushdownResult;
import io.deephaven.engine.table.impl.select.WhereFilter;
import io.deephaven.engine.table.impl.sources.regioned.RegionedPushdownAction;
import io.deephaven.engine.table.impl.sources.regioned.RegionedPushdownFilterContext;
import io.deephaven.engine.table.impl.util.FieldUtils;
import io.deephaven.engine.table.impl.util.JobScheduler;
import io.deephaven.engine.util.string.StringUtils;
Expand All @@ -25,10 +27,12 @@
import java.lang.ref.SoftReference;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.stream.Collectors;

/**
* Partial TableLocation implementation for use by TableDataService implementations.
Expand Down Expand Up @@ -300,6 +304,10 @@ public final BasicDataIndex getDataIndex(@NotNull final String... columns) {
@Nullable
public abstract BasicDataIndex loadDataIndex(@NotNull String... columns);

// ------------------------------------------------------------------------------------------------------------------
// PushdownFilterMatcher default implementation
// ------------------------------------------------------------------------------------------------------------------

@Override
public void estimatePushdownFilterCost(
final WhereFilter filter,
Expand All @@ -309,8 +317,31 @@ public void estimatePushdownFilterCost(
final JobScheduler jobScheduler,
final LongConsumer onComplete,
final Consumer<Exception> onError) {
// Default to having no benefit by pushing down.
onComplete.accept(Long.MAX_VALUE);
if (selection.isEmpty()) {
// If the selection is empty, we can skip all pushdown filtering.
onComplete.accept(Long.MAX_VALUE);
return;
}

final RegionedPushdownFilterContext filterCtx = (RegionedPushdownFilterContext) context;

// Generate a list of all the allowed actions, sorted by minimum cost.
final List<RegionedPushdownAction> sorted = supportedActions()
.stream()
.filter(action -> action.allows(this, filterCtx))
.sorted(Comparator.comparingLong(RegionedPushdownAction::filterCost))
.collect(Collectors.toList());

// If no modes are allowed, we can skip all pushdown filtering.
if (sorted.isEmpty()) {
onComplete.accept(Long.MAX_VALUE);
return;
}

// Delegate to TableLocation to determine which of the supported actions applies to this particular location.
try (final RegionedPushdownAction.EstimateContext estimateCtx = makeEstimateContext(filter, filterCtx)) {
onComplete.accept(estimatePushdownAction(sorted, filter, selection, usePrev, filterCtx, estimateCtx));
}
}

@Override
Expand All @@ -323,8 +354,44 @@ public void pushdownFilter(
final JobScheduler jobScheduler,
final Consumer<PushdownResult> onComplete,
final Consumer<Exception> onError) {
// Default to returning all results as "maybe"
onComplete.accept(PushdownResult.allMaybeMatch(selection));
if (selection.isEmpty()) {
// If the selection is empty, we can skip all pushdown filtering.
onComplete.accept(PushdownResult.allMaybeMatch(selection));
return;
}

final RegionedPushdownFilterContext filterCtx = (RegionedPushdownFilterContext) context;

// Generate a list of all the supported allowed actions, sorted by minimum cost.
final List<RegionedPushdownAction> sorted = supportedActions()
.stream()
.filter(action -> action.allows(this, filterCtx, costCeiling))
.sorted(Comparator.comparingLong(RegionedPushdownAction::filterCost))
.collect(Collectors.toList());

// If no modes are allowed, we can skip all pushdown filtering.
if (sorted.isEmpty()) {
onComplete.accept(PushdownResult.allMaybeMatch(selection));
return;
}

// Initialize the pushdown result with the selection rowset as "maybe" rows
PushdownResult result = PushdownResult.allMaybeMatch(selection);

// Delegate to TableLocation and perform each supported action..
try (final RegionedPushdownAction.ActionContext actionCtx = makeActionContext(filter, filterCtx)) {
for (final RegionedPushdownAction action : sorted) {
try (final PushdownResult ignored = result) {
result = performPushdownAction(action, filter, selection, result, usePrev, filterCtx, actionCtx);
}
if (result.maybeMatch().isEmpty()) {
// No maybe rows remaining, so no reason to continue filtering.
break;
}
}
// Return the final result
onComplete.accept(result);
}
}

@Override
Expand All @@ -335,6 +402,20 @@ public PushdownFilterContext makePushdownFilterContext(
"makePushdownFilterContext() not supported for AbstractTableLocation");
}

@Override
public RegionedPushdownAction.EstimateContext makeEstimateContext(
final WhereFilter filter,
final PushdownFilterContext context) {
return RegionedPushdownAction.DEFAULT_ESTIMATE_CONTEXT;
}

@Override
public RegionedPushdownAction.ActionContext makeActionContext(
final WhereFilter filter,
final PushdownFilterContext context) {
return RegionedPushdownAction.DEFAULT_ACTION_CONTEXT;
}

// ------------------------------------------------------------------------------------------------------------------
// Reference counting implementation
// ------------------------------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,13 @@

import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.table.impl.MutableColumnSourceGetDefaults;
import io.deephaven.engine.rowset.chunkattributes.RowKeys;
import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.LongChunk;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.table.impl.PushdownFilterContext;
import io.deephaven.engine.table.impl.PushdownResult;
import io.deephaven.engine.table.impl.select.WhereFilter;
import io.deephaven.engine.table.impl.util.JobScheduler;
import org.jetbrains.annotations.NotNull;

import java.util.function.Consumer;
import java.util.function.LongConsumer;

import static io.deephaven.util.QueryConstants.NULL_BOOLEAN;

/**
Expand Down Expand Up @@ -159,31 +151,13 @@ public boolean providesFillUnordered() {
}

@Override
public void estimatePushdownFilterCost(
final WhereFilter filter,
final RowSet selection,
final boolean usePrev,
final PushdownFilterContext context,
final JobScheduler jobScheduler,
final LongConsumer onComplete,
final Consumer<Exception> onError) {
// Delegate to the shared code for RowKeyAgnosticChunkSource
RowKeyAgnosticChunkSource.estimatePushdownFilterCostHelper(
filter, selection, usePrev, context, jobScheduler, onComplete, onError);
public Chunk<Values> getValueChunk() {
return SingleValuePushdownHelper.makeChunk(get(0));
}

@Override
public void pushdownFilter(
final WhereFilter filter,
final RowSet selection,
final boolean usePrev,
final PushdownFilterContext context,
final long costCeiling,
final JobScheduler jobScheduler,
final Consumer<PushdownResult> onComplete,
final Consumer<Exception> onError) {
// Delegate to the shared code for RowKeyAgnosticChunkSource
RowKeyAgnosticChunkSource.pushdownFilterHelper(this, filter, selection, usePrev, context, costCeiling,
jobScheduler, onComplete, onError);
public Chunk<Values> getPrevValueChunk() {
// avoid duplicating the current vs prev logic in getPrev
return SingleValuePushdownHelper.makeChunk(getPrev(0));
}
}
Loading
Loading