Skip to content

Commit ff7ea0d

Browse files
hangc0276zymap
authored andcommitted
Fix read thread blocking in sendResponseAndWait causing READ_ENTRY_REQUEST p99 latency spike (#4730)
* Fix read thread blocking in sendResponseAndWait causing READ_ENTRY_REQUEST p99 latency spike * address comments (cherry picked from commit 8664dd9)
1 parent accaa3d commit ff7ea0d

File tree

3 files changed

+220
-17
lines changed

3 files changed

+220
-17
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1075,10 +1075,16 @@ public ServerConfiguration setMaxAddsInProgressLimit(int value) {
10751075
/**
10761076
* Get max number of reads in progress. 0 == unlimited.
10771077
*
1078+
* <p>This limit bounds the memory used by read responses that have been read from storage
1079+
* but not yet flushed to the network. Since read response writes are non-blocking,
1080+
* without this limit a slow consumer could cause unbounded memory growth.
1081+
* The default value of 10000 provides a reasonable balance between throughput and memory usage.
1082+
* Tune based on your average entry size: memoryBudget / avgEntrySize.
1083+
*
10781084
* @return Max number of reads in progress.
10791085
*/
10801086
public int getMaxReadsInProgressLimit() {
1081-
return this.getInt(MAX_READS_IN_PROGRESS_LIMIT, 0);
1087+
return this.getInt(MAX_READS_IN_PROGRESS_LIMIT, 10000);
10821088
}
10831089

10841090
/**

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919

2020
import io.netty.channel.Channel;
2121
import io.netty.channel.ChannelFuture;
22+
import io.netty.channel.ChannelFutureListener;
2223
import io.netty.channel.ChannelPromise;
23-
import java.util.concurrent.ExecutionException;
2424
import java.util.concurrent.TimeUnit;
2525
import org.apache.bookkeeper.proto.BookieProtocol.Request;
2626
import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -74,10 +74,12 @@ protected void sendWriteReqResponse(int rc, Object response, OpStatsLogger stats
7474
protected void sendReadReqResponse(int rc, Object response, OpStatsLogger statsLogger, boolean throttle) {
7575
if (throttle) {
7676
sendResponseAndWait(rc, response, statsLogger);
77+
// onReadRequestFinish is called asynchronously in the ChannelFutureListener
78+
// inside sendResponseAndWait to maintain throttling without blocking the thread.
7779
} else {
7880
sendResponse(rc, response, statsLogger);
81+
requestProcessor.onReadRequestFinish();
7982
}
80-
requestProcessor.onReadRequestFinish();
8183
}
8284

8385
protected void sendResponse(int rc, Object response, OpStatsLogger statsLogger) {
@@ -150,27 +152,44 @@ protected void sendResponse(int rc, Object response, OpStatsLogger statsLogger)
150152
}
151153

152154
/**
153-
* Write on the channel and wait until the write is completed.
155+
* Write on the channel and notify completion via a listener.
154156
*
155-
* <p>That will make the thread to get blocked until we're able to
156-
* write everything on the TCP stack, providing auto-throttling
157-
* and avoiding using too much memory when handling read-requests.
157+
* <p>This provides auto-throttling by holding the read semaphore until the write completes,
158+
* without blocking the read thread pool thread. The read thread is freed immediately to
159+
* process other requests, while the semaphore prevents unbounded read concurrency.
158160
*/
159161
protected void sendResponseAndWait(int rc, Object response, OpStatsLogger statsLogger) {
162+
// Capture fields before the processor may be recycled after this method returns.
163+
final long capturedEnqueueNanos = this.enqueueNanos;
164+
final BookieRequestProcessor processor = this.requestProcessor;
160165
try {
161166
Channel channel = requestHandler.ctx().channel();
162167
ChannelFuture future = channel.writeAndFlush(response);
163-
if (!channel.eventLoop().inEventLoop()) {
164-
future.get();
168+
future.addListener((ChannelFutureListener) f -> {
169+
if (!f.isSuccess() && logger.isDebugEnabled()) {
170+
logger.debug("Netty channel write exception. ", f.cause());
171+
}
172+
if (BookieProtocol.EOK == rc) {
173+
statsLogger.registerSuccessfulEvent(
174+
MathUtils.elapsedNanos(capturedEnqueueNanos), TimeUnit.NANOSECONDS);
175+
} else {
176+
statsLogger.registerFailedEvent(
177+
MathUtils.elapsedNanos(capturedEnqueueNanos), TimeUnit.NANOSECONDS);
178+
}
179+
processor.onReadRequestFinish();
180+
});
181+
} catch (Exception e) {
182+
if (logger.isDebugEnabled()) {
183+
logger.debug("Netty channel write exception. ", e);
165184
}
166-
} catch (ExecutionException | InterruptedException e) {
167-
logger.debug("Netty channel write exception. ", e);
168-
return;
169-
}
170-
if (BookieProtocol.EOK == rc) {
171-
statsLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
172-
} else {
173-
statsLogger.registerFailedEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
185+
if (BookieProtocol.EOK == rc) {
186+
statsLogger.registerSuccessfulEvent(
187+
MathUtils.elapsedNanos(capturedEnqueueNanos), TimeUnit.NANOSECONDS);
188+
} else {
189+
statsLogger.registerFailedEvent(
190+
MathUtils.elapsedNanos(capturedEnqueueNanos), TimeUnit.NANOSECONDS);
191+
}
192+
processor.onReadRequestFinish();
174193
}
175194
}
176195

bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919
package org.apache.bookkeeper.proto;
2020

2121
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertFalse;
2223
import static org.junit.Assert.assertTrue;
2324
import static org.mockito.ArgumentMatchers.any;
2425
import static org.mockito.ArgumentMatchers.anyLong;
2526
import static org.mockito.Mockito.doAnswer;
2627
import static org.mockito.Mockito.mock;
28+
import static org.mockito.Mockito.never;
2729
import static org.mockito.Mockito.times;
2830
import static org.mockito.Mockito.verify;
2931
import static org.mockito.Mockito.when;
@@ -38,10 +40,12 @@
3840
import java.util.concurrent.CountDownLatch;
3941
import java.util.concurrent.ExecutorService;
4042
import java.util.concurrent.Executors;
43+
import java.util.concurrent.Semaphore;
4144
import java.util.concurrent.atomic.AtomicReference;
4245
import org.apache.bookkeeper.bookie.Bookie;
4346
import org.apache.bookkeeper.bookie.BookieException;
4447
import org.apache.bookkeeper.common.concurrent.FutureUtils;
48+
import org.apache.bookkeeper.conf.ServerConfiguration;
4549
import org.apache.bookkeeper.proto.BookieProtocol.ReadRequest;
4650
import org.apache.bookkeeper.proto.BookieProtocol.Response;
4751
import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -195,4 +199,178 @@ public void testNonFenceRequest() throws Exception {
195199
assertEquals(BookieProtocol.READENTRY, response.getOpCode());
196200
assertEquals(BookieProtocol.EOK, response.getErrorCode());
197201
}
202+
203+
/**
204+
* Test that when throttleReadResponses=true and the caller is not in the Netty event loop,
205+
* the read thread is not blocked by the write. onReadRequestFinish() should only be called
206+
* after the write future completes, preserving throttling without blocking the thread.
207+
*/
208+
@Test
209+
public void testThrottledReadNonBlockingOnSuccess() throws Exception {
210+
// Setup event loop to simulate read worker thread (not event loop thread)
211+
EventLoop eventLoop = mock(EventLoop.class);
212+
when(eventLoop.inEventLoop()).thenReturn(false);
213+
doAnswer(inv -> {
214+
((Runnable) inv.getArgument(0)).run();
215+
return null;
216+
}).when(eventLoop).execute(any(Runnable.class));
217+
when(channel.eventLoop()).thenReturn(eventLoop);
218+
219+
// Use a controllable promise so we can verify deferred behavior
220+
DefaultChannelPromise writeFuture = new DefaultChannelPromise(channel);
221+
doAnswer(inv -> writeFuture).when(channel).writeAndFlush(any(Response.class));
222+
223+
long ledgerId = System.currentTimeMillis();
224+
ReadRequest request = ReadRequest.create(
225+
BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, (short) 0, new byte[]{});
226+
ReadEntryProcessor processor = ReadEntryProcessor.create(
227+
request, requestHandler, requestProcessor, null, true /* throttle */);
228+
229+
// run() should return immediately without blocking on the write
230+
processor.run();
231+
232+
// Write should have been issued
233+
verify(channel, times(1)).writeAndFlush(any(Response.class));
234+
// But onReadRequestFinish should NOT have been called yet — write not completed
235+
verify(requestProcessor, never()).onReadRequestFinish();
236+
237+
// Complete the write
238+
writeFuture.setSuccess();
239+
240+
// Now onReadRequestFinish should have been called
241+
verify(requestProcessor, times(1)).onReadRequestFinish();
242+
}
243+
244+
/**
245+
* Test that onReadRequestFinish() is still called even when the write fails,
246+
* so the read semaphore is always released.
247+
*/
248+
@Test
249+
public void testThrottledReadNonBlockingOnWriteFailure() throws Exception {
250+
EventLoop eventLoop = mock(EventLoop.class);
251+
when(eventLoop.inEventLoop()).thenReturn(false);
252+
doAnswer(inv -> {
253+
((Runnable) inv.getArgument(0)).run();
254+
return null;
255+
}).when(eventLoop).execute(any(Runnable.class));
256+
when(channel.eventLoop()).thenReturn(eventLoop);
257+
258+
DefaultChannelPromise writeFuture = new DefaultChannelPromise(channel);
259+
doAnswer(inv -> writeFuture).when(channel).writeAndFlush(any(Response.class));
260+
261+
long ledgerId = System.currentTimeMillis();
262+
ReadRequest request = ReadRequest.create(
263+
BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, (short) 0, new byte[]{});
264+
ReadEntryProcessor processor = ReadEntryProcessor.create(
265+
request, requestHandler, requestProcessor, null, true /* throttle */);
266+
267+
processor.run();
268+
269+
verify(channel, times(1)).writeAndFlush(any(Response.class));
270+
verify(requestProcessor, never()).onReadRequestFinish();
271+
272+
// Fail the write
273+
writeFuture.setFailure(new IOException("channel write failed"));
274+
275+
// onReadRequestFinish must still be called to release the read semaphore
276+
verify(requestProcessor, times(1)).onReadRequestFinish();
277+
}
278+
279+
/**
280+
* Test that when throttleReadResponses=false, onReadRequestFinish() is called
281+
* synchronously before run() returns.
282+
*/
283+
@Test
284+
public void testNonThrottledReadCallsOnFinishSynchronously() throws Exception {
285+
// sendResponse (non-throttle path) uses channel.isActive() and two-arg writeAndFlush
286+
when(channel.isActive()).thenReturn(true);
287+
when(channel.writeAndFlush(any(), any(ChannelPromise.class))).thenReturn(mock(ChannelPromise.class));
288+
289+
long ledgerId = System.currentTimeMillis();
290+
ReadRequest request = ReadRequest.create(
291+
BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, (short) 0, new byte[]{});
292+
ReadEntryProcessor processor = ReadEntryProcessor.create(
293+
request, requestHandler, requestProcessor, null, false /* no throttle */);
294+
295+
processor.run();
296+
297+
verify(channel, times(1)).writeAndFlush(any(), any(ChannelPromise.class));
298+
// onReadRequestFinish should have been called synchronously
299+
verify(requestProcessor, times(1)).onReadRequestFinish();
300+
}
301+
302+
/**
303+
* Verify that maxReadsInProgressLimit defaults to 10000 (enabled),
304+
* ensuring non-blocking read response writes are bounded by default.
305+
*/
306+
@Test
307+
public void testDefaultMaxReadsInProgressLimitIsEnabled() {
308+
ServerConfiguration conf = new ServerConfiguration();
309+
assertEquals("maxReadsInProgressLimit should default to 10000",
310+
10000, conf.getMaxReadsInProgressLimit());
311+
}
312+
313+
/**
314+
* Test that the read semaphore is held from request creation until the write future completes,
315+
* not released when the read thread returns. This ensures that maxReadsInProgressLimit correctly
316+
* bounds the number of read responses buffered in memory, even though the read thread is
317+
* non-blocking.
318+
*/
319+
@Test
320+
public void testThrottledReadHoldsSemaphoreUntilWriteCompletes() throws Exception {
321+
// Simulate maxReadsInProgressLimit=1 with a real semaphore
322+
Semaphore readsSemaphore = new Semaphore(1);
323+
324+
doAnswer(inv -> {
325+
readsSemaphore.acquireUninterruptibly();
326+
return null;
327+
}).when(requestProcessor).onReadRequestStart(any(Channel.class));
328+
doAnswer(inv -> {
329+
readsSemaphore.release();
330+
return null;
331+
}).when(requestProcessor).onReadRequestFinish();
332+
333+
// Setup non-event-loop thread
334+
EventLoop eventLoop = mock(EventLoop.class);
335+
when(eventLoop.inEventLoop()).thenReturn(false);
336+
doAnswer(inv -> {
337+
((Runnable) inv.getArgument(0)).run();
338+
return null;
339+
}).when(eventLoop).execute(any(Runnable.class));
340+
when(channel.eventLoop()).thenReturn(eventLoop);
341+
342+
// Controllable write future
343+
DefaultChannelPromise writeFuture = new DefaultChannelPromise(channel);
344+
doAnswer(inv -> writeFuture).when(channel).writeAndFlush(any(Response.class));
345+
346+
long ledgerId = System.currentTimeMillis();
347+
ReadRequest request = ReadRequest.create(
348+
BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, (short) 0, new byte[]{});
349+
350+
// create() calls onReadRequestStart → semaphore acquired
351+
ReadEntryProcessor processor = ReadEntryProcessor.create(
352+
request, requestHandler, requestProcessor, null, true /* throttle */);
353+
354+
// Semaphore should be acquired (1 permit used)
355+
assertEquals("semaphore should have 0 permits after read started",
356+
0, readsSemaphore.availablePermits());
357+
358+
// Run the processor — thread returns immediately (non-blocking)
359+
processor.run();
360+
361+
// Semaphore should STILL be held (write not completed)
362+
assertEquals("semaphore should still have 0 permits while write is in progress",
363+
0, readsSemaphore.availablePermits());
364+
365+
// A second read would be unable to acquire the semaphore
366+
assertFalse("second read should not be able to acquire semaphore",
367+
readsSemaphore.tryAcquire());
368+
369+
// Complete the write
370+
writeFuture.setSuccess();
371+
372+
// Now semaphore should be released — a new read can proceed
373+
assertEquals("semaphore should have 1 permit after write completes",
374+
1, readsSemaphore.availablePermits());
375+
}
198376
}

0 commit comments

Comments
 (0)