Skip to content

Commit e70300e

Browse files
authored
Merge branch 'master' into release/3.8.1
2 parents 4e48fbe + ec84511 commit e70300e

File tree

5 files changed

+128
-8
lines changed

5 files changed

+128
-8
lines changed

model/src/main/scala/za/co/absa/atum/model/RunStatus.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,15 @@
1515

1616
package za.co.absa.atum.model
1717

18+
import com.fasterxml.jackson.core.`type`.TypeReference
19+
import com.fasterxml.jackson.module.scala.JsonScalaEnumeration
1820
import za.co.absa.atum.model.RunState.RunState
1921

22+
class RunStateType extends TypeReference[RunState.type]
23+
2024
case class RunStatus
2125
(
26+
@JsonScalaEnumeration(classOf[RunStateType])
2227
status: RunState,
2328
error: Option[RunError]
2429
)
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2018-2019 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.atum.util
17+
18+
import com.fasterxml.jackson.annotation.JsonInclude.Include
19+
import com.fasterxml.jackson.databind.ObjectMapper
20+
import com.fasterxml.jackson.module.scala.DefaultScalaModule
21+
22+
import scala.reflect.ClassTag
23+
import scala.util.Try
24+
25+
/**
26+
* Sample serializer that is expected to be used for Atum's model externally, e.g. in Enceladus
27+
*/
28+
object JacksonJsonSerializer {
29+
30+
val objectMapper: ObjectMapper = new ObjectMapper()
31+
.registerModule(DefaultScalaModule)
32+
.setSerializationInclusion(Include.NON_EMPTY) // e.g. null-values fields omitted
33+
34+
35+
def fromJson[T](json: String)
36+
(implicit ct: ClassTag[T]): T = {
37+
val clazz = ct.runtimeClass.asInstanceOf[Class[T]]
38+
if (clazz == classOf[String]) {
39+
json.asInstanceOf[T]
40+
} else {
41+
objectMapper.readValue(json, clazz)
42+
}
43+
}
44+
45+
def toJson[T](entity: T): String = {
46+
entity match {
47+
case str: String =>
48+
if (isValidJson(str)) str else objectMapper.writeValueAsString(entity)
49+
case _ =>
50+
objectMapper.writeValueAsString(entity)
51+
}
52+
}
53+
54+
def isValidJson[T](str: T with String): Boolean = {
55+
Try(objectMapper.readTree(str)).isSuccess
56+
}
57+
58+
}

