Skip to content

Commit 442e3bb

Browse files
authored
Fix the infinite waiting for shutdown due to throttler limit (#2942)
Descriptions of the changes in this PR: ### Motivation If the compactor is limited, the shutdown priority should be higher than waiting for RateLimiter.acquire. ### Changes According to @hangc0276 suggestion, when processing the shutdown logic of `GarbageCollectorThread`, we should check the status of the `newScanner.process` method. If the status is false, throw an `IOException` and stop compact immediately. Master Issue: #2941
1 parent eef3447 commit 442e3bb

File tree

3 files changed

+88
-2
lines changed

3 files changed

+88
-2
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323

2424
import com.google.common.util.concurrent.RateLimiter;
2525

26+
import java.io.IOException;
27+
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.atomic.AtomicBoolean;
2629
import org.apache.bookkeeper.conf.ServerConfiguration;
2730

2831
/**
@@ -66,6 +69,7 @@ public void cleanUpAndRecover() {}
6669
public static class Throttler {
6770
private final RateLimiter rateLimiter;
6871
private final boolean isThrottleByBytes;
72+
private final AtomicBoolean cancelled = new AtomicBoolean(false);
6973

7074
Throttler(ServerConfiguration conf) {
7175
this.isThrottleByBytes = conf.getIsThrottleByBytes();
@@ -74,8 +78,32 @@ public static class Throttler {
7478
}
7579

7680
// acquire. if bybytes: bytes of this entry; if byentries: 1.
77-
public void acquire(int permits) {
78-
rateLimiter.acquire(this.isThrottleByBytes ? permits : 1);
81+
boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
82+
return rateLimiter.tryAcquire(this.isThrottleByBytes ? permits : 1, timeout, unit);
83+
}
84+
85+
// GC thread will check the status for the rate limiter
86+
// If the compactor is being stopped by other threads,
87+
// and the GC thread is still limited, the compact task will be stopped.
88+
public void acquire(int permits) throws IOException {
89+
long timeout = 100;
90+
long start = System.currentTimeMillis();
91+
while (!tryAcquire(permits, timeout, TimeUnit.MILLISECONDS)) {
92+
if (cancelled.get()) {
93+
throw new IOException("Failed to get permits takes "
94+
+ (System.currentTimeMillis() - start)
95+
+ " ms may be compactor has been shutting down");
96+
}
97+
try {
98+
TimeUnit.MILLISECONDS.sleep(timeout);
99+
} catch (InterruptedException e) {
100+
// ignore
101+
}
102+
}
103+
}
104+
105+
public void cancelledAcquire() {
106+
cancelled.set(true);
79107
}
80108
}
81109

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -653,6 +653,8 @@ public synchronized void shutdown() throws InterruptedException {
653653
}
654654
LOG.info("Shutting down GarbageCollectorThread");
655655

656+
throttler.cancelledAcquire();
657+
compactor.throttler.cancelledAcquire();
656658
while (!compacting.compareAndSet(false, true)) {
657659
// Wait till the thread stops compacting
658660
Thread.sleep(100);

bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import java.util.concurrent.TimeUnit;
5454
import java.util.concurrent.atomic.AtomicBoolean;
5555
import java.util.concurrent.atomic.AtomicInteger;
56+
import java.util.function.Supplier;
5657

5758
import org.apache.bookkeeper.bookie.BookieException.EntryLogMetadataMapException;
5859
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
@@ -1390,6 +1391,61 @@ public void checkpointComplete(CheckpointSource.Checkpoint checkpoint, boolean c
13901391
storage.getEntry(1, 1); // entry should exist
13911392
}
13921393

1394+
@Test
1395+
public void testCancelledCompactionWhenShuttingDown() throws Exception {
1396+
// prepare data
1397+
LedgerHandle[] lhs = prepareData(3, false);
1398+
1399+
// change compaction in low throughput
1400+
// restart bookies
1401+
restartBookies(c -> {
1402+
c.setIsThrottleByBytes(true);
1403+
c.setCompactionRateByBytes(ENTRY_SIZE / 1000);
1404+
c.setMinorCompactionThreshold(0.2f);
1405+
c.setMajorCompactionThreshold(0.5f);
1406+
return c;
1407+
});
1408+
1409+
// remove ledger2 and ledger3
1410+
// so entry log 1 and 2 would have ledger1 entries left
1411+
bkc.deleteLedger(lhs[1].getId());
1412+
bkc.deleteLedger(lhs[2].getId());
1413+
LOG.info("Finished deleting the ledgers contains most entries.");
1414+
1415+
getGCThread().triggerGC(true, false, false);
1416+
getGCThread().throttler.cancelledAcquire();
1417+
waitUntilTrue(() -> {
1418+
try {
1419+
return getGCThread().compacting.get();
1420+
} catch (Exception e) {
1421+
fail("Get GC thread failed");
1422+
}
1423+
return null;
1424+
}, () -> "Not attempting to complete", 10000, 200);
1425+
1426+
getGCThread().shutdown();
1427+
// after garbage collection shutdown, compaction should be cancelled when acquire permits
1428+
// and GC running flag should be false.
1429+
assertFalse(getGCThread().running);
1430+
1431+
}
1432+
1433+
private void waitUntilTrue(Supplier<Boolean> condition,
1434+
Supplier<String> msg,
1435+
long waitTime,
1436+
long pause) throws InterruptedException {
1437+
long startTime = System.currentTimeMillis();
1438+
while (true) {
1439+
if (condition.get()) {
1440+
return;
1441+
}
1442+
if (System.currentTimeMillis() > startTime + waitTime) {
1443+
fail(msg.get());
1444+
}
1445+
Thread.sleep(Math.min(waitTime, pause));
1446+
}
1447+
}
1448+
13931449
private LedgerManager getLedgerManager(final Set<Long> ledgers) {
13941450
LedgerManager manager = new LedgerManager() {
13951451
@Override

0 commit comments

Comments
 (0)