Skip to content

Commit acf7ef3

Browse files
squitoHyukjinKwon
authored andcommitted
[SPARK-12297][SQL] Adjust timezone for int96 data from impala
## What changes were proposed in this pull request? Int96 data written by impala vs data written by hive & spark is stored slightly differently -- they use a different offset for the timezone. This adds an option "spark.sql.parquet.int96TimestampConversion" (false by default) to adjust timestamps if and only if the writer is impala (or more precisely, if the parquet file's "createdBy" metadata does not start with "parquet-mr"). This matches the existing behavior in hive from HIVE-9482. ## How was this patch tested? Unit test added, existing tests run via jenkins. Author: Imran Rashid <[email protected]> Author: Henry Robinson <[email protected]> Closes #19769 from squito/SPARK-12297_skip_conversion.
1 parent e4639fa commit acf7ef3

File tree

10 files changed

+237
-27
lines changed

10 files changed

+237
-27
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ object DateTimeUtils {
6161
final val YearZero = -17999
6262
final val toYearZero = to2001 + 7304850
6363
final val TimeZoneGMT = TimeZone.getTimeZone("GMT")
64+
final val TimeZoneUTC = TimeZone.getTimeZone("UTC")
6465
final val MonthOf31Days = Set(1, 3, 5, 7, 8, 10, 12)
6566

6667
val TIMEZONE_OPTION = "timeZone"

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,13 @@ object SQLConf {
291291
.booleanConf
292292
.createWithDefault(true)
293293

294+
val PARQUET_INT96_TIMESTAMP_CONVERSION = buildConf("spark.sql.parquet.int96TimestampConversion")
295+
.doc("This controls whether timestamp adjustments should be applied to INT96 data when " +
296+
"converting to timestamps, for data written by Impala. This is necessary because Impala " +
297+
"stores INT96 data with a different timezone offset than Hive & Spark.")
298+
.booleanConf
299+
.createWithDefault(false)
300+
294301
object ParquetOutputTimestampType extends Enumeration {
295302
val INT96, TIMESTAMP_MICROS, TIMESTAMP_MILLIS = Value
296303
}
@@ -1206,6 +1213,8 @@ class SQLConf extends Serializable with Logging {
12061213

12071214
def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP)
12081215

1216+
def isParquetINT96TimestampConversion: Boolean = getConf(PARQUET_INT96_TIMESTAMP_CONVERSION)
1217+
12091218
def isParquetINT64AsTimestampMillis: Boolean = getConf(PARQUET_INT64_AS_TIMESTAMP_MILLIS)
12101219

12111220
def parquetOutputTimestampType: ParquetOutputTimestampType.Value = {

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.execution.datasources.parquet;
1919

2020
import java.io.IOException;
21+
import java.util.TimeZone;
2122

2223
import org.apache.parquet.bytes.BytesUtils;
2324
import org.apache.parquet.column.ColumnDescriptor;
@@ -93,13 +94,18 @@ public class VectorizedColumnReader {
9394
private final PageReader pageReader;
9495
private final ColumnDescriptor descriptor;
9596
private final OriginalType originalType;
97+
// The timezone conversion to apply to int96 timestamps. Null if no conversion.
98+
private final TimeZone convertTz;
99+
private final static TimeZone UTC = DateTimeUtils.TimeZoneUTC();
96100

97101
public VectorizedColumnReader(
98102
ColumnDescriptor descriptor,
99103
OriginalType originalType,
100-
PageReader pageReader) throws IOException {
104+
PageReader pageReader,
105+
TimeZone convertTz) throws IOException {
101106
this.descriptor = descriptor;
102107
this.pageReader = pageReader;
108+
this.convertTz = convertTz;
103109
this.originalType = originalType;
104110
this.maxDefLevel = descriptor.getMaxDefinitionLevel();
105111

@@ -222,6 +228,10 @@ void readBatch(int total, WritableColumnVector column) throws IOException {
222228
}
223229
}
224230

231+
private boolean shouldConvertTimestamps() {
232+
return convertTz != null && !convertTz.equals(UTC);
233+
}
234+
225235
/**
226236
* Reads `num` values into column, decoding the values from `dictionaryIds` and `dictionary`.
227237
*/
@@ -294,11 +304,21 @@ private void decodeDictionaryIds(
294304
break;
295305
case INT96:
296306
if (column.dataType() == DataTypes.TimestampType) {
297-
for (int i = rowId; i < rowId + num; ++i) {
298-
// TODO: Convert dictionary of Binaries to dictionary of Longs
299-
if (!column.isNullAt(i)) {
300-
Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
301-
column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v));
307+
if (!shouldConvertTimestamps()) {
308+
for (int i = rowId; i < rowId + num; ++i) {
309+
if (!column.isNullAt(i)) {
310+
Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
311+
column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v));
312+
}
313+
}
314+
} else {
315+
for (int i = rowId; i < rowId + num; ++i) {
316+
if (!column.isNullAt(i)) {
317+
Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
318+
long rawTime = ParquetRowConverter.binaryToSQLTimestamp(v);
319+
long adjTime = DateTimeUtils.convertTz(rawTime, convertTz, UTC);
320+
column.putLong(i, adjTime);
321+
}
302322
}
303323
}
304324
} else {
@@ -428,13 +448,26 @@ private void readBinaryBatch(int rowId, int num, WritableColumnVector column) {
428448
if (column.dataType() == DataTypes.StringType || column.dataType() == DataTypes.BinaryType) {
429449
defColumn.readBinarys(num, column, rowId, maxDefLevel, data);
430450
} else if (column.dataType() == DataTypes.TimestampType) {
431-
for (int i = 0; i < num; i++) {
432-
if (defColumn.readInteger() == maxDefLevel) {
433-
column.putLong(rowId + i,
434-
// Read 12 bytes for INT96
435-
ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12)));
436-
} else {
437-
column.putNull(rowId + i);
451+
if (!shouldConvertTimestamps()) {
452+
for (int i = 0; i < num; i++) {
453+
if (defColumn.readInteger() == maxDefLevel) {
454+
// Read 12 bytes for INT96
455+
long rawTime = ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12));
456+
column.putLong(rowId + i, rawTime);
457+
} else {
458+
column.putNull(rowId + i);
459+
}
460+
}
461+
} else {
462+
for (int i = 0; i < num; i++) {
463+
if (defColumn.readInteger() == maxDefLevel) {
464+
// Read 12 bytes for INT96
465+
long rawTime = ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12));
466+
long adjTime = DateTimeUtils.convertTz(rawTime, convertTz, UTC);
467+
column.putLong(rowId + i, adjTime);
468+
} else {
469+
column.putNull(rowId + i);
470+
}
438471
}
439472
}
440473
} else {

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.IOException;
2121
import java.util.Arrays;
2222
import java.util.List;
23+
import java.util.TimeZone;
2324

2425
import org.apache.hadoop.mapreduce.InputSplit;
2526
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -77,6 +78,12 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
7778
*/
7879
private boolean[] missingColumns;
7980

81+
/**
82+
* The timezone that timestamp INT96 values should be converted to. Null if no conversion. Here to workaround
83+
* incompatibilities between different engines when writing timestamp values.
84+
*/
85+
private TimeZone convertTz = null;
86+
8087
/**
8188
* columnBatch object that is used for batch decoding. This is created on first use and triggers
8289
* batched decoding. It is not valid to interleave calls to the batched interface with the row
@@ -105,10 +112,15 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
105112
*/
106113
private final MemoryMode MEMORY_MODE;
107114

108-
public VectorizedParquetRecordReader(boolean useOffHeap) {
115+
public VectorizedParquetRecordReader(TimeZone convertTz, boolean useOffHeap) {
116+
this.convertTz = convertTz;
109117
MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP;
110118
}
111119

120+
public VectorizedParquetRecordReader(boolean useOffHeap) {
121+
this(null, useOffHeap);
122+
}
123+
112124
/**
113125
* Implementation of RecordReader API.
114126
*/
@@ -291,8 +303,8 @@ private void checkEndOfRowGroup() throws IOException {
291303
columnReaders = new VectorizedColumnReader[columns.size()];
292304
for (int i = 0; i < columns.size(); ++i) {
293305
if (missingColumns[i]) continue;
294-
columnReaders[i] = new VectorizedColumnReader(
295-
columns.get(i), types.get(i).getOriginalType(), pages.getPageReader(columns.get(i)));
306+
columnReaders[i] = new VectorizedColumnReader(columns.get(i), types.get(i).getOriginalType(),
307+
pages.getPageReader(columns.get(i)), convertTz);
296308
}
297309
totalCountLoadedSoFar += pages.getRowCount();
298310
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import org.apache.spark.sql.catalyst.InternalRow
4545
import org.apache.spark.sql.catalyst.expressions._
4646
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
4747
import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
48+
import org.apache.spark.sql.catalyst.util.DateTimeUtils
4849
import org.apache.spark.sql.execution.datasources._
4950
import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector}
5051
import org.apache.spark.sql.internal.SQLConf
@@ -307,6 +308,9 @@ class ParquetFileFormat
307308
hadoopConf.set(
308309
ParquetWriteSupport.SPARK_ROW_SCHEMA,
309310
requiredSchema.json)
311+
hadoopConf.set(
312+
SQLConf.SESSION_LOCAL_TIMEZONE.key,
313+
sparkSession.sessionState.conf.sessionLocalTimeZone)
310314

311315
ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
312316

@@ -345,6 +349,8 @@ class ParquetFileFormat
345349
resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
346350
val enableRecordFilter: Boolean =
347351
sparkSession.sessionState.conf.parquetRecordFilterEnabled
352+
val timestampConversion: Boolean =
353+
sparkSession.sessionState.conf.isParquetINT96TimestampConversion
348354
// Whole stage codegen (PhysicalRDD) is able to deal with batches directly
349355
val returningBatch = supportBatch(sparkSession, resultSchema)
350356

@@ -363,6 +369,22 @@ class ParquetFileFormat
363369
fileSplit.getLocations,
364370
null)
365371

372+
val sharedConf = broadcastedHadoopConf.value.value
373+
// PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
374+
// *only* if the file was created by something other than "parquet-mr", so check the actual
375+
// writer here for this file. We have to do this per-file, as each file in the table may
376+
// have different writers.
377+
def isCreatedByParquetMr(): Boolean = {
378+
val footer = ParquetFileReader.readFooter(sharedConf, fileSplit.getPath, SKIP_ROW_GROUPS)
379+
footer.getFileMetaData().getCreatedBy().startsWith("parquet-mr")
380+
}
381+
val convertTz =
382+
if (timestampConversion && !isCreatedByParquetMr()) {
383+
Some(DateTimeUtils.getTimeZone(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
384+
} else {
385+
None
386+
}
387+
366388
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
367389
val hadoopAttemptContext =
368390
new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)
@@ -374,8 +396,8 @@ class ParquetFileFormat
374396
}
375397
val taskContext = Option(TaskContext.get())
376398
val parquetReader = if (enableVectorizedReader) {
377-
val vectorizedReader =
378-
new VectorizedParquetRecordReader(enableOffHeapColumnVector && taskContext.isDefined)
399+
val vectorizedReader = new VectorizedParquetRecordReader(
400+
convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined)
379401
vectorizedReader.initialize(split, hadoopAttemptContext)
380402
logDebug(s"Appending $partitionSchema ${file.partitionValues}")
381403
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
@@ -388,9 +410,9 @@ class ParquetFileFormat
388410
// ParquetRecordReader returns UnsafeRow
389411
val reader = if (pushed.isDefined && enableRecordFilter) {
390412
val parquetFilter = FilterCompat.get(pushed.get, null)
391-
new ParquetRecordReader[UnsafeRow](new ParquetReadSupport, parquetFilter)
413+
new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz), parquetFilter)
392414
} else {
393-
new ParquetRecordReader[UnsafeRow](new ParquetReadSupport)
415+
new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz))
394416
}
395417
reader.initialize(split, hadoopAttemptContext)
396418
reader

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.execution.datasources.parquet
1919

