Skip to content

Commit 52743c4

Browse files
committed
#710 Tidy up the code for Spark schema comparison.
1 parent 6fac7e5 commit 52743c4

File tree

1 file changed

+12
-12
lines changed

1 file changed

+12
-12
lines changed

pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,12 @@ object SparkUtils {
155155
}
156156

157157
/**
158-
* Compares 2 schemas.
158+
* Compares two schemas represented as `StructType` and identifies the differences
159+
* between them, such as newly added fields, deleted fields, or fields with changed types.
160+
*
161+
* @param schemaA the first schema to compare
162+
* @param schemaB the second schema to compare
163+
* @return a list of `FieldChange` that represents the differences between the two schemas
159164
*/
160165
def compareSchemas(schemaA: StructType, schemaB: StructType): List[FieldChange] = {
161166
val newFields = new ListBuffer[FieldChange]
@@ -181,37 +186,32 @@ object SparkUtils {
181186
val newColumns: Array[FieldChange] = schema2.fields
182187
.filter(f => !fields1.contains(f.name))
183188
.map(f => FieldChange.NewField(s"$path${f.name}", dataTypeToString(f.dataType, f.metadata)))
189+
newFields ++= newColumns
184190

185191
val deletedColumns: Array[FieldChange] = schema1.fields
186192
.filter(f => !fields2.contains(f.name))
187193
.map(f => FieldChange.DeletedField(s"$path${f.name}", dataTypeToString(f.dataType, f.metadata)))
194+
deletedFields ++= deletedColumns
188195

189-
val changedType: Array[FieldChange] = schema1.fields
196+
schema1.fields
190197
.filter(f => fields2.contains(f.name))
191-
.flatMap(f1 => {
198+
.foreach(f1 => {
192199
val f2 = fields2(f1.name)
193200

194201
(f1.dataType, f2.dataType) match {
195202
case (st1: StructType, st2: StructType) =>
196203
processStruct(st1, st2, s"$path${f1.name}.")
197-
Seq.empty
198204
case (ar1: ArrayType, ar2: ArrayType) =>
199205
processArray(ar1, ar2, f1.metadata, f2.metadata, s"$path${f1.name}")
200-
Seq.empty
201206
case _ =>
202207
val dt1 = dataTypeToString(f1.dataType, f1.metadata)
203208
val dt2 = dataTypeToString(f2.dataType, f2.metadata)
204209

205-
if (dt1 == dt2) {
206-
Seq.empty[FieldChange]
207-
} else {
208-
Seq(FieldChange.ChangedType(s"$path${f1.name}", dt1, dt2))
210+
if (dt1 != dt2) {
211+
changedFields += FieldChange.ChangedType(s"$path${f1.name}", dt1, dt2)
209212
}
210213
}
211214
})
212-
newFields ++= newColumns
213-
deletedFields ++= deletedColumns
214-
changedFields ++= changedType
215215
}
216216

217217
def processArray(array1: ArrayType, array2: ArrayType, metadata1: Metadata, metadata2: Metadata, path: String = ""): Unit = {

0 commit comments

Comments
 (0)