diff --git a/common/pom.xml b/common/pom.xml
index 8ab0fe50ff..9c0a7169af 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -51,6 +51,10 @@ under the License.
org.apache.parquet
parquet-hadoop
+
+ org.apache.parquet
+ parquet-format-structures
+
org.apache.arrow
arrow-vector
diff --git a/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java b/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java
index b9f1797cb3..3768bff56b 100644
--- a/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java
@@ -89,6 +89,10 @@ ColumnDescriptor getDescriptor() {
return descriptor;
}
+ String getPath() {
+ return String.join(".", this.descriptor.getPath());
+ }
+
/**
* Set the batch size of this reader to be 'batchSize'. Also initializes the native column reader.
*/
diff --git a/common/src/main/java/org/apache/comet/parquet/IcebergCometNativeBatchReader.java b/common/src/main/java/org/apache/comet/parquet/IcebergCometNativeBatchReader.java
new file mode 100644
index 0000000000..7748fbbe29
--- /dev/null
+++ b/common/src/main/java/org/apache/comet/parquet/IcebergCometNativeBatchReader.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.parquet;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.metric.SQLMetric;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A specialized NativeBatchReader for Iceberg that accepts ParquetMetadata as a Thrift encoded byte
+ * array . This allows Iceberg to pass metadata in serialized form with a two-step initialization
+ * pattern.
+ */
+public class IcebergCometNativeBatchReader extends NativeBatchReader {
+
+ public IcebergCometNativeBatchReader(StructType requiredSchema) {
+ super();
+ this.sparkSchema = requiredSchema;
+ }
+
+ /** Initialize the reader using FileInfo instead of PartitionedFile. */
+ public void init(
+ Configuration conf,
+ FileInfo fileInfo,
+ byte[] parquetMetadataBytes,
+ byte[] nativeFilter,
+ int capacity,
+ StructType dataSchema,
+ boolean isCaseSensitive,
+ boolean useFieldId,
+ boolean ignoreMissingIds,
+ boolean useLegacyDateTimestamp,
+ StructType partitionSchema,
+ InternalRow partitionValues,
+ AbstractColumnReader[] preInitializedReaders,
+ Map metrics)
+ throws Throwable {
+
+ // Set parent fields
+ this.conf = conf;
+ this.fileInfo = fileInfo;
+ this.footer = new ParquetMetadataSerializer().deserialize(parquetMetadataBytes);
+ this.nativeFilter = nativeFilter;
+ this.capacity = capacity;
+ this.dataSchema = dataSchema;
+ this.isCaseSensitive = isCaseSensitive;
+ this.useFieldId = useFieldId;
+ this.ignoreMissingIds = ignoreMissingIds;
+ this.useLegacyDateTimestamp = useLegacyDateTimestamp;
+ this.partitionSchema = partitionSchema;
+ this.partitionValues = partitionValues;
+ this.preInitializedReaders = preInitializedReaders;
+ this.metrics.clear();
+ if (metrics != null) {
+ this.metrics.putAll(metrics);
+ }
+
+ // Call parent init method
+ super.init();
+ }
+
+ public StructType getSparkSchema() {
+ return this.sparkSchema;
+ }
+}
diff --git a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
index 698dc53c12..a94c1ce27f 100644
--- a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
@@ -25,6 +25,7 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URI;
+import java.net.URISyntaxException;
import java.nio.channels.Channels;
import java.util.*;
import java.util.stream.Collectors;
@@ -101,36 +102,87 @@
*
*/
public class NativeBatchReader extends RecordReader implements Closeable {
+
+ /**
+ * A class that contains the necessary file information for reading a Parquet file. This class
+ * provides an abstraction over PartitionedFile properties.
+ */
+ public static class FileInfo {
+ private final long start;
+ private final long length;
+ private final String filePath;
+ private final long fileSize;
+
+ public FileInfo(long start, long length, String filePath, long fileSize)
+ throws URISyntaxException {
+ this.start = start;
+ this.length = length;
+ URI uri = new Path(filePath).toUri();
+ if (uri.getScheme() == null) {
+ uri = new Path("file://" + filePath).toUri();
+ }
+ this.filePath = uri.toString();
+ this.fileSize = fileSize;
+ }
+
+ public static FileInfo fromPartitionedFile(PartitionedFile file) throws URISyntaxException {
+ return new FileInfo(file.start(), file.length(), file.filePath().toString(), file.fileSize());
+ }
+
+ public long start() {
+ return start;
+ }
+
+ public long length() {
+ return length;
+ }
+
+ public String filePath() {
+ return filePath;
+ }
+
+ public long fileSize() {
+ return fileSize;
+ }
+
+ public URI pathUri() throws Exception {
+ URI uri = new URI(filePath);
+ return uri;
+ }
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(NativeBatchReader.class);
protected static final BufferAllocator ALLOCATOR = new RootAllocator();
private NativeUtil nativeUtil = new NativeUtil();
- private Configuration conf;
- private int capacity;
- private boolean isCaseSensitive;
- private boolean useFieldId;
- private boolean ignoreMissingIds;
- private StructType partitionSchema;
- private InternalRow partitionValues;
- private PartitionedFile file;
- private final Map metrics;
+ protected Configuration conf;
+ protected int capacity;
+ protected boolean isCaseSensitive;
+ protected boolean useFieldId;
+ protected boolean ignoreMissingIds;
+ protected StructType partitionSchema;
+ protected InternalRow partitionValues;
+ protected PartitionedFile file;
+ protected FileInfo fileInfo;
+ protected final Map metrics;
// Unfortunately CometMetricNode is from the "spark" package and cannot be used directly here
// TODO: Move it to common package?
- private Object metricsNode = null;
+ protected Object metricsNode = null;
- private StructType sparkSchema;
- private StructType dataSchema;
+ protected StructType sparkSchema;
+ protected StructType dataSchema;
MessageType fileSchema;
- private MessageType requestedSchema;
- private CometVector[] vectors;
- private AbstractColumnReader[] columnReaders;
- private CometSchemaImporter importer;
- private ColumnarBatch currentBatch;
+ protected MessageType requestedSchema;
+ protected CometVector[] vectors;
+ protected AbstractColumnReader[] columnReaders;
+ protected CometSchemaImporter importer;
+ protected ColumnarBatch currentBatch;
// private FileReader fileReader;
- private boolean[] missingColumns;
- private boolean isInitialized;
- private ParquetMetadata footer;
- private byte[] nativeFilter;
+ protected boolean[] missingColumns;
+ protected boolean isInitialized;
+ protected ParquetMetadata footer;
+ protected byte[] nativeFilter;
+ protected AbstractColumnReader[] preInitializedReaders;
private ParquetColumn parquetColumn;
@@ -149,7 +201,7 @@ public class NativeBatchReader extends RecordReader impleme
* seeing these dates/timestamps.
*/
// TODO: (ARROW NATIVE)
- private boolean useLegacyDateTimestamp;
+ protected boolean useLegacyDateTimestamp;
/** The TaskContext object for executing this task. */
private final TaskContext taskContext;
@@ -157,6 +209,12 @@ public class NativeBatchReader extends RecordReader impleme
private long totalRowCount = 0;
private long handle;
+ // Protected no-arg constructor for subclasses
+ protected NativeBatchReader() {
+ this.taskContext = TaskContext$.MODULE$.get();
+ this.metrics = new HashMap<>();
+ }
+
// Only for testing
public NativeBatchReader(String file, int capacity) {
this(file, capacity, null, null);
@@ -237,6 +295,41 @@ private NativeBatchReader(AbstractColumnReader[] columnReaders) {
this.taskContext = TaskContext$.MODULE$.get();
}
+ /** Alternate constructor that accepts FileInfo instead of PartitionedFile. */
+ NativeBatchReader(
+ Configuration conf,
+ FileInfo fileInfo,
+ ParquetMetadata footer,
+ byte[] nativeFilter,
+ int capacity,
+ StructType sparkSchema,
+ StructType dataSchema,
+ boolean isCaseSensitive,
+ boolean useFieldId,
+ boolean ignoreMissingIds,
+ boolean useLegacyDateTimestamp,
+ StructType partitionSchema,
+ InternalRow partitionValues,
+ Map metrics,
+ Object metricsNode) {
+ this.conf = conf;
+ this.capacity = capacity;
+ this.sparkSchema = sparkSchema;
+ this.dataSchema = dataSchema;
+ this.isCaseSensitive = isCaseSensitive;
+ this.useFieldId = useFieldId;
+ this.ignoreMissingIds = ignoreMissingIds;
+ this.useLegacyDateTimestamp = useLegacyDateTimestamp;
+ this.partitionSchema = partitionSchema;
+ this.partitionValues = partitionValues;
+ this.fileInfo = fileInfo;
+ this.footer = footer;
+ this.nativeFilter = nativeFilter;
+ this.metrics = metrics;
+ this.metricsNode = metricsNode;
+ this.taskContext = TaskContext$.MODULE$.get();
+ }
+
/**
* Initialize this reader. The reason we don't do it in the constructor is that we want to close
* any resource hold by this reader when error happens during the initialization.
@@ -248,10 +341,12 @@ public void init() throws Throwable {
CometConf.COMET_USE_DECIMAL_128().key(),
(Boolean) CometConf.COMET_USE_DECIMAL_128().defaultValue().get());
- long start = file.start();
- long length = file.length();
- String filePath = file.filePath().toString();
- long fileSize = file.fileSize();
+ // Use fileInfo if available, otherwise fall back to file
+ long start = fileInfo != null ? fileInfo.start() : file.start();
+ long length = fileInfo != null ? fileInfo.length() : file.length();
+ String filePath = fileInfo != null ? fileInfo.filePath() : file.filePath().toString();
+ long fileSize = fileInfo != null ? fileInfo.fileSize() : file.fileSize();
+ URI pathUri = fileInfo != null ? fileInfo.pathUri() : file.pathUri();
ParquetReadOptions.Builder builder = HadoopReadOptions.builder(conf, new Path(filePath));
@@ -261,7 +356,7 @@ public void init() throws Throwable {
ParquetReadOptions readOptions = builder.build();
Map objectStoreOptions =
- asJava(NativeConfig.extractObjectStoreOptions(conf, file.pathUri()));
+ asJava(NativeConfig.extractObjectStoreOptions(conf, pathUri));
// TODO: enable off-heap buffer when they are ready
ReadOptions cometReadOptions = ReadOptions.builder(conf).build();
@@ -269,7 +364,7 @@ public void init() throws Throwable {
Path path = new Path(new URI(filePath));
try (FileReader fileReader =
new FileReader(
- CometInputFile.fromPath(path, conf), footer, readOptions, cometReadOptions, metrics)) {
+ CometInputFile.fromPath(path, conf), readOptions, cometReadOptions, metrics)) {
requestedSchema = footer.getFileMetaData().getSchema();
fileSchema = requestedSchema;
@@ -299,14 +394,8 @@ public void init() throws Throwable {
sparkSchema =
getSparkSchemaByFieldId(sparkSchema, requestedSchema.asGroupType(), caseSensitive);
}
- this.parquetColumn = getParquetColumn(requestedSchema, this.sparkSchema);
- String timeZoneId = conf.get("spark.sql.session.timeZone");
- // Native code uses "UTC" always as the timeZoneId when converting from spark to arrow schema.
- Schema arrowSchema = Utils$.MODULE$.toArrowSchema(sparkSchema, "UTC");
- byte[] serializedRequestedArrowSchema = serializeArrowSchema(arrowSchema);
- Schema dataArrowSchema = Utils$.MODULE$.toArrowSchema(dataSchema, "UTC");
- byte[] serializedDataArrowSchema = serializeArrowSchema(dataArrowSchema);
+ this.parquetColumn = getParquetColumn(requestedSchema, this.sparkSchema);
// Create Column readers
List fields = requestedSchema.getFields();
@@ -364,23 +453,28 @@ public void init() throws Throwable {
checkColumn(parquetFields[i]);
missingColumns[i] = false;
} else {
- if (field.getRepetition() == Type.Repetition.REQUIRED) {
- throw new IOException(
- "Required column '"
- + field.getName()
- + "' is missing"
- + " in data file "
- + filePath);
- }
- if (field.isPrimitive()) {
- ConstantColumnReader reader =
- new ConstantColumnReader(nonPartitionFields[i], capacity, useDecimal128);
- columnReaders[i] = reader;
+ if (preInitializedReaders != null && preInitializedReaders[i] != null) {
+ columnReaders[i] = preInitializedReaders[i];
missingColumns[i] = true;
} else {
- // the column requested is not in the file, but the native reader can handle that
- // and will return nulls for all rows requested
- missingColumns[i] = false;
+ if (field.getRepetition() == Type.Repetition.REQUIRED) {
+ throw new IOException(
+ "Required column '"
+ + field.getName()
+ + "' is missing"
+ + " in data file "
+ + filePath);
+ }
+ if (field.isPrimitive()) {
+ ConstantColumnReader reader =
+ new ConstantColumnReader(nonPartitionFields[i], capacity, useDecimal128);
+ columnReaders[i] = reader;
+ missingColumns[i] = true;
+ } else {
+ // the column requested is not in the file, but the native reader can handle that
+ // and will return nulls for all rows requested
+ missingColumns[i] = false;
+ }
}
}
}
@@ -421,9 +515,40 @@ public void init() throws Throwable {
CometFileKeyUnwrapper keyUnwrapper = null;
if (encryptionEnabled) {
keyUnwrapper = new CometFileKeyUnwrapper();
- keyUnwrapper.storeDecryptionKeyRetriever(file.filePath().toString(), conf);
+ keyUnwrapper.storeDecryptionKeyRetriever(filePath, conf);
}
+ // Filter out columns with preinitialized readers from sparkSchema before making the
+ // call to native
+ if (preInitializedReaders != null) {
+ StructType filteredSchema = new StructType();
+ StructField[] sparkFields = sparkSchema.fields();
+ // List fileFields = fileSchema.getFields();
+ for (int i = 0; i < sparkFields.length; i++) {
+ // Keep the column if:
+ // 1. It doesn't have a preinitialized reader, OR
+ // 2. It has a preinitialized reader but exists in fileSchema
+ boolean hasPreInitializedReader =
+ i < preInitializedReaders.length && preInitializedReaders[i] != null;
+ int finalI = i;
+ boolean existsInFileSchema =
+ fileFields.stream().anyMatch(f -> f.getName().equals(sparkFields[finalI].name()));
+
+ if (!hasPreInitializedReader || existsInFileSchema) {
+ filteredSchema = filteredSchema.add(sparkFields[i]);
+ }
+ }
+ sparkSchema = filteredSchema;
+ }
+
+ // String timeZoneId = conf.get("spark.sql.session.timeZone");
+ String timeZoneId = "UTC";
+ // Native code uses "UTC" always as the timeZoneId when converting from spark to arrow schema.
+ Schema arrowSchema = Utils$.MODULE$.toArrowSchema(sparkSchema, "UTC");
+ byte[] serializedRequestedArrowSchema = serializeArrowSchema(arrowSchema);
+ Schema dataArrowSchema = Utils$.MODULE$.toArrowSchema(dataSchema, "UTC");
+ byte[] serializedDataArrowSchema = serializeArrowSchema(dataArrowSchema);
+
int batchSize =
conf.getInt(
CometConf.COMET_BATCH_SIZE().key(),
@@ -648,13 +773,41 @@ private void checkParquetType(ParquetColumn column) throws IOException {
}
} else { // A missing column which is either primitive or complex
if (column.required()) {
- // Column is missing in data but the required data is non-nullable. This file is invalid.
- throw new IOException(
- "Required column is missing in data file. Col: " + Arrays.toString(path));
+ // check if we have a preinitialized column reader for this column.
+ int columnIndex = getColumnIndexFromParquetColumn(column);
+ if (columnIndex == -1
+ || preInitializedReaders == null
+ || preInitializedReaders[columnIndex] == null) {
+ // Column is missing in data but the required data is non-nullable. This file is invalid.
+ throw new IOException(
+ "Required column is missing in data file. Col: " + Arrays.toString(path));
+ }
}
}
}
+ /**
+ * Get the column index in the requested schema for a given ParquetColumn. Returns -1 if not
+ * found.
+ */
+ private int getColumnIndexFromParquetColumn(ParquetColumn column) {
+ String[] targetPath = asJava(column.path()).toArray(new String[0]);
+ if (targetPath.length == 0) {
+ return -1;
+ }
+
+ // For top-level columns, match by name
+ String columnName = targetPath[0];
+ ParquetColumn[] parquetFields = asJava(parquetColumn.children()).toArray(new ParquetColumn[0]);
+ for (int i = 0; i < parquetFields.length; i++) {
+ String[] fieldPath = asJava(parquetFields[i].path()).toArray(new String[0]);
+ if (fieldPath.length > 0 && fieldPath[0].equals(columnName)) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
/**
* Checks whether the given 'path' exists in 'parquetType'. The difference between this and {@link
* MessageType#containsPath(String[])} is that the latter only support paths to leaf From Spark:
@@ -834,16 +987,34 @@ private int loadNextBatch() throws Throwable {
importer = new CometSchemaImporter(ALLOCATOR);
List fields = requestedSchema.getFields();
+ StructField[] sparkFields = sparkSchema.fields();
+
for (int i = 0; i < fields.size(); i++) {
if (!missingColumns[i]) {
if (columnReaders[i] != null) columnReaders[i].close();
// TODO: (ARROW NATIVE) handle tz, datetime & int96 rebase
- DataType dataType = sparkSchema.fields()[i].dataType();
Type field = fields.get(i);
+
+ // Find the corresponding spark field by matching field names
+ DataType dataType = null;
+ int sparkSchemaIndex = -1;
+ for (int j = 0; j < sparkFields.length; j++) {
+ if (sparkFields[j].name().equals(field.getName())) {
+ dataType = sparkFields[j].dataType();
+ sparkSchemaIndex = j;
+ break;
+ }
+ }
+
+ if (dataType == null) {
+ throw new IOException(
+ "Could not find matching Spark field for Parquet field: " + field.getName());
+ }
+
NativeColumnReader reader =
new NativeColumnReader(
this.handle,
- i,
+ sparkSchemaIndex,
dataType,
field,
null,
diff --git a/common/src/main/java/org/apache/comet/parquet/ParquetMetadataSerializer.java b/common/src/main/java/org/apache/comet/parquet/ParquetMetadataSerializer.java
new file mode 100644
index 0000000000..32b40940a6
--- /dev/null
+++ b/common/src/main/java/org/apache/comet/parquet/ParquetMetadataSerializer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.parquet;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.parquet.format.FileMetaData;
+import org.apache.parquet.format.Util;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+/**
+ * Utility class for serializing and deserializing ParquetMetadata instances to/from byte arrays.
+ * This uses the Parquet format's FileMetaData structure and the underlying Thrift compact protocol
+ * for serialization.
+ */
+public class ParquetMetadataSerializer {
+
+ private final ParquetMetadataConverter converter;
+
+ public ParquetMetadataSerializer() {
+ this.converter = new ParquetMetadataConverter();
+ }
+
+ public ParquetMetadataSerializer(ParquetMetadataConverter converter) {
+ this.converter = converter;
+ }
+
+ /**
+ * Serializes a ParquetMetadata instance to a byte array.
+ *
+ * @param metadata the ParquetMetadata to serialize
+ * @return the serialized byte array
+ * @throws IOException if an error occurs during serialization
+ */
+ public byte[] serialize(ParquetMetadata metadata) throws IOException {
+ FileMetaData fileMetaData = converter.toParquetMetadata(1, metadata);
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ Util.writeFileMetaData(fileMetaData, outputStream);
+ return outputStream.toByteArray();
+ }
+
+ /**
+ * Deserializes a byte array back into a ParquetMetadata instance.
+ *
+ * @param bytes the serialized byte array
+ * @return the deserialized ParquetMetadata
+ * @throws IOException if an error occurs during deserialization
+ */
+ public ParquetMetadata deserialize(byte[] bytes) throws IOException {
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
+ FileMetaData fileMetaData = Util.readFileMetaData(inputStream);
+ return converter.fromParquetMetadata(fileMetaData);
+ }
+}
diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml
index 92af3e2388..c9f95a1d81 100644
--- a/native/core/Cargo.toml
+++ b/native/core/Cargo.toml
@@ -77,7 +77,7 @@ reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-
object_store_opendal = {version = "0.54.0", optional = true}
hdfs-sys = {version = "0.3", optional = true, features = ["hdfs_3_3"]}
opendal = { version ="0.54.1", optional = true, features = ["services-hdfs"] }
-uuid = "1.0"
+uuid = "1.18.1"
[target.'cfg(target_os = "linux")'.dependencies]
procfs = "0.18.0"
diff --git a/pom.xml b/pom.xml
index 6a6254dfcf..d15154fe61 100644
--- a/pom.xml
+++ b/pom.xml
@@ -137,6 +137,10 @@ under the License.
org.apache.parquet
parquet-column
+
+ org.apache.parquet
+ parquet-format-structures
+
com.google.guava
guava
@@ -218,6 +222,12 @@ under the License.
${parquet.version}
${parquet.maven.scope}
+
+ org.apache.parquet
+ parquet-format-structures
+ ${parquet.version}
+ ${parquet.maven.scope}
+
org.apache.parquet
parquet-hadoop