Skip to content

Commit 73f742d

Browse files
author
Catalin Toda
committed
Fix Long type in theschema and int32 parquet type
1 parent 1c1eb99 commit 73f742d

File tree

5 files changed

+26
-8
lines changed

5 files changed

+26
-8
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,8 @@ public ParquetVectorUpdater getUpdater(ColumnDescriptor descriptor, DataType spa
7676
case INT32:
7777
if (sparkType == DataTypes.IntegerType || canReadAsIntDecimal(descriptor, sparkType)) {
7878
return new IntegerUpdater();
79-
} else if (sparkType == DataTypes.LongType && isUnsignedIntTypeMatched(32)) {
80-
// In `ParquetToSparkSchemaConverter`, we map parquet UINT32 to our LongType.
81-
// For unsigned int32, it stores as plain signed int32 in Parquet when dictionary
82-
// fallbacks. We read them as long values.
83-
return new UnsignedIntegerUpdater();
79+
} else if (sparkType == DataTypes.LongType) {
80+
return new LongIntegerUpdater();
8481
} else if (sparkType == DataTypes.ByteType) {
8582
return new ByteUpdater();
8683
} else if (sparkType == DataTypes.ShortType) {
@@ -279,14 +276,14 @@ public void decodeSingleDictionaryId(
279276
}
280277
}
281278

282-
private static class UnsignedIntegerUpdater implements ParquetVectorUpdater {
279+
private static class LongIntegerUpdater implements ParquetVectorUpdater {
283280
@Override
284281
public void readValues(
285282
int total,
286283
int offset,
287284
WritableColumnVector values,
288285
VectorizedValuesReader valuesReader) {
289-
valuesReader.readUnsignedIntegers(total, values, offset);
286+
valuesReader.readIntegersAsLongs(total, values, offset);
290287
}
291288

292289
@Override
@@ -299,7 +296,7 @@ public void readValue(
299296
int offset,
300297
WritableColumnVector values,
301298
VectorizedValuesReader valuesReader) {
302-
values.putLong(offset, Integer.toUnsignedLong(valuesReader.readInteger()));
299+
values.putLong(offset, valuesReader.readInteger());
303300
}
304301

305302
@Override

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,16 @@ public void skipIntegers(int total) {
130130
in.skip(total * 4L);
131131
}
132132

133+
@Override
134+
public final void readIntegersAsLongs(int total, WritableColumnVector c, int rowId) {
135+
int requiredBytes = total * 4;
136+
ByteBuffer buffer = getBuffer(requiredBytes);
137+
for (int i = 0; i < total; i += 1) {
138+
c.putLong(rowId + i, buffer.getInt());
139+
}
140+
}
141+
142+
133143
@Override
134144
public final void readUnsignedIntegers(int total, WritableColumnVector c, int rowId) {
135145
int requiredBytes = total * 4;

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedReaderBase.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ public void readIntegers(int total, WritableColumnVector c, int rowId) {
6666
throw new UnsupportedOperationException();
6767
}
6868

69+
@Override
70+
public void readIntegersAsLongs(int total, WritableColumnVector c, int rowId) {
71+
throw new UnsupportedOperationException();
72+
}
73+
6974
@Override
7075
public void readIntegersWithRebase(int total, WritableColumnVector c, int rowId,
7176
boolean failIfRebase) {

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -710,6 +710,11 @@ public void readIntegers(int total, WritableColumnVector c, int rowId) {
710710
}
711711
}
712712

713+
@Override
714+
public void readIntegersAsLongs(int total, WritableColumnVector c, int rowId) {
715+
throw new UnsupportedOperationException("only readInts is valid.");
716+
}
717+
713718
@Override
714719
public void readUnsignedIntegers(int total, WritableColumnVector c, int rowId) {
715720
throw new UnsupportedOperationException("only readInts is valid.");

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public interface VectorizedValuesReader {
4545
void readShorts(int total, WritableColumnVector c, int rowId);
4646
void readIntegers(int total, WritableColumnVector c, int rowId);
4747
void readIntegersWithRebase(int total, WritableColumnVector c, int rowId, boolean failIfRebase);
48+
void readIntegersAsLongs(int total, WritableColumnVector c, int rowId);
4849
void readUnsignedIntegers(int total, WritableColumnVector c, int rowId);
4950
void readUnsignedLongs(int total, WritableColumnVector c, int rowId);
5051
void readLongs(int total, WritableColumnVector c, int rowId);

0 commit comments

Comments
 (0)