Skip to content

Commit 807866c

Browse files
Forest0923chia7712
authored andcommitted
KAFKA-19390: Call safeForceUnmap() in AbstractIndex.resize() on Linux to prevent stale mmap of index files (#19961)
https://issues.apache.org/jira/browse/KAFKA-19390 The AbstractIndex.resize() method does not release the old memory map for both index and time index files. In some cases, Mixed GC may not run for a long time, which can cause the broker to crash when the vm.max_map_count limit is reached. The root cause is that safeForceUnmap() is not being called on Linux within resize(), so we have changed the code to unmap old mmap on all operating systems. The same problem was reported in [KAFKA-7442](https://issues.apache.org/jira/browse/KAFKA-7442), but the PR submitted at that time did not acquire all necessary locks around the mmap accesses and was closed without fixing the issue. Reviewers: Jun Rao <[email protected]>
1 parent f4ce123 commit 807866c

File tree

5 files changed

+268
-119
lines changed

5 files changed

+268
-119
lines changed

server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,14 @@
2626
* such as acquiring and releasing locks in a safe manner.
2727
*/
2828
public class LockUtils {
29+
@FunctionalInterface
30+
public interface ThrowingSupplier<T, E extends Exception> {
31+
T get() throws E;
32+
}
33+
@FunctionalInterface
34+
public interface ThrowingRunnable<E extends Exception> {
35+
void run() throws E;
36+
}
2937

3038
/**
3139
* Executes the given {@link Supplier} within the context of the specified {@link Lock}.
@@ -49,4 +57,73 @@ public static <T> T inLock(Lock lock, Supplier<T> supplier) {
4957
lock.unlock();
5058
}
5159
}
60+
61+
/**
62+
* Executes the given {@link Runnable} within the context of the specified {@link Lock}.
63+
* The lock is acquired before executing the runnable and released after the execution,
64+
* ensuring that the lock is always released, even if an exception is thrown.
65+
*
66+
* @param lock the lock to be acquired and released
67+
* @param runnable the runnable to be executed within the lock context
68+
* @throws NullPointerException if either {@code lock} or {@code runnable} is null
69+
*/
70+
public static void inLock(Lock lock, Runnable runnable) {
71+
Objects.requireNonNull(lock, "Lock must not be null");
72+
Objects.requireNonNull(runnable, "Runnable must not be null");
73+
74+
lock.lock();
75+
try {
76+
runnable.run();
77+
} finally {
78+
lock.unlock();
79+
}
80+
}
81+
82+
/**
83+
* Executes the given {@link ThrowingSupplier} within the context of the specified {@link Lock}.
84+
* The lock is acquired before executing the supplier and released after the execution,
85+
* ensuring that the lock is always released, even if an exception is thrown.
86+
*
87+
* @param <T> the type of the result returned by the supplier
88+
* @param <E> the type of exception that may be thrown by the supplier
89+
* @param lock the lock to be acquired and released
90+
* @param supplier the supplier to be executed within the lock context
91+
* @return the result of the supplier
92+
* @throws E if an exception occurs during the execution of the supplier
93+
* @throws NullPointerException if either {@code lock} or {@code supplier} is null
94+
*/
95+
public static <T, E extends Exception> T inLockThrows(Lock lock, ThrowingSupplier<T, E> supplier) throws E {
96+
Objects.requireNonNull(lock, "Lock must not be null");
97+
Objects.requireNonNull(supplier, "Supplier must not be null");
98+
99+
lock.lock();
100+
try {
101+
return supplier.get();
102+
} finally {
103+
lock.unlock();
104+
}
105+
}
106+
107+
/**
108+
* Executes the given {@link ThrowingRunnable} within the context of the specified {@link Lock}.
109+
* The lock is acquired before executing the runnable and released after the execution,
110+
* ensuring that the lock is always released, even if an exception is thrown.
111+
*
112+
* @param <E> the type of exception that may be thrown by the runnable
113+
* @param lock the lock to be acquired and released
114+
* @param runnable the runnable to be executed within the lock context
115+
* @throws E if an exception occurs during the execution of the runnable
116+
* @throws NullPointerException if either {@code lock} or {@code runnable} is null
117+
*/
118+
public static <E extends Exception> void inLockThrows(Lock lock, ThrowingRunnable<E> runnable) throws E {
119+
Objects.requireNonNull(lock, "Lock must not be null");
120+
Objects.requireNonNull(runnable, "Runnable must not be null");
121+
122+
lock.lock();
123+
try {
124+
runnable.run();
125+
} finally {
126+
lock.unlock();
127+
}
128+
}
52129
}

storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java

Lines changed: 71 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
package org.apache.kafka.storage.internals.log;
1818

1919
import org.apache.kafka.common.utils.ByteBufferUnmapper;
20-
import org.apache.kafka.common.utils.OperatingSystem;
2120
import org.apache.kafka.common.utils.Utils;
21+
import org.apache.kafka.server.util.LockUtils;
2222

2323
import org.slf4j.Logger;
2424
import org.slf4j.LoggerFactory;
@@ -33,8 +33,9 @@
3333
import java.nio.file.Files;
3434
import java.util.Objects;
3535
import java.util.OptionalInt;
36-
import java.util.concurrent.locks.Lock;
3736
import java.util.concurrent.locks.ReentrantLock;
37+
import java.util.concurrent.locks.ReentrantReadWriteLock;
38+
import java.util.function.Supplier;
3839

3940
/**
4041
* The abstract index class which holds entry format agnostic methods.
@@ -47,7 +48,10 @@ private enum SearchResultType {
4748

4849
private static final Logger log = LoggerFactory.getLogger(AbstractIndex.class);
4950

50-
protected final ReentrantLock lock = new ReentrantLock();
51+
// Serializes all index operations that mutate internal state
52+
private final ReentrantLock lock = new ReentrantLock();
53+
// Allows concurrent read operations while ensuring exclusive access if the underlying mmap is changed
54+
private final ReentrantReadWriteLock remapLock = new ReentrantReadWriteLock();
5155

5256
private final long baseOffset;
5357
private final int maxIndexSize;
@@ -187,36 +191,32 @@ public void updateParentDir(File parentDir) {
187191
* @return a boolean indicating whether the size of the memory map and the underneath file is changed or not.
188192
*/
189193
public boolean resize(int newSize) throws IOException {
190-
lock.lock();
191-
try {
192-
int roundedNewSize = roundDownToExactMultiple(newSize, entrySize());
193-
194-
if (length == roundedNewSize) {
195-
log.debug("Index {} was not resized because it already has size {}", file.getAbsolutePath(), roundedNewSize);
196-
return false;
197-
} else {
198-
RandomAccessFile raf = new RandomAccessFile(file, "rw");
199-
try {
200-
int position = mmap.position();
201-
202-
/* Windows or z/OS won't let us modify the file length while the file is mmapped :-( */
203-
if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
204-
safeForceUnmap();
205-
raf.setLength(roundedNewSize);
206-
this.length = roundedNewSize;
207-
mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize);
208-
this.maxEntries = mmap.limit() / entrySize();
209-
mmap.position(position);
210-
log.debug("Resized {} to {}, position is {} and limit is {}", file.getAbsolutePath(), roundedNewSize,
211-
mmap.position(), mmap.limit());
212-
return true;
213-
} finally {
214-
Utils.closeQuietly(raf, "index file " + file.getName());
215-
}
216-
}
217-
} finally {
218-
lock.unlock();
219-
}
194+
return inLockThrows(() ->
195+
inRemapWriteLockThrows(() -> {
196+
int roundedNewSize = roundDownToExactMultiple(newSize, entrySize());
197+
198+
if (length == roundedNewSize) {
199+
log.debug("Index {} was not resized because it already has size {}", file.getAbsolutePath(), roundedNewSize);
200+
return false;
201+
} else {
202+
RandomAccessFile raf = new RandomAccessFile(file, "rw");
203+
try {
204+
int position = mmap.position();
205+
206+
safeForceUnmap();
207+
raf.setLength(roundedNewSize);
208+
this.length = roundedNewSize;
209+
mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize);
210+
this.maxEntries = mmap.limit() / entrySize();
211+
mmap.position(position);
212+
log.debug("Resized {} to {}, position is {} and limit is {}", file.getAbsolutePath(), roundedNewSize,
213+
mmap.position(), mmap.limit());
214+
return true;
215+
} finally {
216+
Utils.closeQuietly(raf, "index file " + file.getName());
217+
}
218+
}
219+
}));
220220
}
221221

