Skip to content

Commit 7e27a78

Browse files
authored
KAFKA-19390 Call safeForceUnmap() in AbstractIndex.resize() on Linux to prevent stale mmap of index files (#20621)
This backports [KAFKA-19390](https://issues.apache.org/jira/browse/KAFKA-19390) to kafka v4.0. Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 4b0ba42 commit 7e27a78

File tree

6 files changed

+272
-120
lines changed

6 files changed

+272
-120
lines changed
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.server.util;
18+
19+
import java.util.Objects;
20+
import java.util.concurrent.locks.Lock;
21+
22+
/**
23+
* A utility class providing helper methods for working with {@link Lock} objects.
24+
* This class simplifies the usage of locks by encapsulating common patterns,
25+
* such as acquiring and releasing locks in a safe manner.
26+
*/
27+
public class LockUtils {
28+
@FunctionalInterface
29+
public interface ThrowingSupplier<T, E extends Exception> {
30+
T get() throws E;
31+
}
32+
@FunctionalInterface
33+
public interface ThrowingRunnable<E extends Exception> {
34+
void run() throws E;
35+
}
36+
37+
/**
38+
* Executes the given {@link ThrowingSupplier} within the context of the specified {@link Lock}.
39+
* The lock is acquired before executing the supplier and released after the execution,
40+
* ensuring that the lock is always released, even if an exception is thrown.
41+
*
42+
* @param <T> the type of the result returned by the supplier
43+
* @param <E> the type of exception that may be thrown by the supplier
44+
* @param lock the lock to be acquired and released
45+
* @param supplier the supplier to be executed within the lock context
46+
* @return the result of the supplier
47+
* @throws E if an exception occurs during the execution of the supplier
48+
* @throws NullPointerException if either {@code lock} or {@code supplier} is null
49+
*/
50+
public static <T, E extends Exception> T inLock(Lock lock, ThrowingSupplier<T, E> supplier) throws E {
51+
Objects.requireNonNull(lock, "Lock must not be null");
52+
Objects.requireNonNull(supplier, "Supplier must not be null");
53+
54+
lock.lock();
55+
try {
56+
return supplier.get();
57+
} finally {
58+
lock.unlock();
59+
}
60+
}
61+
62+
/**
63+
* Executes the given {@link ThrowingRunnable} within the context of the specified {@link Lock}.
64+
* The lock is acquired before executing the runnable and released after the execution,
65+
* ensuring that the lock is always released, even if an exception is thrown.
66+
*
67+
* @param <E> the type of exception that may be thrown by the runnable
68+
* @param lock the lock to be acquired and released
69+
* @param runnable the runnable to be executed within the lock context
70+
* @throws E if an exception occurs during the execution of the runnable
71+
* @throws NullPointerException if either {@code lock} or {@code runnable} is null
72+
*/
73+
public static <E extends Exception> void inLock(Lock lock, ThrowingRunnable<E> runnable) throws E {
74+
Objects.requireNonNull(lock, "Lock must not be null");
75+
Objects.requireNonNull(runnable, "Runnable must not be null");
76+
77+
lock.lock();
78+
try {
79+
runnable.run();
80+
} finally {
81+
lock.unlock();
82+
}
83+
}
84+
}

storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java

Lines changed: 67 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
package org.apache.kafka.storage.internals.log;
1818

1919
import org.apache.kafka.common.utils.ByteBufferUnmapper;
20-
import org.apache.kafka.common.utils.OperatingSystem;
2120
import org.apache.kafka.common.utils.Utils;
21+
import org.apache.kafka.server.util.LockUtils;
2222

2323
import org.slf4j.Logger;
2424
import org.slf4j.LoggerFactory;
@@ -33,8 +33,8 @@
3333
import java.nio.file.Files;
3434
import java.util.Objects;
3535
import java.util.OptionalInt;
36-
import java.util.concurrent.locks.Lock;
3736
import java.util.concurrent.locks.ReentrantLock;
37+
import java.util.concurrent.locks.ReentrantReadWriteLock;
3838

3939
/**
4040
* The abstract index class which holds entry format agnostic methods.
@@ -47,7 +47,18 @@ private enum SearchResultType {
4747

4848
private static final Logger log = LoggerFactory.getLogger(AbstractIndex.class);
4949

50-
protected final ReentrantLock lock = new ReentrantLock();
50+
// Serializes all index operations that mutate internal state.
51+
// Readers do not need to acquire this lock because:
52+
// 1) MappedByteBuffer provides direct access to the OS-level buffer cache,
53+
// which allows concurrent reads in practice.
54+
// 2) Clients only read committed data and are not affected by concurrent appends/truncates.
55+
// In the rare case when the data is truncated, the follower could read inconsistent data.
56+
// The follower has the logic to ignore the inconsistent data through crc and leader epoch.
57+
// 3) Read and remap operations are coordinated via remapLock to ensure visibility of the
58+
// underlying mmap.
59+
private final ReentrantLock lock = new ReentrantLock();
60+
// Allows concurrent read operations while ensuring exclusive access if the underlying mmap is changed
61+
private final ReentrantReadWriteLock remapLock = new ReentrantReadWriteLock();
5162

5263
private final long baseOffset;
5364
private final int maxIndexSize;
@@ -187,36 +198,32 @@ public void updateParentDir(File parentDir) {
187198
* @return a boolean indicating whether the size of the memory map and the underneath file is changed or not.
188199
*/
189200
public boolean resize(int newSize) throws IOException {
190-
lock.lock();
191-
try {
192-
int roundedNewSize = roundDownToExactMultiple(newSize, entrySize());
193-
194-
if (length == roundedNewSize) {
195-
log.debug("Index {} was not resized because it already has size {}", file.getAbsolutePath(), roundedNewSize);
196-
return false;
197-
} else {
198-
RandomAccessFile raf = new RandomAccessFile(file, "rw");
199-
try {
200-
int position = mmap.position();
201-
202-
/* Windows or z/OS won't let us modify the file length while the file is mmapped :-( */
203-
if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
204-
safeForceUnmap();
205-
raf.setLength(roundedNewSize);
206-
this.length = roundedNewSize;
207-
mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize);
208-
this.maxEntries = mmap.limit() / entrySize();
209-
mmap.position(position);
210-
log.debug("Resized {} to {}, position is {} and limit is {}", file.getAbsolutePath(), roundedNewSize,
211-
mmap.position(), mmap.limit());
212-
return true;
213-
} finally {
214-
Utils.closeQuietly(raf, "index file " + file.getName());
215-
}
216-
}
217-
} finally {
218-
lock.unlock();
219-
}
201+
return inLock(() ->
202+
inRemapWriteLock(() -> {
203+
int roundedNewSize = roundDownToExactMultiple(newSize, entrySize());
204+
205+
if (length == roundedNewSize) {
206+
log.debug("Index {} was not resized because it already has size {}", file.getAbsolutePath(), roundedNewSize);
207+
return false;
208+
} else {
209+
RandomAccessFile raf = new RandomAccessFile(file, "rw");
210+
try {
211+
int position = mmap.position();
212+
213+
safeForceUnmap();
214+
raf.setLength(roundedNewSize);
215+
this.length = roundedNewSize;
216+
mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize);
217+
this.maxEntries = mmap.limit() / entrySize();
218+
mmap.position(position);
219+
log.debug("Resized {} to {}, position is {} and limit is {}", file.getAbsolutePath(), roundedNewSize,
220+
mmap.position(), mmap.limit());
221+
return true;
222+
} finally {
223+
Utils.closeQuietly(raf, "index file " + file.getName());
224+
}
225+
}
226+
}));
220227
}
221228

