Skip to content

Commit 320ce55

Browse files
authored
fix: clean up [iceberg] integration APIs (#2032)
1 parent ba3c82c commit 320ce55

File tree

3 files changed

+75
-9
lines changed

3 files changed

+75
-9
lines changed

common/src/main/java/org/apache/comet/parquet/BatchReader.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -99,20 +99,20 @@ public class BatchReader extends RecordReader<Void, ColumnarBatch> implements Cl
9999
private StructType partitionSchema;
100100
private InternalRow partitionValues;
101101
private PartitionedFile file;
102-
private final Map<String, SQLMetric> metrics;
102+
protected Map<String, SQLMetric> metrics;
103103

104104
private long rowsRead;
105-
private StructType sparkSchema;
105+
protected StructType sparkSchema;
106106
private MessageType requestedSchema;
107-
private CometVector[] vectors;
108-
private AbstractColumnReader[] columnReaders;
107+
protected CometVector[] vectors;
108+
protected AbstractColumnReader[] columnReaders;
109109
private CometSchemaImporter importer;
110-
private ColumnarBatch currentBatch;
110+
protected ColumnarBatch currentBatch;
111111
private Future<Option<Throwable>> prefetchTask;
112112
private LinkedBlockingQueue<Pair<PageReadStore, Long>> prefetchQueue;
113113
private FileReader fileReader;
114114
private boolean[] missingColumns;
115-
private boolean isInitialized;
115+
protected boolean isInitialized;
116116
private ParquetMetadata footer;
117117

118118
/** The total number of rows across all row groups of the input split. */
@@ -143,7 +143,9 @@ public class BatchReader extends RecordReader<Void, ColumnarBatch> implements Cl
143143
private boolean useLegacyDateTimestamp;
144144

145145
/** The TaskContext object for executing this task. */
146-
private final TaskContext taskContext;
146+
private TaskContext taskContext;
147+
148+
public BatchReader() {}
147149

148150
// Only for testing
149151
public BatchReader(String file, int capacity) {
@@ -183,6 +185,9 @@ public BatchReader(
183185
this.taskContext = TaskContext$.MODULE$.get();
184186
}
185187

188+
/**
189+
* @deprecated since 0.9.1, will be removed in 0.10.0.
190+
*/
186191
public BatchReader(AbstractColumnReader[] columnReaders) {
187192
// Todo: set useDecimal128 and useLazyMaterialization
188193
int numColumns = columnReaders.length;
@@ -377,10 +382,16 @@ public void init() throws URISyntaxException, IOException {
377382
}
378383
}
379384

385+
/**
386+
* @deprecated since 0.9.1, will be removed in 0.10.0.
387+
*/
380388
public void setSparkSchema(StructType schema) {
381389
this.sparkSchema = schema;
382390
}
383391

392+
/**
393+
* @deprecated since 0.9.1, will be removed in 0.10.0.
394+
*/
384395
public AbstractColumnReader[] getColumnReaders() {
385396
return columnReaders;
386397
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.parquet;
21+
22+
import java.util.HashMap;
23+
24+
import org.apache.spark.sql.types.StructType;
25+
import org.apache.spark.sql.vectorized.ColumnarBatch;
26+
27+
import org.apache.comet.vector.CometVector;
28+
29+
public class IcebergCometBatchReader extends BatchReader {
30+
public IcebergCometBatchReader(int numColumns, StructType schema) {
31+
this.columnReaders = new AbstractColumnReader[numColumns];
32+
this.vectors = new CometVector[numColumns];
33+
this.currentBatch = new ColumnarBatch(vectors);
34+
this.metrics = new HashMap<>();
35+
this.sparkSchema = schema;
36+
}
37+
38+
public void init(AbstractColumnReader[] columnReaders) {
39+
this.columnReaders = columnReaders;
40+
this.isInitialized = true;
41+
}
42+
}

common/src/main/java/org/apache/comet/parquet/Utils.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,26 @@ public static ColumnReader getColumnReader(
3737
CometSchemaImporter importer,
3838
int batchSize,
3939
boolean useDecimal128,
40-
boolean useLazyMaterialization) {
40+
boolean useLazyMaterialization,
41+
boolean useLegacyTimestamp) {
4142

4243
ColumnDescriptor descriptor = buildColumnDescriptor(columnSpec);
4344
return getColumnReader(
44-
type, descriptor, importer, batchSize, useDecimal128, useLazyMaterialization, true);
45+
type,
46+
descriptor,
47+
importer,
48+
batchSize,
49+
useDecimal128,
50+
useLazyMaterialization,
51+
useLegacyTimestamp);
4552
}
4653

54+
/**
55+
* This method is called from Apache Iceberg.
56+
*
57+
* @deprecated since 0.9.1, will be removed in 0.10.0; use getColumnReader with ParquetColumnSpec
58+
* instead.
59+
*/
4760
public static ColumnReader getColumnReader(
4861
DataType type,
4962
ColumnDescriptor descriptor,

0 commit comments

Comments
 (0)