Skip to content

Commit 9063e61

Browse files
andygroveclaude
andcommitted
Remove prefetch feature, V2 scan dead code, and deprecate BatchReader
BatchReader is annotated @IcebergApi and kept for Iceberg compatibility, but Comet's own production code no longer uses it. The prefetch feature was entirely built on BatchReader and is dead code now that the native_iceberg_compat path uses NativeBatchReader. V2 Parquet scan acceleration (CometParquetScan) is also no longer active. This commit: - Marks BatchReader as @deprecated (since 0.14.0) - Removes all prefetch internals from BatchReader (fields, methods, inner class) - Removes COMET_SCAN_PREFETCH_ENABLED and COMET_SCAN_PREFETCH_THREAD_NUM configs - Removes CometPrefetchThreadPool - Deletes CometParquetPartitionReaderFactory and CometParquetScan - Simplifies CometScanExec.prepareRDD to always use newFileScanRDD - Removes dead BatchReader code path from CometParquetFileFormat - Cleans up EliminateRedundantTransitions V2 dead code path - Removes prefetch tests, BatchReader-only tests, and BatchReader benchmark case - Cleans up CometParquetScan references in tests Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 020d982 commit 9063e61

File tree

13 files changed

+56
-841
lines changed

13 files changed

+56
-841
lines changed

common/src/main/java/org/apache/comet/parquet/BatchReader.java

Lines changed: 5 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,6 @@
2424
import java.net.URI;
2525
import java.net.URISyntaxException;
2626
import java.util.*;
27-
import java.util.concurrent.Callable;
28-
import java.util.concurrent.ExecutorService;
29-
import java.util.concurrent.Future;
30-
import java.util.concurrent.LinkedBlockingQueue;
3127

3228
import scala.Option;
3329

@@ -36,9 +32,7 @@
3632

3733
import org.apache.arrow.memory.BufferAllocator;
3834
import org.apache.arrow.memory.RootAllocator;
39-
import org.apache.commons.lang3.tuple.Pair;
4035
import org.apache.hadoop.conf.Configuration;
41-
import org.apache.hadoop.fs.FileSystem;
4236
import org.apache.hadoop.fs.Path;
4337
import org.apache.hadoop.mapreduce.InputSplit;
4438
import org.apache.hadoop.mapreduce.RecordReader;
@@ -87,7 +81,10 @@
8781
* reader.close();
8882
* }
8983
* </pre>
84+
*
85+
* @deprecated since 0.14.0. This class is kept for Iceberg compatibility only.
9086
*/
87+
@Deprecated
9188
@IcebergApi
9289
public class BatchReader extends RecordReader<Void, ColumnarBatch> implements Closeable {
9390
private static final Logger LOG = LoggerFactory.getLogger(FileReader.class);
@@ -110,8 +107,6 @@ public class BatchReader extends RecordReader<Void, ColumnarBatch> implements Cl
110107
protected AbstractColumnReader[] columnReaders;
111108
private CometSchemaImporter importer;
112109
protected ColumnarBatch currentBatch;
113-
private Future<Option<Throwable>> prefetchTask;
114-
private LinkedBlockingQueue<Pair<PageReadStore, Long>> prefetchQueue;
115110
private FileReader fileReader;
116111
private boolean[] missingColumns;
117112
protected boolean isInitialized;
@@ -363,26 +358,7 @@ public void init() throws URISyntaxException, IOException {
363358
}
364359
}
365360

366-
// Pre-fetching
367-
boolean preFetchEnabled =
368-
conf.getBoolean(
369-
CometConf.COMET_SCAN_PREFETCH_ENABLED().key(),
370-
(boolean) CometConf.COMET_SCAN_PREFETCH_ENABLED().defaultValue().get());
371-
372-
if (preFetchEnabled) {
373-
LOG.info("Prefetch enabled for BatchReader.");
374-
this.prefetchQueue = new LinkedBlockingQueue<>();
375-
}
376-
377361
isInitialized = true;
378-
synchronized (this) {
379-
// if prefetch is enabled, `init()` is called in separate thread. When
380-
// `BatchReader.nextBatch()` is called asynchronously, it is possibly that
381-
// `init()` is not called or finished. We need to hold on `nextBatch` until
382-
// initialization of `BatchReader` is done. Once we are close to finish
383-
// initialization, we notify the waiting thread of `nextBatch` to continue.
384-
notifyAll();
385-
}
386362
}
387363

