Skip to content

Commit f5c5029

Browse files
committed
feat: Introduce the configuration for forcibly triggering timeline compaction
1. Introduce the configuration for forcibly triggering timeline compaction Signed-off-by: TheR1sing3un <[email protected]> feat: enable timeline compaction by default 1. enable timeline compaction by default Signed-off-by: TheR1sing3un <[email protected]> rerun
1 parent a0d4314 commit f5c5029

File tree

4 files changed

+64
-2
lines changed

4 files changed

+64
-2
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,6 @@ public int archiveIfRequired(HoodieEngineContext context, boolean acquireLock) t
115115
this.timelineWriter.write(instantsToArchive, Option.of(action -> deleteAnyLeftOverMarkers(context, action)), Option.of(exceptionHandler));
116116
log.debug("Deleting archived instants");
117117
deleteArchivedActions(instantsToArchive, context);
118-
// triggers compaction and cleaning only after archiving action
119-
this.timelineWriter.compactAndClean(context);
120118
Supplier<List<HoodieInstant>> archivedInstants = () -> instantsToArchive.stream()
121119
.flatMap(action -> Stream.concat(action.getCompletedInstants().stream(), action.getPendingInstants().stream()))
122120
.collect(Collectors.toList());
@@ -125,6 +123,10 @@ public int archiveIfRequired(HoodieEngineContext context, boolean acquireLock) t
125123
} else {
126124
log.info("No Instants to archive");
127125
}
126+
// run compact and clean if needed even no instants were archived
127+
if (!instantsToArchive.isEmpty() || config.isTimelineCompactionForced()) {
128+
this.timelineWriter.compactAndClean(context);
129+
}
128130
return instantsToArchive.size();
129131
} finally {
130132
if (acquireLock) {

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,14 @@ public class HoodieArchivalConfig extends HoodieConfig {
102102
.withDocumentation("If enabled, archival will proceed beyond savepoint, skipping savepoint commits."
103103
+ " If disabled, archival will stop at the earliest savepoint commit.");
104104

105+
public static final ConfigProperty<Boolean> TIMELINE_COMPACTION_FORCED = ConfigProperty
106+
.key("hoodie.timeline.compaction.forced")
107+
.defaultValue(true)
108+
.markAdvanced()
109+
.withDocumentation("If enabled, timeline compaction will be forced to run during archival of timeline."
110+
+ " This helps in reducing the number of files in the archived timeline, at the cost of"
111+
+ " additional compaction time during archival.");
112+
105113
public static final ConfigProperty<Long> TIMELINE_COMPACTION_TARGET_FILE_MAX_BYTES = ConfigProperty
106114
.key("hoodie.timeline.compaction.target.file.max.bytes")
107115
.defaultValue(1000L * 1024 * 1024)
@@ -198,6 +206,11 @@ public Builder withArchiveBeyondSavepoint(boolean archiveBeyondSavepoint) {
198206
return this;
199207
}
200208

209+
public Builder withTimelineCompactionForced(boolean timelineCompactionForced) {
210+
archivalConfig.setValue(TIMELINE_COMPACTION_FORCED, String.valueOf(timelineCompactionForced));
211+
return this;
212+
}
213+
201214
public HoodieArchivalConfig build() {
202215
archivalConfig.setDefaults(HoodieArchivalConfig.class.getName());
203216
return archivalConfig;

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1916,6 +1916,10 @@ public int getCommitArchivalBatchSize() {
19161916
return getInt(HoodieArchivalConfig.COMMITS_ARCHIVAL_BATCH_SIZE);
19171917
}
19181918

1919+
public boolean isTimelineCompactionForced() {
1920+
return getBoolean(HoodieArchivalConfig.TIMELINE_COMPACTION_FORCED);
1921+
}
1922+
19191923
public Boolean shouldCleanBootstrapBaseFile() {
19201924
return getBoolean(HoodieCleanConfig.CLEANER_BOOTSTRAP_BASE_FILE_ENABLE);
19211925
}

hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -725,6 +725,49 @@ public void testCompactionCleaning() throws Exception {
725725
assertEquals(Arrays.asList(7, 8, 9), LSMTimeline.allSnapshotVersions(metaClient, metaClient.getArchivePath()).stream().sorted().collect(Collectors.toList()));
726726
}
727727

728+
@Test
729+
public void testCompactionForced() throws Exception {
730+
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5, 2, 3);
731+
732+
// do ingestion and trigger archive actions here.
733+
for (int i = 1; i < 21; i++) {
734+
testTable.doWriteOperation(
735+
WriteClientTestUtils.createNewInstantTime(), WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
736+
archiveAndGetCommitsList(writeConfig);
737+
}
738+
739+
// loading archived timeline and active timeline success
740+
HoodieActiveTimeline rawActiveTimeline = TIMELINE_FACTORY.createActiveTimeline(metaClient, false);
741+
HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline();
742+
assertEquals(4 * 3 + 16, rawActiveTimeline.countInstants() + archivedTimeLine.countInstants());
743+
744+
assertEquals(10, LSMTimeline.latestSnapshotVersion(metaClient, metaClient.getArchivePath()));
745+
assertEquals(Arrays.asList(8, 9, 10), LSMTimeline.allSnapshotVersions(metaClient, metaClient.getArchivePath()).stream().sorted().collect(Collectors.toList()));
746+
747+
HoodieLSMTimelineManifest latestSnapshotManifest = LSMTimeline.latestSnapshotManifest(metaClient, metaClient.getArchivePath());
748+
Map<Integer, List<Pair<Integer, HoodieLSMTimelineManifest.LSMFileEntry>>> layeredFiles =
749+
latestSnapshotManifest.getFiles().stream().map(file -> Pair.of(LSMTimeline.getFileLayer(file.getFileName()), file)).collect(Collectors.groupingBy(Pair::getKey));
750+
assertEquals(2, layeredFiles.get(0).size());
751+
assertEquals(2, layeredFiles.get(1).size());
752+
753+
// run archive again without new commits and force-compaction, there will be no new snapshot version created
754+
writeConfig.setValue(HoodieArchivalConfig.TIMELINE_COMPACTION_FORCED.key(), "false");
755+
writeConfig.setValue(HoodieArchivalConfig.TIMELINE_COMPACTION_BATCH_SIZE.key(), "2");
756+
archiveAndGetCommitsList(writeConfig, true);
757+
assertEquals(10, LSMTimeline.latestSnapshotVersion(metaClient, metaClient.getArchivePath()));
758+
759+
// run archive again without new commits but with force-compaction and there will be new snapshot version created
760+
writeConfig.setValue(HoodieArchivalConfig.TIMELINE_COMPACTION_FORCED.key(), "true");
761+
archiveAndGetCommitsList(writeConfig, true);
762+
assertEquals(12, LSMTimeline.latestSnapshotVersion(metaClient, metaClient.getArchivePath()));
763+
latestSnapshotManifest = LSMTimeline.latestSnapshotManifest(metaClient, metaClient.getArchivePath());
764+
layeredFiles = latestSnapshotManifest.getFiles().stream().map(file -> Pair.of(LSMTimeline.getFileLayer(file.getFileName()), file)).collect(Collectors.groupingBy(Pair::getKey));
765+
assertFalse(layeredFiles.containsKey(0));
766+
assertEquals(1, layeredFiles.get(1).size());
767+
assertEquals(1, layeredFiles.get(2).size());
768+
769+
}
770+
728771
@Test
729772
public void testCompactionWithLargeL0File() throws Exception {
730773
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5, 2, 3);

0 commit comments

Comments
 (0)