Skip to content

Commit e6c2ad8

Browse files
Use MetadataLogFileIndex to get latest version in AddDateVersionTransformer (#308)
* Use MetadataLogFileIndex to get latest version in AddDateVersionTransformer * Fix * Fix build * Increase test coverage * Improve formatting --------- Co-authored-by: Kevin Wallimann <[email protected]>
1 parent 7f32b89 commit e6c2ad8

File tree

9 files changed

+222
-17
lines changed

9 files changed

+222
-17
lines changed

compatibility-api/src/main/scala/za/co/absa/hyperdrive/compatibility/api/CompatibleSparkUtil.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@ package za.co.absa.hyperdrive.compatibility.api
1717

1818
import org.apache.spark.sql.SparkSession
1919
import org.apache.spark.sql.execution.streaming.MetadataLogFileIndex
20+
import org.apache.spark.sql.types.StructType
2021

2122
trait CompatibleSparkUtil {
22-
def createMetadataLogFileIndex(spark: SparkSession, destination: String): MetadataLogFileIndex
23+
def createMetadataLogFileIndex(spark: SparkSession, destination: String, userSpecifiedSchema: Option[StructType]): MetadataLogFileIndex
2324
def hasMetadata(spark: SparkSession, destination: String): Boolean
2425
def jsonStringToObject(jsonString: String): Object
2526
def objectToJsonString(obj: Object): Option[String]

compatibility-provider/src/main/scala/za/co/absa/hyperdrive/compatibility/provider/CompatibleSparkUtilProvider.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@ package za.co.absa.hyperdrive.compatibility.provider
1717

1818
import org.apache.spark.sql.SparkSession
1919
import org.apache.spark.sql.execution.streaming.MetadataLogFileIndex
20+
import org.apache.spark.sql.types.StructType
2021
import za.co.absa.hyperdrive.compatibility.api.CompatibleSparkUtil
2122
import za.co.absa.hyperdrive.compatibility.impl.SparkUtil
2223

2324
object CompatibleSparkUtilProvider extends CompatibleSparkUtil {
24-
def createMetadataLogFileIndex(spark: SparkSession, destination: String): MetadataLogFileIndex =
25-
SparkUtil.createMetadataLogFileIndex(spark, destination)
25+
def createMetadataLogFileIndex(spark: SparkSession, destination: String, userSpecifiedSchema: Option[StructType]): MetadataLogFileIndex =
26+
SparkUtil.createMetadataLogFileIndex(spark, destination, userSpecifiedSchema)
2627

2728
def hasMetadata(spark: SparkSession, destination: String): Boolean =
2829
SparkUtil.hasMetadata(spark, destination)
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.compatibility.provider
17+
18+
import org.scalatest.flatspec.AnyFlatSpec
19+
import org.scalatest.matchers.should.Matchers
20+
import za.co.absa.commons.io.{TempDirectory, TempFile}
21+
import za.co.absa.spark.commons.test.SparkTestBase
22+
23+
class TestCompatibleSparkUtilProvider extends AnyFlatSpec with Matchers with SparkTestBase {
24+
25+
"createMetadataLogFileIndex" should "return a metadata log file index" in {
26+
val result = CompatibleSparkUtilProvider.createMetadataLogFileIndex(
27+
spark,
28+
TempDirectory("createMetadataLogFileIndex").path.toAbsolutePath.toString,
29+
None
30+
)
31+
32+
result shouldBe a[org.apache.spark.sql.execution.streaming.MetadataLogFileIndex]
33+
}
34+
35+
"hasMetadata" should "return true if metadata exists" in {
36+
val result = CompatibleSparkUtilProvider.hasMetadata(
37+
spark,
38+
TempFile("hasMetadata").path.toAbsolutePath.toString
39+
)
40+
41+
result shouldBe false
42+
}
43+
44+
"jsonStringToObject" should "return an object from a JSON string" in {
45+
val jsonString = """{"key": "value"}"""
46+
47+
val result = CompatibleSparkUtilProvider.jsonStringToObject(jsonString)
48+
49+
import scala.collection.JavaConverters._
50+
result.asInstanceOf[java.util.Map[String, String]].asScala should contain("key" -> "value")
51+
}
52+
53+
"objectToJsonString" should "return a JSON string from an object" in {
54+
import scala.collection.JavaConverters._
55+
val obj = Map("key" -> "value").asJava
56+
57+
val result = CompatibleSparkUtilProvider.objectToJsonString(obj)
58+
59+
result shouldBe defined
60+
result.get shouldBe """{"key":"value"}"""
61+
}
62+
}

compatibility_spark-2/src/main/scala/za/co/absa/hyperdrive/compatibility/impl/SparkUtil.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import org.apache.avro.util.internal.JacksonUtils
1919
import org.apache.hadoop.fs.Path
2020
import org.apache.spark.sql.SparkSession
2121
import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex}
22+
import org.apache.spark.sql.types.StructType
2223
import org.codehaus.jackson.map.ObjectMapper
2324
import za.co.absa.hyperdrive.compatibility.api.CompatibleSparkUtil
2425

@@ -27,8 +28,8 @@ import java.io.ByteArrayOutputStream
2728
object SparkUtil extends CompatibleSparkUtil {
2829
private lazy val objectMapper = new ObjectMapper()
2930

30-
override def createMetadataLogFileIndex(spark: SparkSession, destination: String): MetadataLogFileIndex =
31-
new MetadataLogFileIndex(spark, new Path(destination), None)
31+
override def createMetadataLogFileIndex(spark: SparkSession, destination: String, userSpecifiedSchema: Option[StructType]): MetadataLogFileIndex =
32+
new MetadataLogFileIndex(spark, new Path(destination), userSpecifiedSchema)
3233

3334
override def hasMetadata(spark: SparkSession, destination: String): Boolean =
3435
FileStreamSink.hasMetadata(Seq(destination), spark.sparkContext.hadoopConfiguration)
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.compatibility.impl
17+
18+
import org.scalatest.flatspec.AnyFlatSpec
19+
import org.scalatest.matchers.should.Matchers
20+
import za.co.absa.spark.commons.test.SparkTestBase
21+
22+
import za.co.absa.commons.io.{TempDirectory, TempFile}
23+
24+
class TestSparkUtil extends AnyFlatSpec with Matchers with SparkTestBase {
25+
26+
"createMetadataLogFileIndex" should "return a metadata log file index" in {
27+
val result = SparkUtil.createMetadataLogFileIndex(
28+
spark,
29+
TempDirectory("createMetadataLogFileIndex").path.toAbsolutePath.toString,
30+
None
31+
)
32+
33+
result shouldBe a[org.apache.spark.sql.execution.streaming.MetadataLogFileIndex]
34+
}
35+
36+
"hasMetadata" should "return true if metadata exists" in {
37+
val result = SparkUtil.hasMetadata(
38+
spark,
39+
TempFile("hasMetadata").path.toAbsolutePath.toString
40+
)
41+
42+
result shouldBe false
43+
}
44+
45+
"jsonStringToObject" should "return an object from a JSON string" in {
46+
val jsonString = """{"key": "value"}"""
47+
48+
val result = SparkUtil.jsonStringToObject(jsonString)
49+
50+
import scala.collection.JavaConverters._
51+
result.asInstanceOf[java.util.Map[String, String]].asScala should contain("key" -> "value")
52+
}
53+
54+
"objectToJsonString" should "return a JSON string from an object" in {
55+
import scala.collection.JavaConverters._
56+
val obj = Map("key" -> "value").asJava
57+
58+
val result = SparkUtil.objectToJsonString(obj)
59+
60+
result shouldBe defined
61+
result.get shouldBe """{"key":"value"}"""
62+
}
63+
}

compatibility_spark-3/src/main/scala/za/co/absa/hyperdrive/compatibility/impl/SparkUtil.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,16 @@ import org.apache.avro.util.internal.JacksonUtils
2020
import org.apache.hadoop.fs.Path
2121
import org.apache.spark.sql.SparkSession
2222
import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex}
23+
import org.apache.spark.sql.types.StructType
2324
import za.co.absa.hyperdrive.compatibility.api.CompatibleSparkUtil
2425

2526
import java.io.ByteArrayOutputStream
2627

2728
object SparkUtil extends CompatibleSparkUtil {
2829
private lazy val objectMapper = new ObjectMapper()
2930

30-
override def createMetadataLogFileIndex(spark: SparkSession, destination: String): MetadataLogFileIndex =
31-
new MetadataLogFileIndex(spark, new Path(destination), Map.empty, None)
31+
override def createMetadataLogFileIndex(spark: SparkSession, destination: String, userSpecifiedSchema: Option[StructType]): MetadataLogFileIndex =
32+
new MetadataLogFileIndex(spark, new Path(destination), Map.empty, userSpecifiedSchema)
3233

3334
override def hasMetadata(spark: SparkSession, destination: String): Boolean =
3435
FileStreamSink.hasMetadata(Seq(destination), spark.sparkContext.hadoopConfiguration, spark.sessionState.conf)
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.compatibility.impl
17+
18+
import org.scalatest.flatspec.AnyFlatSpec
19+
import org.scalatest.matchers.should.Matchers
20+
import za.co.absa.spark.commons.test.SparkTestBase
21+
22+
import za.co.absa.commons.io.{TempDirectory, TempFile}
23+
24+
class TestSparkUtil extends AnyFlatSpec with Matchers with SparkTestBase {
25+
26+
"createMetadataLogFileIndex" should "return a metadata log file index" in {
27+
val result = SparkUtil.createMetadataLogFileIndex(
28+
spark,
29+
TempDirectory("createMetadataLogFileIndex").path.toAbsolutePath.toString,
30+
None
31+
)
32+
33+
result shouldBe a[org.apache.spark.sql.execution.streaming.MetadataLogFileIndex]
34+
}
35+
36+
"hasMetadata" should "return true if metadata exists" in {
37+
val result = SparkUtil.hasMetadata(
38+
spark,
39+
TempFile("hasMetadata").path.toAbsolutePath.toString
40+
)
41+
42+
result shouldBe false
43+
}
44+
45+
"jsonStringToObject" should "return an object from a JSON string" in {
46+
val jsonString = """{"key": "value"}"""
47+
48+
val result = SparkUtil.jsonStringToObject(jsonString)
49+
50+
import scala.collection.JavaConverters._
51+
result.asInstanceOf[java.util.Map[String, String]].asScala should contain("key" -> "value")
52+
}
53+
54+
"objectToJsonString" should "return a JSON string from an object" in {
55+
import scala.collection.JavaConverters._
56+
val obj = Map("key" -> "value").asJava
57+
58+
val result = SparkUtil.objectToJsonString(obj)
59+
60+
result shouldBe defined
61+
result.get shouldBe """{"key":"value"}"""
62+
}
63+
}

ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/dateversion/AddDateVersionTransformer.scala

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,21 @@
1616
package za.co.absa.hyperdrive.ingestor.implementation.transformer.dateversion
1717

1818
import org.apache.commons.configuration2.Configuration
19+
import org.apache.spark.sql.catalyst.InternalRow
20+
import org.apache.spark.sql.execution.streaming.MetadataLogFileIndex
1921
import org.slf4j.LoggerFactory
2022
import org.apache.spark.sql.functions.{lit, to_date}
23+
import org.apache.spark.sql.types.StructType
2124
import org.apache.spark.sql.{DataFrame, SparkSession}
2225
import za.co.absa.hyperdrive.compatibility.provider.CompatibleSparkUtilProvider
2326
import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformer, StreamTransformerFactory}
2427
import za.co.absa.hyperdrive.ingestor.api.utils.ConfigUtils.getOrThrow
2528
import za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetStreamWriter
29+
import org.apache.spark.sql.types.{DateType, StructField}
2630

2731
import java.time.LocalDate
2832
import java.time.format.DateTimeFormatter
33+
import org.apache.spark.sql.types._
2934

3035
private[transformer] class AddDateVersionTransformer(val reportDate: String, val destination: String) extends StreamTransformer {
3136

@@ -44,22 +49,30 @@ private[transformer] class AddDateVersionTransformer(val reportDate: String, val
4449
if (noCommittedParquetFilesExist(spark)) {
4550
initialVersion
4651
} else {
47-
import spark.implicits._
48-
val df = spark.read.parquet(destination)
49-
val versions = df.select(df(ColumnVersion))
50-
.filter(df(ColumnDate) === lit(reportDate))
51-
.distinct()
52-
.as[Int]
53-
.collect().toList
54-
52+
val versions = getVersions(spark, ColumnDate, ColumnVersion, reportDate)
5553
if (versions.nonEmpty) versions.max + 1 else initialVersion
5654
}
5755
}
5856

5957
private def noCommittedParquetFilesExist(spark: SparkSession): Boolean = {
60-
val fileCatalog = CompatibleSparkUtilProvider.createMetadataLogFileIndex(spark, destination)
58+
val fileCatalog = CompatibleSparkUtilProvider.createMetadataLogFileIndex(spark, destination, None)
6159
!CompatibleSparkUtilProvider.hasMetadata(spark, destination) || fileCatalog.allFiles().isEmpty
6260
}
61+
62+
private def getVersions(spark: SparkSession, ColumnDate: String, ColumnVersion: String, reportDate: String): Seq[Int] = {
63+
val fileCatalog: MetadataLogFileIndex = CompatibleSparkUtilProvider.createMetadataLogFileIndex(spark, destination, Some(StructType(Seq(
64+
StructField(ColumnDate, StringType, nullable = true),
65+
StructField(ColumnVersion, IntegerType, nullable = true)
66+
))))
67+
68+
fileCatalog.partitionSpec().partitions.map { partition =>
69+
val row: InternalRow = partition.values
70+
(row.getString(0), row.getInt(1))
71+
}
72+
.filter { case (date, _) => date == reportDate }
73+
.map { case (_, version) => version }
74+
.toList
75+
}
6376
}
6477

6578
object AddDateVersionTransformer extends StreamTransformerFactory with AddDateVersionTransformerAttributes {

ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/MetadataLogUtil.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ object MetadataLogUtil {
6060
}
6161

6262
private def getMetadataLogFiles(spark: SparkSession, rootPath: String): Try[Set[String]] = {
63-
val metadataLogFileIndex = CompatibleSparkUtilProvider.createMetadataLogFileIndex(spark, rootPath)
63+
val metadataLogFileIndex = CompatibleSparkUtilProvider.createMetadataLogFileIndex(spark, rootPath, None)
6464
val parquetFilesArr = metadataLogFileIndex.inputFiles
6565
val parquetFiles = parquetFilesArr.toSet
6666
if (parquetFiles.size != parquetFilesArr.length) {

0 commit comments

Comments
 (0)