Skip to content

Commit 20ada07

Browse files
sydneybealSydney Bealyihua
authored andcommitted
[HUDI-8235] Adding support for EPOCHMICROSECONDS in TimestampBasedAvroKeyGenerator (#7913)
Co-authored-by: Sydney Beal <[email protected]> Co-authored-by: Y Ethan Guo <[email protected]>
1 parent f02f7b8 commit 20ada07

File tree

3 files changed

+24
-7
lines changed

3 files changed

+24
-7
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.TimeZone;
4242
import java.util.concurrent.TimeUnit;
4343

44+
import static java.util.concurrent.TimeUnit.MICROSECONDS;
4445
import static java.util.concurrent.TimeUnit.MILLISECONDS;
4546
import static java.util.concurrent.TimeUnit.SECONDS;
4647
import static org.apache.hudi.common.config.TimestampKeyGeneratorConfig.DATE_TIME_PARSER;
@@ -54,7 +55,7 @@
5455
*/
5556
public class TimestampBasedAvroKeyGenerator extends SimpleAvroKeyGenerator {
5657
public enum TimestampType implements Serializable {
57-
UNIX_TIMESTAMP, DATE_STRING, MIXED, EPOCHMILLISECONDS, SCALAR
58+
UNIX_TIMESTAMP, DATE_STRING, MIXED, EPOCHMILLISECONDS, EPOCHMICROSECONDS, SCALAR
5859
}
5960

6061
private final TimeUnit timeUnit;
@@ -93,6 +94,9 @@ public TimestampBasedAvroKeyGenerator(TypedProperties config) throws IOException
9394
case EPOCHMILLISECONDS:
9495
timeUnit = MILLISECONDS;
9596
break;
97+
case EPOCHMICROSECONDS:
98+
timeUnit = MICROSECONDS;
99+
break;
96100
case UNIX_TIMESTAMP:
97101
timeUnit = SECONDS;
98102
break;

hudi-common/src/main/java/org/apache/hudi/common/config/TimestampKeyGeneratorConfig.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ public class TimestampKeyGeneratorConfig extends HoodieConfig {
4242
.withAlternatives(OLD_TIMESTAMP_KEYGEN_CONFIG_PREFIX + "timestamp.type")
4343
.markAdvanced()
4444
.withDocumentation("Timestamp type of the field, which should be one of the timestamp types "
45-
+ "supported: `UNIX_TIMESTAMP`, `DATE_STRING`, `MIXED`, `EPOCHMILLISECONDS`, `SCALAR`.");
45+
+ "supported: `UNIX_TIMESTAMP`, `DATE_STRING`, `MIXED`, `EPOCHMILLISECONDS`,"
46+
+ " `EPOCHMICROSECONDS`, `SCALAR`.");
4647

4748
public static final ConfigProperty<String> INPUT_TIME_UNIT = ConfigProperty
4849
.key(TIMESTAMP_KEYGEN_CONFIG_PREFIX + "timestamp.scalar.time.unit")

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.hudi.DataSourceReadOptions._
2222
import org.apache.hudi.HoodieConversionUtils.toJavaOption
2323
import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, extractEqualityPredicatesLiteralValues, generateFieldMap, haveProperPartitionValues, shouldListLazily, shouldUsePartitionPathPrefixAnalysis, shouldValidatePartitionColumns}
2424
import org.apache.hudi.client.common.HoodieSparkEngineContext
25-
import org.apache.hudi.common.config.TypedProperties
25+
import org.apache.hudi.common.config.{TimestampKeyGeneratorConfig, TypedProperties}
2626
import org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION
2727
import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType}
2828
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
@@ -46,10 +46,10 @@ import org.apache.spark.sql.catalyst.{InternalRow, expressions}
4646
import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache}
4747
import org.apache.spark.sql.internal.SQLConf
4848
import org.apache.spark.sql.types.{ByteType, DateType, IntegerType, LongType, ShortType, StringType, StructField, StructType}
49+
import org.apache.spark.unsafe.types.UTF8String
4950

5051
import java.util.Collections
5152
import javax.annotation.concurrent.NotThreadSafe
52-
5353
import scala.collection.JavaConverters._
5454
import scala.language.implicitConversions
5555
import scala.util.{Success, Try}
@@ -405,9 +405,21 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
405405
}
406406

407407
protected def doParsePartitionColumnValues(partitionColumns: Array[String], partitionPath: String): Array[Object] = {
408-
HoodieSparkUtils.parsePartitionColumnValues(partitionColumns, partitionPath, getBasePath, schema,
409-
configProperties.getString(DateTimeUtils.TIMEZONE_OPTION, SQLConf.get.sessionLocalTimeZone),
410-
sparkParsePartitionUtil, shouldValidatePartitionColumns(spark))
408+
val tableConfig = metaClient.getTableConfig
409+
if (null != tableConfig.getKeyGeneratorClassName
410+
&& tableConfig.getKeyGeneratorClassName.equals(classOf[org.apache.hudi.keygen.TimestampBasedKeyGenerator].getName)
411+
&& tableConfig.propsMap.get(TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD.key())
412+
.matches("SCALAR|UNIX_TIMESTAMP|EPOCHMILLISECONDS|EPOCHMICROSECONDS")) {
413+
// For TIMESTAMP key generator when TYPE is SCALAR, UNIX_TIMESTAMP,
414+
// EPOCHMILLISECONDS, or EPOCHMICROSECONDS,
415+
// we couldn't reconstruct initial partition column values from partition paths due to lost data after formatting in most cases.
416+
// But the output for these cases is in a string format, so we can pass partitionPath as UTF8String
417+
Array.fill(partitionColumns.length)(UTF8String.fromString(partitionPath))
418+
} else {
419+
HoodieSparkUtils.parsePartitionColumnValues(partitionColumns, partitionPath, getBasePath, schema,
420+
configProperties.getString(DateTimeUtils.TIMEZONE_OPTION, SQLConf.get.sessionLocalTimeZone),
421+
sparkParsePartitionUtil, shouldValidatePartitionColumns(spark))
422+
}
411423
}
412424

413425
private def arePartitionPathsUrlEncoded: Boolean =

0 commit comments

Comments
 (0)