20-
import java.util.{Map => JMap}
20+
import java.util.{Map => JMap, TimeZone}
2121

2222
import scala.collection.JavaConverters._
2323

@@ -48,9 +48,17 @@ import org.apache.spark.sql.types._
4848
* Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]]
4949
* to [[prepareForRead()]], but use a private `var` for simplicity.
5050
*/
51-
private[parquet] class ParquetReadSupport extends ReadSupport[UnsafeRow] with Logging {
51+
private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone])
52+
extends ReadSupport[UnsafeRow] with Logging {
5253
private var catalystRequestedSchema: StructType = _
5354

55+
def this() {
56+
// We need a zero-arg constructor for SpecificParquetRecordReaderBase. But that is only
57+
// used in the vectorized reader, where we get the convertTz value directly, and the value here
58+
// is ignored.
59+
this(None)
60+
}
61+
5462
/**
5563
* Called on executor side before [[prepareForRead()]] and instantiating actual Parquet record
5664
* readers. Responsible for figuring out Parquet requested schema used for column pruning.
@@ -95,7 +103,8 @@ private[parquet] class ParquetReadSupport extends ReadSupport[UnsafeRow] with Lo
95103
new ParquetRecordMaterializer(
96104
parquetRequestedSchema,
97105
ParquetReadSupport.expandUDT(catalystRequestedSchema),
98-
new ParquetToSparkSchemaConverter(conf))
106+
new ParquetToSparkSchemaConverter(conf),
107+
convertTz)
99108
}
100109
}
101110

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.execution.datasources.parquet
1919

20+
import java.util.TimeZone
21+
2022
import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer}
2123
import org.apache.parquet.schema.MessageType
2224

@@ -33,11 +35,12 @@ import org.apache.spark.sql.types.StructType
3335
private[parquet] class ParquetRecordMaterializer(
3436
parquetSchema: MessageType,
3537
catalystSchema: StructType,
36-
schemaConverter: ParquetToSparkSchemaConverter)
38+
schemaConverter: ParquetToSparkSchemaConverter,
39+
convertTz: Option[TimeZone])
3740
extends RecordMaterializer[UnsafeRow] {
3841

3942
private val rootConverter =
40-
new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, NoopUpdater)
43+
new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, convertTz, NoopUpdater)
4144

4245
override def getCurrentRecord: UnsafeRow = rootConverter.currentRecord
4346

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet
1919

2020
import java.math.{BigDecimal, BigInteger}
2121
import java.nio.ByteOrder
22+
import java.util.TimeZone
2223

2324
import scala.collection.JavaConverters._
2425
import scala.collection.mutable.ArrayBuffer
@@ -117,12 +118,14 @@ private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpd
117118
* @param parquetType Parquet schema of Parquet records
118119
* @param catalystType Spark SQL schema that corresponds to the Parquet record type. User-defined
119120
* types should have been expanded.
121+
* @param convertTz the optional time zone to convert to for int96 data
120122
* @param updater An updater which propagates converted field values to the parent container
121123
*/
122124
private[parquet] class ParquetRowConverter(
123125
schemaConverter: ParquetToSparkSchemaConverter,
124126
parquetType: GroupType,
125127
catalystType: StructType,
128+
convertTz: Option[TimeZone],
126129
updater: ParentContainerUpdater)
127130
extends ParquetGroupConverter(updater) with Logging {
128131

@@ -151,6 +154,8 @@ private[parquet] class ParquetRowConverter(
151154
|${catalystType.prettyJson}
152155
""".stripMargin)
153156

157+
private val UTC = DateTimeUtils.TimeZoneUTC
158+
154159
/**
155160
* Updater used together with field converters within a [[ParquetRowConverter]]. It propagates
156161
* converted filed values to the `ordinal`-th cell in `currentRow`.
@@ -279,7 +284,9 @@ private[parquet] class ParquetRowConverter(
279284
val buf = value.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN)
280285
val timeOfDayNanos = buf.getLong
281286
val julianDay = buf.getInt
282-
updater.setLong(DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos))
287+
val rawTime = DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)
288+
val adjTime = convertTz.map(DateTimeUtils.convertTz(rawTime, _, UTC)).getOrElse(rawTime)
289+
updater.setLong(adjTime)
283290
}
284291
}
285292

@@ -309,7 +316,7 @@ private[parquet] class ParquetRowConverter(
309316

310317
case t: StructType =>
311318
new ParquetRowConverter(
312-
schemaConverter, parquetType.asGroupType(), t, new ParentContainerUpdater {
319+
schemaConverter, parquetType.asGroupType(), t, convertTz, new ParentContainerUpdater {
313320
override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy())
314321
})
315322

Binary file not shown.

0 commit comments

Comments
 (0)