Skip to content

Commit 02b6717

Browse files
authored
#170 Add column renaming transformer (#173)
* Add column renaming transformer. * Extend e2e the test suite with the renaming transformer. * Add renaming transformer usage description to README.md.
1 parent 726b1a1 commit 02b6717

File tree

9 files changed

+336
-5
lines changed

9 files changed

+336
-5
lines changed

README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,19 @@ Caution: This transformer requires a writer which defines `writer.parquet.destin
170170
| :--- | :---: | :--- |
171171
| `transformer.{transformer-id}.report.date` | No | User-defined date for `hyperdrive_date` in format `yyyy-MM-dd`. Default date is the date of the ingestion |
172172

173+
##### ColumnRenamingStreamTransformer
174+
`ColumnRenamingStreamTransformer` allows renaming of columns specified in the configuration.
175+
176+
To add the transformer to the pipeline use this class name:
177+
```
178+
component.transformer.class.{transformer-id} = za.co.absa.hyperdrive.ingestor.implementation.transformer.column.renaming.ColumnRenamingStreamTransformer
179+
```
180+
181+
| Property Name | Required | Description |
182+
| :--- | :---: | :--- |
183+
| `transformer.{transformer-id}.columns.rename.from` | Yes | A comma-separated list of columns to rename. For example, `column1, column2`. |
184+
| `transformer.{transformer-id}.columns.rename.to` | Yes | A comma-separated list of new column names. For example, `column1_new, column2_new`. |
185+
173186
See [Pipeline settings](#pipeline-settings) for details about `{transformer-id}`.
174187
##### ParquetStreamWriter
175188
| Property Name | Required | Description |

driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaToParquetIncrementingVersionDockerTest.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ class KafkaToParquetIncrementingVersionDockerTest extends FlatSpec with Matchers
6464
"component.transformer.class.[avro.decoder]" -> "za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroDecodingTransformer",
6565
"component.transformer.id.1" -> "[version.incrementer]",
6666
"component.transformer.class.[version.incrementer]" -> "za.co.absa.hyperdrive.ingestor.implementation.transformer.dateversion.AddDateVersionTransformer",
67+
"component.transformer.id.2" -> "[column.renamer]",
68+
"component.transformer.class.[column.renamer]" -> "za.co.absa.hyperdrive.ingestor.implementation.transformer.column.renaming.ColumnRenamingStreamTransformer",
6769
"component.writer" -> "za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetStreamWriter",
6870

6971
// Spark settings
@@ -82,6 +84,10 @@ class KafkaToParquetIncrementingVersionDockerTest extends FlatSpec with Matchers
8284
// comma separated list of columns to select
8385
"transformer.[version.incrementer].report.date" -> "2020-03-31",
8486

87+
// Rename columns transformer
88+
"transformer.[column.renamer].columns.rename.from" -> "field1,field2",
89+
"transformer.[column.renamer].columns.rename.to" -> "field3,field4",
90+
8591
// Sink(Parquet) settings
8692
"writer.common.checkpoint.location" -> (checkpointDir + "/${reader.kafka.topic}"),
8793
"writer.parquet.destination.directory" -> destinationDir,
@@ -102,7 +108,7 @@ class KafkaToParquetIncrementingVersionDockerTest extends FlatSpec with Matchers
102108
val df = spark.read.parquet(destinationDir)
103109
df.count shouldBe ingestionSize
104110
import spark.implicits._
105-
df.columns should contain theSameElementsAs List("field1", "field2", "hyperdrive_date", "hyperdrive_version")
111+
df.columns should contain theSameElementsAs List("field3", "field4", "hyperdrive_date", "hyperdrive_version")
106112
df.select("hyperdrive_version").distinct().as[Int].collect() should contain theSameElementsAs List(1)
107113

108114
// when (2)
@@ -115,10 +121,10 @@ class KafkaToParquetIncrementingVersionDockerTest extends FlatSpec with Matchers
115121
val df2 = spark.read.parquet(destinationDir)
116122
df2.count shouldBe 2 * ingestionSize
117123
import spark.implicits._
118-
df2.columns should contain theSameElementsAs List("field1", "field2", "hyperdrive_date", "hyperdrive_version")
124+
df2.columns should contain theSameElementsAs List("field3", "field4", "hyperdrive_date", "hyperdrive_version")
119125
df2.select("hyperdrive_version").distinct().as[Int].collect() should contain theSameElementsAs List(1, 2)
120-
df2.select("field1").distinct().as[String].collect() should contain theSameElementsAs List("hello")
121-
df2.select("field2").as[Int].collect() should contain theSameElementsAs (0 until ingestionSize) ++ (0 until ingestionSize)
126+
df2.select("field3").distinct().as[String].collect() should contain theSameElementsAs List("hello")
127+
df2.select("field4").as[Int].collect() should contain theSameElementsAs (0 until ingestionSize) ++ (0 until ingestionSize)
122128
df2.select("hyperdrive_date").distinct()
123129
.as[java.sql.Date].collect() should contain theSameElementsAs List(java.sql.Date.valueOf("2020-03-31"))
124130
}

ingestor-default/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.transformer.StreamTransformerFactoryProvider

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@ za.co.absa.hyperdrive.ingestor.implementation.transformer.column.selection.Colum
1616
za.co.absa.hyperdrive.ingestor.implementation.transformer.dateversion.AddDateVersionTransformerLoader
1717
za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroDecodingTransformerLoader
1818
za.co.absa.hyperdrive.ingestor.implementation.transformer.enceladus.columns.AddEnceladusColumnsTransformerLoader
19+
za.co.absa.hyperdrive.ingestor.implementation.transformer.column.renaming.ColumnRenamingStreamTransformerLoader
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.ingestor.implementation.transformer.column.renaming
17+
18+
import org.apache.commons.configuration2.Configuration
19+
import org.apache.logging.log4j.LogManager
20+
import org.apache.spark.sql.DataFrame
21+
import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformer, StreamTransformerFactory}
22+
23+
private[transformer] class ColumnRenamingStreamTransformer(val columnsFrom: Seq[String], val columnsTo: Seq[String]) extends StreamTransformer {
24+
if (columnsFrom.isEmpty || columnsTo.isEmpty) {
25+
throw new IllegalArgumentException("Empty list of columns to rename.")
26+
}
27+
28+
if (columnsFrom.size != columnsTo.size) {
29+
throw new IllegalArgumentException("The size of source column names doesn't match the list of target column names " +
30+
s"${columnsFrom.size} != ${columnsTo.size}.")
31+
}
32+
33+
override def transform(streamData: DataFrame): DataFrame = {
34+
val renamings = columnsFrom.zip(columnsTo)
35+
36+
renamings.foldLeft(streamData){ case (df, (from, to)) =>
37+
df.withColumnRenamed(from, to)
38+
}
39+
}
40+
}
41+
42+
object ColumnRenamingStreamTransformer extends StreamTransformerFactory with ColumnRenamingStreamTransformerAttributes {
43+
override def apply(config: Configuration): StreamTransformer = {
44+
val columnsFrom = config.getStringArray(KEY_COLUMNS_FROM)
45+
val columnsTo = config.getStringArray(KEY_COLUMNS_TO)
46+
LogManager.getLogger.info(s"Going to create ColumnRenamingStreamTransformer using: " +
47+
s"columnsFrom='${columnsFrom.mkString(",")}', columnsTo='${columnsTo.mkString(",")}'")
48+
new ColumnRenamingStreamTransformer(columnsFrom, columnsTo)
49+
}
50+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.ingestor.implementation.transformer.column.renaming
17+
18+
import za.co.absa.hyperdrive.ingestor.api.{HasComponentAttributes, PropertyMetadata}
19+
20+
trait ColumnRenamingStreamTransformerAttributes extends HasComponentAttributes {
21+
val KEY_COLUMNS_FROM = "columns.rename.from"
22+
val KEY_COLUMNS_TO = "columns.rename.to"
23+
24+
override def getName: String = "Column Renaming Transformer"
25+
26+
override def getDescription: String = "This transformer renames given columns. Column expressions are not possible"
27+
28+
override def getProperties: Map[String, PropertyMetadata] = Map(
29+
KEY_COLUMNS_FROM -> PropertyMetadata("Source column names", Some("Comma separated list of columns to be renames."), required = true),
30+
KEY_COLUMNS_TO -> PropertyMetadata("Target column names", Some("Comma separated list of new names of the columns. The number of columns should match the list of source columns."), required = true)
31+
)
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.ingestor.implementation.transformer.column.renaming
17+
18+
import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformerFactory, StreamTransformerFactoryProvider}
19+
20+
class ColumnRenamingStreamTransformerLoader extends StreamTransformerFactoryProvider {
21+
override def getComponentFactory: StreamTransformerFactory = ColumnRenamingStreamTransformer
22+
}

ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/TestServiceProviderConfiguration.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import za.co.absa.hyperdrive.ingestor.api.writer.{StreamWriterFactory, StreamWri
2424
import za.co.absa.hyperdrive.ingestor.api.{ComponentFactory, ComponentFactoryProvider}
2525
import za.co.absa.hyperdrive.ingestor.implementation.reader.kafka.KafkaStreamReader
2626
import za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroDecodingTransformer
27+
import za.co.absa.hyperdrive.ingestor.implementation.transformer.column.renaming.ColumnRenamingStreamTransformer
2728
import za.co.absa.hyperdrive.ingestor.implementation.transformer.column.selection.ColumnSelectorStreamTransformer
2829
import za.co.absa.hyperdrive.ingestor.implementation.transformer.dateversion.AddDateVersionTransformer
2930
import za.co.absa.hyperdrive.ingestor.implementation.transformer.enceladus.columns.AddEnceladusColumnsTransformer
@@ -44,7 +45,7 @@ class TestServiceProviderConfiguration extends FlatSpec with Matchers {
4445
it should "load StreamTransformers" in {
4546
val factoryProviders = loadServices[StreamTransformerFactoryProvider, StreamTransformerFactory]()
4647
factoryProviders should contain theSameElementsAs Seq(AddDateVersionTransformer,
47-
ColumnSelectorStreamTransformer, ConfluentAvroDecodingTransformer, AddEnceladusColumnsTransformer)
48+
ColumnSelectorStreamTransformer, ConfluentAvroDecodingTransformer, AddEnceladusColumnsTransformer, ColumnRenamingStreamTransformer)
4849
}
4950

5051
it should "load StreamWriters" in {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.ingestor.implementation.transformer.column.renaming
17+
18+
import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler
19+
import org.apache.commons.configuration2.{BaseConfiguration, DynamicCombinedConfiguration}
20+
import org.apache.spark.sql.DataFrame
21+
import org.apache.spark.sql.functions.col
22+
import org.apache.spark.sql.execution.streaming.MemoryStream
23+
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
24+
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
25+
import za.co.absa.commons.io.TempDirectory
26+
import za.co.absa.commons.spark.SparkTestBase
27+
import za.co.absa.hyperdrive.ingestor.api.writer.StreamWriterProperties
28+
import za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetStreamWriter
29+
30+
class TestColumnRenamingStreamTransformer extends FlatSpec with SparkTestBase with Matchers with BeforeAndAfter {
31+
import spark.implicits._
32+
33+
private var baseDir: TempDirectory = _
34+
35+
private def baseDirPath = baseDir.path.toUri.toString
36+
37+
private def destinationDir = s"$baseDirPath/destination"
38+
39+
private def checkpointDir = s"$baseDirPath/checkpoint"
40+
41+
private val random = scala.util.Random
42+
43+
behavior of "ColumnRenamingStreamTransformer"
44+
45+
before {
46+
baseDir = TempDirectory("testColumnRenamingStreamTransformer").deleteOnExit()
47+
}
48+
49+
after {
50+
baseDir.delete()
51+
}
52+
53+
it should "rename an input column" in {
54+
val config = new BaseConfiguration()
55+
config.addProperty(ParquetStreamWriter.KEY_DESTINATION_DIRECTORY, destinationDir)
56+
config.addProperty(ColumnRenamingStreamTransformer.KEY_COLUMNS_FROM, "value")
57+
config.addProperty(ColumnRenamingStreamTransformer.KEY_COLUMNS_TO, "v")
58+
val underTest = ColumnRenamingStreamTransformer(config)
59+
val df = getDummyReadStream().toDF()
60+
61+
executeQuery(underTest.transform(df))
62+
63+
val actualDf = spark.read.parquet(destinationDir)
64+
65+
assert(df.schema.exists(f => f.name == "value"))
66+
assert(!df.schema.exists(f => f.name == "v"))
67+
assert(actualDf.schema.exists(f => f.name == "v"))
68+
assert(!actualDf.schema.exists(f => f.name == "value"))
69+
}
70+
71+
it should "rename multiple columns while leaving existing columns intact" in {
72+
val config = new DynamicCombinedConfiguration()
73+
config.setListDelimiterHandler(new DefaultListDelimiterHandler(','))
74+
75+
config.addProperty(ParquetStreamWriter.KEY_DESTINATION_DIRECTORY, destinationDir)
76+
config.addProperty(ColumnRenamingStreamTransformer.KEY_COLUMNS_FROM, "value, value2")
77+
config.addProperty(ColumnRenamingStreamTransformer.KEY_COLUMNS_TO, "v1, v2")
78+
val underTest = ColumnRenamingStreamTransformer(config)
79+
val df = getDummyReadStream().toDF()
80+
.withColumn("value2", col("value"))
81+
.withColumn("value3", col("value"))
82+
83+
executeQuery(underTest.transform(df))
84+
85+
val actualDf = spark.read.parquet(destinationDir)
86+
87+
actualDf.printSchema()
88+
assert(actualDf.schema.exists(f => f.name == "v1"))
89+
assert(actualDf.schema.exists(f => f.name == "v2"))
90+
assert(actualDf.schema.exists(f => f.name == "value3"))
91+
92+
assert(!actualDf.schema.exists(f => f.name == "value"))
93+
assert(!actualDf.schema.exists(f => f.name == "value2"))
94+
}
95+
96+
it should "throw an exception if columns from do not match columns to" in {
97+
val config = new DynamicCombinedConfiguration()
98+
config.setListDelimiterHandler(new DefaultListDelimiterHandler(','))
99+
100+
config.addProperty(ParquetStreamWriter.KEY_DESTINATION_DIRECTORY, destinationDir)
101+
config.addProperty(ColumnRenamingStreamTransformer.KEY_COLUMNS_FROM, "value, value2")
102+
config.addProperty(ColumnRenamingStreamTransformer.KEY_COLUMNS_TO, "v1")
103+
104+
val ex = intercept[IllegalArgumentException] {
105+
ColumnRenamingStreamTransformer(config)
106+
}
107+
108+
assert(ex.getMessage.contains("The size of source column names doesn't match"))
109+
}
110+
111+
private def executeQuery(df: DataFrame): Unit = {
112+
val query = df
113+
.writeStream
114+
.option(StreamWriterProperties.CheckpointLocation, checkpointDir)
115+
.outputMode(OutputMode.Append)
116+
.trigger(Trigger.Once)
117+
.start(destinationDir)
118+
query.awaitTermination()
119+
}
120+
121+
private def getDummyReadStream(): MemoryStream[Int] = {
122+
val input = MemoryStream[Int](random.nextInt(), spark.sqlContext)
123+
input.addData(List.range(0, 100))
124+
input
125+
}
126+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.ingestor.implementation.transformer.column.renaming
17+
18+
import org.apache.commons.configuration2.{BaseConfiguration, DynamicCombinedConfiguration}
19+
import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler
20+
import org.scalatest.FlatSpec
21+
22+
import ColumnRenamingStreamTransformer._
23+
24+
class TestColumnRenamingStreamTransformerObject extends FlatSpec {
25+
behavior of ColumnRenamingStreamTransformer.getClass.getSimpleName
26+
27+
it should "create ColumnRenamingStreamTransformer for column pairs specified in configurations" in {
28+
val columnsFrom = Seq("a", "b", "c")
29+
val columnsTo = Seq("A", "b2", "C3")
30+
val config = new DynamicCombinedConfiguration()
31+
config.setListDelimiterHandler(new DefaultListDelimiterHandler(','))
32+
config.addProperty(KEY_COLUMNS_FROM, columnsFrom.mkString(","))
33+
config.addProperty(KEY_COLUMNS_TO, columnsTo.mkString(","))
34+
35+
val transformer = ColumnRenamingStreamTransformer(config).asInstanceOf[ColumnRenamingStreamTransformer]
36+
assert(columnsFrom == transformer.columnsFrom)
37+
assert(columnsTo == transformer.columnsTo)
38+
}
39+
40+
it should "throw an exception if source columns are not specified" in {
41+
val columnsTo = Seq("A", "b2", "C3")
42+
val config = new DynamicCombinedConfiguration()
43+
config.setListDelimiterHandler(new DefaultListDelimiterHandler(','))
44+
config.addProperty(KEY_COLUMNS_TO, columnsTo.mkString(","))
45+
46+
val ex = intercept[IllegalArgumentException] {
47+
ColumnRenamingStreamTransformer(config).asInstanceOf[ColumnRenamingStreamTransformer]
48+
}
49+
50+
assert(ex.getMessage.contains("Empty list of columns to rename."))
51+
}
52+
53+
it should "throw an exception if target columns are not specified" in {
54+
val columnsFrom = Seq("a", "b", "c")
55+
val config = new DynamicCombinedConfiguration()
56+
config.setListDelimiterHandler(new DefaultListDelimiterHandler(','))
57+
config.addProperty(KEY_COLUMNS_FROM, columnsFrom.mkString(","))
58+
59+
val ex = intercept[IllegalArgumentException] {
60+
ColumnRenamingStreamTransformer(config).asInstanceOf[ColumnRenamingStreamTransformer]
61+
}
62+
63+
assert(ex.getMessage.contains("Empty list of columns to rename."))
64+
}
65+
66+
it should "throw an exception if two lists do not match" in {
67+
val columnsFrom = Seq("a", "b", "c")
68+
val columnsTo = Seq("A", "b2")
69+
val config = new DynamicCombinedConfiguration()
70+
config.setListDelimiterHandler(new DefaultListDelimiterHandler(','))
71+
config.addProperty(KEY_COLUMNS_FROM, columnsFrom.mkString(","))
72+
config.addProperty(KEY_COLUMNS_TO, columnsTo.mkString(","))
73+
74+
val ex = intercept[IllegalArgumentException] {
75+
ColumnRenamingStreamTransformer(config).asInstanceOf[ColumnRenamingStreamTransformer]
76+
}
77+
78+
assert(ex.getMessage.contains("The size of source column names doesn't match"))
79+
}
80+
}

0 commit comments

Comments
 (0)