Skip to content

Commit 68ec207

Browse files
arunmahadevanHyukjinKwon
authored andcommitted
[SPARK-25260][SQL] Fix namespace handling in SchemaConverters.toAvroType
## What changes were proposed in this pull request? `toAvroType` converts spark data type to avro schema. It always appends the record name to namespace so its impossible to have an Avro namespace independent of the record name. When invoked with a spark data type like, ```java val sparkSchema = StructType(Seq( StructField("name", StringType, nullable = false), StructField("address", StructType(Seq( StructField("city", StringType, nullable = false), StructField("state", StringType, nullable = false))), nullable = false))) // map it to an avro schema with record name "employee" and top level namespace "foo.bar", val avroSchema = SchemaConverters.toAvroType(sparkSchema, false, "employee", "foo.bar") // result is // avroSchema.getName = employee // avroSchema.getNamespace = foo.bar.employee // avroSchema.getFullname = foo.bar.employee.employee ``` The patch proposes to fix this so that the result is ``` avroSchema.getName = employee avroSchema.getNamespace = foo.bar avroSchema.getFullname = foo.bar.employee ``` ## How was this patch tested? New and existing unit tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Closes apache#22251 from arunmahadevan/avro-fix. Authored-by: Arun Mahadevan <[email protected]> Signed-off-by: hyukjinkwon <[email protected]>
1 parent 32c8a3d commit 68ec207

File tree

2 files changed

+48
-12
lines changed

2 files changed

+48
-12
lines changed

external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ object SchemaConverters {
123123
catalystType: DataType,
124124
nullable: Boolean = false,
125125
recordName: String = "topLevelRecord",
126-
prevNameSpace: String = "")
126+
nameSpace: String = "")
127127
: Schema = {
128128
val builder = SchemaBuilder.builder()
129129

@@ -143,29 +143,25 @@ object SchemaConverters {
143143
val avroType = LogicalTypes.decimal(d.precision, d.scale)
144144
val fixedSize = minBytesForPrecision(d.precision)
145145
// Need to avoid naming conflict for the fixed fields
146-
val name = prevNameSpace match {
146+
val name = nameSpace match {
147147
case "" => s"$recordName.fixed"
148-
case _ => s"$prevNameSpace.$recordName.fixed"
148+
case _ => s"$nameSpace.$recordName.fixed"
149149
}
150150
avroType.addToSchema(SchemaBuilder.fixed(name).size(fixedSize))
151151

152152
case BinaryType => builder.bytesType()
153153
case ArrayType(et, containsNull) =>
154154
builder.array()
155-
.items(toAvroType(et, containsNull, recordName, prevNameSpace))
155+
.items(toAvroType(et, containsNull, recordName, nameSpace))
156156
case MapType(StringType, vt, valueContainsNull) =>
157157
builder.map()
158-
.values(toAvroType(vt, valueContainsNull, recordName, prevNameSpace))
158+
.values(toAvroType(vt, valueContainsNull, recordName, nameSpace))
159159
case st: StructType =>
160-
val nameSpace = prevNameSpace match {
161-
case "" => recordName
162-
case _ => s"$prevNameSpace.$recordName"
163-
}
164-
160+
val childNameSpace = if (nameSpace != "") s"$nameSpace.$recordName" else recordName
165161
val fieldsAssembler = builder.record(recordName).namespace(nameSpace).fields()
166162
st.foreach { f =>
167163
val fieldAvroType =
168-
toAvroType(f.dataType, f.nullable, f.name, nameSpace)
164+
toAvroType(f.dataType, f.nullable, f.name, childNameSpace)
169165
fieldsAssembler.name(f.name).`type`(fieldAvroType).noDefault()
170166
}
171167
fieldsAssembler.endRecord()

external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1082,7 +1082,6 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
10821082
val schema = getAvroSchemaStringFromFiles(dir.toString)
10831083
assert(schema.contains("\"namespace\":\"topLevelRecord\""))
10841084
assert(schema.contains("\"namespace\":\"topLevelRecord.data\""))
1085-
assert(schema.contains("\"namespace\":\"topLevelRecord.data.data\""))
10861085
}
10871086
}
10881087

@@ -1099,6 +1098,47 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
10991098
}
11001099
}
11011100

1101+
test("check namespace - toAvroType") {
1102+
val sparkSchema = StructType(Seq(
1103+
StructField("name", StringType, nullable = false),
1104+
StructField("address", StructType(Seq(
1105+
StructField("city", StringType, nullable = false),
1106+
StructField("state", StringType, nullable = false))),
1107+
nullable = false)))
1108+
val employeeType = SchemaConverters.toAvroType(sparkSchema,
1109+
recordName = "employee",
1110+
nameSpace = "foo.bar")
1111+
1112+
assert(employeeType.getFullName == "foo.bar.employee")
1113+
assert(employeeType.getName == "employee")
1114+
assert(employeeType.getNamespace == "foo.bar")
1115+
1116+
val addressType = employeeType.getField("address").schema()
1117+
assert(addressType.getFullName == "foo.bar.employee.address")
1118+
assert(addressType.getName == "address")
1119+
assert(addressType.getNamespace == "foo.bar.employee")
1120+
}
1121+
1122+
test("check empty namespace - toAvroType") {
1123+
val sparkSchema = StructType(Seq(
1124+
StructField("name", StringType, nullable = false),
1125+
StructField("address", StructType(Seq(
1126+
StructField("city", StringType, nullable = false),
1127+
StructField("state", StringType, nullable = false))),
1128+
nullable = false)))
1129+
val employeeType = SchemaConverters.toAvroType(sparkSchema,
1130+
recordName = "employee")
1131+
1132+
assert(employeeType.getFullName == "employee")
1133+
assert(employeeType.getName == "employee")
1134+
assert(employeeType.getNamespace == null)
1135+
1136+
val addressType = employeeType.getField("address").schema()
1137+
assert(addressType.getFullName == "employee.address")
1138+
assert(addressType.getName == "address")
1139+
assert(addressType.getNamespace == "employee")
1140+
}
1141+
11021142
case class NestedMiddleArray(id: Int, data: Array[NestedBottom])
11031143

11041144
case class NestedTopArray(id: Int, data: NestedMiddleArray)

0 commit comments

Comments
 (0)