model/src/test/scala/za/co/absa/atum/util/SerializationUtilsJsonSpec.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import za.co.absa.atum.model.{Checkpoint, ControlMeasure, ControlMeasureMetadata
2121
import za.co.absa.atum.utils.SerializationUtils
2222

2323
/**
24-
* Unit tests for ControlMeasure SerializationUtils-based object serialization
24+
* Unit tests for ControlMeasure and RunStatus SerializationUtils-based object serialization
2525
*/
2626
class SerializationUtilsJsonSpec extends AnyFlatSpec with Matchers {
2727

@@ -136,4 +136,13 @@ class SerializationUtilsJsonSpec extends AnyFlatSpec with Matchers {
136136
SerializationUtils.fromJson[Seq[RunStatus]](SerializationUtils.asJsonPretty(runStatuses)) shouldEqual runStatuses
137137
}
138138

139+
// jackson serialization support (notice the `runStatusesJson` being reused):
140+
it should "serialize via Jackson's toJson" in {
141+
JacksonJsonSerializer.toJson(runStatuses) shouldBe runStatusesJson
142+
}
143+
144+
it should "deserialize via Jackson's fromJson" in {
145+
JacksonJsonSerializer.fromJson[Array[RunStatus]](runStatusesJson) shouldBe runStatuses // Array to overcome runtime erasure
146+
}
147+
139148
}

pom.xml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@
120120

121121
<!-- Frameworks and libraries -->
122122
<json4s.version>3.5.3</json4s.version> <!-- This version is set to be compatible with Spark 2.4.5 -->
123+
<jackson.version>2.10.4</jackson.version> <!-- Spark 2.4.x default is 2.8.4 -->
123124
<scalatest.maven.version>2.0.2</scalatest.maven.version>
124125
<scalatest.version>3.2.9</scalatest.version>
125126
<slf4j.version>1.7.25</slf4j.version>
@@ -154,6 +155,16 @@
154155
<artifactId>spark-core_${scala.binary.version}</artifactId>
155156
<version>${spark.version}</version>
156157
<scope>provided</scope>
158+
<exclusions>
159+
<exclusion>
160+
<artifactId>com.fasterxml.jackson.core</artifactId>
161+
<groupId>jackson-databind</groupId>
162+
</exclusion>
163+
<exclusion>
164+
<artifactId>com.fasterxml.jackson.module</artifactId>
165+
<groupId>jackson-module-scala_${scala.binary.version}</groupId>
166+
</exclusion>
167+
</exclusions>
157168
</dependency>
158169
<dependency>
159170
<groupId>org.apache.spark</groupId>
@@ -169,6 +180,18 @@
169180
<version>${json4s.version}</version>
170181
</dependency>
171182

183+
<dependency>
184+
<groupId>com.fasterxml.jackson.core</groupId>
185+
<artifactId>jackson-databind</artifactId>
186+
<version>${jackson.version}</version>
187+
</dependency>
188+
<dependency>
189+
<groupId>com.fasterxml.jackson.module</groupId>
190+
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
191+
<version>${jackson.version}</version>
192+
</dependency>
193+
194+
172195
<!-- scalatest -->
173196
<dependency>
174197
<groupId>org.specs2</groupId>

project/Dependencies.scala

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ object Dependencies {
3030
val scalatest = "3.2.9"
3131
val specs2 = "2.5"
3232
val aws = "2.17.85"
33+
val jacksonModuleScala = "2.10.4"
3334

3435
val apacheCommonsLang3 = "3.12.0"
3536
val commonsConfiguration = "1.6"
@@ -57,21 +58,44 @@ object Dependencies {
5758
}
5859
}
5960

60-
val sparkCore = moduleByScala("org.apache.spark" %% "spark-core" % _ % Provided)(Versions.spark2, Versions.spark3) _
61-
val sparkSql = moduleByScala("org.apache.spark" %% "spark-sql" % _ % Provided)(Versions.spark2, Versions.spark3) _
61+
62+
// extended version where to moduleId Fn takes 2 params: module version and scala version (to pass along)
63+
def moduleByScalaUsingScalaVersion(moduleIdWithoutVersionNeedsScalaVersion: (String, String) => ModuleID)
64+
(scala211Version: String, scala212Version: String)
65+
(actualScalaVersion: String): ModuleID = {
66+
actualScalaVersion match {
67+
case _ if actualScalaVersion.startsWith("2.11") => moduleIdWithoutVersionNeedsScalaVersion.apply(scala211Version, actualScalaVersion)
68+
case _ if actualScalaVersion.startsWith("2.12") => moduleIdWithoutVersionNeedsScalaVersion.apply(scala212Version, actualScalaVersion)
69+
case _ => throw new IllegalArgumentException("Only Scala 2.11 and 2.12 are currently supported.")
70+
}
71+
}
72+
73+
74+
lazy val sparkCore = {
75+
def coreWithExcludes(version: String, scalaVersion: String): ModuleID = "org.apache.spark" %% "spark-core" % version % Provided exclude(
76+
"com.fasterxml.jackson.core", "jackson-databind"
77+
) exclude(
78+
"com.fasterxml.jackson.module", "jackson-module-scala_" + scalaVersion.substring(0, 4) // e.g. 2.11
79+
)
80+
moduleByScalaUsingScalaVersion(coreWithExcludes)(Versions.spark2, Versions.spark3) _
81+
}
82+
83+
lazy val sparkSql = moduleByScala("org.apache.spark" %% "spark-sql" % _ % Provided)(Versions.spark2, Versions.spark3) _
6284

6385
lazy val scalaTest = "org.scalatest" %% "scalatest" % Versions.scalatest % Test
6486

65-
val json4sExt = moduleByScala("org.json4s" %% "json4s-ext" % _)(Versions.json4s_spark2, Versions.json4s_spark3) _
66-
val json4sCore = moduleByScala("org.json4s" %% "json4s-core" % _ % Provided)(Versions.json4s_spark2, Versions.json4s_spark3) _
67-
val json4sJackson = moduleByScala("org.json4s" %% "json4s-jackson" % _ % Provided)(Versions.json4s_spark2, Versions.json4s_spark3) _
68-
val json4sNative = moduleByScala("org.json4s" %% "json4s-native" % _ % Provided)(Versions.json4s_spark2, Versions.json4s_spark3)_
87+
lazy val json4sExt = moduleByScala("org.json4s" %% "json4s-ext" % _)(Versions.json4s_spark2, Versions.json4s_spark3) _
88+
lazy val json4sCore = moduleByScala("org.json4s" %% "json4s-core" % _ % Provided)(Versions.json4s_spark2, Versions.json4s_spark3) _
89+
lazy val json4sJackson = moduleByScala("org.json4s" %% "json4s-jackson" % _ % Provided)(Versions.json4s_spark2, Versions.json4s_spark3) _
90+
lazy val json4sNative = moduleByScala("org.json4s" %% "json4s-native" % _ % Provided)(Versions.json4s_spark2, Versions.json4s_spark3) _
6991

7092
lazy val absaCommons = "za.co.absa.commons" %% "commons" % Versions.absaCommons
7193
lazy val commonsConfiguration = "commons-configuration" % "commons-configuration" % Versions.commonsConfiguration
7294
lazy val apacheCommons = "org.apache.commons" % "commons-lang3" % Versions.apacheCommonsLang3
7395
lazy val typeSafeConfig = "com.typesafe" % "config" % Versions.typesafeConfig
7496

97+
lazy val jacksonModuleScala = "com.fasterxml.jackson.module" %% "jackson-module-scala" % Versions.jacksonModuleScala
98+
7599
lazy val mockitoScala = "org.mockito" %% "mockito-scala" % Versions.mockitoScala % Test
76100
lazy val mockitoScalaScalatest = "org.mockito" %% "mockito-scala-scalatest" % Versions.mockitoScala % Test
77101

@@ -90,7 +114,8 @@ object Dependencies {
90114
def modelDependencies(scalaVersion: String): Seq[ModuleID] = Seq(
91115
json4sCore(scalaVersion),
92116
json4sJackson(scalaVersion),
93-
json4sNative(scalaVersion)
117+
json4sNative(scalaVersion),
118+
jacksonModuleScala
94119
)
95120

96121
def coreDependencies(scalaVersion: String): Seq[ModuleID] = Seq(

0 commit comments

Comments
 (0)