Skip to content

Commit 138849a

Browse files
danpihoubonanzymap
committed
Fix SST files not being cleaned up in the locations folder (#4555)
* fix entry location compaction * replace entryLocationCompactionEnable with entryLocationCompactionInterval * Add randomCompactionDelay to avoid all the bookies triggering compaction simultaneously * Fix the style issue * Fix the style issue * Fix test --------- Co-authored-by: houbonan <houbonan@didiglobal.com> Co-authored-by: zymap <zhangyong1025.zy@gmail.com> (cherry picked from commit ede1ba9)
1 parent a753ace commit 138849a

File tree

8 files changed

+104
-1
lines changed

8 files changed

+104
-1
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ public interface BookKeeperServerStats {
152152
String THREAD_RUNTIME = "THREAD_RUNTIME";
153153
String MAJOR_COMPACTION_COUNT = "MAJOR_COMPACTION_TOTAL";
154154
String MINOR_COMPACTION_COUNT = "MINOR_COMPACTION_TOTAL";
155+
String ENTRY_LOCATION_COMPACTION_COUNT = "ENTRY_LOCATION_COMPACTION_TOTAL";
155156
String ACTIVE_LEDGER_COUNT = "ACTIVE_LEDGER_TOTAL";
156157
String DELETED_LEDGER_COUNT = "DELETED_LEDGER_TOTAL";
157158

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectionStatus.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ public class GarbageCollectionStatus {
4242

4343
private long lastMajorCompactionTime;
4444
private long lastMinorCompactionTime;
45+
private long lastEntryLocationCompactionTime;
4546
private long majorCompactionCounter;
4647
private long minorCompactionCounter;
48+
private long entryLocationCompactionCounter;
4749
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.concurrent.Executors;
3434
import java.util.concurrent.Future;
3535
import java.util.concurrent.ScheduledExecutorService;
36+
import java.util.concurrent.ThreadLocalRandom;
3637
import java.util.concurrent.TimeUnit;
3738
import java.util.concurrent.atomic.AtomicBoolean;
3839
import java.util.concurrent.atomic.AtomicInteger;
@@ -85,6 +86,10 @@ public class GarbageCollectorThread implements Runnable {
8586
long majorCompactionMaxTimeMillis;
8687
long lastMajorCompactionTime;
8788

89+
final long entryLocationCompactionInterval;
90+
long randomCompactionDelay;
91+
long lastEntryLocationCompactionTime;
92+
8893
@Getter
8994
final boolean isForceGCAllowWhenNoSpace;
9095

@@ -200,6 +205,10 @@ public GarbageCollectorThread(ServerConfiguration conf,
200205
isForceGCAllowWhenNoSpace = conf.getIsForceGCAllowWhenNoSpace();
201206
majorCompactionMaxTimeMillis = conf.getMajorCompactionMaxTimeMillis();
202207
minorCompactionMaxTimeMillis = conf.getMinorCompactionMaxTimeMillis();
208+
entryLocationCompactionInterval = conf.getEntryLocationCompactionInterval() * SECOND;
209+
if (entryLocationCompactionInterval > 0) {
210+
randomCompactionDelay = ThreadLocalRandom.current().nextLong(entryLocationCompactionInterval);
211+
}
203212

204213
boolean isForceAllowCompaction = conf.isForceAllowCompaction();
205214

@@ -266,12 +275,22 @@ public void removeEntryLog(long logToRemove) {
266275
}
267276
}
268277

278+
if (entryLocationCompactionInterval > 0) {
279+
if (entryLocationCompactionInterval < gcWaitTime) {
280+
throw new IOException(
281+
"Too short entry location compaction interval : " + entryLocationCompactionInterval);
282+
}
283+
}
284+
269285
LOG.info("Minor Compaction : enabled=" + enableMinorCompaction + ", threshold="
270286
+ minorCompactionThreshold + ", interval=" + minorCompactionInterval);
271287
LOG.info("Major Compaction : enabled=" + enableMajorCompaction + ", threshold="
272288
+ majorCompactionThreshold + ", interval=" + majorCompactionInterval);
289+
LOG.info("Entry Location Compaction : interval=" + entryLocationCompactionInterval + ", randomCompactionDelay="
290+
+ randomCompactionDelay);
273291

274-
lastMinorCompactionTime = lastMajorCompactionTime = System.currentTimeMillis();
292+
lastMinorCompactionTime = lastMajorCompactionTime =
293+
lastEntryLocationCompactionTime = System.currentTimeMillis();
275294
}
276295

277296
private EntryLogMetadataMap createEntryLogMetadataMap() throws IOException {
@@ -449,6 +468,7 @@ public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMin
449468
gcStats.getMajorCompactionCounter().inc();
450469
majorCompacting.set(false);
451470
}
471+
452472
} else if (((isForceMinorCompactionAllow && force) || (enableMinorCompaction
453473
&& (force || curTime - lastMinorCompactionTime > minorCompactionInterval)))
454474
&& (!suspendMinor)) {
@@ -463,6 +483,20 @@ public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMin
463483
minorCompacting.set(false);
464484
}
465485
}
486+
if (entryLocationCompactionInterval > 0 && (curTime - lastEntryLocationCompactionTime > (
487+
entryLocationCompactionInterval + randomCompactionDelay))) {
488+
// enter entry location compaction
489+
LOG.info(
490+
"Enter entry location compaction, entryLocationCompactionInterval {}, randomCompactionDelay "
491+
+ "{}, lastEntryLocationCompactionTime {}",
492+
entryLocationCompactionInterval, randomCompactionDelay, lastEntryLocationCompactionTime);
493+
ledgerStorage.entryLocationCompact();
494+
lastEntryLocationCompactionTime = System.currentTimeMillis();
495+
randomCompactionDelay = ThreadLocalRandom.current().nextLong(entryLocationCompactionInterval);
496+
LOG.info("Next entry location compaction interval {}",
497+
entryLocationCompactionInterval + randomCompactionDelay);
498+
gcStats.getEntryLocationCompactionCounter().inc();
499+
}
466500
gcStats.getGcThreadRuntime().registerSuccessfulEvent(
467501
MathUtils.nowInNano() - threadStart, TimeUnit.NANOSECONDS);
468502
} catch (EntryLogMetadataMapException e) {
@@ -809,8 +843,10 @@ public GarbageCollectionStatus getGarbageCollectionStatus() {
809843
.minorCompacting(minorCompacting.get())
810844
.lastMajorCompactionTime(lastMajorCompactionTime)
811845
.lastMinorCompactionTime(lastMinorCompactionTime)
846+
.lastEntryLocationCompactionTime(lastEntryLocationCompactionTime)
812847
.majorCompactionCounter(gcStats.getMajorCompactionCounter().get())
813848
.minorCompactionCounter(gcStats.getMinorCompactionCounter().get())
849+
.entryLocationCompactionCounter(gcStats.getEntryLocationCompactionCounter().get())
814850
.build();
815851
}
816852
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/GarbageCollectorStats.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
2626
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER;
2727
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.DELETED_LEDGER_COUNT;
28+
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ENTRY_LOCATION_COMPACTION_COUNT;
2829
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.MAJOR_COMPACTION_COUNT;
2930
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.MINOR_COMPACTION_COUNT;
3031
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.RECLAIMED_COMPACTION_SPACE_BYTES;
@@ -62,6 +63,11 @@ public class GarbageCollectorStats {
6263
help = "Number of major compactions"
6364
)
6465
private final Counter majorCompactionCounter;
66+
@StatsDoc(
67+
name = ENTRY_LOCATION_COMPACTION_COUNT,
68+
help = "Number of entry location compactions"
69+
)
70+
private final Counter entryLocationCompactionCounter;
6571
@StatsDoc(
6672
name = RECLAIMED_DELETION_SPACE_BYTES,
6773
help = "Number of disk space bytes reclaimed via deleting entry log files"
@@ -111,6 +117,7 @@ public GarbageCollectorStats(StatsLogger statsLogger,
111117

112118
this.minorCompactionCounter = statsLogger.getCounter(MINOR_COMPACTION_COUNT);
113119
this.majorCompactionCounter = statsLogger.getCounter(MAJOR_COMPACTION_COUNT);
120+
this.entryLocationCompactionCounter = statsLogger.getCounter(ENTRY_LOCATION_COMPACTION_COUNT);
114121
this.reclaimedSpaceViaCompaction = statsLogger.getCounter(RECLAIMED_COMPACTION_SPACE_BYTES);
115122
this.reclaimedSpaceViaDeletes = statsLogger.getCounter(RECLAIMED_DELETION_SPACE_BYTES);
116123
this.reclaimFailedToDelete = statsLogger.getCounter(RECLAIM_FAILED_TO_DELETE);

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,8 @@ public boolean isMinorGcSuspended() {
313313
public void entryLocationCompact() {
314314
if (entryLocationIndex.isCompacting()) {
315315
// RocksDB already running compact.
316+
log.info("Compacting directory {}, skipping this entryLocationCompaction this time.",
317+
entryLocationIndex.getEntryLocationDBPath());
316318
return;
317319
}
318320
cleanupExecutor.execute(() -> {

bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
105105
protected static final String COMPACTION_RATE = "compactionRate";
106106
protected static final String COMPACTION_RATE_BY_ENTRIES = "compactionRateByEntries";
107107
protected static final String COMPACTION_RATE_BY_BYTES = "compactionRateByBytes";
108+
protected static final String ENTRY_LOCATION_COMPACTION_INTERVAL = "entryLocationCompactionInterval";
108109

109110
// Gc Parameters
110111
protected static final String GC_WAIT_TIME = "gcWaitTime";
@@ -2973,6 +2974,31 @@ public ServerConfiguration setCompactionRateByBytes(int rate) {
29732974
return this;
29742975
}
29752976

2977+
/**
2978+
* Get interval to run entry location compaction, in seconds.
2979+
*
2980+
* <p>If it is set to less than zero, the entry location compaction is disabled.
2981+
*
2982+
* @return high water mark.
2983+
*/
2984+
public long getEntryLocationCompactionInterval() {
2985+
return getLong(ENTRY_LOCATION_COMPACTION_INTERVAL, -1);
2986+
}
2987+
2988+
/**
2989+
* Set interval to run entry location compaction.
2990+
*
2991+
* @see #getMajorCompactionInterval()
2992+
*
2993+
* @param interval
2994+
* Interval to run entry location compaction
2995+
* @return server configuration
2996+
*/
2997+
public ServerConfiguration setEntryLocationCompactionInterval(long interval) {
2998+
setProperty(ENTRY_LOCATION_COMPACTION_INTERVAL, interval);
2999+
return this;
3000+
}
3001+
29763002
/**
29773003
* Should we remove pages from page cache after force write.
29783004
*
@@ -3213,6 +3239,10 @@ public void validate() throws ConfigurationException {
32133239
if (getMajorCompactionInterval() > 0 && getMajorCompactionInterval() * SECOND < getGcWaitTime()) {
32143240
throw new ConfigurationException("majorCompactionInterval should be >= gcWaitTime.");
32153241
}
3242+
if (getEntryLocationCompactionInterval() > 0
3243+
&& getEntryLocationCompactionInterval() * SECOND < getGcWaitTime()) {
3244+
throw new ConfigurationException("entryLocationCompactionInterval should be >= gcWaitTime.");
3245+
}
32163246
}
32173247

32183248
/**

bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestServerConfiguration.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ public void testEntryLogSizeLimit() throws ConfigurationException {
158158
public void testCompactionSettings() throws ConfigurationException {
159159
ServerConfiguration conf = new ServerConfiguration();
160160
long major, minor;
161+
long entryLocationCompactionInterval;
161162

162163
// Default Values
163164
major = conf.getMajorCompactionMaxTimeMillis();
@@ -239,5 +240,24 @@ public void testCompactionSettings() throws ConfigurationException {
239240
minorThreshold = conf.getMinorCompactionThreshold();
240241
Assert.assertEquals(0.6, majorThreshold, 0.00001);
241242
Assert.assertEquals(0.3, minorThreshold, 0.00001);
243+
244+
// Default Values
245+
entryLocationCompactionInterval = conf.getEntryLocationCompactionInterval();
246+
Assert.assertEquals(-1, entryLocationCompactionInterval);
247+
248+
// Set entry location compaction
249+
conf.setEntryLocationCompactionInterval(3600);
250+
entryLocationCompactionInterval = conf.getEntryLocationCompactionInterval();
251+
Assert.assertEquals(3600, entryLocationCompactionInterval);
252+
253+
conf.setEntryLocationCompactionInterval(550);
254+
try {
255+
conf.validate();
256+
fail();
257+
} catch (ConfigurationException ignore) {
258+
}
259+
260+
conf.setEntryLocationCompactionInterval(650);
261+
conf.validate();
242262
}
243263
}

conf/bk_server.conf

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -581,6 +581,11 @@ ledgerDirectories=/tmp/bk-data
581581
# Set the rate at which compaction will readd entries. The unit is bytes added per second.
582582
# compactionRateByBytes=1000000
583583

584+
# Interval to run entry location compaction, in seconds
585+
# If it is set to less than zero, the entry location compaction is disabled.
586+
# Note: should be greater than gcWaitTime.
587+
# entryLocationCompactionInterval=-1
588+
584589
# Flag to enable/disable transactional compaction. If it is set to true, it will use transactional compaction,
585590
# which it will use new entry log files to store compacted entries during compaction; if it is set to false,
586591
# it will use normal compaction, which it shares same entry log file with normal add operations.

0 commit comments

Comments
 (0)