Skip to content

Commit 7f8c2a5

Browse files
Forest0923chia7712
authored andcommitted
MINOR: Refactor LockUtils and improve comments (follow up to KAFKA-19390) (#20131)
This PR performs a refactoring of LockUtils and improves inline comments, as a follow-up to #19961. Reviewers: Chia-Ping Tsai <[email protected]>, Jun Rao <[email protected]>
1 parent 807866c commit 7f8c2a5

File tree

3 files changed

+24
-73
lines changed

3 files changed

+24
-73
lines changed

server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java

Lines changed: 2 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import java.util.Objects;
2020
import java.util.concurrent.locks.Lock;
21-
import java.util.function.Supplier;
2221

2322
/**
2423
* A utility class providing helper methods for working with {@link Lock} objects.
@@ -35,50 +34,6 @@ public interface ThrowingRunnable<E extends Exception> {
3534
void run() throws E;
3635
}
3736

38-
/**
39-
* Executes the given {@link Supplier} within the context of the specified {@link Lock}.
40-
* The lock is acquired before executing the supplier and released after the execution,
41-
* ensuring that the lock is always released, even if an exception is thrown.
42-
*
43-
* @param <T> the type of the result returned by the supplier
44-
* @param lock the lock to be acquired and released
45-
* @param supplier the supplier to be executed within the lock context
46-
* @return the result of the supplier
47-
* @throws NullPointerException if either {@code lock} or {@code supplier} is null
48-
*/
49-
public static <T> T inLock(Lock lock, Supplier<T> supplier) {
50-
Objects.requireNonNull(lock, "Lock must not be null");
51-
Objects.requireNonNull(supplier, "Supplier must not be null");
52-
53-
lock.lock();
54-
try {
55-
return supplier.get();
56-
} finally {
57-
lock.unlock();
58-
}
59-
}
60-
61-
/**
62-
* Executes the given {@link Runnable} within the context of the specified {@link Lock}.
63-
* The lock is acquired before executing the runnable and released after the execution,
64-
* ensuring that the lock is always released, even if an exception is thrown.
65-
*
66-
* @param lock the lock to be acquired and released
67-
* @param runnable the runnable to be executed within the lock context
68-
* @throws NullPointerException if either {@code lock} or {@code runnable} is null
69-
*/
70-
public static void inLock(Lock lock, Runnable runnable) {
71-
Objects.requireNonNull(lock, "Lock must not be null");
72-
Objects.requireNonNull(runnable, "Runnable must not be null");
73-
74-
lock.lock();
75-
try {
76-
runnable.run();
77-
} finally {
78-
lock.unlock();
79-
}
80-
}
81-
8237
/**
8338
* Executes the given {@link ThrowingSupplier} within the context of the specified {@link Lock}.
8439
* The lock is acquired before executing the supplier and released after the execution,
@@ -92,7 +47,7 @@ public static void inLock(Lock lock, Runnable runnable) {
9247
* @throws E if an exception occurs during the execution of the supplier
9348
* @throws NullPointerException if either {@code lock} or {@code supplier} is null
9449
*/
95-
public static <T, E extends Exception> T inLockThrows(Lock lock, ThrowingSupplier<T, E> supplier) throws E {
50+
public static <T, E extends Exception> T inLock(Lock lock, ThrowingSupplier<T, E> supplier) throws E {
9651
Objects.requireNonNull(lock, "Lock must not be null");
9752
Objects.requireNonNull(supplier, "Supplier must not be null");
9853

@@ -115,7 +70,7 @@ public static <T, E extends Exception> T inLockThrows(Lock lock, ThrowingSupplie
11570
* @throws E if an exception occurs during the execution of the runnable
11671
* @throws NullPointerException if either {@code lock} or {@code runnable} is null
11772
*/
118-
public static <E extends Exception> void inLockThrows(Lock lock, ThrowingRunnable<E> runnable) throws E {
73+
public static <E extends Exception> void inLock(Lock lock, ThrowingRunnable<E> runnable) throws E {
11974
Objects.requireNonNull(lock, "Lock must not be null");
12075
Objects.requireNonNull(runnable, "Runnable must not be null");
12176

storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import java.util.OptionalInt;
3636
import java.util.concurrent.locks.ReentrantLock;
3737
import java.util.concurrent.locks.ReentrantReadWriteLock;
38-
import java.util.function.Supplier;
3938

4039
/**
4140
* The abstract index class which holds entry format agnostic methods.
@@ -48,7 +47,15 @@ private enum SearchResultType {
4847

4948
private static final Logger log = LoggerFactory.getLogger(AbstractIndex.class);
5049

51-
// Serializes all index operations that mutate internal state
50+
// Serializes all index operations that mutate internal state.
51+
// Readers do not need to acquire this lock because:
52+
// 1) MappedByteBuffer provides direct access to the OS-level buffer cache,
53+
// which allows concurrent reads in practice.
54+
// 2) Clients only read committed data and are not affected by concurrent appends/truncates.
55+
// In the rare case when the data is truncated, the follower could read inconsistent data.
56+
// The follower has the logic to ignore the inconsistent data through crc and leader epoch.
57+
// 3) Read and remap operations are coordinated via remapLock to ensure visibility of the
58+
// underlying mmap.
5259
private final ReentrantLock lock = new ReentrantLock();
5360
// Allows concurrent read operations while ensuring exclusive access if the underlying mmap is changed
5461
private final ReentrantReadWriteLock remapLock = new ReentrantReadWriteLock();
@@ -191,8 +198,8 @@ public void updateParentDir(File parentDir) {
191198
* @return a boolean indicating whether the size of the memory map and the underneath file is changed or not.
192199
*/
193200
public boolean resize(int newSize) throws IOException {
194-
return inLockThrows(() ->
195-
inRemapWriteLockThrows(() -> {
201+
return inLock(() ->
202+
inRemapWriteLock(() -> {
196203
int roundedNewSize = roundDownToExactMultiple(newSize, entrySize());
197204

198205
if (length == roundedNewSize) {
@@ -258,7 +265,7 @@ public boolean deleteIfExists() throws IOException {
258265
* the file.
259266
*/
260267
public void trimToValidSize() throws IOException {
261-
inLockThrows(() -> {
268+
inLock(() -> {
262269
if (mmap != null) {
263270
resize(entrySize() * entries);
264271
}
@@ -282,10 +289,7 @@ public void closeHandler() {
282289
// However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk.
283290
// To prevent this, we forcefully cleanup memory mapping within proper execution which never affects API responsiveness.
284291
// See https://issues.apache.org/jira/browse/KAFKA-4614 for the details.
285-
inLockThrows(() ->
286-
inRemapWriteLockThrows(() -> {
287-
safeForceUnmap();
288-
}));
292+
inLock(() -> inRemapWriteLock(this::safeForceUnmap));
289293
}
290294

291295
/**
@@ -412,36 +416,28 @@ protected void truncateToEntries0(int entries) {
412416
mmap.position(entries * entrySize());
413417
}
414418

415-
protected final <T> T inLock(Supplier<T> action) {
419+
protected final <T, E extends Exception> T inLock(LockUtils.ThrowingSupplier<T, E> action) throws E {
416420
return LockUtils.inLock(lock, action);
417421
}
418422

419-
protected final void inLock(Runnable action) {
423+
protected final <E extends Exception> void inLock(LockUtils.ThrowingRunnable<E> action) throws E {
420424
LockUtils.inLock(lock, action);
421425
}
422426

423-
protected final <T, E extends Exception> T inLockThrows(LockUtils.ThrowingSupplier<T, E> action) throws E {
424-
return LockUtils.inLockThrows(lock, action);
425-
}
426-
427-
protected final <E extends Exception> void inLockThrows(LockUtils.ThrowingRunnable<E> action) throws E {
428-
LockUtils.inLockThrows(lock, action);
429-
}
430-
431-
protected final <T> T inRemapReadLock(Supplier<T> action) {
427+
protected final <T, E extends Exception> T inRemapReadLock(LockUtils.ThrowingSupplier<T, E> action) throws E {
432428
return LockUtils.inLock(remapLock.readLock(), action);
433429
}
434430

435-
protected final void inRemapReadLock(Runnable action) {
431+
protected final <E extends Exception> void inRemapReadLock(LockUtils.ThrowingRunnable<E> action) throws E {
436432
LockUtils.inLock(remapLock.readLock(), action);
437433
}
438434

439-
protected final <T, E extends Exception> T inRemapWriteLockThrows(LockUtils.ThrowingSupplier<T, E> action) throws E {
440-
return LockUtils.inLockThrows(remapLock.writeLock(), action);
435+
protected final <T, E extends Exception> T inRemapWriteLock(LockUtils.ThrowingSupplier<T, E> action) throws E {
436+
return LockUtils.inLock(remapLock.writeLock(), action);
441437
}
442438

443-
protected final <E extends Exception> void inRemapWriteLockThrows(LockUtils.ThrowingRunnable<E> action) throws E {
444-
LockUtils.inLockThrows(remapLock.writeLock(), action);
439+
protected final <E extends Exception> void inRemapWriteLock(LockUtils.ThrowingRunnable<E> action) throws E {
440+
LockUtils.inLock(remapLock.writeLock(), action);
445441
}
446442

447443
/**

storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ public void maybeAppend(long timestamp, long offset, boolean skipFullCheck) {
215215

216216
@Override
217217
public boolean resize(int newSize) throws IOException {
218-
return inLockThrows(() -> {
218+
return inLock(() -> {
219219
if (super.resize(newSize)) {
220220
this.lastEntry = lastEntryFromIndexFile();
221221
return true;

0 commit comments

Comments
 (0)