Skip to content

Commit 17771bf

Browse files
remove size checks from IncrementalIndex to simplify logic (#18219) (#18276)
Co-authored-by: Clint Wylie <cwylie@apache.org>
1 parent ae252d9 commit 17771bf

File tree

32 files changed

+122
-258
lines changed

32 files changed

+122
-258
lines changed

benchmarks/src/test/java/org/apache/druid/benchmark/indexing/OnheapIncrementalIndexBenchmark.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import org.apache.druid.segment.Segment;
5252
import org.apache.druid.segment.incremental.IncrementalIndex;
5353
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
54-
import org.apache.druid.segment.incremental.IndexSizeExceededException;
5554
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
5655
import org.joda.time.Interval;
5756
import org.junit.Assert;
@@ -253,13 +252,8 @@ public void concurrentAddRead() throws InterruptedException, ExecutionException
253252
indexExecutor.submit(
254253
() -> {
255254
currentlyRunning.incrementAndGet();
256-
try {
257-
for (int i = 0; i < elementsPerAddTask; i++) {
258-
incrementalIndex.add(getLongRow(timestamp + i, 1, DIMENSION_COUNT));
259-
}
260-
}
261-
catch (IndexSizeExceededException e) {
262-
throw new RuntimeException(e);
255+
for (int i = 0; i < elementsPerAddTask; i++) {
256+
incrementalIndex.add(getLongRow(timestamp + i, 1, DIMENSION_COUNT));
263257
}
264258
currentlyRunning.decrementAndGet();
265259
someoneRan.set(true);

indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ public SamplerResponse sample(
171171

172172
//keep the index of the row to be added to responseRows for further use
173173
final int rowIndex = responseRows.size();
174-
IncrementalIndexAddResult addResult = index.add(new SamplerInputRow(row, rowIndex), true);
174+
IncrementalIndexAddResult addResult = index.add(new SamplerInputRow(row, rowIndex));
175175
if (addResult.hasParseException()) {
176176
responseRows.add(new SamplerResponseRow(
177177
rawColumns,

processing/src/main/java/org/apache/druid/segment/generator/DataGenerator.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.druid.data.input.impl.TimestampSpec;
2828
import org.apache.druid.java.util.common.logger.Logger;
2929
import org.apache.druid.segment.incremental.IncrementalIndex;
30-
import org.apache.druid.segment.incremental.IndexSizeExceededException;
3130
import org.joda.time.Interval;
3231

3332
import java.util.ArrayList;
@@ -207,12 +206,7 @@ public InputRow get()
207206
public static void addStreamToIndex(Stream<InputRow> stream, IncrementalIndex index)
208207
{
209208
stream.forEachOrdered(row -> {
210-
try {
211-
index.add(row);
212-
}
213-
catch (IndexSizeExceededException e) {
214-
throw new RuntimeException(e);
215-
}
209+
index.add(row);
216210
});
217211
}
218212

processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java

Lines changed: 8 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -102,10 +102,10 @@
102102
* In-memory, row-based data structure used to hold data during ingestion. Realtime tasks query this index using
103103
* {@link IncrementalIndexCursorFactory}.
104104
*
105-
* Concurrency model: {@link #add(InputRow)} and {@link #add(InputRow, boolean)} are not thread-safe, and must be
106-
* called from a single thread or externally synchronized. However, the methods that support
107-
* {@link IncrementalIndexCursorFactory} are thread-safe, and may be called concurrently with each other, and with
108-
* the "add" methods. This concurrency model supports real-time queries of the data in the index.
105+
* Concurrency model: {@link #add(InputRow)} is not thread-safe, and must be called from a single thread or externally
106+
* synchronized. However, the methods that support {@link IncrementalIndexCursorFactory} are thread-safe, and may be
107+
* called concurrently with each other, and with the "add" methods. This concurrency model supports real-time queries
108+
* of the data in the index.
109109
*/
110110
public abstract class IncrementalIndex implements IncrementalIndexRowSelector, ColumnInspector, Iterable<Row>, Closeable
111111
{
@@ -383,9 +383,8 @@ protected abstract void initAggs(
383383
// Note: This method does not need to be thread safe.
384384
protected abstract AddToFactsResult addToFacts(
385385
IncrementalIndexRow key,
386-
InputRowHolder inputRowHolder,
387-
boolean skipMaxRowsInMemoryCheck
388-
) throws IndexSizeExceededException;
386+
InputRowHolder inputRowHolder
387+
);
389388

390389

391390
public abstract Iterable<Row> iterableWithPostAggregations(
@@ -453,45 +452,23 @@ public ColumnFormat getColumnFormat(String columnName)
453452
}
454453
}
455454

456-
/**
457-
* Adds a new row. The row might correspond with another row that already exists, in which case this will
458-
* update that row instead of inserting a new one.
459-
*
460-
* Not thread-safe.
461-
*
462-
* @param row the row of data to add
463-
*
464-
* @return the number of rows in the data set after adding the InputRow. If any parse failure occurs, a {@link ParseException} is returned in {@link IncrementalIndexAddResult}.
465-
*
466-
* @throws IndexSizeExceededException this exception is thrown once it reaches max rows limit and skipMaxRowsInMemoryCheck is set to false.
467-
*/
468-
public IncrementalIndexAddResult add(InputRow row) throws IndexSizeExceededException
469-
{
470-
return add(row, false);
471-
}
472-
473455
/**
474456
* Adds a new row. The row might correspond with another row that already exists, in which case this will
475457
* update that row instead of inserting a new one.
476458
*
477459
* Not thread-safe.
478460
*
479461
* @param row the row of data to add
480-
* @param skipMaxRowsInMemoryCheck whether or not to skip the check of rows exceeding the max rows or bytes limit
481462
*
482463
* @return the number of rows in the data set after adding the InputRow. If any parse failure occurs, a {@link ParseException} is returned in {@link IncrementalIndexAddResult}.
483-
*
484-
* @throws IndexSizeExceededException this exception is thrown once it reaches max rows limit and skipMaxRowsInMemoryCheck is set to false.
485464
*/
486-
public IncrementalIndexAddResult add(InputRow row, boolean skipMaxRowsInMemoryCheck)
487-
throws IndexSizeExceededException
465+
public IncrementalIndexAddResult add(InputRow row)
488466
{
489467
IncrementalIndexRowResult incrementalIndexRowResult = toIncrementalIndexRow(row);
490468
inputRowHolder.set(row);
491469
final AddToFactsResult addToFactsResult = addToFacts(
492470
incrementalIndexRowResult.getIncrementalIndexRow(),
493-
inputRowHolder,
494-
skipMaxRowsInMemoryCheck
471+
inputRowHolder
495472
);
496473
updateMaxIngestedTime(row.getTimestamp());
497474
@Nullable ParseException parseException = getCombinedParseException(

processing/src/main/java/org/apache/druid/segment/incremental/IndexSizeExceededException.java

Lines changed: 0 additions & 34 deletions
This file was deleted.

processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -224,9 +224,8 @@ protected void initAggs(
224224
@Override
225225
protected AddToFactsResult addToFacts(
226226
IncrementalIndexRow key,
227-
InputRowHolder inputRowHolder,
228-
boolean skipMaxRowsInMemoryCheck
229-
) throws IndexSizeExceededException
227+
InputRowHolder inputRowHolder
228+
)
230229
{
231230
final List<String> parseExceptionMessages = new ArrayList<>();
232231
final AtomicLong totalSizeInBytes = getBytesInMemory();
@@ -258,16 +257,6 @@ protected AddToFactsResult addToFacts(
258257
final int rowIndex = indexIncrement.getAndIncrement();
259258
aggregators.put(rowIndex, aggs);
260259

261-
// Last ditch sanity checks
262-
if ((numEntries.get() >= maxRowCount || totalSizeInBytes.get() >= maxBytesInMemory)
263-
&& facts.getPriorIndex(key) == IncrementalIndexRow.EMPTY_ROW_INDEX
264-
&& !skipMaxRowsInMemoryCheck) {
265-
throw new IndexSizeExceededException(
266-
"Maximum number of rows [%d] or max size in bytes [%d] reached",
267-
maxRowCount,
268-
maxBytesInMemory
269-
);
270-
}
271260
final int prev = facts.putIfAbsent(key, rowIndex);
272261
if (IncrementalIndexRow.EMPTY_ROW_INDEX == prev) {
273262
numEntries.incrementAndGet();

processing/src/test/java/org/apache/druid/frame/processor/test/TestFrameProcessorUtils.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.apache.druid.segment.incremental.IncrementalIndex;
3131
import org.apache.druid.segment.incremental.IncrementalIndexCursorFactory;
3232
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
33-
import org.apache.druid.segment.incremental.IndexSizeExceededException;
3433
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
3534

3635
import java.util.List;
@@ -54,13 +53,8 @@ public static CursorFactory toCursorFactory(List<InputRow> inputRows)
5453
.setMaxRowCount(1000)
5554
.build();
5655

57-
try {
58-
for (InputRow inputRow : inputRows) {
59-
index.add(inputRow);
60-
}
61-
}
62-
catch (IndexSizeExceededException e) {
63-
throw new RuntimeException(e);
56+
for (InputRow inputRow : inputRows) {
57+
index.add(inputRow);
6458
}
6559

6660
return new IncrementalIndexCursorFactory(index);

processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
import org.apache.druid.segment.column.ValueType;
5757
import org.apache.druid.segment.incremental.IncrementalIndex;
5858
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
59-
import org.apache.druid.segment.incremental.IndexSizeExceededException;
6059
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
6160
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
6261
import org.apache.druid.testing.InitializedNullHandlingTest;
@@ -343,12 +342,7 @@ private static QueryableIndex buildIndex(String storeDoubleAsFloat) throws IOExc
343342

344343

345344
getStreamOfEvents().forEach(o -> {
346-
try {
347-
index.add(ROW_PARSER.parseBatch((Map<String, Object>) o).get(0));
348-
}
349-
catch (IndexSizeExceededException e) {
350-
throw new RuntimeException(e);
351-
}
345+
index.add(ROW_PARSER.parseBatch((Map<String, Object>) o).get(0));
352346
});
353347

354348
if (oldValue == null) {

processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -593,7 +593,7 @@ public static IncrementalIndex createIncrementalIndex(
593593
Granularity gran,
594594
int maxRowCount,
595595
boolean rollup
596-
) throws Exception
596+
)
597597
{
598598
IncrementalIndex index = new OnheapIncrementalIndex.Builder()
599599
.setIndexSchema(
@@ -633,7 +633,7 @@ public static IncrementalIndex createIncrementalIndex(
633633
Granularity gran,
634634
int maxRowCount,
635635
boolean rollup
636-
) throws Exception
636+
)
637637
{
638638
return createIncrementalIndex(
639639
rows,

processing/src/test/java/org/apache/druid/query/aggregation/firstlast/first/StringFirstTimeseriesQueryTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import org.apache.druid.segment.incremental.IncrementalIndex;
4545
import org.apache.druid.segment.incremental.IncrementalIndexCursorFactory;
4646
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
47-
import org.apache.druid.segment.incremental.IndexSizeExceededException;
4847
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
4948
import org.apache.druid.segment.serde.ComplexMetrics;
5049
import org.apache.druid.testing.InitializedNullHandlingTest;
@@ -68,7 +67,7 @@ public class StringFirstTimeseriesQueryTest extends InitializedNullHandlingTest
6867
private QueryableIndex queryableIndex;
6968

7069
@Before
71-
public void setUp() throws IndexSizeExceededException
70+
public void setUp()
7271
{
7372
final SerializablePairLongStringComplexMetricSerde serde = new SerializablePairLongStringComplexMetricSerde();
7473
ComplexMetrics.registerSerde(serde.getTypeName(), serde);

0 commit comments

Comments
 (0)