Skip to content

Commit a5c9111

Browse files
authored
[core] Fix the timezone conversion for timestamp_ltz data_type in OrcFileFormat (#5082)
1 parent d33b037 commit a5c9111

File tree

11 files changed

+286
-20
lines changed

11 files changed

+286
-20
lines changed

docs/layouts/shortcodes/generated/orc_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,5 +38,11 @@
3838
<td>Double</td>
3939
<td>If the number of distinct keys in a dictionary is greater than this fraction of the total number of non-null rows, turn off dictionary encoding in orc. Use 0 to always disable dictionary encoding. Use 1 to always use dictionary encoding.</td>
4040
</tr>
41+
<tr>
42+
<td><h5>orc.timestamp-ltz.legacy.type</h5></td>
43+
<td style="word-wrap: break-word;">false</td>
44+
<td>Boolean</td>
45+
<td>This option is used to be compatible with the paimon-orc‘s old behavior for the `timestamp_ltz` data type.</td>
46+
</tr>
4147
</tbody>
4248
</table>

paimon-format/src/main/java/org/apache/paimon/format/OrcOptions.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,11 @@ public class OrcOptions {
4141
+ "fraction of the total number of non-null rows, turn off "
4242
+ "dictionary encoding in orc. Use 0 to always disable dictionary encoding. "
4343
+ "Use 1 to always use dictionary encoding.");
44+
45+
public static final ConfigOption<Boolean> ORC_TIMESTAMP_LTZ_LEGACY_TYPE =
46+
key("orc.timestamp-ltz.legacy.type")
47+
.booleanType()
48+
.defaultValue(false)
49+
.withDescription(
50+
"This option is used to be compatible with the paimon-orc‘s old behavior for the `timestamp_ltz` data type.");
4451
}

paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import java.util.stream.Collectors;
5757

5858
import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
59+
import static org.apache.paimon.format.OrcOptions.ORC_TIMESTAMP_LTZ_LEGACY_TYPE;
5960
import static org.apache.paimon.types.DataTypeChecks.getFieldTypes;
6061

