Skip to content

Commit a0d4314

Browse files
authored
feat: Support splitting tasks based on file size when reading the cow table (#17730)
1. Support splitting tasks based on file size when reading the cow table Signed-off-by: TheR1sing3un <[email protected]>
1 parent 1a91070 commit a0d4314

File tree

2 files changed

+58
-1
lines changed

2 files changed

+58
-1
lines changed

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,17 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String,
188188

189189
override def isSplitable(sparkSession: SparkSession,
190190
options: Map[String, String],
191-
path: Path): Boolean = false
191+
path: Path): Boolean = {
192+
// NOTE: When we have and only the base file that needs to be read with normal reading mode,
193+
// we can consider the current format to be equivalent to `org.apache.spark.sql.execution.datasources.parquet.ParquetFormat`.
194+
// Naturally, we can maintain the same `isSplitable` logic as the upper-level format.
195+
// This will enable us to take advantage of spark's file splitting capability.
196+
// For overly large single files, we can use multiple concurrent tasks to read them, thereby reducing the overall job reading time consumption
197+
val superSplitable = super.isSplitable(sparkSession, options, path)
198+
val splitable = !isMOR && !isIncremental && !isBootstrap && superSplitable
199+
logInfo(s"isSplitable: $splitable, super.isSplitable: $superSplitable, isMOR: $isMOR, isIncremental: $isIncremental, isBootstrap: $isBootstrap")
200+
splitable
201+
}
192202

193203
override def buildReaderWithPartitionValues(spark: SparkSession,
194204
dataStructType: StructType,

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -911,5 +911,52 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
911911
}
912912
}
913913

914+
test("Test Query CoW table with splitable file format") {
915+
withTable(generateTableName) { tableName =>
916+
spark.sql(
917+
s"""
918+
|create table $tableName (
919+
| id int,
920+
| name string,
921+
| price double,
922+
| ts long,
923+
| dt string
924+
|) using hudi
925+
| tblproperties (
926+
| type = 'cow'
927+
| )
928+
| partitioned by (dt)
929+
""".stripMargin
930+
)
931+
932+
withSQLConf("hoodie.datasource.overwrite.mode" -> "dynamic") {
933+
spark.sql(
934+
s"""
935+
| insert overwrite table $tableName partition(dt) values
936+
| (0, 'a0', 10, 1000, '2023-12-06'),
937+
| (1, 'a1', 10, 1000, '2023-12-06'),
938+
| (2, 'a2', 11, 1000, '2023-12-06'),
939+
| (3, 'a3', 10, 1000, '2023-12-06')
940+
""".stripMargin)
941+
checkAnswer(s"select id, name, price, ts, dt from $tableName")(
942+
Seq(0, "a0", 10.0, 1000, "2023-12-06"),
943+
Seq(1, "a1", 10.0, 1000, "2023-12-06"),
944+
Seq(2, "a2", 11.0, 1000, "2023-12-06"),
945+
Seq(3, "a3", 10.0, 1000, "2023-12-06")
946+
)
947+
}
948+
949+
// force split file by setting small
950+
withSQLConf(s"${SQLConf.FILES_MAX_PARTITION_BYTES.key}" -> "10240") {
951+
checkAnswer(s"select id, name, price, ts, dt from $tableName")(
952+
Seq(0, "a0", 10.0, 1000, "2023-12-06"),
953+
Seq(1, "a1", 10.0, 1000, "2023-12-06"),
954+
Seq(2, "a2", 11.0, 1000, "2023-12-06"),
955+
Seq(3, "a3", 10.0, 1000, "2023-12-06")
956+
)
957+
}
958+
}
959+
}
960+
914961
}
915962

0 commit comments

Comments
 (0)