Skip to content

Commit 0ba03a1

Browse files
authored
fix: fix spark/sql test failures in native_iceberg_compat (apache#1593)
1 parent 602ab33 commit 0ba03a1

File tree

10 files changed

+553
-164
lines changed

10 files changed

+553
-164
lines changed

common/src/main/java/org/apache/comet/parquet/BatchReader.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,7 @@
2323
import java.io.IOException;
2424
import java.net.URI;
2525
import java.net.URISyntaxException;
26-
import java.util.Arrays;
27-
import java.util.HashMap;
28-
import java.util.List;
29-
import java.util.Map;
26+
import java.util.*;
3027
import java.util.concurrent.Callable;
3128
import java.util.concurrent.ExecutorService;
3229
import java.util.concurrent.Future;
@@ -285,8 +282,8 @@ public void init() throws URISyntaxException, IOException {
285282
// Initialize missing columns and use null vectors for them
286283
missingColumns = new boolean[columns.size()];
287284
List<String[]> paths = requestedSchema.getPaths();
288-
StructField[] nonPartitionFields = sparkSchema.fields();
289285
ShimFileFormat.findRowIndexColumnIndexInSchema(sparkSchema);
286+
StructField[] nonPartitionFields = sparkSchema.fields();
290287
for (int i = 0; i < requestedSchema.getFieldCount(); i++) {
291288
Type t = requestedSchema.getFields().get(i);
292289
Preconditions.checkState(

common/src/main/java/org/apache/comet/parquet/FileReader.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
import org.apache.parquet.io.InputFile;
8181
import org.apache.parquet.io.ParquetDecodingException;
8282
import org.apache.parquet.io.SeekableInputStream;
83+
import org.apache.parquet.schema.MessageType;
8384
import org.apache.parquet.schema.PrimitiveType;
8485
import org.apache.spark.sql.execution.metric.SQLMetric;
8586

@@ -578,6 +579,10 @@ private boolean advanceToNextBlock() {
578579
}
579580

580581
public long[] getRowIndices() {
582+
return getRowIndices(blocks);
583+
}
584+
585+
public static long[] getRowIndices(List<BlockMetaData> blocks) {
581586
long[] rowIndices = new long[blocks.size() * 2];
582587
for (int i = 0, n = blocks.size(); i < n; i++) {
583588
BlockMetaData block = blocks.get(i);
@@ -591,7 +596,7 @@ public long[] getRowIndices() {
591596
//
592597
// The reason reflection is used here is that some Spark versions still depend on a
593598
// Parquet version where the method `getRowIndexOffset` is not public.
594-
private long getRowIndexOffset(BlockMetaData metaData) {
599+
public static long getRowIndexOffset(BlockMetaData metaData) {
595600
try {
596601
Method method = BlockMetaData.class.getMethod("getRowIndexOffset");
597602
method.setAccessible(true);
@@ -699,6 +704,35 @@ private static ParquetMetadata readFooter(
699704
}
700705

701706
private List<BlockMetaData> filterRowGroups(List<BlockMetaData> blocks) {
707+
return filterRowGroups(options, blocks, this);
708+
}
709+
710+
public static List<BlockMetaData> filterRowGroups(
711+
ParquetReadOptions options, List<BlockMetaData> blocks, FileReader fileReader) {
712+
FilterCompat.Filter recordFilter = options.getRecordFilter();
713+
if (FilterCompat.isFilteringRequired(recordFilter)) {
714+
// set up data filters based on configured levels
715+
List<RowGroupFilter.FilterLevel> levels = new ArrayList<>();
716+
717+
if (options.useStatsFilter()) {
718+
levels.add(STATISTICS);
719+
}
720+
721+
if (options.useDictionaryFilter()) {
722+
levels.add(DICTIONARY);
723+
}
724+
725+
if (options.useBloomFilter()) {
726+
levels.add(BLOOMFILTER);
727+
}
728+
return RowGroupFilter.filterRowGroups(levels, recordFilter, blocks, fileReader);
729+
}
730+
731+
return blocks;
732+
}
733+
734+
public static List<BlockMetaData> filterRowGroups(
735+
ParquetReadOptions options, List<BlockMetaData> blocks, MessageType schema) {
702736
FilterCompat.Filter recordFilter = options.getRecordFilter();
703737
if (FilterCompat.isFilteringRequired(recordFilter)) {
704738
// set up data filters based on configured levels
@@ -715,7 +749,7 @@ private List<BlockMetaData> filterRowGroups(List<BlockMetaData> blocks) {
715749
if (options.useBloomFilter()) {
716750
levels.add(BLOOMFILTER);
717751
}
718-
return RowGroupFilter.filterRowGroups(levels, recordFilter, blocks, this);
752+
return RowGroupFilter.filterRowGroups(levels, recordFilter, blocks, schema);
719753
}
720754

721755
return blocks;

common/src/main/java/org/apache/comet/parquet/Native.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -244,15 +244,15 @@ public static native void setPageV2(
244244
* Initialize a record batch reader for a PartitionedFile
245245
*
246246
* @param filePath
247-
* @param start
248-
* @param length
247+
* @param starts
248+
* @param lengths
249249
* @return a handle to the record batch reader, used in subsequent calls.
250250
*/
251251
public static native long initRecordBatchReader(
252252
String filePath,
253253
long fileSize,
254-
long start,
255-
long length,
254+
long[] starts,
255+
long[] lengths,
256256
byte[] filter,
257257
byte[] requiredSchema,
258258
byte[] dataSchema,

0 commit comments

Comments
 (0)