Skip to content

Commit 00d23a5

Browse files
Merge pull request #180 from AbsaOSS/feature/179-copy-transformer
#179: Implement copy transformer
2 parents 70f96d0 + c86207f commit 00d23a5

File tree

9 files changed

+382
-6
lines changed

9 files changed

+382
-6
lines changed

README.md

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,52 @@ component.transformer.class.{transformer-id} = za.co.absa.hyperdrive.ingestor.im
183183
| `transformer.{transformer-id}.columns.rename.from` | Yes | A comma-separated list of columns to rename. For example, `column1, column2`. |
184184
| `transformer.{transformer-id}.columns.rename.to` | Yes | A comma-separated list of new column names. For example, `column1_new, column2_new`. |
185185

186+
##### ColumnCopyStreamTransformer
187+
`ColumnCopyStreamTransformer` allows copying of columns specified in the configuration. Dots in column names are interpreted as nested structs,
188+
unless they are surrounded by backticks (same as Spark convention)
189+
190+
Note that usage of the star-operator `*` within column names is not supported and may lead to unexpected behaviour.
191+
192+
To add the transformer to the pipeline use this class name:
193+
```
194+
component.transformer.class.{transformer-id} = za.co.absa.hyperdrive.ingestor.implementation.transformer.column.copy.ColumnCopyStreamTransformer
195+
```
196+
197+
| Property Name | Required | Description |
198+
| :--- | :---: | :--- |
199+
| `transformer.{transformer-id}.columns.copy.from` | Yes | A comma-separated list of columns to copy from. For example, `column1.fieldA, column2.fieldA`. |
200+
| `transformer.{transformer-id}.columns.copy.to` | Yes | A comma-separated list of new column names. For example, `newColumn.col1_fieldA, newColumn.col2_fieldA`. |
201+
202+
**Example**
203+
204+
Given a dataframe with the following schema
205+
```
206+
|-- column1
207+
| |-- fieldA
208+
| |-- fieldB
209+
|-- column2
210+
| |-- fieldA
211+
|-- column3
212+
```
213+
214+
Then, the following column parameters
215+
- `transformer.{transformer-id}.columns.copy.from=column1.fieldA, column2.fieldA`
216+
- `transformer.{transformer-id}.columns.copy.to=newColumn.col1_fieldA, newColumn.col2_fieldA`
217+
218+
will produce the following schema
219+
```
220+
|-- column1
221+
| |-- fieldA
222+
| |-- fieldB
223+
|-- column2
224+
| |-- fieldA
225+
|-- column3
226+
|-- newColumn
227+
| |-- col1_fieldA
228+
| |-- col2_fieldA
229+
230+
```
231+
186232
See [Pipeline settings](#pipeline-settings) for details about `{transformer-id}`.
187233
##### ParquetStreamWriter
188234
| Property Name | Required | Description |

driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaToKafkaDockerTest.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,9 @@ class KafkaToKafkaDockerTest extends FlatSpec with Matchers with SparkTestBase w
8686
"component.transformer.class.[avro.decoder]" -> "za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroDecodingTransformer",
8787
"component.transformer.id.1" -> "column.selector",
8888
"component.transformer.class.column.selector" -> "za.co.absa.hyperdrive.ingestor.implementation.transformer.column.selection.ColumnSelectorStreamTransformer",
89-
"component.transformer.id.2" -> "[avro.encoder]",
89+
"component.transformer.id.2" -> "[column.copy]",
90+
"component.transformer.class.[column.copy]" -> "za.co.absa.hyperdrive.ingestor.implementation.transformer.column.copy.ColumnCopyStreamTransformer",
91+
"component.transformer.id.3" -> "[avro.encoder]",
9092
"component.transformer.class.[avro.encoder]" -> "za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroEncodingTransformer",
9193
"component.writer" -> "za.co.absa.hyperdrive.ingestor.implementation.writer.kafka.KafkaStreamWriter",
9294

@@ -108,6 +110,10 @@ class KafkaToKafkaDockerTest extends FlatSpec with Matchers with SparkTestBase w
108110
// comma separated list of columns to select
109111
"transformer.column.selector.columns.to.select" -> "*",
110112

113+
// copy transformer settings
114+
"transformer.[column.copy].columns.copy.from" -> "some_id, value_field",
115+
"transformer.[column.copy].columns.copy.to" -> "grouped.some_id, grouped.value_field",
116+
111117
// Avro Encoder (ABRiS) settings
112118
"transformer.[avro.encoder].schema.registry.url" -> "${transformer.[avro.decoder].schema.registry.url}",
113119
"transformer.[avro.encoder].value.schema.naming.strategy" -> "topic.name",
@@ -144,9 +150,15 @@ class KafkaToKafkaDockerTest extends FlatSpec with Matchers with SparkTestBase w
144150
.asScala.map(_.getType) should contain theSameElementsAs Seq(Type.STRING, Type.NULL)
145151

146152
val valueFieldNames = records.head.value().getSchema.getFields.asScala.map(_.name())
147-
valueFieldNames should contain theSameElementsAs List("some_id", "value_field")
148-
records.map(_.value().get("some_id")) should contain theSameElementsInOrderAs List.range(0, numberOfRecords)
149-
records.map(_.value().get("value_field")).distinct should contain theSameElementsAs List(new Utf8("valueHello"))
153+
valueFieldNames should contain theSameElementsAs List("some_id", "value_field", "grouped")
154+
val allIds = records.map(_.value().get("some_id"))
155+
val allValues = records.map(_.value().get("value_field"))
156+
allIds should contain theSameElementsInOrderAs List.range(0, numberOfRecords)
157+
allValues.distinct should contain theSameElementsAs List(new Utf8("valueHello"))
158+
val idsInGrouped = records.map(_.value().get("grouped").asInstanceOf[GenericRecord].get("some_id"))
159+
val valuesInGrouped = records.map(_.value().get("grouped").asInstanceOf[GenericRecord].get("value_field"))
160+
idsInGrouped shouldBe allIds
161+
valuesInGrouped shouldBe allValues
150162
}
151163

