Skip to content

Commit e37f31e

Browse files
authored
KAFKA-19390: backport to 3.9 (apache#20571)
Backport KAFKA-19390 to v3.9, which includes PRs apache#19961 and apache#20131 Reviewers: Jun Rao <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 4dd3512 commit e37f31e

File tree

6 files changed

+274
-121
lines changed

6 files changed

+274
-121
lines changed
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.server.util;
18+
19+
import java.util.Objects;
20+
import java.util.concurrent.locks.Lock;
21+
22+
/**
23+
* A utility class providing helper methods for working with {@link Lock} objects.
24+
* This class simplifies the usage of locks by encapsulating common patterns,
25+
* such as acquiring and releasing locks in a safe manner.
26+
*/
27+
public class LockUtils {
28+
@FunctionalInterface
29+
public interface ThrowingSupplier<T, E extends Exception> {
30+
T get() throws E;
31+
}
32+
@FunctionalInterface
33+
public interface ThrowingRunnable<E extends Exception> {
34+
void run() throws E;
35+
}
36+
37+
/**
38+
* Executes the given {@link ThrowingSupplier} within the context of the specified {@link Lock}.
39+
* The lock is acquired before executing the supplier and released after the execution,
40+
* ensuring that the lock is always released, even if an exception is thrown.
41+
*
42+
* @param <T> the type of the result returned by the supplier
43+
* @param <E> the type of exception that may be thrown by the supplier
44+
* @param lock the lock to be acquired and released
45+
* @param supplier the supplier to be executed within the lock context
46+
* @return the result of the supplier
47+
* @throws E if an exception occurs during the execution of the supplier
48+
* @throws NullPointerException if either {@code lock} or {@code supplier} is null
49+
*/
50+
public static <T, E extends Exception> T inLock(Lock lock, ThrowingSupplier<T, E> supplier) throws E {
51+
Objects.requireNonNull(lock, "Lock must not be null");
52+
Objects.requireNonNull(supplier, "Supplier must not be null");
53+
54+
lock.lock();
55+
try {
56+
return supplier.get();
57+
} finally {
58+
lock.unlock();
59+
}
60+
}
61+
62+
/**
63+
* Executes the given {@link ThrowingRunnable} within the context of the specified {@link Lock}.
64+
* The lock is acquired before executing the runnable and released after the execution,
65+
* ensuring that the lock is always released, even if an exception is thrown.
66+
*
67+
* @param <E> the type of exception that may be thrown by the runnable
68+
* @param lock the lock to be acquired and released
69+
* @param runnable the runnable to be executed within the lock context
70+
* @throws E if an exception occurs during the execution of the runnable
71+
* @throws NullPointerException if either {@code lock} or {@code runnable} is null
72+
*/
73+
public static <E extends Exception> void inLock(Lock lock, ThrowingRunnable<E> runnable) throws E {
74+
Objects.requireNonNull(lock, "Lock must not be null");
75+
Objects.requireNonNull(runnable, "Runnable must not be null");
76+
77+
lock.lock();
78+
try {
79+
runnable.run();
80+
} finally {
81+
lock.unlock();
82+
}
83+
}
84+
}

storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java

Lines changed: 70 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
package org.apache.kafka.storage.internals.log;
1818

1919
import org.apache.kafka.common.utils.ByteBufferUnmapper;
20-
import org.apache.kafka.common.utils.OperatingSystem;
2120
import org.apache.kafka.common.utils.Utils;
21+
import org.apache.kafka.server.util.LockUtils;
2222

2323
import org.slf4j.Logger;
2424
import org.slf4j.LoggerFactory;
@@ -33,8 +33,8 @@
3333
import java.nio.file.Files;
3434
import java.util.Objects;
3535
import java.util.OptionalInt;
36-
import java.util.concurrent.locks.Lock;
3736
import java.util.concurrent.locks.ReentrantLock;
37+
import java.util.concurrent.locks.ReentrantReadWriteLock;
3838

3939
/**
4040
* The abstract index class which holds entry format agnostic methods.
@@ -47,7 +47,18 @@ private enum SearchResultType {
4747

4848
private static final Logger log = LoggerFactory.getLogger(AbstractIndex.class);
4949

50-
protected final ReentrantLock lock = new ReentrantLock();
50+
// Serializes all index operations that mutate internal state.
51+
// Readers do not need to acquire this lock because:
52+
// 1) MappedByteBuffer provides direct access to the OS-level buffer cache,
53+
// which allows concurrent reads in practice.
54+
// 2) Clients only read committed data and are not affected by concurrent appends/truncates.
55+
// In the rare case when the data is truncated, the follower could read inconsistent data.
56+
// The follower has the logic to ignore the inconsistent data through crc and leader epoch.
57+
// 3) Read and remap operations are coordinated via remapLock to ensure visibility of the
58+
// underlying mmap.
59+
private final ReentrantLock lock = new ReentrantLock();
60+
// Allows concurrent read operations while ensuring exclusive access if the underlying mmap is changed
61+
private final ReentrantReadWriteLock remapLock = new ReentrantReadWriteLock();
5162

5263
private final long baseOffset;
5364
private final int maxIndexSize;
@@ -187,36 +198,32 @@ public void updateParentDir(File parentDir) {
187198
* @return a boolean indicating whether the size of the memory map and the underneath file is changed or not.
188199
*/
189200
public boolean resize(int newSize) throws IOException {
190-
lock.lock();
191-
try {
192-
int roundedNewSize = roundDownToExactMultiple(newSize, entrySize());
193-
194-
if (length == roundedNewSize) {
195-
log.debug("Index {} was not resized because it already has size {}", file.getAbsolutePath(), roundedNewSize);
196-
return false;
197-
} else {
198-
RandomAccessFile raf = new RandomAccessFile(file, "rw");
199-
try {
200-
int position = mmap.position();
201-
202-
/* Windows or z/OS won't let us modify the file length while the file is mmapped :-( */
203-
if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
204-
safeForceUnmap();
205-
raf.setLength(roundedNewSize);
206-
this.length = roundedNewSize;
207-
mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize);
208-
this.maxEntries = mmap.limit() / entrySize();
209-
mmap.position(position);
210-
log.debug("Resized {} to {}, position is {} and limit is {}", file.getAbsolutePath(), roundedNewSize,
211-
mmap.position(), mmap.limit());
212-
return true;
213-
} finally {
214-
Utils.closeQuietly(raf, "index file " + file.getName());
215-
}
216-
}
217-
} finally {
218-
lock.unlock();
219-
}
201+
return inLock(() ->
202+
inRemapWriteLock(() -> {
203+
int roundedNewSize = roundDownToExactMultiple(newSize, entrySize());
204+
205+
if (length == roundedNewSize) {
206+
log.debug("Index {} was not resized because it already has size {}", file.getAbsolutePath(), roundedNewSize);
207+
return false;
208+
} else {
209+
RandomAccessFile raf = new RandomAccessFile(file, "rw");
210+
try {
211+
int position = mmap.position();
212+
213+
safeForceUnmap();
214+
raf.setLength(roundedNewSize);
215+
this.length = roundedNewSize;
216+
mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize);
217+
this.maxEntries = mmap.limit() / entrySize();
218+
mmap.position(position);
219+
log.debug("Resized {} to {}, position is {} and limit is {}", file.getAbsolutePath(), roundedNewSize,
220+
mmap.position(), mmap.limit());
221+
return true;
222+
} finally {
223+
Utils.closeQuietly(raf, "index file " + file.getName());
224+
}
225+
}
226+
}));
220227
}
221228