222222
/**
@@ -236,12 +236,9 @@ public void renameTo(File f) throws IOException {
236236
* Flush the data in the index to disk
237237
*/
238238
public void flush() {
239-
lock.lock();
240-
try {
239+
inLock(() -> {
241240
mmap.force();
242-
} finally {
243-
lock.unlock();
244-
}
241+
});
245242
}
246243

247244
/**
@@ -261,14 +258,11 @@ public boolean deleteIfExists() throws IOException {
261258
* the file.
262259
*/
263260
public void trimToValidSize() throws IOException {
264-
lock.lock();
265-
try {
261+
inLockThrows(() -> {
266262
if (mmap != null) {
267263
resize(entrySize() * entries);
268264
}
269-
} finally {
270-
lock.unlock();
271-
}
265+
});
272266
}
273267

274268
/**
@@ -288,12 +282,10 @@ public void closeHandler() {
288282
// However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk.
289283
// To prevent this, we forcefully cleanup memory mapping within proper execution which never affects API responsiveness.
290284
// See https://issues.apache.org/jira/browse/KAFKA-4614 for the details.
291-
lock.lock();
292-
try {
293-
safeForceUnmap();
294-
} finally {
295-
lock.unlock();
296-
}
285+
inLockThrows(() ->
286+
inRemapWriteLockThrows(() -> {
287+
safeForceUnmap();
288+
}));
297289
}
298290

299291
/**
@@ -420,20 +412,36 @@ protected void truncateToEntries0(int entries) {
420412
mmap.position(entries * entrySize());
421413
}
422414

423-
/**
424-
* Execute the given function in a lock only if we are running on windows or z/OS. We do this
425-
* because Windows or z/OS won't let us resize a file while it is mmapped. As a result we have to force unmap it
426-
* and this requires synchronizing reads.
427-
*/
428-
protected final <T, E extends Exception> T maybeLock(Lock lock, StorageAction<T, E> action) throws E {
429-
if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
430-
lock.lock();
431-
try {
432-
return action.execute();
433-
} finally {
434-
if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
435-
lock.unlock();
436-
}
415+
protected final <T> T inLock(Supplier<T> action) {
416+
return LockUtils.inLock(lock, action);
417+
}
418+
419+
protected final void inLock(Runnable action) {
420+
LockUtils.inLock(lock, action);
421+
}
422+
423+
protected final <T, E extends Exception> T inLockThrows(LockUtils.ThrowingSupplier<T, E> action) throws E {
424+
return LockUtils.inLockThrows(lock, action);
425+
}
426+
427+
protected final <E extends Exception> void inLockThrows(LockUtils.ThrowingRunnable<E> action) throws E {
428+
LockUtils.inLockThrows(lock, action);
429+
}
430+
431+
protected final <T> T inRemapReadLock(Supplier<T> action) {
432+
return LockUtils.inLock(remapLock.readLock(), action);
433+
}
434+
435+
protected final void inRemapReadLock(Runnable action) {
436+
LockUtils.inLock(remapLock.readLock(), action);
437+
}
438+
439+
protected final <T, E extends Exception> T inRemapWriteLockThrows(LockUtils.ThrowingSupplier<T, E> action) throws E {
440+
return LockUtils.inLockThrows(remapLock.writeLock(), action);
441+
}
442+
443+
protected final <E extends Exception> void inRemapWriteLockThrows(LockUtils.ThrowingRunnable<E> action) throws E {
444+
LockUtils.inLockThrows(remapLock.writeLock(), action);
437445
}
438446

439447
/**

storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java

Lines changed: 13 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public final class OffsetIndex extends AbstractIndex {
5656
private static final int ENTRY_SIZE = 8;
5757

5858
/* the last offset in the index */
59-
private long lastOffset;
59+
private volatile long lastOffset;
6060

6161
public OffsetIndex(File file, long baseOffset) throws IOException {
6262
this(file, baseOffset, -1);
@@ -95,7 +95,7 @@ public void sanityCheck() {
9595
* the pair (baseOffset, 0) is returned.
9696
*/
9797
public OffsetPosition lookup(long targetOffset) {
98-
return maybeLock(lock, () -> {
98+
return inRemapReadLock(() -> {
9999
ByteBuffer idx = mmap().duplicate();
100100
int slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY);
101101
if (slot == -1)
@@ -111,7 +111,7 @@ public OffsetPosition lookup(long targetOffset) {
111111
* @return The offset/position pair at that entry
112112
*/
113113
public OffsetPosition entry(int n) {
114-
return maybeLock(lock, () -> {
114+
return inRemapReadLock(() -> {
115115
if (n >= entries())
116116
throw new IllegalArgumentException("Attempt to fetch the " + n + "th entry from index " +
117117
file().getAbsolutePath() + ", which has size " + entries());
@@ -125,7 +125,7 @@ public OffsetPosition entry(int n) {
125125
* such offset.
126126
*/
127127
public Optional<OffsetPosition> fetchUpperBoundOffset(OffsetPosition fetchOffset, int fetchSize) {
128-
return maybeLock(lock, () -> {
128+
return inRemapReadLock(() -> {
129129
ByteBuffer idx = mmap().duplicate();
130130
int slot = smallestUpperBoundSlotFor(idx, fetchOffset.position + fetchSize, IndexSearchType.VALUE);
131131
if (slot == -1)
@@ -141,8 +141,7 @@ public Optional<OffsetPosition> fetchUpperBoundOffset(OffsetPosition fetchOffset
141141
* @throws InvalidOffsetException if provided offset is not larger than the last offset
142142
*/
143143
public void append(long offset, int position) {
144-
lock.lock();
145-
try {
144+
inLock(() -> {
146145
if (isFull())
147146
throw new IllegalArgumentException("Attempt to append to a full index (size = " + entries() + ").");
148147

@@ -157,15 +156,12 @@ public void append(long offset, int position) {
157156
} else
158157
throw new InvalidOffsetException("Attempt to append an offset " + offset + " to position " + entries() +
159158
" no larger than the last offset appended (" + lastOffset + ") to " + file().getAbsolutePath());
160-
} finally {
161-
lock.unlock();
162-
}
159+
});
163160
}
164161

165162
@Override
166163
public void truncateTo(long offset) {
167-
lock.lock();
168-
try {
164+
inLock(() -> {
169165
ByteBuffer idx = mmap().duplicate();
170166
int slot = largestLowerBoundSlotFor(idx, offset, IndexSearchType.KEY);
171167

@@ -182,9 +178,7 @@ else if (relativeOffset(idx, slot) == offset - baseOffset())
182178
else
183179
newEntries = slot + 1;
184180
truncateToEntries(newEntries);
185-
} finally {
186-
lock.unlock();
187-
}
181+
});
188182
}
189183

190184
public long lastOffset() {
@@ -218,30 +212,24 @@ private int physical(ByteBuffer buffer, int n) {
218212
* Truncates index to a known number of entries.
219213
*/
220214
private void truncateToEntries(int entries) {
221-
lock.lock();
222-
try {
215+
inLock(() -> {
223216
super.truncateToEntries0(entries);
224217
this.lastOffset = lastEntry().offset;
225218
log.debug("Truncated index {} to {} entries; position is now {} and last offset is now {}",
226-
file().getAbsolutePath(), entries, mmap().position(), lastOffset);
227-
} finally {
228-
lock.unlock();
229-
}
219+
file().getAbsolutePath(), entries, mmap().position(), lastOffset);
220+
});
230221
}
231222

232223
/**
233224
* The last entry in the index
234225
*/
235226
private OffsetPosition lastEntry() {
236-
lock.lock();
237-
try {
227+
return inRemapReadLock(() -> {
238228
int entries = entries();
239229
if (entries == 0)
240230
return new OffsetPosition(baseOffset(), 0);
241231
else
242232
return parseEntry(mmap(), entries - 1);
243-
} finally {
244-
lock.unlock();
245-
}
233+
});
246234
}
247235
}

0 commit comments

Comments
 (0)