Skip to content

Commit 4029684

Browse files
Feature/265 deterministic plan (#266)
* Use static uuid for new columns Signed-off-by: Kevin Wallimann <[email protected]> * Use static uuid for new columns Signed-off-by: Kevin Wallimann <[email protected]> * Use maven 3.8.4 Signed-off-by: Kevin Wallimann <[email protected]> * Use maven 3.8.4 Signed-off-by: Kevin Wallimann <[email protected]>
1 parent 29e1f34 commit 4029684

File tree

3 files changed

+16
-10
lines changed

3 files changed

+16
-10
lines changed

.github/workflows/build.yml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@ jobs:
2828
key: ${{ runner.os }}-${{ matrix.scala }}-${{ matrix.spark }}-${{ hashFiles('**/pom.xml') }}
2929
restore-keys: |
3030
${{ runner.os }}-${{ matrix.scala }}-${{ matrix.spark }}-
31+
- name: Install Maven 3.8.4
32+
run: mvn -N wrapper:wrapper -Dmaven=3.8.4
3133
- name: License check
32-
run: mvn clean validate --no-transfer-progress -Plicense-check,spark-2,spark-3,scala-2.12
34+
run: ./mvnw clean validate --no-transfer-progress -Plicense-check,spark-2,spark-3,scala-2.12
3335
- name: Switch scala version
34-
run: mvn scala-cross-build:change-version --no-transfer-progress -Pscala-${{ matrix.scala }},spark-${{ matrix.spark }}
36+
run: ./mvnw scala-cross-build:change-version --no-transfer-progress -Pscala-${{ matrix.scala }},spark-${{ matrix.spark }}
3537
- name: Build and run tests
36-
run: mvn clean verify --no-transfer-progress -Pscala-${{ matrix.scala }},spark-${{ matrix.spark }},all-tests
38+
run: ./mvnw clean verify --no-transfer-progress -Pscala-${{ matrix.scala }},spark-${{ matrix.spark }},all-tests

ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/ConfluentAvroDecodingTransformer.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
package za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent
1717

18-
import java.util.UUID
1918
import org.apache.commons.configuration2.Configuration
2019
import org.apache.commons.lang3.RandomStringUtils
2120
import org.slf4j.LoggerFactory
@@ -27,10 +26,10 @@ import za.co.absa.abris.config.FromAvroConfig
2726
import za.co.absa.hyperdrive.ingestor.api.context.HyperdriveContext
2827
import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformer, StreamTransformerFactory}
2928
import za.co.absa.hyperdrive.ingestor.api.utils.ConfigUtils
30-
import za.co.absa.hyperdrive.ingestor.api.utils.ConfigUtils.getOrThrow
3129
import za.co.absa.hyperdrive.ingestor.implementation.HyperdriveContextKeys
3230
import za.co.absa.hyperdrive.ingestor.implementation.reader.kafka.KafkaStreamReader.KEY_TOPIC
33-
import za.co.absa.hyperdrive.ingestor.implementation.utils.{AbrisConfigKeys, AbrisConfigUtil, AbrisConsumerConfigKeys, SchemaRegistryConfigUtil}
31+
import za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroDecodingTransformer.ColumnPrefix
32+
import za.co.absa.hyperdrive.ingestor.implementation.utils.{AbrisConfigUtil, AbrisConsumerConfigKeys, SchemaRegistryConfigUtil}
3433

3534
private[transformer] class ConfluentAvroDecodingTransformer(
3635
val valueAvroConfig: FromAvroConfig,
@@ -48,8 +47,8 @@ private[transformer] class ConfluentAvroDecodingTransformer(
4847
}
4948

5049
private def getKeyValueDataFrame(dataFrame: DataFrame, keyAvroConfig: FromAvroConfig) = {
51-
val keyStructCol = UUID.randomUUID().toString
52-
val valueStructCol = UUID.randomUUID().toString
50+
val keyStructCol = ColumnPrefix + "-key"
51+
val valueStructCol = ColumnPrefix + "-value"
5352
val columnsToSelect = Seq(
5453
from_avro(col("key"), keyAvroConfig) as Symbol(keyStructCol),
5554
from_avro(col("value"), valueAvroConfig) as Symbol(valueStructCol)
@@ -73,7 +72,7 @@ private[transformer] class ConfluentAvroDecodingTransformer(
7372
}
7473

7574
private def getValueDataFrame(dataFrame: DataFrame) = {
76-
val dataStructCol = UUID.randomUUID().toString
75+
val dataStructCol = ColumnPrefix
7776
val columnsToSelect = Seq(
7877
from_avro(col("value"), valueAvroConfig) as Symbol(dataStructCol)
7978
) ++ keepColumns.map(col)
@@ -85,7 +84,7 @@ private[transformer] class ConfluentAvroDecodingTransformer(
8584
nonNullableDf.select(dataColumns ++ keepColumns.map(col):_*)
8685
}
8786

88-
private def checkIfColumnNameConflictsExistWithKeepColumns(avroColumns: Seq[String]) = {
87+
private def checkIfColumnNameConflictsExistWithKeepColumns(avroColumns: Seq[String]): Unit = {
8988
val nameCollisions = avroColumns.intersect(keepColumns)
9089
if (nameCollisions.nonEmpty) {
9190
throw new IllegalArgumentException(s"Names of columns to keep collided with key and value columns. Consider renaming them before. Conflicts: $nameCollisions")
@@ -107,6 +106,8 @@ private[transformer] class ConfluentAvroDecodingTransformer(
107106
object ConfluentAvroDecodingTransformer extends StreamTransformerFactory with ConfluentAvroDecodingTransformerAttributes {
108107
private val keyColumnPrefixLength = 4
109108

109+
val ColumnPrefix = "hyperdrive-22c9fda5-d56c-44e6-9c5f-13197e71f3fc"
110+
110111
object AbrisConfigKeys extends AbrisConsumerConfigKeys {
111112
override val topic: String = KEY_TOPIC
112113
override val schemaId: String = KEY_SCHEMA_REGISTRY_VALUE_SCHEMA_ID

pom.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,9 @@
284284
<exclude>.editorconfig</exclude>
285285
<exclude>**/*.MockMaker</exclude>
286286
<exclude>**/*.json</exclude>
287+
<exclude>mvnw</exclude>
288+
<exclude>mvnw.cmd</exclude>
289+
<exclude>.mvn/**/*</exclude>
287290
</excludes>
288291
</configuration>
289292
</plugin>

0 commit comments

Comments
 (0)