Skip to content

Commit ab4d1b3

Browse files
authored
#40: Make UDFs being registered exactly once per SparkSession (#41)
* upgraded to Spark-commons 0.4.0 * using `OncePerSparkSession` as a predecessor for `UDFLibrary` to prevent multiple registration * added `doc` to `build.yml` script
1 parent eefe87a commit ab4d1b3

File tree

2 files changed

+33
-29
lines changed

2 files changed

+33
-29
lines changed

.github/workflows/build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,4 @@ jobs:
4040
with:
4141
java-version: "[email protected]"
4242
- name: Build and run tests
43-
run: sbt ++${{matrix.scala}} test
43+
run: sbt ++${{matrix.scala}} test doc

src/main/scala/za/co/absa/standardization/udf/UDFLibrary.scala

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -22,48 +22,52 @@ import org.apache.spark.sql.types._
2222
import org.apache.spark.sql.{Row, SparkSession}
2323
import za.co.absa.standardization.config.{ErrorCodesConfig, StandardizationConfig}
2424
import za.co.absa.standardization.udf.UDFNames._
25-
import za.co.absa.standardization.{ErrorMessage, Mapping}
25+
import za.co.absa.standardization.ErrorMessage
26+
import za.co.absa.spark.commons.OncePerSparkSession
2627

2728
import scala.collection.mutable
2829
import scala.util.{Failure, Success, Try}
2930

30-
class UDFLibrary(stdConfig: StandardizationConfig)(implicit val spark: SparkSession) extends Serializable {
31+
class UDFLibrary(stdConfig: StandardizationConfig)(implicit spark: SparkSession) extends OncePerSparkSession with Serializable {
3132

3233
private implicit val errorCodes: ErrorCodesConfig = stdConfig.errorCodes
3334

34-
spark.udf.register(stdCastErr, { (errCol: String, rawValue: String) =>
35-
ErrorMessage.stdCastErr(errCol, rawValue)
36-
})
35+
override protected def register(implicit spark: SparkSession): Unit = {
3736

38-
spark.udf.register(stdNullErr, { errCol: String => ErrorMessage.stdNullErr(errCol) })
37+
spark.udf.register(stdCastErr, { (errCol: String, rawValue: String) =>
38+
ErrorMessage.stdCastErr(errCol, rawValue)
39+
})
3940

40-
spark.udf.register(stdSchemaErr, { errRow: String => ErrorMessage.stdSchemaError(errRow) })
41+
spark.udf.register(stdNullErr, { errCol: String => ErrorMessage.stdNullErr(errCol) })
4142

42-
spark.udf.register(arrayDistinctErrors, // this UDF is registered for _spark-hats_ library sake
43-
(arr: mutable.WrappedArray[ErrorMessage]) =>
44-
if (arr != null) {
45-
arr.distinct.filter((a: AnyRef) => a != null)
46-
} else {
47-
Seq[ErrorMessage]()
48-
}
49-
)
43+
spark.udf.register(stdSchemaErr, { errRow: String => ErrorMessage.stdSchemaError(errRow) })
5044

51-
spark.udf.register(cleanErrCol,
52-
UDFLibrary.cleanErrCol,
53-
ArrayType.apply(ErrorMessage.errorColSchema, containsNull = false))
45+
spark.udf.register(arrayDistinctErrors, // this UDF is registered for _spark-hats_ library sake
46+
(arr: mutable.WrappedArray[ErrorMessage]) =>
47+
if (arr != null) {
48+
arr.distinct.filter((a: AnyRef) => a != null)
49+
} else {
50+
Seq[ErrorMessage]()
51+
}
52+
)
53+
54+
spark.udf.register(cleanErrCol,
55+
UDFLibrary.cleanErrCol,
56+
ArrayType.apply(ErrorMessage.errorColSchema, containsNull = false))
5457

55-
spark.udf.register(errorColumnAppend,
56-
UDFLibrary.errorColumnAppend,
57-
ArrayType.apply(ErrorMessage.errorColSchema, containsNull = false))
58+
spark.udf.register(errorColumnAppend,
59+
UDFLibrary.errorColumnAppend,
60+
ArrayType.apply(ErrorMessage.errorColSchema, containsNull = false))
5861

5962

60-
spark.udf.register(binaryUnbase64,
61-
{stringVal: String => Try {
62-
Base64.getDecoder.decode(stringVal)
63-
} match {
64-
case Success(decoded) => decoded
65-
case Failure(_) => null //scalastyle:ignore null
66-
}})
63+
spark.udf.register(binaryUnbase64,
64+
{stringVal: String => Try {
65+
Base64.getDecoder.decode(stringVal)
66+
} match {
67+
case Success(decoded) => decoded
68+
case Failure(_) => null //scalastyle:ignore null
69+
}})
70+
}
6771
}
6872

6973
object UDFLibrary {

0 commit comments

Comments
 (0)