222229
/**
@@ -236,12 +243,9 @@ public void renameTo(File f) throws IOException {
236243
* Flush the data in the index to disk
237244
*/
238245
public void flush() {
239-
lock.lock();
240-
try {
246+
inLock(() -> {
241247
mmap.force();
242-
} finally {
243-
lock.unlock();
244-
}
248+
});
245249
}
246250

247251
/**
@@ -261,14 +265,11 @@ public boolean deleteIfExists() throws IOException {
261265
* the file.
262266
*/
263267
public void trimToValidSize() throws IOException {
264-
lock.lock();
265-
try {
268+
inLock(() -> {
266269
if (mmap != null) {
267270
resize(entrySize() * entries);
268271
}
269-
} finally {
270-
lock.unlock();
271-
}
272+
});
272273
}
273274

274275
/**
@@ -288,12 +289,7 @@ public void closeHandler() {
288289
// However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk.
289290
// To prevent this, we forcefully cleanup memory mapping within proper execution which never affects API responsiveness.
290291
// See https://issues.apache.org/jira/browse/KAFKA-4614 for the details.
291-
lock.lock();
292-
try {
293-
safeForceUnmap();
294-
} finally {
295-
lock.unlock();
296-
}
292+
inLock(() -> inRemapWriteLock(this::safeForceUnmap));
297293
}
298294

299295
/**
@@ -420,20 +416,28 @@ protected void truncateToEntries0(int entries) {
420416
mmap.position(entries * entrySize());
421417
}
422418

423-
/**
424-
* Execute the given function in a lock only if we are running on windows or z/OS. We do this
425-
* because Windows or z/OS won't let us resize a file while it is mmapped. As a result we have to force unmap it
426-
* and this requires synchronizing reads.
427-
*/
428-
protected final <T, E extends Exception> T maybeLock(Lock lock, StorageAction<T, E> action) throws E {
429-
if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
430-
lock.lock();
431-
try {
432-
return action.execute();
433-
} finally {
434-
if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
435-
lock.unlock();
436-
}
419+
protected final <T, E extends Exception> T inLock(LockUtils.ThrowingSupplier<T, E> action) throws E {
420+
return LockUtils.inLock(lock, action);
421+
}
422+
423+
protected final <E extends Exception> void inLock(LockUtils.ThrowingRunnable<E> action) throws E {
424+
LockUtils.inLock(lock, action);
425+
}
426+
427+
protected final <T, E extends Exception> T inRemapReadLock(LockUtils.ThrowingSupplier<T, E> action) throws E {
428+
return LockUtils.inLock(remapLock.readLock(), action);
429+
}
430+
431+
protected final <E extends Exception> void inRemapReadLock(LockUtils.ThrowingRunnable<E> action) throws E {
432+
LockUtils.inLock(remapLock.readLock(), action);
433+
}
434+
435+
protected final <T, E extends Exception> T inRemapWriteLock(LockUtils.ThrowingSupplier<T, E> action) throws E {
436+
return LockUtils.inLock(remapLock.writeLock(), action);
437+
}
438+
439+
protected final <E extends Exception> void inRemapWriteLock(LockUtils.ThrowingRunnable<E> action) throws E {
440+
LockUtils.inLock(remapLock.writeLock(), action);
437441
}
438442

439443
/**

storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java

Lines changed: 13 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public final class OffsetIndex extends AbstractIndex {
5656
private static final int ENTRY_SIZE = 8;
5757

5858
/* the last offset in the index */
59-
private long lastOffset;
59+
private volatile long lastOffset;
6060

6161
public OffsetIndex(File file, long baseOffset) throws IOException {
6262
this(file, baseOffset, -1);
@@ -95,7 +95,7 @@ public void sanityCheck() {
9595
* the pair (baseOffset, 0) is returned.
9696
*/
9797
public OffsetPosition lookup(long targetOffset) {
98-
return maybeLock(lock, () -> {
98+
return inRemapReadLock(() -> {
9999
ByteBuffer idx = mmap().duplicate();
100100
int slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY);
101101
if (slot == -1)
@@ -111,7 +111,7 @@ public OffsetPosition lookup(long targetOffset) {
111111
* @return The offset/position pair at that entry
112112
*/
113113
public OffsetPosition entry(int n) {
114-
return maybeLock(lock, () -> {
114+
return inRemapReadLock(() -> {
115115
if (n >= entries())
116116
throw new IllegalArgumentException("Attempt to fetch the " + n + "th entry from index " +
117117
file().getAbsolutePath() + ", which has size " + entries());
@@ -125,7 +125,7 @@ public OffsetPosition entry(int n) {
125125
* such offset.
126126
*/
127127
public Optional<OffsetPosition> fetchUpperBoundOffset(OffsetPosition fetchOffset, int fetchSize) {
128-
return maybeLock(lock, () -> {
128+
return inRemapReadLock(() -> {
129129
ByteBuffer idx = mmap().duplicate();
130130
int slot = smallestUpperBoundSlotFor(idx, fetchOffset.position + fetchSize, IndexSearchType.VALUE);
131131
if (slot == -1)
@@ -141,8 +141,7 @@ public Optional<OffsetPosition> fetchUpperBoundOffset(OffsetPosition fetchOffset
141141
* @throws InvalidOffsetException if provided offset is not larger than the last offset
142142
*/
143143
public void append(long offset, int position) {
144-
lock.lock();
145-
try {
144+
inLock(() -> {
146145
if (isFull())
147146
throw new IllegalArgumentException("Attempt to append to a full index (size = " + entries() + ").");
148147

@@ -157,15 +156,12 @@ public void append(long offset, int position) {
157156
} else
158157
throw new InvalidOffsetException("Attempt to append an offset " + offset + " to position " + entries() +
159158
" no larger than the last offset appended (" + lastOffset + ") to " + file().getAbsolutePath());
160-
} finally {
161-
lock.unlock();
162-
}
159+
});
163160
}
164161

165162
@Override
166163
public void truncateTo(long offset) {
167-
lock.lock();
168-
try {
164+
inLock(() -> {
169165
ByteBuffer idx = mmap().duplicate();
170166
int slot = largestLowerBoundSlotFor(idx, offset, IndexSearchType.KEY);
171167

@@ -182,9 +178,7 @@ else if (relativeOffset(idx, slot) == offset - baseOffset())
182178
else
183179
newEntries = slot + 1;
184180
truncateToEntries(newEntries);
185-
} finally {
186-
lock.unlock();
187-
}
181+
});
188182
}
189183

190184
public long lastOffset() {
@@ -218,30 +212,24 @@ private int physical(ByteBuffer buffer, int n) {
218212
* Truncates index to a known number of entries.
219213
*/
220214
private void truncateToEntries(int entries) {
221-
lock.lock();
222-
try {
215+
inLock(() -> {
223216
super.truncateToEntries0(entries);
224217
this.lastOffset = lastEntry().offset;
225218
log.debug("Truncated index {} to {} entries; position is now {} and last offset is now {}",
226-
file().getAbsolutePath(), entries, mmap().position(), lastOffset);
227-
} finally {
228-
lock.unlock();
229-
}
219+
file().getAbsolutePath(), entries, mmap().position(), lastOffset);
220+
});
230221
}
231222

232223
/**
233224
* The last entry in the index
234225
*/
235226
private OffsetPosition lastEntry() {
236-
lock.lock();
237-
try {
227+
return inRemapReadLock(() -> {
238228
int entries = entries();
239229
if (entries == 0)
240230
return new OffsetPosition(baseOffset(), 0);
241231
else
242232
return parseEntry(mmap(), entries - 1);
243-
} finally {
244-
lock.unlock();
245-
}
233+
});
246234
}
247235
}

0 commit comments

Comments
 (0)