Skip to content

Commit fbd33b5

Browse files
danpihoubonanzymap
committed
Fix SST files not being cleaned up in the locations folder (#4555)
* fix entry location compaction * replace entryLocationCompactionEnable with entryLocationCompactionInterval * Add randomCompactionDelay to avoid all the bookies triggering compaction simultaneously * Fix the style issue * Fix the style issue * Fix test --------- Co-authored-by: houbonan <[email protected]> Co-authored-by: zymap <[email protected]> (cherry picked from commit ede1ba9)
1 parent 0ce47aa commit fbd33b5

File tree

8 files changed

+104
-1
lines changed

8 files changed

+104
-1
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ public interface BookKeeperServerStats {
150150
String THREAD_RUNTIME = "THREAD_RUNTIME";
151151
String MAJOR_COMPACTION_COUNT = "MAJOR_COMPACTION_TOTAL";
152152
String MINOR_COMPACTION_COUNT = "MINOR_COMPACTION_TOTAL";
153+
String ENTRY_LOCATION_COMPACTION_COUNT = "ENTRY_LOCATION_COMPACTION_TOTAL";
153154
String ACTIVE_LEDGER_COUNT = "ACTIVE_LEDGER_TOTAL";
154155
String DELETED_LEDGER_COUNT = "DELETED_LEDGER_TOTAL";
155156

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectionStatus.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ public class GarbageCollectionStatus {
4242

4343
private long lastMajorCompactionTime;
4444
private long lastMinorCompactionTime;
45+
private long lastEntryLocationCompactionTime;
4546
private long majorCompactionCounter;
4647
private long minorCompactionCounter;
48+
private long entryLocationCompactionCounter;
4749
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.concurrent.Executors;
3434
import java.util.concurrent.Future;
3535
import java.util.concurrent.ScheduledExecutorService;
36+
import java.util.concurrent.ThreadLocalRandom;
3637
import java.util.concurrent.TimeUnit;
3738
import java.util.concurrent.atomic.AtomicBoolean;
3839
import java.util.concurrent.atomic.AtomicInteger;
@@ -85,6 +86,10 @@ public class GarbageCollectorThread implements Runnable {
8586
long majorCompactionMaxTimeMillis;
8687
long lastMajorCompactionTime;
8788

89+
final long entryLocationCompactionInterval;
90+
long randomCompactionDelay;
91+
long lastEntryLocationCompactionTime;
92+
8893
@Getter
8994
final boolean isForceGCAllowWhenNoSpace;
9095

@@ -196,6 +201,10 @@ public GarbageCollectorThread(ServerConfiguration conf,
196201
isForceGCAllowWhenNoSpace = conf.getIsForceGCAllowWhenNoSpace();
197202
majorCompactionMaxTimeMillis = conf.getMajorCompactionMaxTimeMillis();
198203
minorCompactionMaxTimeMillis = conf.getMinorCompactionMaxTimeMillis();
204+
entryLocationCompactionInterval = conf.getEntryLocationCompactionInterval() * SECOND;
205+
if (entryLocationCompactionInterval > 0) {
206+
randomCompactionDelay = ThreadLocalRandom.current().nextLong(entryLocationCompactionInterval);
207+
}
199208

200209
boolean isForceAllowCompaction = conf.isForceAllowCompaction();
201210

@@ -262,12 +271,22 @@ public void removeEntryLog(long logToRemove) {
262271
}
263272
}
264273

274+
if (entryLocationCompactionInterval > 0) {
275+
if (entryLocationCompactionInterval < gcWaitTime) {
276+
throw new IOException(
277+
"Too short entry location compaction interval : " + entryLocationCompactionInterval);
278+
}
279+
}
280+
265281
LOG.info("Minor Compaction : enabled=" + enableMinorCompaction + ", threshold="
266282
+ minorCompactionThreshold + ", interval=" + minorCompactionInterval);
267283
LOG.info("Major Compaction : enabled=" + enableMajorCompaction + ", threshold="
268284
+ majorCompactionThreshold + ", interval=" + majorCompactionInterval);
285+
LOG.info("Entry Location Compaction : interval=" + entryLocationCompactionInterval + ", randomCompactionDelay="
286+
+ randomCompactionDelay);
269287

270-
lastMinorCompactionTime = lastMajorCompactionTime = System.currentTimeMillis();
288+
lastMinorCompactionTime = lastMajorCompactionTime =
289+
lastEntryLocationCompactionTime = System.currentTimeMillis();
271290
}
272291

273292
private EntryLogMetadataMap createEntryLogMetadataMap() throws IOException {
@@ -445,6 +464,7 @@ public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMin
445464
gcStats.getMajorCompactionCounter().inc();
446465
majorCompacting.set(false);
447466
}
467+
448468
} else if (((isForceMinorCompactionAllow && force) || (enableMinorCompaction
449469
&& (force || curTime - lastMinorCompactionTime > minorCompactionInterval)))
450470
&& (!suspendMinor)) {
@@ -459,6 +479,20 @@ public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMin
459479
minorCompacting.set(false);
460480
}
461481
}
482+
if (entryLocationCompactionInterval > 0 && (curTime - lastEntryLocationCompactionTime > (
483+
entryLocationCompactionInterval + randomCompactionDelay))) {
484+
// enter entry location compaction
485+
LOG.info(
486+
"Enter entry location compaction, entryLocationCompactionInterval {}, randomCompactionDelay "
487+
+ "{}, lastEntryLocationCompactionTime {}",
488+
entryLocationCompactionInterval, randomCompactionDelay, lastEntryLocationCompactionTime);
489+
ledgerStorage.entryLocationCompact();
490+
lastEntryLocationCompactionTime = System.currentTimeMillis();
491+
randomCompactionDelay = ThreadLocalRandom.current().nextLong(entryLocationCompactionInterval);
492+
LOG.info("Next entry location compaction interval {}",
493+
entryLocationCompactionInterval + randomCompactionDelay);
494+
gcStats.getEntryLocationCompactionCounter().inc();
495+
}
462496
gcStats.getGcThreadRuntime().registerSuccessfulEvent(
463497
MathUtils.nowInNano() - threadStart, TimeUnit.NANOSECONDS);
464498
} catch (EntryLogMetadataMapException e) {
@@ -796,8 +830,10 @@ public GarbageCollectionStatus getGarbageCollectionStatus() {
796830
.minorCompacting(minorCompacting.get())
797831
.lastMajorCompactionTime(lastMajorCompactionTime)
798832
.lastMinorCompactionTime(lastMinorCompactionTime)
833+
.lastEntryLocationCompactionTime(lastEntryLocationCompactionTime)
799834
.majorCompactionCounter(gcStats.getMajorCompactionCounter().get())
800835
.minorCompactionCounter(gcStats.getMinorCompactionCounter().get())
836+
.entryLocationCompactionCounter(gcStats.getEntryLocationCompactionCounter().get())
801837
.build();
802838
}
803839
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/stats/GarbageCollectorStats.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
2626
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER;
2727
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.DELETED_LEDGER_COUNT;
28+
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ENTRY_LOCATION_COMPACTION_COUNT;
2829
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.MAJOR_COMPACTION_COUNT;
2930
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.MINOR_COMPACTION_COUNT;
3031
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.RECLAIMED_COMPACTION_SPACE_BYTES;
@@ -61,6 +62,11 @@ public class GarbageCollectorStats {
6162
help = "Number of major compactions"
6263
)
6364
private final Counter majorCompactionCounter;
65+
@StatsDoc(
66+
name = ENTRY_LOCATION_COMPACTION_COUNT,
67+
help = "Number of entry location compactions"
68+
)
69+
private final Counter entryLocationCompactionCounter;
6470
@StatsDoc(
6571
name = RECLAIMED_DELETION_SPACE_BYTES,
6672
help = "Number of disk space bytes reclaimed via deleting entry log files"
@@ -105,6 +111,7 @@ public GarbageCollectorStats(StatsLogger statsLogger,
105111

106112
this.minorCompactionCounter = statsLogger.getCounter(MINOR_COMPACTION_COUNT);
107113
this.majorCompactionCounter = statsLogger.getCounter(MAJOR_COMPACTION_COUNT);
114+
this.entryLocationCompactionCounter = statsLogger.getCounter(ENTRY_LOCATION_COMPACTION_COUNT);
108115
this.reclaimedSpaceViaCompaction = statsLogger.getCounter(RECLAIMED_COMPACTION_SPACE_BYTES);
109116
this.reclaimedSpaceViaDeletes = statsLogger.getCounter(RECLAIMED_DELETION_SPACE_BYTES);
110117
this.gcThreadRuntime = statsLogger.getOpStatsLogger(THREAD_RUNTIME);

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,8 @@ public boolean isMinorGcSuspended() {
313313
public void entryLocationCompact() {
314314
if (entryLocationIndex.isCompacting()) {
315315
// RocksDB already running compact.
316+
log.info("Compacting directory {}, skipping this entryLocationCompaction this time.",
317+
entryLocationIndex.getEntryLocationDBPath());
316318
return;
317319
}
318320
cleanupExecutor.execute(() -> {

bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
105105
protected static final String COMPACTION_RATE = "compactionRate";
106106
protected static final String COMPACTION_RATE_BY_ENTRIES = "compactionRateByEntries";
107107
protected static final String COMPACTION_RATE_BY_BYTES = "compactionRateByBytes";
108+
protected static final String ENTRY_LOCATION_COMPACTION_INTERVAL = "entryLocationCompactionInterval";
108109

109110
// Gc Parameters
110111
protected static final String GC_WAIT_TIME = "gcWaitTime";
@@ -2969,6 +2970,31 @@ public ServerConfiguration setCompactionRateByBytes(int rate) {
29692970
return this;
29702971
}
29712972

2973+
/**
2974+
* Get interval to run entry location compaction, in seconds.
2975+
*
2976+
* <p>If it is set to less than zero, the entry location compaction is disabled.
2977+
*
2978+
* @return high water mark.
2979+
*/
2980+
public long getEntryLocationCompactionInterval() {
2981+
return getLong(ENTRY_LOCATION_COMPACTION_INTERVAL, -1);
2982+
}
2983+
2984+
/**
2985+
* Set interval to run entry location compaction.
2986+
*
2987+
* @see #getMajorCompactionInterval()
2988+
*
2989+
* @param interval
2990+
* Interval to run entry location compaction
2991+
* @return server configuration
2992+
*/
2993+
public ServerConfiguration setEntryLocationCompactionInterval(long interval) {
2994+
setProperty(ENTRY_LOCATION_COMPACTION_INTERVAL, interval);
2995+
return this;
2996+
}
2997+
29722998
/**
29732999
* Should we remove pages from page cache after force write.
29743000
*
@@ -3187,6 +3213,10 @@ public void validate() throws ConfigurationException {
31873213
if (getMajorCompactionInterval() > 0 && getMajorCompactionInterval() * SECOND < getGcWaitTime()) {
31883214
throw new ConfigurationException("majorCompactionInterval should be >= gcWaitTime.");
31893215
}
3216+
if (getEntryLocationCompactionInterval() > 0
3217+
&& getEntryLocationCompactionInterval() * SECOND < getGcWaitTime()) {
3218+
throw new ConfigurationException("entryLocationCompactionInterval should be >= gcWaitTime.");
3219+
}
31903220
}
31913221

31923222
/**

bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestServerConfiguration.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ public void testEntryLogSizeLimit() throws ConfigurationException {
158158
public void testCompactionSettings() throws ConfigurationException {
159159
ServerConfiguration conf = new ServerConfiguration();
160160
long major, minor;
161+
long entryLocationCompactionInterval;
161162

162163
// Default Values
163164
major = conf.getMajorCompactionMaxTimeMillis();
@@ -239,5 +240,24 @@ public void testCompactionSettings() throws ConfigurationException {
239240
minorThreshold = conf.getMinorCompactionThreshold();
240241
Assert.assertEquals(0.6, majorThreshold, 0.00001);
241242
Assert.assertEquals(0.3, minorThreshold, 0.00001);
243+
244+
// Default Values
245+
entryLocationCompactionInterval = conf.getEntryLocationCompactionInterval();
246+
Assert.assertEquals(-1, entryLocationCompactionInterval);
247+
248+
// Set entry location compaction
249+
conf.setEntryLocationCompactionInterval(3600);
250+
entryLocationCompactionInterval = conf.getEntryLocationCompactionInterval();
251+
Assert.assertEquals(3600, entryLocationCompactionInterval);
252+
253+
conf.setEntryLocationCompactionInterval(550);
254+
try {
255+
conf.validate();
256+
fail();
257+
} catch (ConfigurationException ignore) {
258+
}
259+
260+
conf.setEntryLocationCompactionInterval(650);
261+
conf.validate();
242262
}
243263
}

conf/bk_server.conf

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -581,6 +581,11 @@ ledgerDirectories=/tmp/bk-data
581581
# Set the rate at which compaction will readd entries. The unit is bytes added per second.
582582
# compactionRateByBytes=1000000
583583

584+
# Interval to run entry location compaction, in seconds
585+
# If it is set to less than zero, the entry location compaction is disabled.
586+
# Note: should be greater than gcWaitTime.
587+
# entryLocationCompactionInterval=-1
588+
584589
# Flag to enable/disable transactional compaction. If it is set to true, it will use transactional compaction,
585590
# which it will use new entry log files to store compacted entries during compaction; if it is set to false,
586591
# it will use normal compaction, which it shares same entry log file with normal add operations.

0 commit comments

Comments
 (0)