Skip to content

Commit 7717a17

Browse files
committed
feat(schema): Add converter for Spark StructType to HoodieSchema
1 parent e2ffe0e commit 7717a17

File tree

3 files changed

+1142
-0
lines changed

3 files changed

+1142
-0
lines changed
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi
20+
21+
import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaField, HoodieSchemaType}
22+
import org.apache.hudi.internal.schema.HoodieSchemaException
23+
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
24+
25+
import scala.collection.JavaConverters._
26+
27+
/**
28+
* Utilities for converting between HoodieSchema and Spark SQL schemas.
29+
*
30+
* This object provides high-level conversion methods with utilities for
31+
* handling defaults and nullability alignment.
32+
*/
33+
object HoodieSchemaConversionUtils {
34+
35+
/**
36+
* Converts HoodieSchema to Catalyst's StructType.
37+
*
38+
* @param hoodieSchema HoodieSchema to convert
39+
* @return Spark StructType corresponding to the HoodieSchema
40+
* @throws HoodieSchemaException if conversion fails
41+
*/
42+
def convertHoodieSchemaToStructType(hoodieSchema: HoodieSchema): StructType = {
43+
try {
44+
HoodieSchemaConverters.toSqlType(hoodieSchema).dataType.asInstanceOf[StructType]
45+
} catch {
46+
case e: Exception => throw new HoodieSchemaException(
47+
s"Failed to convert HoodieSchema to StructType: $hoodieSchema", e)
48+
}
49+
}
50+
51+
/**
52+
* Converts HoodieSchema to Catalyst's DataType (general purpose, not just StructType).
53+
*
54+
* @param hoodieSchema HoodieSchema to convert
55+
* @return Spark DataType corresponding to the HoodieSchema
56+
* @throws HoodieSchemaException if conversion fails
57+
*/
58+
def convertHoodieSchemaToDataType(hoodieSchema: HoodieSchema): DataType = {
59+
try {
60+
HoodieSchemaConverters.toSqlType(hoodieSchema).dataType
61+
} catch {
62+
case e: Exception => throw new HoodieSchemaException(
63+
s"Failed to convert HoodieSchema to DataType: $hoodieSchema", e)
64+
}
65+
}
66+
67+
/**
68+
* Converts StructType to HoodieSchema.
69+
*
70+
* @param structType Catalyst's StructType or DataType
71+
* @param qualifiedName HoodieSchema qualified name (namespace.name format)
72+
* @return HoodieSchema corresponding to the Spark DataType
73+
* @throws HoodieSchemaException if conversion fails
74+
*/
75+
def convertStructTypeToHoodieSchema(structType: DataType, qualifiedName: String): HoodieSchema = {
76+
val (namespace, name) = {
77+
val parts = qualifiedName.split('.')
78+
if (parts.length > 1) {
79+
(parts.init.mkString("."), parts.last)
80+
} else {
81+
("", parts.head)
82+
}
83+
}
84+
convertStructTypeToHoodieSchema(structType, name, namespace)
85+
}
86+
87+
/**
88+
* Converts StructType to HoodieSchema.
89+
*
90+
* @param structType Catalyst's StructType or DataType
91+
* @param structName Schema record name
92+
* @param recordNamespace Schema record namespace
93+
* @return HoodieSchema corresponding to the Spark DataType
94+
* @throws HoodieSchemaException if conversion fails
95+
*/
96+
def convertStructTypeToHoodieSchema(structType: DataType,
97+
structName: String,
98+
recordNamespace: String): HoodieSchema = {
99+
try {
100+
HoodieSchemaConverters.toHoodieType(structType, nullable = false, structName, recordNamespace)
101+
} catch {
102+
case e: Exception => throw new HoodieSchemaException(
103+
s"Failed to convert struct type to HoodieSchema: $structType", e)
104+
}
105+
}
106+
107+
/**
108+
* Recursively aligns the nullable property of Spark schema fields with HoodieSchema.
109+
*
110+
* @param sourceSchema Source Spark StructType to align
111+
* @param hoodieSchema HoodieSchema to use as source of truth
112+
* @return StructType with aligned nullability
113+
*/
114+
def alignFieldsNullability(sourceSchema: StructType, hoodieSchema: HoodieSchema): StructType = {
115+
val hoodieFieldsMap = hoodieSchema.getFields.asScala.map(f => (f.name(), f)).toMap
116+
117+
val alignedFields = sourceSchema.fields.map { field =>
118+
hoodieFieldsMap.get(field.name) match {
119+
case Some(hoodieField) =>
120+
val alignedField = field.copy(nullable = hoodieField.isNullable)
121+
122+
field.dataType match {
123+
case structType: StructType =>
124+
val nestedSchema = unwrapNullableSchema(hoodieField.schema())
125+
if (nestedSchema.getType == HoodieSchemaType.RECORD) {
126+
alignedField.copy(dataType = alignFieldsNullability(structType, nestedSchema))
127+
} else {
128+
alignedField
129+
}
130+
131+
case ArrayType(elementType, _) =>
132+
val arraySchema = unwrapNullableSchema(hoodieField.schema())
133+
if (arraySchema.getType == HoodieSchemaType.ARRAY) {
134+
val elemSchema = arraySchema.getElementType
135+
val newElementType = updateElementType(elementType, elemSchema)
136+
alignedField.copy(dataType = ArrayType(newElementType, elemSchema.isNullable))
137+
} else {
138+
alignedField
139+
}
140+
141+
case MapType(keyType, valueType, _) =>
142+
val mapSchema = unwrapNullableSchema(hoodieField.schema())
143+
if (mapSchema.getType == HoodieSchemaType.MAP) {
144+
val valueSchema = mapSchema.getValueType
145+
val newValueType = updateElementType(valueType, valueSchema)
146+
alignedField.copy(dataType = MapType(keyType, newValueType, valueSchema.isNullable))
147+
} else {
148+
alignedField
149+
}
150+
151+
case _ => alignedField
152+
}
153+
154+
case None => field.copy()
155+
}
156+
}
157+
158+
StructType(alignedFields)
159+
}
160+
161+
/**
162+
* Unwraps nullable schema (union with null) to get the non-null type.
163+
*/
164+
private def unwrapNullableSchema(schema: HoodieSchema): HoodieSchema = {
165+
if (schema.isNullable) schema.getNonNullType else schema
166+
}
167+
168+
/**
169+
* Recursively updates element types for complex types (arrays, maps, structs).
170+
*/
171+
private def updateElementType(dataType: DataType, hoodieSchema: HoodieSchema): DataType = {
172+
dataType match {
173+
case structType: StructType =>
174+
if (hoodieSchema.getType == HoodieSchemaType.RECORD) {
175+
alignFieldsNullability(structType, hoodieSchema)
176+
} else {
177+
structType
178+
}
179+
180+
case ArrayType(elemType, _) =>
181+
if (hoodieSchema.getType == HoodieSchemaType.ARRAY) {
182+
val elemSchema = hoodieSchema.getElementType
183+
ArrayType(updateElementType(elemType, elemSchema), elemSchema.isNullable)
184+
} else {
185+
dataType
186+
}
187+
188+
case MapType(keyType, valueType, _) =>
189+
if (hoodieSchema.getType == HoodieSchemaType.MAP) {
190+
val valueSchema = hoodieSchema.getValueType
191+
MapType(keyType, updateElementType(valueType, valueSchema), valueSchema.isNullable)
192+
} else {
193+
dataType
194+
}
195+
196+
case _ => dataType
197+
}
198+
}
199+
}

0 commit comments

Comments
 (0)