Skip to content

Commit a8bb8b0

Browse files
liviazhuHeartSaVioR
authored andcommitted
[SPARK-53625][SS] Propagate metadata columns through projections to address ApplyCharTypePadding incompatibility
### What changes were proposed in this pull request? Modify streaming MicrobatchExecution to propagate metadata columns through projections to resolve an incompatibility with the ApplyCharTypePadding rule which is applied by default in serverless which previous resulted in an `assertion failed: Invalid batch: ACTV_IND#130290,_metadata#130291 != ACTV_IND#130307` error. ### Why are the changes needed? Bug fix ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #52375 from liviazhu/liviazhu-db/col-metadata. Authored-by: Livia Zhu <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
1 parent 552effc commit a8bb8b0

File tree

2 files changed

+105
-1
lines changed

2 files changed

+105
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -801,6 +801,16 @@ class MicroBatchExecution(
801801
case _ => false
802802
}
803803
val finalDataPlan = dataPlan transformUp {
804+
// SPARK-53625: Propagate metadata columns through Projects
805+
case p: Project if hasFileMetadata =>
806+
// Check if there is any metadata fields not in the output list
807+
val newMetadata = p.metadataOutput.filterNot(p.outputSet.contains)
808+
if (newMetadata.nonEmpty) {
809+
// If so, add it to projection
810+
p.copy(projectList = p.projectList ++ newMetadata)
811+
} else {
812+
p
813+
}
804814
case l: LogicalRelation =>
805815
var newRelation = l
806816
if (hasFileMetadata) {

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.spark.sql.functions._
3131
import org.apache.spark.sql.internal.SQLConf
3232
import org.apache.spark.sql.streaming.Trigger
3333
import org.apache.spark.sql.test.SharedSparkSession
34-
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType}
34+
import org.apache.spark.sql.types.{IntegerType, LongType, MetadataBuilder, StringType, StructField, StructType}
3535

3636
class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
3737

@@ -1133,4 +1133,98 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
11331133
assert(selectSingleRowDf.count() === 1)
11341134
}
11351135
}
1136+
1137+
Seq("true", "false").foreach { sideCharPadding =>
1138+
test(s"SPARK-53625: file metadata in streaming with char type, " +
1139+
s"sideCharPadding=$sideCharPadding") {
1140+
withSQLConf(SQLConf.READ_SIDE_CHAR_PADDING.key -> sideCharPadding) {
1141+
withTempDir { dir =>
1142+
import scala.jdk.CollectionConverters._
1143+
1144+
val metadata = new MetadataBuilder()
1145+
.putString("__CHAR_VARCHAR_TYPE_STRING", "char(1)")
1146+
.build()
1147+
val charSchemaStruct = new StructType()
1148+
.add(StructField("char_col", StringType, metadata = metadata))
1149+
1150+
val data = Seq(Row("A"), Row("B"))
1151+
val df = spark.createDataFrame(data.asJava, charSchemaStruct)
1152+
df.coalesce(1).write.format("json")
1153+
.save(dir.getCanonicalPath + "/source/new-streaming-data")
1154+
1155+
val streamDf = spark.readStream.format("json")
1156+
.schema(charSchemaStruct)
1157+
.load(dir.getCanonicalPath + "/source/new-streaming-data")
1158+
.select("*", "_metadata")
1159+
1160+
val streamQuery0 = streamDf
1161+
.writeStream.format("json")
1162+
.option("checkpointLocation", dir.getCanonicalPath + "/target/checkpoint")
1163+
.trigger(Trigger.AvailableNow())
1164+
.start(dir.getCanonicalPath + "/target/new-streaming-data")
1165+
1166+
streamQuery0.awaitTermination()
1167+
assert(streamQuery0.lastProgress.numInputRows == 2L)
1168+
1169+
val newDF = spark.read.format("json")
1170+
.load(dir.getCanonicalPath + "/target/new-streaming-data")
1171+
1172+
val sourceFile = new File(dir, "/source/new-streaming-data").listFiles()
1173+
.filter(_.getName.endsWith(".json")).head
1174+
val sourceFileMetadata = Map(
1175+
METADATA_FILE_PATH -> sourceFile.toURI.toString,
1176+
METADATA_FILE_NAME -> sourceFile.getName,
1177+
METADATA_FILE_SIZE -> sourceFile.length(),
1178+
METADATA_FILE_BLOCK_START -> 0,
1179+
METADATA_FILE_BLOCK_LENGTH -> sourceFile.length(),
1180+
METADATA_FILE_MODIFICATION_TIME -> new Timestamp(sourceFile.lastModified())
1181+
)
1182+
1183+
// SELECT * will have: char_col, _metadata of /source/new-streaming-data
1184+
assert(newDF.select("*").columns.toSet == Set("char_col", "_metadata"))
1185+
// Verify the data is expected
1186+
checkAnswer(
1187+
newDF.select(col("char_col"),
1188+
col(METADATA_FILE_PATH), col(METADATA_FILE_NAME),
1189+
col(METADATA_FILE_SIZE), col(METADATA_FILE_BLOCK_START),
1190+
col(METADATA_FILE_BLOCK_LENGTH),
1191+
// since we are writing _metadata to a json file,
1192+
// we should explicitly cast the column to timestamp type
1193+
to_timestamp(col(METADATA_FILE_MODIFICATION_TIME))),
1194+
Seq(
1195+
Row(
1196+
"A",
1197+
sourceFileMetadata(METADATA_FILE_PATH),
1198+
sourceFileMetadata(METADATA_FILE_NAME),
1199+
sourceFileMetadata(METADATA_FILE_SIZE),
1200+
sourceFileMetadata(METADATA_FILE_BLOCK_START),
1201+
sourceFileMetadata(METADATA_FILE_BLOCK_LENGTH),
1202+
sourceFileMetadata(METADATA_FILE_MODIFICATION_TIME)),
1203+
Row(
1204+
"B",
1205+
sourceFileMetadata(METADATA_FILE_PATH),
1206+
sourceFileMetadata(METADATA_FILE_NAME),
1207+
sourceFileMetadata(METADATA_FILE_SIZE),
1208+
sourceFileMetadata(METADATA_FILE_BLOCK_START),
1209+
sourceFileMetadata(METADATA_FILE_BLOCK_LENGTH),
1210+
sourceFileMetadata(METADATA_FILE_MODIFICATION_TIME))
1211+
)
1212+
)
1213+
1214+
checkAnswer(
1215+
newDF.where(s"$METADATA_FILE_SIZE > 0").select(METADATA_FILE_SIZE),
1216+
Seq(
1217+
Row(sourceFileMetadata(METADATA_FILE_SIZE)),
1218+
Row(sourceFileMetadata(METADATA_FILE_SIZE)))
1219+
)
1220+
checkAnswer(
1221+
newDF.where(s"$METADATA_FILE_SIZE > 0").select(METADATA_FILE_PATH),
1222+
Seq(
1223+
Row(sourceFileMetadata(METADATA_FILE_PATH)),
1224+
Row(sourceFileMetadata(METADATA_FILE_PATH)))
1225+
)
1226+
}
1227+
}
1228+
}
1229+
}
11361230
}

0 commit comments

Comments
 (0)