Skip to content

Commit cc8ae31

Browse files
committed
change for spark
1 parent acbf764 commit cc8ae31

File tree

1 file changed

+71
-44
lines changed

1 file changed

+71
-44
lines changed

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java

Lines changed: 71 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -541,11 +541,15 @@ private void clusterIncrementalUnAwareBucketTable(
541541
DataSourceV2Relation relation) {
542542
IncrementalClusterManager incrementalClusterManager =
543543
new IncrementalClusterManager(table, partitionPredicate);
544-
Map<BinaryRow, CompactUnit> compactUnits =
544+
checkArgument(
545+
table.bucketMode() != BucketMode.HASH_FIXED,
546+
"Clustering for bucketed table is not supported for spark currently.");
547+
Map<BinaryRow, Map<Integer, CompactUnit>> compactUnits =
545548
incrementalClusterManager.createCompactUnits(fullCompaction);
546549

547-
Map<BinaryRow, Pair<List<DataSplit>, CommitMessage>> partitionSplits =
548-
incrementalClusterManager.toSplitsAndRewriteDvFiles(compactUnits);
550+
Map<BinaryRow, Map<Integer, Pair<List<DataSplit>, CommitMessage>>> partitionSplits =
551+
incrementalClusterManager.toSplitsAndRewriteDvFiles(
552+
compactUnits, table.bucketMode());
549553

550554
// sort in partition
551555
TableSorter sorter =
@@ -560,7 +564,13 @@ private void clusterIncrementalUnAwareBucketTable(
560564

561565
Dataset<Row> datasetForWrite =
562566
partitionSplits.values().stream()
563-
.map(Pair::getKey)
567+
.map(
568+
bucketEntry -> {
569+
checkArgument(
570+
bucketEntry.size() == 1,
571+
"Unaware-bucket table should only have one bucket.");
572+
return bucketEntry.values().iterator().next().getLeft();
573+
})
564574
.map(
565575
splits -> {
566576
Dataset<Row> dataset =
@@ -583,53 +593,70 @@ private void clusterIncrementalUnAwareBucketTable(
583593
}
584594

585595
// re-organize the commit messages to generate the compact messages
586-
Map<BinaryRow, List<DataFileMeta>> partitionClustered = new HashMap<>();
596+
Map<BinaryRow, Map<Integer, List<DataFileMeta>>> partitionClustered = new HashMap<>();
587597
for (CommitMessage commitMessage : JavaConverters.seqAsJavaList(commitMessages)) {
588-
checkArgument(commitMessage.bucket() == 0);
589-
partitionClustered
590-
.computeIfAbsent(commitMessage.partition(), k -> new ArrayList<>())
598+
BinaryRow partition = commitMessage.partition();
599+
int bucket = commitMessage.bucket();
600+
checkArgument(bucket == 0);
601+
602+
Map<Integer, List<DataFileMeta>> bucketFiles = partitionClustered.get(partition);
603+
if (bucketFiles == null) {
604+
bucketFiles = new HashMap<>();
605+
partitionClustered.put(partition.copy(), bucketFiles);
606+
}
607+
bucketFiles
608+
.computeIfAbsent(bucket, file -> new ArrayList<>())
591609
.addAll(((CommitMessageImpl) commitMessage).newFilesIncrement().newFiles());
610+
partitionClustered.put(partition, bucketFiles);
592611
}
593612

594613
List<CommitMessage> clusterMessages = new ArrayList<>();
595-
for (Map.Entry<BinaryRow, List<DataFileMeta>> entry : partitionClustered.entrySet()) {
614+
for (Map.Entry<BinaryRow, Map<Integer, List<DataFileMeta>>> entry :
615+
partitionClustered.entrySet()) {
596616
BinaryRow partition = entry.getKey();
597-
CommitMessageImpl dvCommitMessage =
598-
(CommitMessageImpl) partitionSplits.get(partition).getValue();
599-
List<DataFileMeta> clusterBefore = compactUnits.get(partition).files();
600-
// upgrade the clustered file to outputLevel
601-
List<DataFileMeta> clusterAfter =
602-
IncrementalClusterManager.upgrade(
603-
entry.getValue(), compactUnits.get(partition).outputLevel());
604-
LOG.info(
605-
"Partition {}: upgrade file level to {}",
606-
partition,
607-
compactUnits.get(partition).outputLevel());
608-
609-
List<IndexFileMeta> newIndexFiles = new ArrayList<>();
610-
List<IndexFileMeta> deletedIndexFiles = new ArrayList<>();
611-
if (dvCommitMessage != null) {
612-
newIndexFiles = dvCommitMessage.compactIncrement().newIndexFiles();
613-
deletedIndexFiles = dvCommitMessage.compactIncrement().deletedIndexFiles();
614-
}
617+
Map<Integer, List<DataFileMeta>> bucketEntries = entry.getValue();
618+
Map<Integer, Pair<List<DataSplit>, CommitMessage>> bucketSplits =
619+
partitionSplits.get(partition);
620+
Map<Integer, CompactUnit> bucketUnits = compactUnits.get(partition);
621+
for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
622+
bucketEntries.entrySet()) {
623+
int bucket = bucketEntry.getKey();
624+
CommitMessageImpl dvCommitMessage =
625+
(CommitMessageImpl) bucketSplits.get(bucket).getValue();
626+
List<DataFileMeta> clusterBefore = bucketUnits.get(bucket).files();
627+
// upgrade the clustered file to outputLevel
628+
List<DataFileMeta> clusterAfter =
629+
IncrementalClusterManager.upgrade(
630+
bucketEntry.getValue(), bucketUnits.get(bucket).outputLevel());
631+
LOG.info(
632+
"Partition {}, bucket {}: upgrade file level to {}",
633+
partition,
634+
bucket,
635+
bucketUnits.get(bucket).outputLevel());
636+
637+
List<IndexFileMeta> newIndexFiles = new ArrayList<>();
638+
List<IndexFileMeta> deletedIndexFiles = new ArrayList<>();
639+
if (dvCommitMessage != null) {
640+
newIndexFiles = dvCommitMessage.compactIncrement().newIndexFiles();
641+
deletedIndexFiles = dvCommitMessage.compactIncrement().deletedIndexFiles();
642+
}
615643

616-
// get the dv index messages
617-
CompactIncrement compactIncrement =
618-
new CompactIncrement(
619-
clusterBefore,
620-
clusterAfter,
621-
Collections.emptyList(),
622-
newIndexFiles,
623-
deletedIndexFiles);
624-
clusterMessages.add(
625-
new CommitMessageImpl(
626-
partition,
627-
// bucket 0 is bucket for unaware-bucket table
628-
// for compatibility with the old design
629-
0,
630-
table.coreOptions().bucket(),
631-
DataIncrement.emptyIncrement(),
632-
compactIncrement));
644+
// get the dv index messages
645+
CompactIncrement compactIncrement =
646+
new CompactIncrement(
647+
clusterBefore,
648+
clusterAfter,
649+
Collections.emptyList(),
650+
newIndexFiles,
651+
deletedIndexFiles);
652+
clusterMessages.add(
653+
new CommitMessageImpl(
654+
partition,
655+
bucket,
656+
table.coreOptions().bucket(),
657+
DataIncrement.emptyIncrement(),
658+
compactIncrement));
659+
}
633660
}
634661
if (LOG.isDebugEnabled()) {
635662
LOG.debug("Commit messages after reorganizing:{}", clusterMessages);

0 commit comments

Comments
 (0)