152164
after {

ingestor-default/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.transformer.StreamTransformerFactoryProvider

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,4 @@ za.co.absa.hyperdrive.ingestor.implementation.transformer.dateversion.AddDateVer
1717
za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroDecodingTransformerLoader
1818
za.co.absa.hyperdrive.ingestor.implementation.transformer.enceladus.columns.AddEnceladusColumnsTransformerLoader
1919
za.co.absa.hyperdrive.ingestor.implementation.transformer.column.renaming.ColumnRenamingStreamTransformerLoader
20+
za.co.absa.hyperdrive.ingestor.implementation.transformer.column.copy.ColumnCopyStreamTransformerLoader
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.ingestor.implementation.transformer.column.copy
17+
18+
import org.apache.commons.configuration2.Configuration
19+
import org.apache.logging.log4j.LogManager
20+
import org.apache.spark.sql.{Column, DataFrame}
21+
import org.apache.spark.sql.functions._
22+
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
23+
import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformer, StreamTransformerFactory}
24+
import za.co.absa.hyperdrive.ingestor.api.utils.ConfigUtils
25+
26+
/**
27+
* @param columnsFrom list of columns to copy from. Nested structs can be specified with dots
28+
* @param columnsTo list of columns to copy from. Nested structs can be specified with dots
29+
* Example:
30+
* Given a dataframe with the following schema
31+
* |-- orig1
32+
* | |-- nestedOrig1
33+
* |-- orig2
34+
* | |-- nestedOrig2
35+
* |-- orig3
36+
* and the following parameters
37+
*
38+
* columnsFrom=Seq("orig1.nestedOrig","orig2")
39+
* columnsTo=Seq("new1.nested1.nested11","new1.nested2")
40+
*
41+
* will produce the following schema
42+
* |-- orig1
43+
* | |-- nestedOrig1
44+
* |-- orig2
45+
* | |-- nestedOrig2
46+
* |-- orig3
47+
* |-- new1
48+
* | |-- nested1
49+
* | | |-- nested11
50+
* | |-- nested2
51+
* | | |-- nestedOrig2
52+
*
53+
*/
54+
private[transformer] class ColumnCopyStreamTransformer(val columnsFrom: Seq[String], val columnsTo: Seq[String]) extends StreamTransformer {
55+
case class Node(name: String, copyColumnName: Option[String], children: Seq[Node])
56+
57+
if (columnsFrom.size != columnsTo.size) {
58+
throw new IllegalArgumentException("The size of source column names doesn't match the list of target column names " +
59+
s"${columnsFrom.size} != ${columnsTo.size}.")
60+
}
61+
62+
def transform(dataFrame: DataFrame): DataFrame = {
63+
val parsedColumns = columnsTo.map(to => UnresolvedAttribute.parseAttributeName(to).toList)
64+
val rootNodeName = "root"
65+
val rootNode = Node(rootNodeName, None, Seq())
66+
val copyColumnsList = columnsFrom.zip(parsedColumns.map(rootNodeName :: _))
67+
val copyColumnsTreeRootNode = copyColumnsList.foldLeft(rootNode)((node, parsedColumn) =>
68+
createNode(parsedColumn._1, parsedColumn._2, Some(node)))
69+
70+
copyColumnsTreeRootNode.children.foldLeft(dataFrame)((df, topNode) =>
71+
df.withColumn(topNode.name, createColumn(topNode)))
72+
}
73+
74+
private def createNode(sourceField: String, treePath: List[String], node: Option[Node]): Node = treePath match {
75+
case last :: Nil => Node(last, Some(sourceField), Seq())
76+
case head :: tail if node.isDefined && node.get.name == head =>
77+
Node(head, None, node.get.children.filterNot(_.name == tail.head) :+
78+
createNode(sourceField, tail, node.get.children.find(_.name == tail.head)))
79+
case head :: tail => Node(head, None, Seq(createNode(sourceField, tail, None)))
80+
}
81+
82+
private def createColumn(node: Node): Column = node.children match {
83+
case Nil =>
84+
val originalColumn = node.copyColumnName.getOrElse(
85+
throw new IllegalStateException(s"Expected a copy column name at leaf node ${node}, got None"))
86+
val newColumn = node.name
87+
col(originalColumn).as(newColumn)
88+
case children => struct(children.map(createColumn):_*).as(node.name)
89+
}
90+
}
91+
92+
object ColumnCopyStreamTransformer extends StreamTransformerFactory with ColumnCopyStreamTransformerAttributes {
93+
override def apply(config: Configuration): StreamTransformer = {
94+
val columnsFrom = ConfigUtils.getSeqOrThrow(KEY_COLUMNS_FROM, config)
95+
val columnsTo = ConfigUtils.getSeqOrThrow(KEY_COLUMNS_TO, config)
96+
LogManager.getLogger.info(s"Going to create ColumnRenamingStreamTransformer using: " +
97+
s"columnsFrom='${columnsFrom.mkString(",")}', columnsTo='${columnsTo.mkString(",")}'")
98+
new ColumnCopyStreamTransformer(columnsFrom, columnsTo)
99+
}
100+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.ingestor.implementation.transformer.column.copy
17+
18+
import za.co.absa.hyperdrive.ingestor.api.{HasComponentAttributes, PropertyMetadata}
19+
20+
trait ColumnCopyStreamTransformerAttributes extends HasComponentAttributes {
21+
val KEY_COLUMNS_FROM = "columns.copy.from"
22+
val KEY_COLUMNS_TO = "columns.copy.to"
23+
24+
override def getName: String = "Column Copy Transformer"
25+
26+
override def getDescription: String = "This transformer copies given columns. Column expressions are not possible"
27+
28+
override def getProperties: Map[String, PropertyMetadata] = Map(
29+
KEY_COLUMNS_FROM -> PropertyMetadata("Source column names", Some("Comma separated list of columns to be copied."), required = true),
30+
KEY_COLUMNS_TO -> PropertyMetadata("Target column names", Some("Comma separated list of new names of the columns. The number of columns should match the list of source columns."), required = true)
31+
)
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.ingestor.implementation.transformer.column.copy
17+
18+
import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformerFactory, StreamTransformerFactoryProvider}
19+
20+
class ColumnCopyStreamTransformerLoader extends StreamTransformerFactoryProvider {
21+
override def getComponentFactory: StreamTransformerFactory = ColumnCopyStreamTransformer
22+
}

ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/TestServiceProviderConfiguration.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import za.co.absa.hyperdrive.ingestor.api.writer.{StreamWriterFactory, StreamWri
2424
import za.co.absa.hyperdrive.ingestor.api.{ComponentFactory, ComponentFactoryProvider}
2525
import za.co.absa.hyperdrive.ingestor.implementation.reader.kafka.KafkaStreamReader
2626
import za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroDecodingTransformer
27+
import za.co.absa.hyperdrive.ingestor.implementation.transformer.column.copy.ColumnCopyStreamTransformer
2728
import za.co.absa.hyperdrive.ingestor.implementation.transformer.column.renaming.ColumnRenamingStreamTransformer
2829
import za.co.absa.hyperdrive.ingestor.implementation.transformer.column.selection.ColumnSelectorStreamTransformer
2930
import za.co.absa.hyperdrive.ingestor.implementation.transformer.dateversion.AddDateVersionTransformer
@@ -44,8 +45,14 @@ class TestServiceProviderConfiguration extends FlatSpec with Matchers {
4445

4546
it should "load StreamTransformers" in {
4647
val factoryProviders = loadServices[StreamTransformerFactoryProvider, StreamTransformerFactory]()
47-
factoryProviders should contain theSameElementsAs Seq(AddDateVersionTransformer,
48-
ColumnSelectorStreamTransformer, ConfluentAvroDecodingTransformer, AddEnceladusColumnsTransformer, ColumnRenamingStreamTransformer)
48+
factoryProviders should contain theSameElementsAs Seq(
49+
AddDateVersionTransformer,
50+
ColumnSelectorStreamTransformer,
51+
ConfluentAvroDecodingTransformer,
52+
AddEnceladusColumnsTransformer,
53+
ColumnRenamingStreamTransformer,
54+
ColumnCopyStreamTransformer
55+
)
4956
}
5057

5158
it should "load StreamWriters" in {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.ingestor.implementation.transformer.column.copy
17+
18+
import org.apache.commons.configuration2.DynamicCombinedConfiguration
19+
import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler
20+
import org.apache.spark.sql.Row
21+
import org.apache.spark.sql.catalyst.encoders.RowEncoder
22+
import org.apache.spark.sql.execution.streaming.MemoryStream
23+
import org.apache.spark.sql.types.{ArrayType, IntegerType, StringType, StructType}
24+
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
25+
import za.co.absa.commons.spark.SparkTestBase
26+
27+
class TestColumnCopyStreamTransformer extends FlatSpec with SparkTestBase with Matchers with BeforeAndAfter {
28+
29+
it should "copy columns while leaving existing columns intact" in {
30+
// given
31+
val inputSchema = new StructType()
32+
.add("col1Top", new StructType()
33+
.add("Field.1", StringType, nullable = false)
34+
, nullable = false)
35+
.add("col2Top", new StructType()
36+
.add("Field2", StringType)
37+
.add("Field3", new ArrayType(IntegerType, containsNull = true)))
38+
.add("col3Top", StringType)
39+
val memoryStream = new MemoryStream[Row](1, spark.sqlContext)(RowEncoder(inputSchema))
40+
val df = memoryStream.toDF()
41+
42+
val config = new DynamicCombinedConfiguration()
43+
config.setListDelimiterHandler(new DefaultListDelimiterHandler(','))
44+
config.addProperty(ColumnCopyStreamTransformer.KEY_COLUMNS_FROM,
45+
"col1Top.`Field.1`, col2Top, col2Top, col2Top.Field2, col2Top.Field3")
46+
config.addProperty(ColumnCopyStreamTransformer.KEY_COLUMNS_TO,
47+
"copy1.`cp.col1Top`.cpField1, copy1.cpCol2Top, copy2, copy2.l1.l2.l3.Field2, copy2.l1.l2.Field3")
48+
49+
// when
50+
val resultDf = ColumnCopyStreamTransformer(config).transform(df)
51+
52+
// then
53+
val expectedSchema = new StructType()
54+
.add("col1Top", new StructType()
55+
.add("Field.1", StringType, nullable = false)
56+
, nullable = false)
57+
.add("col2Top", new StructType()
58+
.add("Field2", StringType)
59+
.add("Field3", new ArrayType(IntegerType, containsNull = true)))
60+
.add("col3Top", StringType)
61+
.add("copy1", new StructType()
62+
.add("cp.col1Top", new StructType()
63+
.add("cpField1", StringType, nullable = false),
64+
nullable = false)
65+
.add("cpCol2Top", new StructType()
66+
.add("Field2", StringType)
67+
.add("Field3", new ArrayType(IntegerType, containsNull = true))),
68+
nullable = false)
69+
.add("copy2", new StructType()
70+
.add("l1", new StructType()
71+
.add("l2", new StructType()
72+
.add("l3", new StructType()
73+
.add("Field2", StringType)
74+
, nullable = false)
75+
.add("Field3", new ArrayType(IntegerType, containsNull = true))
76+
, nullable = false)
77+
, nullable = false)
78+
, nullable = false)
79+
resultDf.schema shouldBe expectedSchema
80+
}
81+
82+
it should "throw an exception if columns from do not match columns to" in {
83+
val config = new DynamicCombinedConfiguration()
84+
config.setListDelimiterHandler(new DefaultListDelimiterHandler(','))
85+
config.addProperty(ColumnCopyStreamTransformer.KEY_COLUMNS_FROM, "col1,col2")
86+
config.addProperty(ColumnCopyStreamTransformer.KEY_COLUMNS_TO, "col1")
87+
88+
val ex = the[IllegalArgumentException] thrownBy ColumnCopyStreamTransformer(config)
89+
ex.getMessage should include("The size of source column names doesn't match")
90+
}
91+
}

0 commit comments

Comments
 (0)