222229
/**
@@ -236,12 +243,9 @@ public void renameTo(File f) throws IOException {
236243
* Flush the data in the index to disk
237244
*/
238245
public void flush() {
239-
lock.lock();
240-
try {
246+
inLock(() -> {
241247
mmap.force();
242-
} finally {
243-
lock.unlock();
244-
}
248+
});
245249
}
246250

247251
/**
@@ -261,12 +265,11 @@ public boolean deleteIfExists() throws IOException {
261265
* the file.
262266
*/
263267
public void trimToValidSize() throws IOException {
264-
lock.lock();
265-
try {
266-
resize(entrySize() * entries);
267-
} finally {
268-
lock.unlock();
269-
}
268+
inLock(() -> {
269+
if (mmap != null) {
270+
resize(entrySize() * entries);
271+
}
272+
});
270273
}
271274

272275
/**
@@ -286,12 +289,7 @@ public void closeHandler() {
286289
// However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk.
287290
// To prevent this, we forcefully cleanup memory mapping within proper execution which never affects API responsiveness.
288291
// See https://issues.apache.org/jira/browse/KAFKA-4614 for the details.
289-
lock.lock();
290-
try {
291-
safeForceUnmap();
292-
} finally {
293-
lock.unlock();
294-
}
292+
inLock(() -> inRemapWriteLock(this::safeForceUnmap));
295293
}
296294

297295
/**
@@ -418,20 +416,28 @@ protected void truncateToEntries0(int entries) {
418416
mmap.position(entries * entrySize());
419417
}
420418

421-
/**
422-
* Execute the given function in a lock only if we are running on windows or z/OS. We do this
423-
* because Windows or z/OS won't let us resize a file while it is mmapped. As a result we have to force unmap it
424-
* and this requires synchronizing reads.
425-
*/
426-
protected final <T, E extends Exception> T maybeLock(Lock lock, StorageAction<T, E> action) throws E {
427-
if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
428-
lock.lock();
429-
try {
430-
return action.execute();
431-
} finally {
432-
if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
433-
lock.unlock();
434-
}
419+
protected final <T, E extends Exception> T inLock(LockUtils.ThrowingSupplier<T, E> action) throws E {
420+
return LockUtils.inLock(lock, action);
421+
}
422+
423+
protected final <E extends Exception> void inLock(LockUtils.ThrowingRunnable<E> action) throws E {
424+
LockUtils.inLock(lock, action);
425+
}
426+
427+
protected final <T, E extends Exception> T inRemapReadLock(LockUtils.ThrowingSupplier<T, E> action) throws E {
428+
return LockUtils.inLock(remapLock.readLock(), action);
429+
}
430+
431+
protected final <E extends Exception> void inRemapReadLock(LockUtils.ThrowingRunnable<E> action) throws E {
432+
LockUtils.inLock(remapLock.readLock(), action);
433+
}
434+
435+
protected final <T, E extends Exception> T inRemapWriteLock(LockUtils.ThrowingSupplier<T, E> action) throws E {
436+
return LockUtils.inLock(remapLock.writeLock(), action);
437+
}
438+
439+
protected final <E extends Exception> void inRemapWriteLock(LockUtils.ThrowingRunnable<E> action) throws E {
440+
LockUtils.inLock(remapLock.writeLock(), action);
435441
}
436442

437443
/**

storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java

Lines changed: 13 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public class OffsetIndex extends AbstractIndex {
5656
private static final int ENTRY_SIZE = 8;
5757

5858
/* the last offset in the index */
59-
private long lastOffset;
59+
private volatile long lastOffset;
6060

6161
public OffsetIndex(File file, long baseOffset) throws IOException {
6262
this(file, baseOffset, -1);
@@ -96,7 +96,7 @@ public void sanityCheck() {
9696
* the pair (baseOffset, 0) is returned.
9797
*/
9898
public OffsetPosition lookup(long targetOffset) {
99-
return maybeLock(lock, () -> {
99+
return inRemapReadLock(() -> {
100100
ByteBuffer idx = mmap().duplicate();
101101
int slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY);
102102
if (slot == -1)
@@ -112,7 +112,7 @@ public OffsetPosition lookup(long targetOffset) {
112112
* @return The offset/position pair at that entry
113113
*/
114114
public OffsetPosition entry(int n) {
115-
return maybeLock(lock, () -> {
115+
return inRemapReadLock(() -> {
116116
if (n >= entries())
117117
throw new IllegalArgumentException("Attempt to fetch the " + n + "th entry from index " +
118118
file().getAbsolutePath() + ", which has size " + entries());
@@ -126,7 +126,7 @@ public OffsetPosition entry(int n) {
126126
* such offset.
127127
*/
128128
public Optional<OffsetPosition> fetchUpperBoundOffset(OffsetPosition fetchOffset, int fetchSize) {
129-
return maybeLock(lock, () -> {
129+
return inRemapReadLock(() -> {
130130
ByteBuffer idx = mmap().duplicate();
131131
int slot = smallestUpperBoundSlotFor(idx, fetchOffset.position + fetchSize, IndexSearchType.VALUE);
132132
if (slot == -1)
@@ -142,8 +142,7 @@ public Optional<OffsetPosition> fetchUpperBoundOffset(OffsetPosition fetchOffset
142142
* @throws InvalidOffsetException if provided offset is not larger than the last offset
143143
*/
144144
public void append(long offset, int position) {
145-
lock.lock();
146-
try {
145+
inLock(() -> {
147146
if (isFull())
148147
throw new IllegalArgumentException("Attempt to append to a full index (size = " + entries() + ").");
149148

@@ -158,15 +157,12 @@ public void append(long offset, int position) {
158157
} else
159158
throw new InvalidOffsetException("Attempt to append an offset " + offset + " to position " + entries() +
160159
" no larger than the last offset appended (" + lastOffset + ") to " + file().getAbsolutePath());
161-
} finally {
162-
lock.unlock();
163-
}
160+
});
164161
}
165162

166163
@Override
167164
public void truncateTo(long offset) {
168-
lock.lock();
169-
try {
165+
inLock(() -> {
170166
ByteBuffer idx = mmap().duplicate();
171167
int slot = largestLowerBoundSlotFor(idx, offset, IndexSearchType.KEY);
172168

@@ -183,9 +179,7 @@ else if (relativeOffset(idx, slot) == offset - baseOffset())
183179
else
184180
newEntries = slot + 1;
185181
truncateToEntries(newEntries);
186-
} finally {
187-
lock.unlock();
188-
}
182+
});
189183
}
190184

191185
public long lastOffset() {
@@ -219,30 +213,24 @@ private int physical(ByteBuffer buffer, int n) {
219213
* Truncates index to a known number of entries.
220214
*/
221215
private void truncateToEntries(int entries) {
222-
lock.lock();
223-
try {
216+
inLock(() -> {
224217
super.truncateToEntries0(entries);
225218
this.lastOffset = lastEntry().offset;
226219
log.debug("Truncated index {} to {} entries; position is now {} and last offset is now {}",
227-
file().getAbsolutePath(), entries, mmap().position(), lastOffset);
228-
} finally {
229-
lock.unlock();
230-
}
220+
file().getAbsolutePath(), entries, mmap().position(), lastOffset);
221+
});
231222
}
232223

233224
/**
234225
* The last entry in the index
235226
*/
236227
private OffsetPosition lastEntry() {
237-
lock.lock();
238-
try {
228+
return inRemapReadLock(() -> {
239229
int entries = entries();
240230
if (entries == 0)
241231
return new OffsetPosition(baseOffset(), 0);
242232
else
243233
return parseEntry(mmap(), entries - 1);
244-
} finally {
245-
lock.unlock();
246-
}
234+
});
247235
}
248236
}

0 commit comments

Comments
 (0)