Skip to content

Commit b5e0373

Browse files
cpwrightCopilot
andauthored
feat: DH-21486: Allow the clock filter to execute in parallel. (#7594)
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 484b7b6 commit b5e0373

File tree

8 files changed

+428
-25
lines changed

8 files changed

+428
-25
lines changed

engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1395,6 +1395,14 @@ private QueryTable whereInternal(final WhereFilter... filters) {
13951395
}
13961396
if (reindexingFilter.requiresSorting()) {
13971397
result = (QueryTable) result.sort(reindexingFilter.getSortColumns());
1398+
if (!result.isFlat()) {
1399+
// The only reindexing filters are ClockFilters; and the SortedClockFilter is the
1400+
// only one that requires sorting. It makes the assumption that sort flattens the
1401+
// table, which is generally true for a historical sort - but if the input was
1402+
// already sorted, we don't create a new table but rather just return a copy of the
1403+
// table with the sorted columns set.
1404+
result = (QueryTable) result.flatten();
1405+
}
13981406
reindexingFilter.sortingDone();
13991407
}
14001408
// Execute the current filter (which is or wraps a reindexing filter)

engine/table/src/main/java/io/deephaven/engine/table/impl/select/ClockFilter.java

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,19 @@ public final WritableRowSet filter(
8080
return initial == null ? RowSetFactory.empty() : initial;
8181
}
8282

83+
/**
84+
* This method is called from the {@link ClockFilter#filter(RowSet, RowSet, Table, boolean)} method to initialize
85+
* the filter's internal data structures.
86+
*
87+
* <p>
88+
* The filter permits parallelization, so it may be called from many initialization threads concurrently.
89+
* </p>
90+
*
91+
* @param selection the selection to be filtered
92+
* @param fullSet the full set of rows in the source table
93+
* @param table the table that is being filtered
94+
* @return the rowset that matches the filter at initialization time
95+
*/
8396
@Nullable
8497
protected abstract WritableRowSet initializeAndGetInitialIndex(@NotNull final RowSet selection,
8598
@NotNull final RowSet fullSet, @NotNull final Table table);
@@ -131,6 +144,13 @@ public final void run() {
131144
}
132145
}
133146

147+
/**
148+
* This method is called from the {@link ClockFilter#run()} method as part of the update graph source refresh.
149+
*
150+
* <p>
151+
* This method is not called concurrently, because there are no updates from the source table.
152+
* </p>
153+
*/
134154
@Nullable
135155
protected abstract WritableRowSet updateAndGetAddedIndex();
136156

