@@ -24,6 +24,7 @@ import org.apache.paimon.spark.PaimonSparkTestBase
2424import org .apache .paimon .table .FileStoreTable
2525import org .apache .paimon .table .source .DataSplit
2626
27+ import org .apache .spark .scheduler .{SparkListener , SparkListenerJobEnd , SparkListenerStageCompleted , SparkListenerStageSubmitted }
2728import org .apache .spark .sql .{Dataset , Row }
2829import org .apache .spark .sql .execution .streaming .MemoryStream
2930import org .apache .spark .sql .streaming .StreamTest
@@ -648,6 +649,87 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT
648649 }
649650 }
650651
652+ test(" Paimon Procedure: test aware-bucket compaction read parallelism" ) {
653+ spark.sql(s """
654+ |CREATE TABLE T (id INT, value STRING)
655+ |TBLPROPERTIES ('primary-key'='id', 'bucket'='3', 'write-only'='true')
656+ | """ .stripMargin)
657+
658+ val table = loadTable(" T" )
659+ for (i <- 1 to 10 ) {
660+ sql(s " INSERT INTO T VALUES ( $i, ' $i') " )
661+ }
662+ assertResult(10 )(table.snapshotManager().snapshotCount())
663+
664+ val buckets = table.newSnapshotReader().bucketEntries().asScala.map(_.bucket()).distinct.size
665+ assertResult(3 )(buckets)
666+
667+ val taskBuffer = scala.collection.mutable.ListBuffer .empty[Int ]
668+ val listener = new SparkListener {
669+ override def onStageSubmitted (stageSubmitted : SparkListenerStageSubmitted ): Unit = {
670+ taskBuffer += stageSubmitted.stageInfo.numTasks
671+ }
672+ }
673+
674+ try {
675+ spark.sparkContext.addSparkListener(listener)
676+
677+ // spark.default.parallelism cannot be change in spark session
678+ // sparkParallelism is 2, bucket is 3, use 2 as the read parallelism
679+ spark.conf.set(" spark.sql.shuffle.partitions" , 2 )
680+ spark.sql(" CALL sys.compact(table => 'T')" )
681+
682+ // sparkParallelism is 5, bucket is 3, use 3 as the read parallelism
683+ spark.conf.set(" spark.sql.shuffle.partitions" , 5 )
684+ spark.sql(" CALL sys.compact(table => 'T')" )
685+
686+ assertResult(Seq (2 , 3 ))(taskBuffer)
687+ } finally {
688+ spark.sparkContext.removeSparkListener(listener)
689+ }
690+ }
691+
692+ test(" Paimon Procedure: test unaware-bucket compaction read parallelism" ) {
693+ spark.sql(s """
694+ |CREATE TABLE T (id INT, value STRING)
695+ |TBLPROPERTIES ('bucket'='-1', 'write-only'='true')
696+ | """ .stripMargin)
697+
698+ val table = loadTable(" T" )
699+ for (i <- 1 to 12 ) {
700+ sql(s " INSERT INTO T VALUES ( $i, ' $i') " )
701+ }
702+ assertResult(12 )(table.snapshotManager().snapshotCount())
703+
704+ val buckets = table.newSnapshotReader().bucketEntries().asScala.map(_.bucket()).distinct.size
705+ // only has bucket-0
706+ assertResult(1 )(buckets)
707+
708+ val taskBuffer = scala.collection.mutable.ListBuffer .empty[Int ]
709+ val listener = new SparkListener {
710+ override def onStageSubmitted (stageSubmitted : SparkListenerStageSubmitted ): Unit = {
711+ taskBuffer += stageSubmitted.stageInfo.numTasks
712+ }
713+ }
714+
715+ try {
716+ spark.sparkContext.addSparkListener(listener)
717+
718+ // spark.default.parallelism cannot be change in spark session
719+ // sparkParallelism is 2, task groups is 6, use 2 as the read parallelism
720+ spark.conf.set(" spark.sql.shuffle.partitions" , 2 )
721+ spark.sql(" CALL sys.compact(table => 'T', options => 'compaction.max.file-num=2')" )
722+
723+ // sparkParallelism is 5, task groups is 3, use 3 as the read parallelism
724+ spark.conf.set(" spark.sql.shuffle.partitions" , 5 )
725+ spark.sql(" CALL sys.compact(table => 'T', options => 'compaction.max.file-num=2')" )
726+
727+ assertResult(Seq (2 , 3 ))(taskBuffer)
728+ } finally {
729+ spark.sparkContext.removeSparkListener(listener)
730+ }
731+ }
732+
651733 def lastSnapshotCommand (table : FileStoreTable ): CommitKind = {
652734 table.snapshotManager().latestSnapshot().commitKind()
653735 }
0 commit comments