Skip to content

Commit d53e74e

Browse files
baibaichenCopilot
andcommitted
[GLUTEN-11550][VL][UT] Enable Variant test suites
Enable GlutenVariantEndToEndSuite, GlutenVariantShreddingSuite, and GlutenParquetVariantShreddingSuite for both spark40 and spark41. Fixes: 1. VeloxValidatorApi: Detect variant shredded structs (produced by Spark's PushVariantIntoScan) by checking __VARIANT_METADATA_KEY metadata. Triggers fallback to Spark's native Parquet reader. 2. Spark41Shims: Detect Parquet variant logical type annotations and fall back to vanilla Spark when PARQUET_IGNORE_VARIANT_ANNOTATION is not set, since Velox native reader does not check variant annotations. 3. pom.xml: Add -Dfile.encoding=UTF-8 to test JVM args. On JDK 17 and earlier, java.nio.charset.Charset.defaultCharset() is determined by the OS locale. On CI containers (centos-8/9) where LANG=C, the default charset is US-ASCII (ANSI_X3.4-1968). JDK 18+ changed this via JEP 400 (https://openjdk.org/jeps/400) to always default to UTF-8 regardless of locale. Spark's VariantUtil.getString() uses new String(byte[], offset, length) without specifying charset, which decodes using the JVM default charset. With JDK 17 + LANG=C, UTF-8 encoded multi-byte characters (e.g. Chinese) are decoded as ASCII, producing garbled output. Call chain: VariantEndToEndSuite.check("\"你好,世界...\"") -> to_json(parse_json(col("v"))) -> StructsToJsonEvaluator.evaluate() -> JacksonGenerator.write(VariantVal) -> VariantVal.toJson() -> Variant.toJsonImpl() -> VariantUtil.getString(byte[], pos) -> new String(value, start, length) // no charset specified https://github.com/apache/spark/blob/v4.0.1/common/variant/src/main/java/org/apache/spark/types/variant/VariantUtil.java#L508 https://github.com/apache/spark/blob/v4.1.0/common/variant/src/main/java/org/apache/spark/types/variant/VariantUtil.java#L509 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent ff89faf commit d53e74e

File tree

8 files changed

+84
-6
lines changed

8 files changed

+84
-6
lines changed

backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,14 @@ object VeloxBackendSettings extends BackendSettingsApi {
208208
return None
209209
}
210210
val fileLimit = GlutenConfig.get.parquetMetadataFallbackFileLimit
211+
// Variant annotation check is always performed (not gated by
212+
// parquetMetadataValidationEnabled) because it is a correctness issue.
213+
val variantAnnotationResult =
214+
ParquetMetadataUtils.validateVariantAnnotation(rootPaths, hadoopConf, fileLimit)
215+
if (variantAnnotationResult.isDefined) {
216+
return variantAnnotationResult.map(
217+
reason => s"Detected unsupported metadata in parquet files: $reason")
218+
}
211219
val parquetOptions = new ParquetOptions(CaseInsensitiveMap(properties), SQLConf.get)
212220
val parquetMetadataValidationResult =
213221
ParquetMetadataUtils.validateMetadata(rootPaths, hadoopConf, parquetOptions, fileLimit)

backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,15 @@ object VeloxValidatorApi {
121121
case map: MapType =>
122122
validateSchema(map.keyType).orElse(validateSchema(map.valueType))
123123
case struct: StructType =>
124+
// Detect variant shredded struct produced by Spark's PushVariantIntoScan.
125+
// These structs have all fields annotated with __VARIANT_METADATA_KEY metadata.
126+
// Velox cannot read the variant shredding encoding in Parquet files.
127+
if (
128+
struct.fields.nonEmpty &&
129+
struct.fields.forall(_.metadata.contains("__VARIANT_METADATA_KEY"))
130+
) {
131+
return Some(s"Variant shredded struct is not supported: $struct")
132+
}
124133
struct.foreach {
125134
field =>
126135
val reason = validateSchema(field.dataType)

backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,48 @@ object ParquetMetadataUtils extends Logging {
162162
None
163163
}
164164

165+
/**
166+
* Checks whether Parquet files contain variant logical type annotations that require fallback to
167+
* vanilla Spark. This check is always performed (not gated by parquetMetadataValidationEnabled)
168+
* because it is a correctness issue: Velox native reader does not check variant annotations.
169+
*/
170+
def validateVariantAnnotation(
171+
rootPaths: Seq[String],
172+
hadoopConf: Configuration,
173+
fileLimit: Int
174+
): Option[String] = {
175+
rootPaths.foreach {
176+
rootPath =>
177+
val fs = new Path(rootPath).getFileSystem(hadoopConf)
178+
try {
179+
val filesIterator = fs.listFiles(new Path(rootPath), true)
180+
var checkedFileCount = 0
181+
while (filesIterator.hasNext && checkedFileCount < fileLimit) {
182+
val fileStatus = filesIterator.next()
183+
checkedFileCount += 1
184+
try {
185+
val footer = ParquetFooterReaderShim.readFooter(
186+
hadoopConf,
187+
fileStatus,
188+
ParquetMetadataConverter.NO_FILTER)
189+
if (
190+
SparkShimLoader.getSparkShims
191+
.shouldFallbackForParquetVariantAnnotation(footer)
192+
) {
193+
return Some("Variant annotation detected in Parquet file.")
194+
}
195+
} catch {
196+
case _: Exception => // ignore
197+
}
198+
}
199+
} catch {
200+
case e: Exception =>
201+
logWarning("Catch exception when checking variant annotation", e)
202+
}
203+
}
204+
None
205+
}
206+
165207
private def isTimezoneFoundInMetadata(
166208
footer: ParquetMetadata,
167209
parquetOptions: ParquetOptions): Option[String] = {

gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -853,8 +853,8 @@ class VeloxTestSettings extends BackendTestSettings {
853853
enableSuite[GlutenUDTRegistrationSuite]
854854
enableSuite[GlutenUnsafeRowSuite]
855855
enableSuite[GlutenUserDefinedTypeSuite]
856-
// TODO: 4.x enableSuite[GlutenVariantEndToEndSuite] // 3 failures
857-
// TODO: 4.x enableSuite[GlutenVariantShreddingSuite] // 8 failures
856+
enableSuite[GlutenVariantEndToEndSuite]
857+
enableSuite[GlutenVariantShreddingSuite]
858858
enableSuite[GlutenVariantSuite]
859859
enableSuite[GlutenVariantWriteShreddingSuite]
860860
// TODO: 4.x enableSuite[GlutenXmlFunctionsSuite] // 10 failures

gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,7 @@ class VeloxTestSettings extends BackendTestSettings {
397397
.exclude("parquet widening conversion ShortType -> DecimalType(20,0)")
398398
.exclude("parquet widening conversion ShortType -> DecimalType(38,0)")
399399
.exclude("parquet widening conversion ShortType -> DoubleType")
400-
// TODO: 4.x enableSuite[GlutenParquetVariantShreddingSuite] // 1 failure
400+
enableSuite[GlutenParquetVariantShreddingSuite]
401401
// Generated suites for org.apache.spark.sql.execution.datasources.text
402402
// TODO: 4.x enableSuite[GlutenWholeTextFileV1Suite] // 1 failure
403403
// TODO: 4.x enableSuite[GlutenWholeTextFileV2Suite] // 1 failure
@@ -818,8 +818,8 @@ class VeloxTestSettings extends BackendTestSettings {
818818
enableSuite[GlutenUDTRegistrationSuite]
819819
enableSuite[GlutenUnsafeRowSuite]
820820
enableSuite[GlutenUserDefinedTypeSuite]
821-
// TODO: 4.x enableSuite[GlutenVariantEndToEndSuite] // 3 failures
822-
// TODO: 4.x enableSuite[GlutenVariantShreddingSuite] // 8 failures
821+
enableSuite[GlutenVariantEndToEndSuite]
822+
enableSuite[GlutenVariantShreddingSuite]
823823
enableSuite[GlutenVariantSuite]
824824
enableSuite[GlutenVariantWriteShreddingSuite]
825825
// TODO: 4.x enableSuite[GlutenXmlFunctionsSuite] // 10 failures

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@
167167
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED
168168
-Djdk.reflect.useDirectMethodHandle=false
169169
-Dio.netty.tryReflectionSetAccessible=true
170+
-Dfile.encoding=UTF-8
170171
</extraJavaTestArgs>
171172
<log4j.conf>file:src/test/resources/log4j2.properties</log4j.conf>
172173
</properties>

shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,8 @@ trait SparkShims {
237237

238238
def isParquetFileEncrypted(footer: ParquetMetadata): Boolean
239239

240+
def shouldFallbackForParquetVariantAnnotation(footer: ParquetMetadata): Boolean = false
241+
240242
def getOtherConstantMetadataColumnValues(file: PartitionedFile): JMap[String, Object] =
241243
Map.empty[String, Any].asJava.asInstanceOf[JMap[String, Object]]
242244

shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ import org.apache.spark.sql.types._
5353
import org.apache.hadoop.fs.{FileStatus, Path}
5454
import org.apache.parquet.hadoop.metadata.{CompressionCodecName, ParquetMetadata}
5555
import org.apache.parquet.hadoop.metadata.FileMetaData.EncryptionType
56-
import org.apache.parquet.schema.MessageType
56+
import org.apache.parquet.schema.{GroupType, LogicalTypeAnnotation, MessageType}
5757

5858
import java.time.ZoneOffset
5959
import java.util.{Map => JMap}
@@ -571,6 +571,22 @@ class Spark41Shims extends SparkShims {
571571
}
572572
}
573573

574+
override def shouldFallbackForParquetVariantAnnotation(footer: ParquetMetadata): Boolean = {
575+
if (SQLConf.get.getConf(SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION)) {
576+
return false
577+
}
578+
containsVariantAnnotation(footer.getFileMetaData.getSchema)
579+
}
580+
581+
private def containsVariantAnnotation(groupType: GroupType): Boolean = {
582+
groupType.getFields.asScala.exists {
583+
field =>
584+
Option(field.getLogicalTypeAnnotation)
585+
.exists(_.isInstanceOf[LogicalTypeAnnotation.VariantLogicalTypeAnnotation]) ||
586+
(!field.isPrimitive && containsVariantAnnotation(field.asGroupType()))
587+
}
588+
}
589+
574590
override def getOtherConstantMetadataColumnValues(file: PartitionedFile): JMap[String, Object] =
575591
file.otherConstantMetadataColumnValues.asJava.asInstanceOf[JMap[String, Object]]
576592

0 commit comments

Comments
 (0)