@@ -140,7 +160,7 @@ public final void run() {
140160
protected final static class Range {
141161

142162
long nextKey;
143-
private final long lastKey;
163+
private long lastKey;
144164

145165
protected Range(final long firstKey, final long lastKey) {
146166
nextKey = Require.leq(firstKey, "firstRowKey", lastKey, "lastRowKey");
@@ -170,5 +190,39 @@ RowSetBuilderRandom consumeKeysAndAppendAdded(final ColumnSource<Long> nanosColu
170190
addedBuilder.addRange(firstKeyAdded, lastKeyAdded);
171191
return addedBuilder;
172192
}
193+
194+
/**
195+
* Returns true if this range is before the given key.
196+
*/
197+
boolean isBefore(final long key) {
198+
return lastKey < key;
199+
}
200+
201+
/**
202+
* Merges range if contiguous; returns true if the range was merged into this range
203+
*/
204+
boolean merge(final long firstKey, final long lastKey) {
205+
if (isEmpty()) {
206+
this.nextKey = firstKey;
207+
this.lastKey = lastKey;
208+
return true;
209+
}
210+
if (firstKey == this.lastKey + 1) {
211+
this.lastKey = lastKey;
212+
return true;
213+
}
214+
if (lastKey == this.nextKey - 1) {
215+
this.nextKey = firstKey;
216+
return true;
217+
}
218+
return false;
219+
}
220+
221+
/**
222+
* @return the first key in this range
223+
*/
224+
long firstKey() {
225+
return nextKey;
226+
}
173227
}
174228
}

engine/table/src/main/java/io/deephaven/engine/table/impl/select/SortedClockFilter.java

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@
1212
import org.jetbrains.annotations.NotNull;
1313
import org.jetbrains.annotations.Nullable;
1414

15+
import java.util.Comparator;
16+
import java.util.LinkedList;
17+
import java.util.List;
18+
1519
/**
1620
* This will filter a table on an Instant column for all rows greater than "now" according to a supplied clock. It
1721
* requires sorting of the input table according to the specified timestamp column, leveraging this for a very efficient
@@ -21,7 +25,17 @@
2125
public class SortedClockFilter extends ClockFilter {
2226

2327
private boolean sortingDone;
28+
/**
29+
* the currently active range we are working on; this must be the first range in the table (because we are
30+
* guaranteed to be sorted).
31+
*/
2432
private Range range;
33+
/**
34+
* A list of ranges that were not immediately mergeable with the first range. They are ordered according to key, and
35+
* we pop one off at a time until all the rows are consumed. We can expect at most one range per initialization
36+
* thread to be in this list.
37+
*/
38+
private final List<Range> pendingRanges = new LinkedList<>();
2539

2640
public SortedClockFilter(@NotNull final String columnName,
2741
@NotNull final Clock clock,
@@ -59,31 +73,41 @@ protected WritableRowSet initializeAndGetInitialIndex(@NotNull final RowSet sele
5973
// because the input table must be historical, and the historical sort implementation uses a
6074
// ContiguousRowRedirection.
6175
Require.requirement(table.isFlat(), "table.isFlat()");
62-
// This must be the first filter in a where-clause of its own, again because of the sort, hence selection must
63-
// be equal to fullSet.
64-
// This test as implemented only works because the table is flat.
65-
Require.requirement(selection.size() == fullSet.size()
66-
&& selection.size() == selection.lastRowKey() - selection.firstRowKey() + 1
67-
&& fullSet.size() == fullSet.lastRowKey() - fullSet.firstRowKey() + 1,
68-
"selection.size() == fullSet.size() && selection.size() == selection.lastRowKey() - selection.firstRowKey() + 1 && fullSet.size() == fullSet.lastRowKey() - fullSet.firstRowKey() + 1");
6976

70-
range = new Range(selection.firstRowKey(), selection.lastRowKey());
71-
return updateAndGetAddedIndex();
77+
// we must have a contiguous region in our rowset, there may be multiple ranges that are created; but they
78+
// should all merge together, with only the first range actually producing any useful result
79+
Require.requirement(selection.isContiguous(), "selection.isContiguous()");
80+
81+
synchronized (this) {
82+
if (range == null) {
83+
range = new Range(selection.firstRowKey(), selection.lastRowKey());
84+
} else if (!range.merge(selection.firstRowKey(), selection.lastRowKey())) {
85+
if (range.isBefore(selection.firstRowKey())) {
86+
pendingRanges.add(new Range(selection.firstRowKey(), selection.lastRowKey()));
87+
} else {
88+
pendingRanges.add(range);
89+
range = new Range(selection.firstRowKey(), selection.lastRowKey());
90+
}
91+
}
92+
return updateAndGetAddedIndex();
93+
}
7294
}
7395

7496
@Override
7597
@Nullable
76-
protected WritableRowSet updateAndGetAddedIndex() {
98+
protected synchronized WritableRowSet updateAndGetAddedIndex() {
7799
if (range == null || range.isEmpty()) {
78100
return null;
79101
}
80102
final RowSetBuilderRandom addedBuilder =
81103
range.consumeKeysAndAppendAdded(nanosColumnSource, clock.currentTimeNanos(), null);
104+
if (range.isEmpty()) {
105+
pendingRanges.sort(Comparator.comparingLong(Range::firstKey));
106+
while (range.isEmpty() && !pendingRanges.isEmpty()) {
107+
range = pendingRanges.remove(0);
108+
range.consumeKeysAndAppendAdded(nanosColumnSource, clock.currentTimeNanos(), addedBuilder);
109+
}
110+
}
82111
return addedBuilder == null ? null : addedBuilder.build();
83112
}
84-
85-
@Override
86-
public boolean permitParallelization() {
87-
return false;
88-
}
89113
}

engine/table/src/main/java/io/deephaven/engine/table/impl/select/UnsortedClockFilter.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,16 @@ public int compare(final Range r1, final Range r2) {
6969
protected WritableRowSet initializeAndGetInitialIndex(@NotNull final RowSet selection,
7070
@NotNull final RowSet fullSet,
7171
@NotNull final Table table) {
72-
rangesByNextTime = new PriorityQueue<>(INITIAL_RANGE_QUEUE_CAPACITY, new RangeComparator());
73-
7472
if (selection.isEmpty()) {
7573
return null;
7674
}
7775

76+
synchronized (this) {
77+
if (rangesByNextTime == null) {
78+
rangesByNextTime = new PriorityQueue<>(INITIAL_RANGE_QUEUE_CAPACITY, new RangeComparator());
79+
}
80+
}
81+
7882
final RowSetBuilderSequential addedBuilder = RowSetFactory.builderSequential();
7983

8084
final long nowNanos = clock.currentTimeNanos();
@@ -98,7 +102,9 @@ protected WritableRowSet initializeAndGetInitialIndex(@NotNull final RowSet sele
98102
|| QueryLanguageFunctionUtils.less(currentValue, previousValue)) {
99103
// Add the current range, as appropriate
100104
if (activeRangeIsDeferred) {
101-
rangesByNextTime.add(new Range(activeRangeFirstKey, activeRangeLastKey));
105+
synchronized (this) {
106+
rangesByNextTime.add(new Range(activeRangeFirstKey, activeRangeLastKey));
107+
}
102108
} else {
103109
addedBuilder.appendRange(activeRangeFirstKey, activeRangeLastKey);
104110
}
@@ -113,7 +119,9 @@ protected WritableRowSet initializeAndGetInitialIndex(@NotNull final RowSet sele
113119

114120
// Add the final range, as appropriate
115121
if (activeRangeIsDeferred) {
116-
rangesByNextTime.add(new Range(activeRangeFirstKey, activeRangeLastKey));
122+
synchronized (this) {
123+
rangesByNextTime.add(new Range(activeRangeFirstKey, activeRangeLastKey));
124+
}
117125
} else {
118126
addedBuilder.appendRange(activeRangeFirstKey, activeRangeLastKey);
119127
}
@@ -141,9 +149,4 @@ protected WritableRowSet updateAndGetAddedIndex() {
141149
}
142150
return addedBuilder == null ? null : addedBuilder.build();
143151
}
144-
145-
@Override
146-
public boolean permitParallelization() {
147-
return false;
148-
}
149152
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
//
2+
// Copyright (c) 2016-2026 Deephaven Data Labs and Patent Pending
3+
//
4+
package io.deephaven.engine.table.impl;
5+
6+
import io.deephaven.util.thread.ThreadInitializationFactory;
7+
8+
import java.util.Random;
9+
import java.util.concurrent.Future;
10+
11+
/**
12+
* A version of the operation initialization thread pool that sleeps for a random amount of time before executing the
13+
* operation initializer; this is intended for unit tests to allow some perturbation for parallel tasks to get better
14+
* coverage.
15+
*/
16+
public class RandomizedOperationInitializationThreadPool extends OperationInitializationThreadPool {
17+
private final Random random = new Random();
18+
private final int sleepMillis;
19+
20+
/**
21+
* Create the thread pool.
22+
*
23+
* @param factory the thread initialization factory
24+
* @param sleepMillis the maximum number of milliseconds to sleep before executing the operation initializer. Values
25+
* are randomly distributed between 0 and this value.
26+
*/
27+
public RandomizedOperationInitializationThreadPool(ThreadInitializationFactory factory, int sleepMillis) {
28+
super(factory);
29+
this.sleepMillis = sleepMillis;
30+
}
31+
32+
@Override
33+
public Future<?> submit(Runnable runnable) {
34+
return super.submit(() -> {
35+
final int toSleep = random.nextInt(sleepMillis);
36+
try {
37+
if (toSleep > 0) {
38+
Thread.sleep(toSleep);
39+
}
40+
} catch (InterruptedException ignored) {
41+
}
42+
runnable.run();
43+
});
44+
}
45+
}

0 commit comments

Comments
 (0)