6162
/** Orc {@link FileFormat}. */
@@ -70,6 +71,7 @@ public class OrcFileFormat extends FileFormat {
7071
private final int readBatchSize;
7172
private final int writeBatchSize;
7273
private final boolean deletionVectorsEnabled;
74+
private final boolean legacyTimestampLtzType;
7375

7476
public OrcFileFormat(FormatContext formatContext) {
7577
super(IDENTIFIER);
@@ -81,6 +83,7 @@ public OrcFileFormat(FormatContext formatContext) {
8183
this.readBatchSize = formatContext.readBatchSize();
8284
this.writeBatchSize = formatContext.writeBatchSize();
8385
this.deletionVectorsEnabled = formatContext.options().get(DELETION_VECTORS_ENABLED);
86+
this.legacyTimestampLtzType = formatContext.options().get(ORC_TIMESTAMP_LTZ_LEGACY_TYPE);
8487
}
8588

8689
@VisibleForTesting
@@ -96,7 +99,8 @@ public int readBatchSize() {
9699
@Override
97100
public Optional<SimpleStatsExtractor> createStatsExtractor(
98101
RowType type, SimpleColStatsCollector.Factory[] statsCollectors) {
99-
return Optional.of(new OrcSimpleStatsExtractor(type, statsCollectors));
102+
return Optional.of(
103+
new OrcSimpleStatsExtractor(type, statsCollectors, legacyTimestampLtzType));
100104
}
101105

102106
@Override
@@ -116,7 +120,8 @@ public FormatReaderFactory createReaderFactory(
116120
(RowType) refineDataType(projectedRowType),
117121
orcPredicates,
118122
readBatchSize,
119-
deletionVectorsEnabled);
123+
deletionVectorsEnabled,
124+
legacyTimestampLtzType);
120125
}
121126

122127
@Override
@@ -141,7 +146,8 @@ public FormatWriterFactory createWriterFactory(RowType type) {
141146
DataType[] orcTypes = getFieldTypes(refinedType).toArray(new DataType[0]);
142147

143148
TypeDescription typeDescription = OrcTypeUtil.convertToOrcSchema((RowType) refinedType);
144-
Vectorizer<InternalRow> vectorizer = new RowDataVectorizer(typeDescription, orcTypes);
149+
Vectorizer<InternalRow> vectorizer =
150+
new RowDataVectorizer(typeDescription, orcTypes, legacyTimestampLtzType);
145151

146152
return new OrcWriterFactory(vectorizer, orcProperties, writerConf, writeBatchSize);
147153
}

paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public class OrcReaderFactory implements FormatReaderFactory {
6969
protected final List<OrcFilters.Predicate> conjunctPredicates;
7070
protected final int batchSize;
7171
protected final boolean deletionVectorsEnabled;
72+
protected final boolean legacyTimestampLtzType;
7273

7374
/**
7475
* @param hadoopConfig the hadoop config for orc reader.
@@ -80,13 +81,15 @@ public OrcReaderFactory(
8081
final RowType readType,
8182
final List<OrcFilters.Predicate> conjunctPredicates,
8283
final int batchSize,
83-
final boolean deletionVectorsEnabled) {
84+
final boolean deletionVectorsEnabled,
85+
final boolean legacyTimestampLtzType) {
8486
this.hadoopConfig = checkNotNull(hadoopConfig);
8587
this.schema = convertToOrcSchema(readType);
8688
this.tableType = readType;
8789
this.conjunctPredicates = checkNotNull(conjunctPredicates);
8890
this.batchSize = batchSize;
8991
this.deletionVectorsEnabled = deletionVectorsEnabled;
92+
this.legacyTimestampLtzType = legacyTimestampLtzType;
9093
}
9194

9295
// ------------------------------------------------------------------------
@@ -131,7 +134,10 @@ public OrcReaderBatch createReaderBatch(
131134
DataType type = tableFieldTypes.get(i);
132135
vectors[i] =
133136
createPaimonVector(
134-
orcBatch.cols[tableFieldNames.indexOf(name)], orcBatch, type);
137+
orcBatch.cols[tableFieldNames.indexOf(name)],
138+
orcBatch,
139+
type,
140+
legacyTimestampLtzType);
135141
}
136142
return new OrcReaderBatch(filePath, orcBatch, new VectorizedColumnBatch(vectors), recycler);
137143
}

paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcSimpleStatsExtractor.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,15 @@ public class OrcSimpleStatsExtractor implements SimpleStatsExtractor {
5656

5757
private final RowType rowType;
5858
private final SimpleColStatsCollector.Factory[] statsCollectors;
59+
private final boolean legacyTimestampLtzType;
5960

6061
public OrcSimpleStatsExtractor(
61-
RowType rowType, SimpleColStatsCollector.Factory[] statsCollectors) {
62+
RowType rowType,
63+
SimpleColStatsCollector.Factory[] statsCollectors,
64+
boolean legacyTimestampLtzType) {
6265
this.rowType = rowType;
6366
this.statsCollectors = statsCollectors;
67+
this.legacyTimestampLtzType = legacyTimestampLtzType;
6468
Preconditions.checkArgument(
6569
rowType.getFieldCount() == statsCollectors.length,
6670
"The stats collector is not aligned to write schema.");
@@ -228,7 +232,6 @@ private SimpleColStats toFieldStats(
228232
nullCount);
229233
break;
230234
case TIMESTAMP_WITHOUT_TIME_ZONE:
231-
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
232235
assertStatsClass(field, stats, TimestampColumnStatistics.class);
233236
TimestampColumnStatistics timestampStats = (TimestampColumnStatistics) stats;
234237
fieldStats =
@@ -237,6 +240,22 @@ private SimpleColStats toFieldStats(
237240
Timestamp.fromSQLTimestamp(timestampStats.getMaximum()),
238241
nullCount);
239242
break;
243+
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
244+
assertStatsClass(field, stats, TimestampColumnStatistics.class);
245+
TimestampColumnStatistics timestampLtzStats = (TimestampColumnStatistics) stats;
246+
fieldStats =
247+
legacyTimestampLtzType
248+
? new SimpleColStats(
249+
Timestamp.fromSQLTimestamp(timestampLtzStats.getMinimum()),
250+
Timestamp.fromSQLTimestamp(timestampLtzStats.getMaximum()),
251+
nullCount)
252+
: new SimpleColStats(
253+
Timestamp.fromInstant(
254+
timestampLtzStats.getMinimum().toInstant()),
255+
Timestamp.fromInstant(
256+
timestampLtzStats.getMaximum().toInstant()),
257+
nullCount);
258+
break;
240259
default:
241260
fieldStats = new SimpleColStats(null, null, nullCount);
242261
}

paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,14 @@ public boolean isNullAt(int i) {
6262

6363
public static org.apache.paimon.data.columnar.ColumnVector createPaimonVector(
6464
ColumnVector vector, VectorizedRowBatch orcBatch, DataType dataType) {
65+
return createPaimonVector(vector, orcBatch, dataType, false);
66+
}
67+
68+
public static org.apache.paimon.data.columnar.ColumnVector createPaimonVector(
69+
ColumnVector vector,
70+
VectorizedRowBatch orcBatch,
71+
DataType dataType,
72+
boolean legacyTimestampLtzType) {
6573
if (vector instanceof LongColumnVector) {
6674
if (dataType.getTypeRoot() == DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) {
6775
return new OrcLegacyTimestampColumnVector((LongColumnVector) vector, orcBatch);
@@ -75,7 +83,7 @@ public static org.apache.paimon.data.columnar.ColumnVector createPaimonVector(
7583
} else if (vector instanceof DecimalColumnVector) {
7684
return new OrcDecimalColumnVector((DecimalColumnVector) vector, orcBatch);
7785
} else if (vector instanceof TimestampColumnVector) {
78-
return new OrcTimestampColumnVector(vector, orcBatch);
86+
return new OrcTimestampColumnVector(vector, orcBatch, dataType, legacyTimestampLtzType);
7987
} else if (vector instanceof ListColumnVector) {
8088
return new OrcArrayColumnVector(
8189
(ListColumnVector) vector, orcBatch, (ArrayType) dataType);

paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package org.apache.paimon.format.orc.reader;
2020

2121
import org.apache.paimon.data.Timestamp;
22+
import org.apache.paimon.types.DataType;
23+
import org.apache.paimon.types.TimestampType;
2224
import org.apache.paimon.utils.DateTimeUtils;
2325

2426
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
@@ -31,17 +33,28 @@
3133
*/
3234
public class OrcTimestampColumnVector extends AbstractOrcColumnVector
3335
implements org.apache.paimon.data.columnar.TimestampColumnVector {
34-
36+
private final Boolean legacyTimestampLtzType;
37+
private final DataType dataType;
3538
private final TimestampColumnVector vector;
3639

37-
public OrcTimestampColumnVector(ColumnVector vector, VectorizedRowBatch orcBatch) {
40+
public OrcTimestampColumnVector(
41+
ColumnVector vector,
42+
VectorizedRowBatch orcBatch,
43+
DataType dataType,
44+
boolean legacyTimestampLtzType) {
3845
super(vector, orcBatch);
3946
this.vector = (TimestampColumnVector) vector;
47+
this.dataType = dataType;
48+
this.legacyTimestampLtzType = legacyTimestampLtzType;
4049
}
4150

4251
@Override
4352
public Timestamp getTimestamp(int i, int precision) {
4453
i = rowMapper(i);
45-
return DateTimeUtils.toInternal(vector.time[i], vector.nanos[i] % 1_000_000);
54+
if (dataType instanceof TimestampType || legacyTimestampLtzType) {
55+
return DateTimeUtils.toInternal(vector.time[i], vector.nanos[i] % 1_000_000);
56+
} else {
57+
return Timestamp.fromEpochMillis(vector.time[i], vector.nanos[i] % 1_000_000);
58+
}
4659
}
4760
}

paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.paimon.data.InternalArray;
2323
import org.apache.paimon.data.InternalMap;
2424
import org.apache.paimon.data.InternalRow;
25+
import org.apache.paimon.data.LocalZoneTimestamp;
2526
import org.apache.paimon.types.ArrayType;
2627
import org.apache.paimon.types.BigIntType;
2728
import org.apache.paimon.types.BinaryType;
@@ -63,8 +64,6 @@
6364
/** Factory to create {@link FieldWriter}. */
6465
public class FieldWriterFactory implements DataTypeVisitor<FieldWriter> {
6566

66-
public static final FieldWriterFactory WRITER_FACTORY = new FieldWriterFactory();
67-
6867
private static final FieldWriter STRING_WRITER =
6968
(rowId, column, getters, columnId) -> {
7069
BytesColumnVector vector = (BytesColumnVector) column;
@@ -108,6 +107,12 @@ public class FieldWriterFactory implements DataTypeVisitor<FieldWriter> {
108107
(rowId, column, getters, columnId) ->
109108
((DoubleColumnVector) column).vector[rowId] = getters.getDouble(columnId);
110109

110+
private final boolean legacyTimestampLtzType;
111+
112+
public FieldWriterFactory(boolean legacyTimestampLtzType) {
113+
this.legacyTimestampLtzType = legacyTimestampLtzType;
114+
}
115+
111116
@Override
112117
public FieldWriter visit(CharType charType) {
113118
return STRING_WRITER;
@@ -186,9 +191,20 @@ public FieldWriter visit(TimestampType timestampType) {
186191
@Override
187192
public FieldWriter visit(LocalZonedTimestampType localZonedTimestampType) {
188193
return (rowId, column, getters, columnId) -> {
189-
Timestamp timestamp =
190-
getters.getTimestamp(columnId, localZonedTimestampType.getPrecision())
191-
.toSQLTimestamp();
194+
org.apache.paimon.data.Timestamp localTimestamp =
195+
getters.getTimestamp(columnId, localZonedTimestampType.getPrecision());
196+
Timestamp timestamp;
197+
198+
if (legacyTimestampLtzType) {
199+
timestamp = localTimestamp.toSQLTimestamp();
200+
} else {
201+
LocalZoneTimestamp localZoneTimestamp =
202+
LocalZoneTimestamp.fromEpochMillis(
203+
localTimestamp.getMillisecond(),
204+
localTimestamp.getNanoOfMillisecond());
205+
timestamp = java.sql.Timestamp.from(localZoneTimestamp.toInstant());
206+
}
207+
192208
TimestampColumnVector vector = (TimestampColumnVector) column;
193209
vector.set(rowId, timestamp);
194210
};

paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,18 +29,22 @@
2929
import java.util.List;
3030
import java.util.stream.Collectors;
3131

32-
import static org.apache.paimon.format.orc.writer.FieldWriterFactory.WRITER_FACTORY;
33-
3432
/** A {@link Vectorizer} of {@link InternalRow} type element. */
3533
public class RowDataVectorizer extends Vectorizer<InternalRow> {
3634

3735
private final List<FieldWriter> fieldWriters;
3836

3937
public RowDataVectorizer(TypeDescription schema, DataType[] fieldTypes) {
38+
this(schema, fieldTypes, false);
39+
}
40+
41+
public RowDataVectorizer(
42+
TypeDescription schema, DataType[] fieldTypes, boolean legacyTimestampLtzType) {
4043
super(schema);
44+
FieldWriterFactory fieldWriterFactory = new FieldWriterFactory(legacyTimestampLtzType);
4145
this.fieldWriters =
4246
Arrays.stream(fieldTypes)
43-
.map(t -> t.accept(WRITER_FACTORY))
47+
.map(t -> t.accept(fieldWriterFactory))
4448
.collect(Collectors.toList());
4549
}
4650

0 commit comments

Comments
 (0)