Skip to content

Commit b8ecdfc

Browse files
Merge pull request #175 from AbsaOSS/feature/174-fix-unevaluable-name-placeholder
#174: Evaluate column names before creating schema
2 parents 02b6717 + c3e0edf commit b8ecdfc

File tree

4 files changed

+54
-9
lines changed

4 files changed

+54
-9
lines changed

ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/ConfluentAvroEncodingTransformer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ private[transformer] class ConfluentAvroEncodingTransformer(
6666
}
6767

6868
private def getValueDataFrame(dataFrame: DataFrame): DataFrame = {
69-
val allColumns = struct(dataFrame.columns.head, dataFrame.columns.tail: _*)
69+
val allColumns = struct(dataFrame.columns.map(c => dataFrame(c)): _*)
7070
val toAvroConfig = getValueAvroConfig(config, allColumns.expr)
7171
logger.info(s"ToAvro settings: $toAvroConfig")
7272
dataFrame.select(to_avro(allColumns, toAvroConfig) as 'value)

ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/AbrisMockSchemaRegistryClient.scala renamed to ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/testutils/HyperdriveMockSchemaRegistryClient.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,14 @@
1313
* limitations under the License.
1414
*/
1515

16-
package za.co.absa.hyperdrive.ingestor.implementation.utils
16+
package za.co.absa.hyperdrive.ingestor.implementation.testutils
1717

1818
import java.io.IOException
1919

20-
import io.confluent.kafka.schemaregistry.client.{MockSchemaRegistryClient, SchemaMetadata}
2120
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException
21+
import io.confluent.kafka.schemaregistry.client.{MockSchemaRegistryClient, SchemaMetadata}
2222

23-
private class AbrisMockSchemaRegistryClient extends MockSchemaRegistryClient {
23+
class HyperdriveMockSchemaRegistryClient extends MockSchemaRegistryClient {
2424

2525
/**
2626
* MockSchemaRegistryClient is throwing different Exception than the mocked client, this is a workaround

ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/TestConfluentAvroEncodingTransformer.scala

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,27 +16,71 @@
1616
package za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent
1717

1818
import org.apache.commons.configuration2.BaseConfiguration
19-
import org.scalatest.{FlatSpec, Matchers}
19+
import org.apache.spark.sql.execution.streaming.MemoryStream
20+
import org.apache.spark.sql.streaming.Trigger
21+
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
22+
import za.co.absa.abris.avro.read.confluent.SchemaManagerFactory
23+
import za.co.absa.abris.config.AbrisConfig
24+
import za.co.absa.commons.spark.SparkTestBase
25+
import za.co.absa.hyperdrive.ingestor.implementation.testutils.HyperdriveMockSchemaRegistryClient
2026
import za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroEncodingTransformer._
2127
import za.co.absa.hyperdrive.ingestor.implementation.utils.AbrisConfigUtil
2228
import za.co.absa.hyperdrive.ingestor.implementation.writer.kafka.KafkaStreamWriter
2329

24-
class TestConfluentAvroEncodingTransformer extends FlatSpec with Matchers {
30+
class TestConfluentAvroEncodingTransformer extends FlatSpec with Matchers with BeforeAndAfter with SparkTestBase {
2531

2632
private val topic = "topic"
27-
private val schemaRegistryURL = "http://localhost:8081"
33+
private val SchemaRegistryURL = "http://localhost:8081"
2834

2935
behavior of ConfluentAvroEncodingTransformer.getClass.getSimpleName
3036

37+
before {
38+
val mockSchemaRegistryClient = new HyperdriveMockSchemaRegistryClient()
39+
SchemaManagerFactory.resetSRClientInstance()
40+
SchemaManagerFactory.addSRClientInstance(Map(AbrisConfig.SCHEMA_REGISTRY_URL -> SchemaRegistryURL), mockSchemaRegistryClient)
41+
}
42+
3143
it should "create avro stream encoder" in {
3244
val config = new BaseConfiguration
3345
config.addProperty(KafkaStreamWriter.KEY_TOPIC, topic)
34-
config.addProperty(KEY_SCHEMA_REGISTRY_URL, schemaRegistryURL)
46+
config.addProperty(KEY_SCHEMA_REGISTRY_URL, SchemaRegistryURL)
3547
config.addProperty(KEY_SCHEMA_REGISTRY_VALUE_NAMING_STRATEGY, AbrisConfigUtil.TopicNameStrategy)
3648

3749
val encoder = ConfluentAvroEncodingTransformer(config).asInstanceOf[ConfluentAvroEncodingTransformer]
3850

3951
encoder.config shouldBe config
4052
encoder.withKey shouldBe false
4153
}
54+
55+
it should "encode the values" in {
56+
// given
57+
import spark.implicits._
58+
val queryName = "dummyQuery"
59+
val input = MemoryStream[Int](1, spark.sqlContext)
60+
input.addData(1 to 100)
61+
val df = input.toDF()
62+
63+
// when
64+
val config = new BaseConfiguration()
65+
config.addProperty(KafkaStreamWriter.KEY_TOPIC, topic)
66+
config.addProperty(KEY_SCHEMA_REGISTRY_URL, SchemaRegistryURL)
67+
config.addProperty(KEY_SCHEMA_REGISTRY_VALUE_NAMING_STRATEGY, AbrisConfigUtil.TopicNameStrategy)
68+
69+
val encoder = ConfluentAvroEncodingTransformer(config)
70+
val transformedDf = encoder.transform(df)
71+
val query = transformedDf
72+
.writeStream
73+
.trigger(Trigger.Once)
74+
.queryName(queryName)
75+
.format("memory")
76+
.start()
77+
query.awaitTermination()
78+
79+
// then
80+
import spark.implicits._
81+
val outputDf = spark.sql(s"select * from $queryName")
82+
outputDf.count() shouldBe 100
83+
val byteArrays = outputDf.select("value").map(_ (0).asInstanceOf[Array[Byte]]).collect()
84+
byteArrays.distinct.length shouldBe byteArrays.length
85+
}
4286
}

ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/TestAbrisConfigUtil.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
2525
import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils
2626
import za.co.absa.abris.avro.read.confluent.SchemaManagerFactory
2727
import za.co.absa.abris.config.AbrisConfig
28+
import za.co.absa.hyperdrive.ingestor.implementation.testutils.HyperdriveMockSchemaRegistryClient
2829

2930
class TestAbrisConfigUtil extends FlatSpec with Matchers with BeforeAndAfter {
3031

@@ -78,7 +79,7 @@ class TestAbrisConfigUtil extends FlatSpec with Matchers with BeforeAndAfter {
7879
behavior of AbrisConfigUtil.getClass.getName
7980

8081
before {
81-
mockSchemaRegistryClient = new AbrisMockSchemaRegistryClient()
82+
mockSchemaRegistryClient = new HyperdriveMockSchemaRegistryClient()
8283
SchemaManagerFactory.resetSRClientInstance()
8384
SchemaManagerFactory.addSRClientInstance(Map(AbrisConfig.SCHEMA_REGISTRY_URL -> dummySchemaRegistryUrl), mockSchemaRegistryClient)
8485
}

0 commit comments

Comments
 (0)