Skip to content

Commit 3de037f

Browse files
committed
fix: throw reject when SingleThreadExecutor drainTo in progress and queue is empty
1 parent 7c41204 commit 3de037f

File tree

2 files changed

+93
-5
lines changed

2 files changed

+93
-5
lines changed

bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SingleThreadExecutor.java

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.bookkeeper.common.util;
2020

21+
import com.google.common.annotations.VisibleForTesting;
2122
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
2223
import java.util.ArrayList;
2324
import java.util.List;
@@ -29,6 +30,7 @@
2930
import java.util.concurrent.RejectedExecutionException;
3031
import java.util.concurrent.ThreadFactory;
3132
import java.util.concurrent.TimeUnit;
33+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
3234
import java.util.concurrent.atomic.LongAdder;
3335
import lombok.SneakyThrows;
3436
import lombok.extern.slf4j.Slf4j;
@@ -54,6 +56,11 @@ public class SingleThreadExecutor extends AbstractExecutorService implements Exe
5456
private final LongAdder tasksRejected = new LongAdder();
5557
private final LongAdder tasksFailed = new LongAdder();
5658

59+
private final int maxQueueCapacity;
60+
private static final AtomicIntegerFieldUpdater<SingleThreadExecutor> waiterCountUpdater =
61+
AtomicIntegerFieldUpdater.newUpdater(SingleThreadExecutor.class, "waiterCount");
62+
private volatile int waiterCount = 0;
63+
5764
enum State {
5865
Running,
5966
Shutdown,
@@ -80,6 +87,8 @@ public SingleThreadExecutor(ThreadFactory tf, int maxQueueCapacity, boolean reje
8087
} else {
8188
this.queue = new GrowableMpScArrayConsumerBlockingQueue<>();
8289
}
90+
this.maxQueueCapacity = maxQueueCapacity;
91+
8392
this.runner = tf.newThread(this);
8493
this.state = State.Running;
8594
this.rejectExecution = rejectExecution;
@@ -134,6 +143,9 @@ public void run() {
134143

135144
private boolean safeRunTask(Runnable r) {
136145
try {
146+
if (maxQueueCapacity > 0) {
147+
waiterCountUpdater.decrementAndGet(this);
148+
}
137149
r.run();
138150
tasksCompleted.increment();
139151
} catch (Throwable t) {
@@ -162,7 +174,10 @@ public List<Runnable> shutdownNow() {
162174
this.state = State.Shutdown;
163175
this.runner.interrupt();
164176
List<Runnable> remainingTasks = new ArrayList<>();
165-
queue.drainTo(remainingTasks);
177+
int n = queue.drainTo(remainingTasks);
178+
if (maxQueueCapacity > 0) {
179+
waiterCountUpdater.addAndGet(this, -n);
180+
}
166181
return remainingTasks;
167182
}
168183

@@ -204,6 +219,11 @@ public long getFailedTasksCount() {
204219

205220
@Override
206221
public void execute(Runnable r) {
222+
execute(r, null);
223+
}
224+
225+
@VisibleForTesting
226+
void execute(Runnable r, List<Runnable> runnableList) {
207227
if (state != State.Running) {
208228
throw new RejectedExecutionException("Executor is shutting down");
209229
}
@@ -213,18 +233,30 @@ public void execute(Runnable r) {
213233
queue.put(r);
214234
tasksCount.increment();
215235
} else {
216-
if (queue.offer(r)) {
217-
tasksCount.increment();
236+
int delta = r != null ? 1 : runnableList.size();
237+
validateQueueCapacity(delta);
238+
if (r != null ? queue.offer(r) : queue.addAll(runnableList)) {
239+
tasksCount.add(delta);
218240
} else {
219-
tasksRejected.increment();
220-
throw new ExecutorRejectedException("Executor queue is full");
241+
reject();
221242
}
222243
}
223244
} catch (InterruptedException e) {
224245
throw new RejectedExecutionException("Executor thread was interrupted", e);
225246
}
226247
}
227248

249+
private void validateQueueCapacity(int delta) {
250+
if (maxQueueCapacity > 0 && waiterCountUpdater.addAndGet(this, delta) > maxQueueCapacity) {
251+
reject();
252+
}
253+
}
254+
255+
private void reject() {
256+
tasksRejected.increment();
257+
throw new ExecutorRejectedException("Executor queue is full");
258+
}
259+
228260
public void registerMetrics(StatsLogger statsLogger) {
229261
// Register gauges
230262
statsLogger.scopeLabel("thread", runner.getName())

bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestSingleThreadExecutor.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static org.junit.Assert.fail;
2525

2626
import io.netty.util.concurrent.DefaultThreadFactory;
27+
import java.util.ArrayList;
2728
import java.util.List;
2829
import java.util.concurrent.BrokenBarrierException;
2930
import java.util.concurrent.CountDownLatch;
@@ -116,6 +117,61 @@ public void testRejectWhenQueueIsFull() throws Exception {
116117
assertEquals(0, ste.getFailedTasksCount());
117118
}
118119

120+
@Test
121+
public void testRejectWhenDrainToInProgressAndQueueIsEmpty() throws Exception {
122+
@Cleanup("shutdownNow")
123+
SingleThreadExecutor ste = new SingleThreadExecutor(THREAD_FACTORY, 10, true);
124+
125+
CyclicBarrier barrier = new CyclicBarrier(10);
126+
CountDownLatch startedLatch = new CountDownLatch(1);
127+
List<Runnable> tasks = new ArrayList<>();
128+
129+
for (int i = 0; i < 10; i++) {
130+
int n = i;
131+
tasks.add(() -> {
132+
if (n == 0) {
133+
startedLatch.countDown();
134+
} else {
135+
try {
136+
barrier.await();
137+
} catch (InterruptedException | BrokenBarrierException e) {
138+
// ignore
139+
}
140+
}
141+
});
142+
}
143+
ste.execute(null, tasks);
144+
145+
// Wait until the first task is done.
146+
try {
147+
startedLatch.await();
148+
} catch (InterruptedException e) {
149+
throw new RuntimeException(e);
150+
}
151+
152+
// Next task should go through, because the runner thread has already pulled out the first and second items
153+
// from the queue.
154+
List<Runnable> nextTasks = new ArrayList<>();
155+
nextTasks.add(() -> {
156+
});
157+
nextTasks.add(() -> {
158+
});
159+
ste.execute(null, nextTasks);
160+
161+
// Now the queue is really full and should reject tasks
162+
try {
163+
ste.execute(() -> {
164+
});
165+
fail("should have rejected the task");
166+
} catch (RejectedExecutionException e) {
167+
// Expected
168+
}
169+
170+
assertEquals(12, ste.getSubmittedTasksCount());
171+
assertEquals(1, ste.getRejectedTasksCount());
172+
assertEquals(0, ste.getFailedTasksCount());
173+
}
174+
119175
@Test
120176
public void testBlockWhenQueueIsFull() throws Exception {
121177
@Cleanup("shutdown")

0 commit comments

Comments
 (0)