388364
/**
@@ -436,51 +412,13 @@ public ColumnarBatch currentBatch() {
436412
return currentBatch;
437413
}
438414

439-
// Only for testing
440-
public Future<Option<Throwable>> getPrefetchTask() {
441-
return this.prefetchTask;
442-
}
443-
444-
// Only for testing
445-
public LinkedBlockingQueue<Pair<PageReadStore, Long>> getPrefetchQueue() {
446-
return this.prefetchQueue;
447-
}
448-
449415
/**
450416
* Loads the next batch of rows.
451417
*
452418
* @return true if there are no more rows to read, false otherwise.
453419
*/
454420
public boolean nextBatch() throws IOException {
455-
if (this.prefetchTask == null) {
456-
Preconditions.checkState(isInitialized, "init() should be called first!");
457-
} else {
458-
// If prefetch is enabled, this reader will be initialized asynchronously from a
459-
// different thread. Wait until it is initialized
460-
while (!isInitialized) {
461-
synchronized (this) {
462-
try {
463-
// Wait until initialization of current `BatchReader` is finished (i.e., `init()`),
464-
// is done. It is possibly that `init()` is done after entering this while loop,
465-
// so a short timeout is given.
466-
wait(100);
467-
468-
// Checks if prefetch task is finished. If so, tries to get exception if any.
469-
if (prefetchTask.isDone()) {
470-
Option<Throwable> exception = prefetchTask.get();
471-
if (exception.isDefined()) {
472-
throw exception.get();
473-
}
474-
}
475-
} catch (RuntimeException e) {
476-
// Spark will check certain exception e.g. `SchemaColumnConvertNotSupportedException`.
477-
throw e;
478-
} catch (Throwable e) {
479-
throw new IOException(e);
480-
}
481-
}
482-
}
483-
}
421+
Preconditions.checkState(isInitialized, "init() should be called first!");
484422

485423
if (rowsRead >= totalRowCount) return false;
486424
boolean hasMore;
@@ -547,7 +485,6 @@ public void close() throws IOException {
547485
}
548486
}
549487

