Skip to content

Commit f0c9bb2

Browse files
committed
fix: resolve potential exception and blocking issue in queue capacity handling
1 parent e69be64 commit f0c9bb2

File tree

2 files changed

+88
-49
lines changed

2 files changed

+88
-49
lines changed

bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SingleThreadExecutor.java

Lines changed: 60 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -143,9 +143,6 @@ public void run() {
143143

144144
private boolean safeRunTask(Runnable r) {
145145
try {
146-
if (maxQueueCapacity > 0) {
147-
waiterCountUpdater.decrementAndGet(this);
148-
}
149146
r.run();
150147
tasksCompleted.increment();
151148
} catch (Throwable t) {
@@ -156,6 +153,8 @@ private boolean safeRunTask(Runnable r) {
156153
tasksFailed.increment();
157154
log.error("Error while running task: {}", t.getMessage(), t);
158155
}
156+
} finally {
157+
releaseQueuePermits(1);
159158
}
160159

161160
return true;
@@ -219,25 +218,44 @@ public long getFailedTasksCount() {
219218

220219
@Override
221220
public void execute(Runnable r) {
222-
execute(r, null);
221+
executeRunnableOrList(r, null);
223222
}
224223

225224
@VisibleForTesting
226-
void execute(Runnable r, List<Runnable> runnableList) {
225+
void executeRunnableOrList(Runnable runnable, List<Runnable> runnableList) {
227226
if (state != State.Running) {
228227
throw new RejectedExecutionException("Executor is shutting down");
229228
}
230229

230+
boolean hasSingle = runnable != null;
231+
boolean hasList = runnableList != null && !runnableList.isEmpty();
232+
233+
if (hasSingle == hasList) {
234+
// Both are provided or both are missing
235+
throw new IllegalArgumentException("Provide either 'runnable' or a non-empty 'runnableList', not both.");
236+
}
237+
231238
try {
232239
if (!rejectExecution) {
233-
queue.put(r);
234-
tasksCount.increment();
240+
if (hasSingle) {
241+
queue.put(runnable);
242+
tasksCount.increment();
243+
} else {
244+
for (Runnable task : runnableList) {
245+
queue.put(task);
246+
tasksCount.increment();
247+
}
248+
}
235249
} else {
236-
int delta = r != null ? 1 : runnableList.size();
237-
validateQueueCapacity(delta);
238-
if (r != null ? queue.offer(r) : queue.addAll(runnableList)) {
239-
tasksCount.add(delta);
250+
int permits = runnable != null ? 1 : runnableList.size();
251+
reserveQueuePermits(permits);
252+
boolean success = hasSingle
253+
? queue.offer(runnable)
254+
: queue.addAll(runnableList);
255+
if (success) {
256+
tasksCount.add(permits);
240257
} else {
258+
releaseQueuePermits(permits);
241259
reject();
242260
}
243261
}
@@ -246,12 +264,36 @@ void execute(Runnable r, List<Runnable> runnableList) {
246264
}
247265
}
248266

249-
private void validateQueueCapacity(int delta) {
250-
if (maxQueueCapacity > 0 && waiterCountUpdater.addAndGet(this, delta) > maxQueueCapacity) {
267+
private void reserveQueuePermits(int permits) {
268+
if (maxQueueCapacity <= 0) {
269+
return; // // Unlimited capacity, nothing to release
270+
}
271+
272+
if (permits < 0) {
273+
throw new IllegalArgumentException("Number of permits must be non-negative");
274+
}
275+
276+
if (waiterCountUpdater.addAndGet(this, permits) > maxQueueCapacity) {
277+
releaseQueuePermits(permits);
251278
reject();
252279
}
253280
}
254281

282+
private void releaseQueuePermits(int permits) {
283+
if (maxQueueCapacity <= 0) {
284+
return; // // Unlimited capacity, nothing to release
285+
}
286+
287+
if (permits < 0) {
288+
throw new IllegalArgumentException("Number of permits must be non-negative");
289+
}
290+
291+
int waiterCount = waiterCountUpdater.addAndGet(this, -permits);
292+
if (log.isDebugEnabled()) {
293+
log.debug("Released {} permits, current waiter count: {}", permits, waiterCount);
294+
}
295+
}
296+
255297
private void reject() {
256298
tasksRejected.increment();
257299
throw new ExecutorRejectedException("Executor queue is full");
@@ -321,6 +363,11 @@ public Number getSample() {
321363
});
322364
}
323365

366+
@VisibleForTesting
367+
int getWaiterCount() {
368+
return waiterCountUpdater.get(this);
369+
}
370+
324371
private static class ExecutorRejectedException extends RejectedExecutionException {
325372

326373
private ExecutorRejectedException(String msg) {

bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestSingleThreadExecutor.java

Lines changed: 28 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@
2020

2121
import static org.junit.Assert.assertEquals;
2222
import static org.junit.Assert.assertFalse;
23+
import static org.junit.Assert.assertThrows;
2324
import static org.junit.Assert.assertTrue;
2425
import static org.junit.Assert.fail;
2526

2627
import io.netty.util.concurrent.DefaultThreadFactory;
2728
import java.util.ArrayList;
29+
import java.util.Collections;
2830
import java.util.List;
2931
import java.util.concurrent.BrokenBarrierException;
3032
import java.util.concurrent.CountDownLatch;
@@ -36,6 +38,7 @@
3638
import java.util.concurrent.TimeoutException;
3739
import java.util.concurrent.atomic.AtomicInteger;
3840
import lombok.Cleanup;
41+
import org.assertj.core.util.Lists;
3942
import org.awaitility.Awaitility;
4043
import org.junit.Test;
4144

@@ -122,54 +125,43 @@ public void testRejectWhenDrainToInProgressAndQueueIsEmpty() throws Exception {
122125
@Cleanup("shutdownNow")
123126
SingleThreadExecutor ste = new SingleThreadExecutor(THREAD_FACTORY, 10, true);
124127

125-
CyclicBarrier barrier = new CyclicBarrier(10);
126-
CountDownLatch startedLatch = new CountDownLatch(1);
128+
CountDownLatch waitedLatch = new CountDownLatch(1);
127129
List<Runnable> tasks = new ArrayList<>();
128130

129131
for (int i = 0; i < 10; i++) {
130-
int n = i;
131132
tasks.add(() -> {
132-
if (n == 0) {
133-
startedLatch.countDown();
134-
} else {
135-
try {
136-
barrier.await();
137-
} catch (InterruptedException | BrokenBarrierException e) {
138-
// ignore
139-
}
133+
try {
134+
// Block remaining tasks to simulate a stuck queue.
135+
waitedLatch.await();
136+
} catch (Exception e) {
137+
// ignored
140138
}
141139
});
142140
}
143-
ste.execute(null, tasks);
144-
145-
// Wait until the first task is done.
146-
try {
147-
startedLatch.await();
148-
} catch (InterruptedException e) {
149-
throw new RuntimeException(e);
150-
}
141+
ste.executeRunnableOrList(null, tasks);
151142

152-
// Next task should go through, because the runner thread has already pulled out the first and second items
153-
// from the queue.
154-
List<Runnable> nextTasks = new ArrayList<>();
155-
nextTasks.add(() -> {
156-
});
157-
nextTasks.add(() -> {
158-
});
159-
ste.execute(null, nextTasks);
143+
Awaitility.await().pollDelay(1, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(10, ste.getWaiterCount()));
160144

161-
// Now the queue is really full and should reject tasks
162-
try {
163-
ste.execute(() -> {
164-
});
165-
fail("should have rejected the task");
166-
} catch (RejectedExecutionException e) {
167-
// Expected
168-
}
145+
// Now the queue is really full and should reject tasks.
146+
assertThrows(RejectedExecutionException.class, () -> ste.execute(() -> {
147+
}));
169148

170-
assertEquals(12, ste.getSubmittedTasksCount());
149+
assertEquals(10, ste.getWaiterCount());
171150
assertEquals(1, ste.getRejectedTasksCount());
172151
assertEquals(0, ste.getFailedTasksCount());
152+
153+
// Now we can unblock the waited tasks.
154+
waitedLatch.countDown();
155+
156+
// Check the tasks are completed.
157+
Awaitility.await().pollDelay(1, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(0, ste.getWaiterCount()));
158+
159+
// Invalid cases - should throw IllegalArgumentException.
160+
assertThrows(IllegalArgumentException.class, () -> ste.executeRunnableOrList(null, null));
161+
assertThrows(IllegalArgumentException.class, () -> ste.executeRunnableOrList(null, Collections.emptyList()));
162+
assertThrows(IllegalArgumentException.class, () -> ste.executeRunnableOrList(() -> {
163+
}, Lists.newArrayList(() -> {
164+
})));
173165
}
174166

175167
@Test

0 commit comments

Comments
 (0)