Skip to content

Commit 35a99e0

Browse files
authored
chore: various refactoring changes for iceberg [iceberg] (#2680)
* chore: various refactoring changes for iceberg
1 parent 5fa091e commit 35a99e0

File tree

7 files changed

+434
-56
lines changed

7 files changed

+434
-56
lines changed

common/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ under the License.
5151
<groupId>org.apache.parquet</groupId>
5252
<artifactId>parquet-hadoop</artifactId>
5353
</dependency>
54+
<dependency>
55+
<groupId>org.apache.parquet</groupId>
56+
<artifactId>parquet-format-structures</artifactId>
57+
</dependency>
5458
<dependency>
5559
<groupId>org.apache.arrow</groupId>
5660
<artifactId>arrow-vector</artifactId>

common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,10 @@ ColumnDescriptor getDescriptor() {
8989
return descriptor;
9090
}
9191

92+
String getPath() {
93+
return String.join(".", this.descriptor.getPath());
94+
}
95+
9296
/**
9397
* Set the batch size of this reader to be 'batchSize'. Also initializes the native column reader.
9498
*/
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.parquet;
21+
22+
import java.util.Map;
23+
24+
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.spark.sql.catalyst.InternalRow;
26+
import org.apache.spark.sql.execution.metric.SQLMetric;
27+
import org.apache.spark.sql.types.StructType;
28+
29+
/**
30+
* A specialized NativeBatchReader for Iceberg that accepts ParquetMetadata as a Thrift encoded byte
31+
* array . This allows Iceberg to pass metadata in serialized form with a two-step initialization
32+
* pattern.
33+
*/
34+
public class IcebergCometNativeBatchReader extends NativeBatchReader {
35+
36+
public IcebergCometNativeBatchReader(StructType requiredSchema) {
37+
super();
38+
this.sparkSchema = requiredSchema;
39+
}
40+
41+
/** Initialize the reader using FileInfo instead of PartitionedFile. */
42+
public void init(
43+
Configuration conf,
44+
FileInfo fileInfo,
45+
byte[] parquetMetadataBytes,
46+
byte[] nativeFilter,
47+
int capacity,
48+
StructType dataSchema,
49+
boolean isCaseSensitive,
50+
boolean useFieldId,
51+
boolean ignoreMissingIds,
52+
boolean useLegacyDateTimestamp,
53+
StructType partitionSchema,
54+
InternalRow partitionValues,
55+
AbstractColumnReader[] preInitializedReaders,
56+
Map<String, SQLMetric> metrics)
57+
throws Throwable {
58+
59+
// Set parent fields
60+
this.conf = conf;
61+
this.fileInfo = fileInfo;
62+
this.footer = new ParquetMetadataSerializer().deserialize(parquetMetadataBytes);
63+
this.nativeFilter = nativeFilter;
64+
this.capacity = capacity;
65+
this.dataSchema = dataSchema;
66+
this.isCaseSensitive = isCaseSensitive;
67+
this.useFieldId = useFieldId;
68+
this.ignoreMissingIds = ignoreMissingIds;
69+
this.useLegacyDateTimestamp = useLegacyDateTimestamp;
70+
this.partitionSchema = partitionSchema;
71+
this.partitionValues = partitionValues;
72+
this.preInitializedReaders = preInitializedReaders;
73+
this.metrics.clear();
74+
if (metrics != null) {
75+
this.metrics.putAll(metrics);
76+
}
77+
78+
// Call parent init method
79+
super.init();
80+
}
81+
82+
public StructType getSparkSchema() {
83+
return this.sparkSchema;
84+
}
85+
}

0 commit comments

Comments
 (0)