550-
@SuppressWarnings("deprecation")
551488
private boolean loadNextRowGroupIfNecessary() throws Throwable {
552489
// More rows can be read from loaded row group. No need to load next one.
553490
if (rowsRead != totalRowsLoaded) return true;
@@ -556,21 +493,7 @@ private boolean loadNextRowGroupIfNecessary() throws Throwable {
556493
SQLMetric numRowGroupsMetric = metrics.get("ParquetRowGroups");
557494
long startNs = System.nanoTime();
558495

559-
PageReadStore rowGroupReader = null;
560-
if (prefetchTask != null && prefetchQueue != null) {
561-
// Wait for pre-fetch task to finish.
562-
Pair<PageReadStore, Long> rowGroupReaderPair = prefetchQueue.take();
563-
rowGroupReader = rowGroupReaderPair.getLeft();
564-
565-
// Update incremental byte read metric. Because this metric in Spark is maintained
566-
// by thread local variable, we need to manually update it.
567-
// TODO: We may expose metrics from `FileReader` and get from it directly.
568-
long incBytesRead = rowGroupReaderPair.getRight();
569-
FileSystem.getAllStatistics().stream()
570-
.forEach(statistic -> statistic.incrementBytesRead(incBytesRead));
571-
} else {
572-
rowGroupReader = fileReader.readNextRowGroup();
573-
}
496+
PageReadStore rowGroupReader = fileReader.readNextRowGroup();
574497

575498
if (rowGroupTimeMetric != null) {
576499
rowGroupTimeMetric.add(System.nanoTime() - startNs);
@@ -608,48 +531,4 @@ private boolean loadNextRowGroupIfNecessary() throws Throwable {
608531
totalRowsLoaded += rowGroupReader.getRowCount();
609532
return true;
610533
}
611-
612-
// Submits a prefetch task for this reader.
613-
public void submitPrefetchTask(ExecutorService threadPool) {
614-
this.prefetchTask = threadPool.submit(new PrefetchTask());
615-
}
616-
617-
// A task for prefetching parquet row groups.
618-
private class PrefetchTask implements Callable<Option<Throwable>> {
619-
private long getBytesRead() {
620-
return FileSystem.getAllStatistics().stream()
621-
.mapToLong(s -> s.getThreadStatistics().getBytesRead())
622-
.sum();
623-
}
624-
625-
@Override
626-
public Option<Throwable> call() throws Exception {
627-
// Gets the bytes read so far.
628-
long baseline = getBytesRead();
629-
630-
try {
631-
init();
632-
633-
while (true) {
634-
PageReadStore rowGroupReader = fileReader.readNextRowGroup();
635-
636-
if (rowGroupReader == null) {
637-
// Reaches the end of row groups.
638-
return Option.empty();
639-
} else {
640-
long incBytesRead = getBytesRead() - baseline;
641-
642-
prefetchQueue.add(Pair.of(rowGroupReader, incBytesRead));
643-
}
644-
}
645-
} catch (Throwable e) {
646-
// Returns exception thrown from the reader. The reader will re-throw it.
647-
return Option.apply(e);
648-
} finally {
649-
if (fileReader != null) {
650-
fileReader.closeStream();
651-
}
652-
}
653-
}
654-
}
655534
}

common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@
2424
import org.apache.spark.sql.types.StructType;
2525
import org.apache.spark.sql.vectorized.ColumnarBatch;
2626

27+
import org.apache.comet.IcebergApi;
2728
import org.apache.comet.vector.CometVector;
2829

2930
/** This class is a public interface used by Apache Iceberg to read batches using Comet */
31+
@IcebergApi
3032
public class IcebergCometBatchReader extends BatchReader {
3133
public IcebergCometBatchReader(int numColumns, StructType schema) {
3234
this.columnReaders = new AbstractColumnReader[numColumns];

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -651,23 +651,6 @@ object CometConf extends ShimCometConf {
651651
.doubleConf
652652
.createWithDefault(1.0)
653653

654-
val COMET_SCAN_PREFETCH_ENABLED: ConfigEntry[Boolean] =
655-
conf("spark.comet.scan.preFetch.enabled")
656-
.category(CATEGORY_SCAN)
657-
.doc("Whether to enable pre-fetching feature of CometScan.")
658-
.booleanConf
659-
.createWithDefault(false)
660-
661-
val COMET_SCAN_PREFETCH_THREAD_NUM: ConfigEntry[Int] =
662-
conf("spark.comet.scan.preFetch.threadNum")
663-
.category(CATEGORY_SCAN)
664-
.doc(
665-
"The number of threads running pre-fetching for CometScan. Effective if " +
666-
s"${COMET_SCAN_PREFETCH_ENABLED.key} is enabled. Note that more " +
667-
"pre-fetching threads means more memory requirement to store pre-fetched row groups.")
668-
.intConf
669-
.createWithDefault(2)
670-
671654
val COMET_NATIVE_LOAD_REQUIRED: ConfigEntry[Boolean] = conf("spark.comet.nativeLoadRequired")
672655
.category(CATEGORY_EXEC)
673656
.doc(

common/src/main/scala/org/apache/comet/parquet/CometReaderThreadPool.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,6 @@ abstract class CometReaderThreadPool {
5454

5555
}
5656

57-
// A thread pool used for pre-fetching files.
58-
object CometPrefetchThreadPool extends CometReaderThreadPool {
59-
override def threadNamePrefix: String = "prefetch_thread"
60-
}
61-
6257
// Thread pool used by the Parquet parallel reader
6358
object CometFileReaderThreadPool extends CometReaderThreadPool {
6459
override def threadNamePrefix: String = "file_reader_thread"

spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala

Lines changed: 37 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ import org.apache.comet.vector.CometVector
5757
* in [[org.apache.comet.CometSparkSessionExtensions]]
5858
* - `buildReaderWithPartitionValues`, so Spark calls Comet's Parquet reader to read values.
5959
*/
60-
class CometParquetFileFormat(session: SparkSession, scanImpl: String)
60+
class CometParquetFileFormat(session: SparkSession)
6161
extends ParquetFileFormat
6262
with MetricsSupport
6363
with ShimSQLConf {
@@ -110,8 +110,6 @@ class CometParquetFileFormat(session: SparkSession, scanImpl: String)
110110
// Comet specific configurations
111111
val capacity = CometConf.COMET_BATCH_SIZE.get(sqlConf)
112112

113-
val nativeIcebergCompat = scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT
114-
115113
(file: PartitionedFile) => {
116114
val sharedConf = broadcastedHadoopConf.value.value
117115
val footer = FooterReader.readFooter(sharedConf, file)
@@ -135,85 +133,42 @@ class CometParquetFileFormat(session: SparkSession, scanImpl: String)
135133
isCaseSensitive,
136134
datetimeRebaseSpec)
137135

138-
val recordBatchReader =
139-
if (nativeIcebergCompat) {
140-
// We still need the predicate in the conf to allow us to generate row indexes based on
141-
// the actual row groups read
142-
val pushed = if (parquetFilterPushDown) {
143-
filters
144-
// Collects all converted Parquet filter predicates. Notice that not all predicates
145-
// can be converted (`ParquetFilters.createFilter` returns an `Option`). That's why
146-
// a `flatMap` is used here.
147-
.flatMap(parquetFilters.createFilter)
148-
.reduceOption(FilterApi.and)
149-
} else {
150-
None
151-
}
152-
pushed.foreach(p => ParquetInputFormat.setFilterPredicate(sharedConf, p))
153-
val pushedNative = if (parquetFilterPushDown) {
154-
parquetFilters.createNativeFilters(filters)
155-
} else {
156-
None
157-
}
158-
val batchReader = new NativeBatchReader(
159-
sharedConf,
160-
file,
161-
footer,
162-
pushedNative.orNull,
163-
capacity,
164-
requiredSchema,
165-
dataSchema,
166-
isCaseSensitive,
167-
useFieldId,
168-
ignoreMissingIds,
169-
datetimeRebaseSpec.mode == CORRECTED,
170-
partitionSchema,
171-
file.partitionValues,
172-
metrics.asJava,
173-
CometMetricNode(metrics))
174-
try {
175-
batchReader.init()
176-
} catch {
177-
case e: Throwable =>
178-
batchReader.close()
179-
throw e
180-
}
181-
batchReader
182-
} else {
183-
val pushed = if (parquetFilterPushDown) {
184-
filters
185-
// Collects all converted Parquet filter predicates. Notice that not all predicates
186-
// can be converted (`ParquetFilters.createFilter` returns an `Option`). That's why
187-
// a `flatMap` is used here.
188-
.flatMap(parquetFilters.createFilter)
189-
.reduceOption(FilterApi.and)
190-
} else {
191-
None
192-
}
193-
pushed.foreach(p => ParquetInputFormat.setFilterPredicate(sharedConf, p))
194-
195-
val batchReader = new BatchReader(
196-
sharedConf,
197-
file,
198-
footer,
199-
capacity,
200-
requiredSchema,
201-
isCaseSensitive,
202-
useFieldId,
203-
ignoreMissingIds,
204-
datetimeRebaseSpec.mode == CORRECTED,
205-
partitionSchema,
206-
file.partitionValues,
207-
metrics.asJava)
208-
try {
209-
batchReader.init()
210-
} catch {
211-
case e: Throwable =>
212-
batchReader.close()
213-
throw e
214-
}
215-
batchReader
216-
}
136+
val pushed = if (parquetFilterPushDown) {
137+
filters
138+
.flatMap(parquetFilters.createFilter)
139+
.reduceOption(FilterApi.and)
140+
} else {
141+
None
142+
}
143+
pushed.foreach(p => ParquetInputFormat.setFilterPredicate(sharedConf, p))
144+
val pushedNative = if (parquetFilterPushDown) {
145+
parquetFilters.createNativeFilters(filters)
146+
} else {
147+
None
148+
}
149+
val recordBatchReader = new NativeBatchReader(
150+
sharedConf,
151+
file,
152+
footer,
153+
pushedNative.orNull,
154+
capacity,
155+
requiredSchema,
156+
dataSchema,
157+
isCaseSensitive,
158+
useFieldId,
159+
ignoreMissingIds,
160+
datetimeRebaseSpec.mode == CORRECTED,
161+
partitionSchema,
162+
file.partitionValues,
163+
metrics.asJava,
164+
CometMetricNode(metrics))
165+
try {
166+
recordBatchReader.init()
167+
} catch {
168+
case e: Throwable =>
169+
recordBatchReader.close()
170+
throw e
171+
}
217172
val iter = new RecordReaderIterator(recordBatchReader)
218173
try {
219174
iter.asInstanceOf[Iterator[InternalRow]]

0 commit comments

Comments
 (0)