From d6748f915d4801e90f001bc09d65918df85b305f Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Mon, 6 Mar 2023 19:55:41 -0800 Subject: [PATCH 01/16] Group and flush add-responses after journal sync (#3837) ### Motivation Note: this is stacked on top of #3830 & #3835 This change improves the way the AddRequests responses are send to client. The current flow is: * The journal-force-thread issues the fsync on the journal file * We iterate over all the entries that were just synced and for each of them: 1. Trigger channel.writeAndFlus() 2. This will jump on the connection IO thread (Netty will use a `write()` to `eventfd` to post the task and wake the epoll) 3. Write the object in the connection and trigger the serialization logic 4. Grab a `ByteBuf` from the pool and write ~20 bytes with the response 5. Write and flush the buffer on the channel 6. With the flush consolidator we try to group multiple buffer into a single `writev()` syscall, though each call will have a long list of buffer, making the memcpy inefficient. 7. Release all the buffers and return them to the pool All these steps are quite expensive when the bookie is receiving a lot of small requests. This PR changes the flow into: 1. journal fsync 2. go through each request and prepare the response into a per-connection `ByteBuf` which is not written on the channel as of yet 3. after preparing all the responses, we flush them at once: Trigger an event on all the connections that will write the accumulated buffers. The advantages are: 1. 1 ByteBuf allocated per connection instead of 1 per request 1. Less allocations and stress of buffer pool 2. More efficient socket write() operations 3. 1 task per connection posted on the Netty IO threads, instead of 1 per request. --- .../org/apache/bookkeeper/bookie/Bookie.java | 3 ++ .../apache/bookkeeper/bookie/BookieImpl.java | 8 +++ .../org/apache/bookkeeper/bookie/Journal.java | 15 ++++++ .../processor/RequestProcessor.java | 5 ++ .../bookkeeper/proto/BookieProtoEncoding.java | 13 ++++- .../proto/BookieRequestHandler.java | 49 ++++++++++++++++--- .../proto/BookieRequestProcessor.java | 14 +++++- .../apache/bookkeeper/proto/BookieServer.java | 4 +- .../bookkeeper/proto/WriteEntryProcessor.java | 7 +-- .../proto/TestBookieRequestProcessor.java | 13 +++-- .../proto/WriteEntryProcessorTest.java | 29 +++++------ 11 files changed, 126 insertions(+), 34 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java index 90c8acf5af4..ac9df53cd22 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java @@ -23,6 +23,7 @@ import java.util.PrimitiveIterator; import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.common.util.Watcher; +import org.apache.bookkeeper.processor.RequestProcessor; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; /** @@ -86,6 +87,8 @@ void cancelWaitForLastAddConfirmedUpdate(long ledgerId, // TODO: Should be constructed and passed in as a parameter LedgerStorage getLedgerStorage(); + void setRequestProcessor(RequestProcessor requestProcessor); + // TODO: Move this exceptions somewhere else /** * Exception is thrown when no such a ledger is found in this bookie. diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java index 0db230d9d3d..2b76488cbe9 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java @@ -69,6 +69,7 @@ import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.net.DNS; +import org.apache.bookkeeper.processor.RequestProcessor; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; @@ -1281,4 +1282,11 @@ public OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException, NoLedg } } } + + @Override + public void setRequestProcessor(RequestProcessor requestProcessor) { + for (Journal journal : journals) { + journal.setRequestProcessor(requestProcessor); + } + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index eff1673edb0..5f6b60f799b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -51,6 +51,7 @@ import org.apache.bookkeeper.common.util.MemoryLimitController; import org.apache.bookkeeper.common.util.affinity.CpuAffinity; import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.processor.RequestProcessor; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.NullStatsLogger; @@ -444,6 +445,8 @@ private class ForceWriteThread extends BookieCriticalThread { // This holds the queue entries that should be notified after a // successful force write Thread threadToNotifyOnEx; + + RequestProcessor requestProcessor; // should we group force writes private final boolean enableGroupForceWrites; private final Counter forceWriteThreadTime; @@ -499,6 +502,10 @@ public void run() { journalStats.getForceWriteGroupingCountStats() .registerSuccessfulValue(numReqInLastForceWrite); + if (requestProcessor != null) { + requestProcessor.flushPendingResponses(); + } + } catch (IOException ioe) { LOG.error("I/O exception in ForceWrite thread", ioe); running = false; @@ -1093,6 +1100,10 @@ journalFormatVersionToWrite, getBufferedChannelBuilder(), numEntriesToFlush--; entry.run(); } + + if (forceWriteThread.requestProcessor != null) { + forceWriteThread.requestProcessor.flushPendingResponses(); + } } lastFlushPosition = bc.position(); @@ -1211,6 +1222,10 @@ public BufferedChannelBuilder getBufferedChannelBuilder() { return (FileChannel fc, int capacity) -> new BufferedChannel(allocator, fc, capacity); } + public void setRequestProcessor(RequestProcessor requestProcessor) { + forceWriteThread.requestProcessor = requestProcessor; + } + /** * Shuts down the journal. */ diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java index 5a4238e64d6..9f9a0daf682 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java @@ -42,4 +42,9 @@ public interface RequestProcessor extends AutoCloseable { * channel received the given request r */ void processRequest(Object r, BookieRequestHandler channel); + + /** + * Flush any pending response staged on all the client connections. + */ + void flushPendingResponses(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java index 3e41a3f5ea7..edbffa5f431 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java @@ -334,6 +334,14 @@ public Object decode(ByteBuf buffer) throw new IllegalStateException("Received unknown response : op code = " + opCode); } } + + public static void serializeAddResponseInto(int rc, BookieProtocol.ParsedAddRequest req, ByteBuf buf) { + buf.writeInt(RESPONSE_HEADERS_SIZE); // Frame size + buf.writeInt(PacketHeader.toInt(req.getProtocolVersion(), req.getOpCode(), (short) 0)); + buf.writeInt(rc); // rc-code + buf.writeLong(req.getLedgerId()); + buf.writeLong(req.getEntryId()); + } } /** @@ -504,7 +512,10 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) if (LOG.isTraceEnabled()) { LOG.trace("Encode response {} to channel {}.", msg, ctx.channel()); } - if (msg instanceof BookkeeperProtocol.Response) { + + if (msg instanceof ByteBuf) { + ctx.write(msg, promise); + } else if (msg instanceof BookkeeperProtocol.Response) { ctx.write(repV3.encode(msg, ctx.alloc()), promise); } else if (msg instanceof BookieProtocol.Response) { ctx.write(repPreV3.encode(msg, ctx.alloc()), promise); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java index c9d65a73174..50b7969023e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java @@ -20,26 +20,31 @@ */ package org.apache.bookkeeper.proto; +import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.group.ChannelGroup; import java.nio.channels.ClosedChannelException; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.processor.RequestProcessor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Serverside handler for bookkeeper requests. */ +@Slf4j public class BookieRequestHandler extends ChannelInboundHandlerAdapter { - private static final Logger LOG = LoggerFactory.getLogger(BookieRequestHandler.class); + static final Object EVENT_FLUSH_ALL_PENDING_RESPONSES = new Object(); + private final RequestProcessor requestProcessor; private final ChannelGroup allChannels; private ChannelHandlerContext ctx; + private ByteBuf pendingSendResponses = null; + private int maxPendingResponsesSize; + BookieRequestHandler(ServerConfiguration conf, RequestProcessor processor, ChannelGroup allChannels) { this.requestProcessor = processor; this.allChannels = allChannels; @@ -51,7 +56,7 @@ public ChannelHandlerContext ctx() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { - LOG.info("Channel connected {}", ctx.channel()); + log.info("Channel connected {}", ctx.channel()); this.ctx = ctx; super.channelActive(ctx); } @@ -63,16 +68,16 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { - LOG.info("Channels disconnected: {}", ctx.channel()); + log.info("Channels disconnected: {}", ctx.channel()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (cause instanceof ClosedChannelException) { - LOG.info("Client died before request could be completed on {}", ctx.channel(), cause); + log.info("Client died before request could be completed on {}", ctx.channel(), cause); return; } - LOG.error("Unhandled exception occurred in I/O thread or handler on {}", ctx.channel(), cause); + log.error("Unhandled exception occurred in I/O thread or handler on {}", ctx.channel(), cause); ctx.close(); } @@ -84,4 +89,34 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } requestProcessor.processRequest(msg, this); } + + public synchronized void prepareSendResponseV2(int rc, BookieProtocol.ParsedAddRequest req) { + if (pendingSendResponses == null) { + pendingSendResponses = ctx.alloc().directBuffer(maxPendingResponsesSize != 0 + ? maxPendingResponsesSize : 256); + } + + BookieProtoEncoding.ResponseEnDeCoderPreV3.serializeAddResponseInto(rc, req, pendingSendResponses); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt == EVENT_FLUSH_ALL_PENDING_RESPONSES) { + synchronized (this) { + if (pendingSendResponses != null) { + maxPendingResponsesSize = Math.max(maxPendingResponsesSize, + pendingSendResponses.readableBytes()); + if (ctx.channel().isActive()) { + ctx.writeAndFlush(pendingSendResponses, ctx.voidPromise()); + } else { + pendingSendResponses.release(); + } + + pendingSendResponses = null; + } + } + } else { + super.userEventTriggered(ctx, evt); + } + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java index 9237c451ed6..d07aa9cffa0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java @@ -30,6 +30,7 @@ import com.google.protobuf.ByteString; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; +import io.netty.channel.group.ChannelGroup; import io.netty.handler.ssl.SslHandler; import io.netty.util.HashedWheelTimer; import io.netty.util.concurrent.Future; @@ -118,6 +119,8 @@ public class BookieRequestProcessor implements RequestProcessor { final Semaphore addsSemaphore; final Semaphore readsSemaphore; + final ChannelGroup allChannels; + // to temporary blacklist channels final Optional> blacklistedChannels; final Consumer onResponseTimeout; @@ -127,9 +130,11 @@ public class BookieRequestProcessor implements RequestProcessor { private final boolean throttleReadResponses; public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie, StatsLogger statsLogger, - SecurityHandlerFactory shFactory, ByteBufAllocator allocator) throws SecurityException { + SecurityHandlerFactory shFactory, ByteBufAllocator allocator, + ChannelGroup allChannels) throws SecurityException { this.serverCfg = serverCfg; this.allocator = allocator; + this.allChannels = allChannels; this.waitTimeoutOnBackpressureMillis = serverCfg.getWaitTimeoutOnResponseBackpressureMillis(); this.preserveMdcForTaskExecution = serverCfg.getPreserveMdcForTaskExecution(); this.bookie = bookie; @@ -694,6 +699,13 @@ private void processReadRequest(final BookieProtocol.ReadRequest r, final Bookie } } + @Override + public void flushPendingResponses() { + for (Channel c : allChannels) { + c.pipeline().fireUserEventTriggered(BookieRequestHandler.EVENT_FLUSH_ALL_PENDING_RESPONSES); + } + } + public long getWaitTimeoutOnBackpressureMillis() { return waitTimeoutOnBackpressureMillis; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java index 64f439b2134..caff467db36 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java @@ -102,9 +102,11 @@ public BookieServer(ServerConfiguration conf, shFactory = SecurityProviderFactoryFactory .getSecurityProviderFactory(conf.getTLSProviderFactoryClass()); + this.requestProcessor = new BookieRequestProcessor(conf, bookie, - statsLogger.scope(SERVER_SCOPE), shFactory, allocator); + statsLogger.scope(SERVER_SCOPE), shFactory, allocator, nettyServer.allChannels); this.nettyServer.setRequestProcessor(this.requestProcessor); + this.bookie.setRequestProcessor(this.requestProcessor); } /** diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java index 7e8f9fa768d..29b3a5abb70 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java @@ -122,9 +122,10 @@ public void writeComplete(int rc, long ledgerId, long entryId, requestProcessor.getRequestStats().getAddEntryStats() .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); } - sendWriteReqResponse(rc, - ResponseBuilder.buildAddResponse(request), - requestProcessor.getRequestStats().getAddRequestStats()); + + requestHandler.prepareSendResponseV2(rc, request); + requestProcessor.onAddRequestFinish(); + request.recycle(); recycle(); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java index d5ee8f52750..46304023433 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java @@ -33,6 +33,8 @@ import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest; @@ -53,12 +55,15 @@ public class TestBookieRequestProcessor { final BookieRequestProcessor requestProcessor = mock(BookieRequestProcessor.class); + private final ChannelGroup channelGroup = new DefaultChannelGroup(null); + @Test public void testConstructLongPollThreads() throws Exception { // long poll threads == read threads ServerConfiguration conf = new ServerConfiguration(); try (BookieRequestProcessor processor = new BookieRequestProcessor( - conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT)) { + conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT, + channelGroup)) { assertSame(processor.getReadThreadPool(), processor.getLongPollThreadPool()); } @@ -66,7 +71,8 @@ conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocat conf = new ServerConfiguration(); conf.setNumReadWorkerThreads(0); try (BookieRequestProcessor processor = new BookieRequestProcessor( - conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT)) { + conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT, + channelGroup)) { assertNull(processor.getReadThreadPool()); assertNotNull(processor.getLongPollThreadPool()); } @@ -76,7 +82,8 @@ conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocat conf.setNumReadWorkerThreads(2); conf.setNumLongPollWorkerThreads(2); try (BookieRequestProcessor processor = new BookieRequestProcessor( - conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT)) { + conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT, + channelGroup)) { assertNotNull(processor.getReadThreadPool()); assertNotNull(processor.getLongPollThreadPool()); assertNotSame(processor.getReadThreadPool(), processor.getLongPollThreadPool()); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java index 8fc3a89f004..a02cde4ab99 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.doAnswer; @@ -181,28 +182,25 @@ public void testHighPriorityWritesOnReadOnlyBookieWhenHighPriorityWritesAllowed( return null; }).when(bookie).addEntry(any(ByteBuf.class), eq(false), same(processor), same(requestHandler), eq(new byte[0])); - AtomicReference writtenObject = new AtomicReference<>(); + AtomicReference writtenObject = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); doAnswer(invocationOnMock -> { writtenObject.set(invocationOnMock.getArgument(0)); latch.countDown(); return null; - }).when(channel).writeAndFlush(any(), any()); + }).when(requestHandler).prepareSendResponseV2(anyInt(), any()); processor.run(); verify(bookie, times(1)) .addEntry(any(ByteBuf.class), eq(false), same(processor), same(requestHandler), eq(new byte[0])); - verify(channel, times(1)).writeAndFlush(any(), any()); + verify(requestHandler, times(1)).prepareSendResponseV2(anyInt(), any()); +// verify(channel, times(1)).writeAndFlush(any(), any()); latch.await(); - assertTrue(writtenObject.get() instanceof Response); - Response response = (Response) writtenObject.get(); - assertEquals(BookieProtocol.EOK, response.getErrorCode()); - - response.release(); - response.recycle(); + assertTrue(writtenObject.get() instanceof Integer); + assertEquals(BookieProtocol.EOK, (int) writtenObject.get()); } @Test @@ -216,28 +214,23 @@ public void testNormalWritesOnWritableBookie() throws Exception { return null; }).when(bookie).addEntry(any(ByteBuf.class), eq(false), same(processor), same(requestHandler), eq(new byte[0])); - AtomicReference writtenObject = new AtomicReference<>(); + AtomicReference writtenObject = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); doAnswer(invocationOnMock -> { writtenObject.set(invocationOnMock.getArgument(0)); latch.countDown(); return null; - }).when(channel).writeAndFlush(any(), any()); + }).when(requestHandler).prepareSendResponseV2(anyInt(), any()); processor.run(); verify(bookie, times(1)) .addEntry(any(ByteBuf.class), eq(false), same(processor), same(requestHandler), eq(new byte[0])); - verify(channel, times(1)).writeAndFlush(any(), any()); + verify(requestHandler, times(1)).prepareSendResponseV2(anyInt(), any()); latch.await(); - assertTrue(writtenObject.get() instanceof Response); - Response response = (Response) writtenObject.get(); - assertEquals(BookieProtocol.EOK, response.getErrorCode()); - - response.release(); - response.recycle(); + assertEquals(BookieProtocol.EOK, (int) writtenObject.get()); } @Test From beaf230b5d30bbf7a76f728b18ae919fc116cddf Mon Sep 17 00:00:00 2001 From: chenhang Date: Tue, 7 Mar 2023 10:41:16 +0800 Subject: [PATCH 02/16] tmp for rebase master --- .../GrowableArrayBlockingQueue.java | 441 ++++++++++++++++++ .../org/apache/bookkeeper/bookie/Bookie.java | 4 + .../apache/bookkeeper/bookie/BookieImpl.java | 62 +++ .../org/apache/bookkeeper/bookie/Journal.java | 18 + .../processor/RequestProcessor.java | 3 + .../bookkeeper/proto/BookieProtocol.java | 8 +- .../proto/BookieRequestHandler.java | 42 +- .../proto/BookieRequestProcessor.java | 28 +- .../bookkeeper/proto/PacketProcessorBase.java | 10 + .../bookkeeper/proto/WriteEntryProcessor.java | 56 ++- .../proto/WriteEntryProcessorV3.java | 2 +- 11 files changed, 664 insertions(+), 10 deletions(-) create mode 100644 bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/GrowableArrayBlockingQueue.java diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/GrowableArrayBlockingQueue.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/GrowableArrayBlockingQueue.java new file mode 100644 index 00000000000..35705fae9ae --- /dev/null +++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/GrowableArrayBlockingQueue.java @@ -0,0 +1,441 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.common.collections; + +import java.util.AbstractQueue; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.StampedLock; +import java.util.function.Consumer; + +/** + * This implements a {@link BlockingQueue} backed by an array with no fixed capacity. + * + *

When the capacity is reached, data will be moved to a bigger array. + */ +public class GrowableArrayBlockingQueue extends AbstractQueue implements BlockingQueue { + + private final ReentrantLock headLock = new ReentrantLock(); + private final PaddedInt headIndex = new PaddedInt(); + private final PaddedInt tailIndex = new PaddedInt(); + private final StampedLock tailLock = new StampedLock(); + private final Condition isNotEmpty = headLock.newCondition(); + + private T[] data; + + @SuppressWarnings("rawtypes") + private static final AtomicIntegerFieldUpdater SIZE_UPDATER = AtomicIntegerFieldUpdater + .newUpdater(GrowableArrayBlockingQueue.class, "size"); + private volatile int size = 0; + + public GrowableArrayBlockingQueue() { + this(64); + } + + @SuppressWarnings("unchecked") + public GrowableArrayBlockingQueue(int initialCapacity) { + headIndex.value = 0; + tailIndex.value = 0; + + int capacity = io.netty.util.internal.MathUtil.findNextPositivePowerOfTwo(initialCapacity); + data = (T[]) new Object[capacity]; + } + + @Override + public T remove() { + T item = poll(); + if (item == null) { + throw new NoSuchElementException(); + } + + return item; + } + + @Override + public T poll() { + headLock.lock(); + try { + if (SIZE_UPDATER.get(this) > 0) { + T item = data[headIndex.value]; + data[headIndex.value] = null; + headIndex.value = (headIndex.value + 1) & (data.length - 1); + SIZE_UPDATER.decrementAndGet(this); + return item; + } else { + return null; + } + } finally { + headLock.unlock(); + } + } + + @Override + public T element() { + T item = peek(); + if (item == null) { + throw new NoSuchElementException(); + } + + return item; + } + + @Override + public T peek() { + headLock.lock(); + try { + if (SIZE_UPDATER.get(this) > 0) { + return data[headIndex.value]; + } else { + return null; + } + } finally { + headLock.unlock(); + } + } + + @Override + public boolean offer(T e) { + // Queue is unbounded and it will never reject new items + put(e); + return true; + } + + @Override + public void put(T e) { + long stamp = tailLock.writeLock(); + + boolean wasEmpty = false; + + try { + if (SIZE_UPDATER.get(this) == data.length) { + expandArray(); + } + + data[tailIndex.value] = e; + tailIndex.value = (tailIndex.value + 1) & (data.length - 1); + if (SIZE_UPDATER.getAndIncrement(this) == 0) { + wasEmpty = true; + } + } finally { + tailLock.unlockWrite(stamp); + } + + if (wasEmpty) { + headLock.lock(); + try { + isNotEmpty.signal(); + } finally { + headLock.unlock(); + } + } + } + + @Override + public boolean add(T e) { + put(e); + return true; + } + + @Override + public boolean offer(T e, long timeout, TimeUnit unit) { + // Queue is unbounded and it will never reject new items + put(e); + return true; + } + + @Override + public T take() throws InterruptedException { + headLock.lockInterruptibly(); + + try { + while (SIZE_UPDATER.get(this) == 0) { + isNotEmpty.await(); + } + + T item = data[headIndex.value]; + data[headIndex.value] = null; + headIndex.value = (headIndex.value + 1) & (data.length - 1); + if (SIZE_UPDATER.decrementAndGet(this) > 0) { + // There are still entries to consume + isNotEmpty.signal(); + } + return item; + } finally { + headLock.unlock(); + } + } + + @Override + public T poll(long timeout, TimeUnit unit) throws InterruptedException { + headLock.lockInterruptibly(); + + try { + long timeoutNanos = unit.toNanos(timeout); + while (SIZE_UPDATER.get(this) == 0) { + if (timeoutNanos <= 0) { + return null; + } + + timeoutNanos = isNotEmpty.awaitNanos(timeoutNanos); + } + + T item = data[headIndex.value]; + data[headIndex.value] = null; + headIndex.value = (headIndex.value + 1) & (data.length - 1); + if (SIZE_UPDATER.decrementAndGet(this) > 0) { + // There are still entries to consume + isNotEmpty.signal(); + } + return item; + } finally { + headLock.unlock(); + } + } + + @Override + public int remainingCapacity() { + return Integer.MAX_VALUE; + } + + @Override + public int drainTo(Collection c) { + return drainTo(c, Integer.MAX_VALUE); + } + + @Override + public int drainTo(Collection c, int maxElements) { + headLock.lock(); + + try { + int drainedItems = 0; + int size = SIZE_UPDATER.get(this); + + while (size > 0 && drainedItems < maxElements) { + T item = data[headIndex.value]; + data[headIndex.value] = null; + c.add(item); + + headIndex.value = (headIndex.value + 1) & (data.length - 1); + --size; + ++drainedItems; + } + + if (SIZE_UPDATER.addAndGet(this, -drainedItems) > 0) { + // There are still entries to consume + isNotEmpty.signal(); + } + + return drainedItems; + } finally { + headLock.unlock(); + } + } + + @Override + public void clear() { + headLock.lock(); + + try { + int size = SIZE_UPDATER.get(this); + + for (int i = 0; i < size; i++) { + data[headIndex.value] = null; + headIndex.value = (headIndex.value + 1) & (data.length - 1); + } + + if (SIZE_UPDATER.addAndGet(this, -size) > 0) { + // There are still entries to consume + isNotEmpty.signal(); + } + } finally { + headLock.unlock(); + } + } + + @Override + public boolean remove(Object o) { + long stamp = tailLock.writeLock(); + headLock.lock(); + + try { + int index = this.headIndex.value; + int size = this.size; + + for (int i = 0; i < size; i++) { + T item = data[index]; + + if (Objects.equals(item, o)) { + remove(index); + return true; + } + + index = (index + 1) & (data.length - 1); + } + } finally { + headLock.unlock(); + tailLock.unlockWrite(stamp); + } + + return false; + } + + private void remove(int index) { + int tailIndex = this.tailIndex.value; + + if (index < tailIndex) { + System.arraycopy(data, index + 1, data, index, tailIndex - index - 1); + this.tailIndex.value--; + } else { + System.arraycopy(data, index + 1, data, index, data.length - index - 1); + data[data.length - 1] = data[0]; + if (tailIndex > 0) { + System.arraycopy(data, 1, data, 0, tailIndex); + this.tailIndex.value--; + } else { + this.tailIndex.value = data.length - 1; + } + } + + if (tailIndex > 0) { + data[tailIndex - 1] = null; + } else { + data[data.length - 1] = null; + } + + SIZE_UPDATER.decrementAndGet(this); + } + + @Override + public int size() { + return SIZE_UPDATER.get(this); + } + + @Override + public Iterator iterator() { + throw new UnsupportedOperationException(); + } + + public List toList() { + List list = new ArrayList<>(size()); + forEach(list::add); + return list; + } + + @Override + public void forEach(Consumer action) { + long stamp = tailLock.writeLock(); + headLock.lock(); + + try { + int headIndex = this.headIndex.value; + int size = this.size; + + for (int i = 0; i < size; i++) { + T item = data[headIndex]; + + action.accept(item); + + headIndex = (headIndex + 1) & (data.length - 1); + } + + } finally { + headLock.unlock(); + tailLock.unlockWrite(stamp); + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + + long stamp = tailLock.writeLock(); + headLock.lock(); + + try { + int headIndex = this.headIndex.value; + int size = SIZE_UPDATER.get(this); + + sb.append('['); + + for (int i = 0; i < size; i++) { + T item = data[headIndex]; + if (i > 0) { + sb.append(", "); + } + + sb.append(item); + + headIndex = (headIndex + 1) & (data.length - 1); + } + + sb.append(']'); + } finally { + headLock.unlock(); + tailLock.unlockWrite(stamp); + } + return sb.toString(); + } + + @SuppressWarnings("unchecked") + private void expandArray() { + // We already hold the tailLock + headLock.lock(); + + try { + int size = SIZE_UPDATER.get(this); + int newCapacity = data.length * 2; + T[] newData = (T[]) new Object[newCapacity]; + + int oldHeadIndex = headIndex.value; + int newTailIndex = 0; + + for (int i = 0; i < size; i++) { + newData[newTailIndex++] = data[oldHeadIndex]; + oldHeadIndex = (oldHeadIndex + 1) & (data.length - 1); + } + + data = newData; + headIndex.value = 0; + tailIndex.value = size; + } finally { + headLock.unlock(); + } + } + + static final class PaddedInt { + private int value; + + // Padding to avoid false sharing + public volatile int pi1 = 1; + public volatile long p1 = 1L, p2 = 2L, p3 = 3L, p4 = 4L, p5 = 5L, p6 = 6L; + + public long exposeToAvoidOptimization() { + return pi1 + p1 + p2 + p3 + p4 + p5 + p6; + } + } +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java index ac9df53cd22..449d35cc9b5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java @@ -20,10 +20,12 @@ import io.netty.buffer.ByteBuf; import java.io.IOException; +import java.util.List; import java.util.PrimitiveIterator; import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.common.util.Watcher; import org.apache.bookkeeper.processor.RequestProcessor; +import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; /** @@ -44,6 +46,8 @@ public interface Bookie { // TODO: replace ackBeforeSync with flags void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException, InterruptedException; + void addEntry(List requests, boolean ackBeforeSync, WriteCallback cb, Object ctx) + throws IOException, BookieException, InterruptedException; void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException, InterruptedException; void forceLedger(long ledgerId, WriteCallback cb, Object ctx); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java index 2b76488cbe9..daa97ab261b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java @@ -70,6 +70,7 @@ import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.net.DNS; import org.apache.bookkeeper.processor.RequestProcessor; +import org.apache.bookkeeper.proto.BookieProtocol.ParsedAddRequest; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; @@ -1087,6 +1088,67 @@ public void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Obj } } + public void addEntry(List requests, boolean ackBeforeSync, + WriteCallback cb, Object ctx) throws IOException, BookieException, InterruptedException { + long requestNans = MathUtils.nowInNano(); + boolean success = false; + List failedRequests = new ArrayList<>(); + requests.forEach(request -> { + try { + LedgerDescriptor handle = getLedgerForEntry(request.getData(), request.getMasterKey()); + synchronized (handle) { + if (handle.isFenced()) { + throw BookieException.create(BookieException.Code.LedgerFencedException); + } + + addEntryInternalWithoutJournal(handle, request.getData(), ackBeforeSync, cb, ctx, request.getMasterKey()); + } + } catch (BookieException.LedgerFencedException e) { + + } catch (IOException e) { + + } + }); + + List entries = requests.stream().filter(failedRequests::contains).map(ParsedAddRequest::getData).collect(Collectors.toList()); + getJournal(requests.get(0).getLedgerId()).logAddEntry(entries, ackBeforeSync, cb, ctx); + + } + + private void addEntryInternalWithoutJournal(LedgerDescriptor handle, ByteBuf entry, + boolean ackBeforeSync, WriteCallback cb, Object ctx, byte[] masterKey) + throws IOException, BookieException, InterruptedException { + long ledgerId = handle.getLedgerId(); + long entryId = handle.addEntry(entry); + + bookieStats.getWriteBytes().addCount(entry.readableBytes()); + + // journal `addEntry` should happen after the entry is added to ledger storage. + // otherwise the journal entry can potentially be rolled before the ledger is created in ledger storage. + if (masterKeyCache.get(ledgerId) == null) { + // Force the load into masterKey cache + byte[] oldValue = masterKeyCache.putIfAbsent(ledgerId, masterKey); + if (oldValue == null) { + ByteBuf masterKeyEntry = createMasterKeyEntry(ledgerId, masterKey); + try { + getJournal(ledgerId).logAddEntry( + masterKeyEntry, false /* ackBeforeSync */, new NopWriteCallback(), null); + } finally { + ReferenceCountUtil.safeRelease(masterKeyEntry); + } + } + } + + if (!writeDataToJournal) { + cb.writeComplete(0, ledgerId, entryId, null, ctx); + return; + } + + if (LOG.isTraceEnabled()) { + LOG.trace("Adding {}@{}", entryId, ledgerId); + } + } + /** * Fences a ledger. From this point on, clients will be unable to * write to this ledger. Only recoveryAddEntry will be diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index 5f6b60f799b..c68c546e484 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -44,6 +44,8 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; import org.apache.bookkeeper.bookie.stats.JournalStats; import org.apache.bookkeeper.common.collections.BlockingMpscQueue; @@ -885,6 +887,22 @@ public void logAddEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, logAddEntry(ledgerId, entryId, entry, ackBeforeSync, cb, ctx); } + public void logAddEntry(List entries, boolean ackBeforeSync, WriteCallback cb, Object ctx) + throws InterruptedException { + AtomicLong reserveMemory = new AtomicLong(); + List queueEntries = entries.stream().map(entry -> { + long ledgerId = entry.getLong(entry.readerIndex() + 0); + long entryId = entry.getLong(entry.readerIndex() + 8); + entry.retain(); + reserveMemory.addAndGet(entry.readableBytes()); + return QueueEntry.create(entry, ackBeforeSync, ledgerId, entryId, cb, ctx, MathUtils.nowInNano(), journalStats.getJournalAddEntryStats(), callbackTime); + }).collect(Collectors.toList()); + + memoryLimitController.releaseMemory(reserveMemory.get()); + journalStats.getJournalQueueSize().addCount(entries.size()); + queue.putAll(); + } + @VisibleForTesting public void logAddEntry(long ledgerId, long entryId, ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Object ctx) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java index 9f9a0daf682..d6888c477dd 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java @@ -20,6 +20,8 @@ */ package org.apache.bookkeeper.processor; +import java.util.List; +import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.proto.BookieRequestHandler; /** @@ -47,4 +49,5 @@ public interface RequestProcessor extends AutoCloseable { * Flush any pending response staged on all the client connections. */ void flushPendingResponses(); + void processAddRequest(List r, BookieRequestHandler channel); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java index 89654449aad..5f4f93fa735 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java @@ -219,7 +219,7 @@ byte getOpCode() { return opCode; } - long getLedgerId() { + public long getLedgerId() { return ledgerId; } @@ -235,7 +235,7 @@ boolean hasMasterKey() { return masterKey != null; } - byte[] getMasterKey() { + public byte[] getMasterKey() { assert hasMasterKey(); return masterKey; } @@ -323,7 +323,7 @@ static ParsedAddRequest create(byte protocolVersion, long ledgerId, long entryId return add; } - ByteBuf getData() { + public ByteBuf getData() { // We need to have different ByteBufList instances for each bookie write return data; } @@ -332,7 +332,7 @@ boolean isRecoveryAdd() { return (flags & FLAG_RECOVERY_ADD) == FLAG_RECOVERY_ADD; } - void release() { + public void release() { ReferenceCountUtil.release(data); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java index 50b7969023e..dca802c6094 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java @@ -20,6 +20,8 @@ */ package org.apache.bookkeeper.proto; +import static org.apache.bookkeeper.proto.BookieProtocol.ADDENTRY; + import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -28,6 +30,10 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.processor.RequestProcessor; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import org.apache.bookkeeper.common.collections.GrowableArrayBlockingQueue; /** * Serverside handler for bookkeeper requests. @@ -41,6 +47,7 @@ public class BookieRequestHandler extends ChannelInboundHandlerAdapter { private final ChannelGroup allChannels; private ChannelHandlerContext ctx; + private BlockingQueue msgs; private ByteBuf pendingSendResponses = null; private int maxPendingResponsesSize; @@ -48,6 +55,7 @@ public class BookieRequestHandler extends ChannelInboundHandlerAdapter { BookieRequestHandler(ServerConfiguration conf, RequestProcessor processor, ChannelGroup allChannels) { this.requestProcessor = processor; this.allChannels = allChannels; + this.msgs = new GrowableArrayBlockingQueue<>(); } public ChannelHandlerContext ctx() { @@ -87,7 +95,39 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception ctx.fireChannelRead(msg); return; } - requestProcessor.processRequest(msg, this); + + if (msg instanceof BookieProtocol.ParsedAddRequest + && ADDENTRY == ((BookieProtocol.ParsedAddRequest) msg).getOpCode() + && !((BookieProtocol.ParsedAddRequest) msg).isHighPriority() + && isVersionCompatible((BookieProtocol.ParsedAddRequest) msg) + && !((BookieProtocol.ParsedAddRequest) msg).isRecoveryAdd()) { + msgs.put((BookieProtocol.ParsedAddRequest) msg); + } else { + requestProcessor.processRequest(msg, this); + } + } + + private boolean isVersionCompatible(BookieProtocol.ParsedAddRequest r) { + byte version = r.getProtocolVersion(); + if (version < BookieProtocol.LOWEST_COMPAT_PROTOCOL_VERSION + || version > BookieProtocol.CURRENT_PROTOCOL_VERSION) { + log.error("Invalid protocol version, expected something between " + + BookieProtocol.LOWEST_COMPAT_PROTOCOL_VERSION + + " & " + BookieProtocol.CURRENT_PROTOCOL_VERSION + + ". got " + r.getProtocolVersion()); + return false; + } else { + return true; + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + if (!msgs.isEmpty()) { + List c = new ArrayList<>(); + msgs.drainTo(c); + requestProcessor.processAddRequest(c, this); + } } public synchronized void prepareSendResponseV2(int rc, BookieProtocol.ParsedAddRequest req) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java index d07aa9cffa0..3ffb5d201b0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java @@ -35,6 +35,7 @@ import io.netty.util.HashedWheelTimer; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; +import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; @@ -205,14 +206,14 @@ public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie, Stat readsSemaphore = maxReads > 0 ? new Semaphore(maxReads, true) : null; } - protected void onAddRequestStart(Channel channel) { + protected void onAddRequestStart(Channel channel, int permits) { if (addsSemaphore != null) { - if (!addsSemaphore.tryAcquire()) { + if (!addsSemaphore.tryAcquire(permits)) { final long throttlingStartTimeNanos = MathUtils.nowInNano(); channel.config().setAutoRead(false); LOG.info("Too many add requests in progress, disabling autoread on channel {}", channel); requestStats.blockAddRequest(); - addsSemaphore.acquireUninterruptibly(); + addsSemaphore.acquireUninterruptibly(permits); channel.config().setAutoRead(true); final long delayNanos = MathUtils.elapsedNanos(throttlingStartTimeNanos); LOG.info("Re-enabled autoread on channel {} after AddRequest delay of {} nanos", channel, delayNanos); @@ -729,4 +730,25 @@ public boolean isBlacklisted(Channel channel) { public void handleNonWritableChannel(Channel channel) { onResponseTimeout.accept(channel); } + + @Override + public void processAddRequest(List msgs, BookieRequestHandler requestHandler) { + WriteEntryProcessor write = WriteEntryProcessor.create(msgs, requestHandler, this); + if (writeThreadPool == null) { + write.run(); + } else { + try { + writeThreadPool.execute(write); + } catch (RejectedExecutionException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to process request to add entry. Too many pending requests ", e); + } + getRequestStats().getAddEntryRejectedCounter().addCount(msgs.size()); + + write.sendWriteReqResponse(BookieProtocol.ETOOMANYREQUESTS, + ResponseBuilder.buildErrorResponse(BookieProtocol.ETOOMANYREQUESTS, msgs), + requestStats.getAddRequestStats()); + } + } + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java index c9798156c25..c3c478dc057 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java @@ -20,6 +20,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPromise; +import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.proto.BookieProtocol.Request; @@ -38,6 +39,7 @@ abstract class PacketProcessorBase implements Runnable { BookieRequestHandler requestHandler; BookieRequestProcessor requestProcessor; long enqueueNanos; + List requests; protected void init(T request, BookieRequestHandler requestHandler, BookieRequestProcessor requestProcessor) { this.request = request; @@ -46,6 +48,14 @@ protected void init(T request, BookieRequestHandler requestHandler, BookieReques this.enqueueNanos = MathUtils.nowInNano(); } + protected void init(List requests, BookieRequestHandler requestHandler, + BookieRequestProcessor requestProcessor) { + this.requests = requests; + this.requestHandler = requestHandler; + this.requestProcessor = requestProcessor; + this.enqueueNanos = MathUtils.nowInNano(); + } + protected void reset() { request = null; requestHandler = null; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java index 29b3a5abb70..29cfb649252 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java @@ -21,7 +21,9 @@ import io.netty.buffer.ByteBuf; import io.netty.util.Recycler; import java.io.IOException; +import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.bookkeeper.bookie.BookieException; import org.apache.bookkeeper.bookie.BookieException.OperationRejectedException; import org.apache.bookkeeper.net.BookieId; @@ -50,7 +52,15 @@ public static WriteEntryProcessor create(ParsedAddRequest request, BookieRequest BookieRequestProcessor requestProcessor) { WriteEntryProcessor wep = RECYCLER.get(); wep.init(request, requestHandler, requestProcessor); - requestProcessor.onAddRequestStart(requestHandler.ctx().channel()); + requestProcessor.onAddRequestStart(requestHandler.ctx().channel(), 1); + return wep; + } + + public static WriteEntryProcessor create(List requests, BookieRequestHandler requestHandler, + BookieRequestProcessor requestProcessor) { + WriteEntryProcessor wep = RECYCLER.get(); + wep.init(requests, requestHandler, requestProcessor); + requestProcessor.onAddRequestStart(requestHandler.ctx().channel(), requests.size()); return wep; } @@ -154,4 +164,48 @@ protected WriteEntryProcessor newObject(Recycler.Handle han return new WriteEntryProcessor(handle); } }; + + @Override + public void run() { + if (request != null) { + super.run(); + } else if (requests != null && !requests.isEmpty()){ + if (requestProcessor.getBookie().isReadOnly()) { + LOG.warn("BookieServer is running in readOnly mode, so rejecting the request from the client!"); + sendWriteReqResponse(BookieProtocol.EREADONLY, + ResponseBuilder.buildErrorResponse(BookieProtocol.EREADONLY, requests), + requestProcessor.getRequestStats().getAddRequestStats()); + requests.forEach(ParsedAddRequest::release); + requests.forEach(ParsedAddRequest::recycle); + return; + } + + startTimeNanos = MathUtils.nowInNano(); + int rc = BookieProtocol.EOK; + try { + requestProcessor.getBookie().addEntry(requests, false, this, requestHandler); + } catch (OperationRejectedException e) { + requestProcessor.getRequestStats().getAddEntryRejectedCounter().addCount(requests.size()); + if (LOG.isDebugEnabled()) { + LOG.debug("Operation rejected while writing ", e); + } + rc = BookieProtocol.ETOOMANYREQUESTS; + } catch (IOException e) { + LOG.error("Error writing request ", e); + rc = BookieProtocol.EIO; + } catch ( Throwable t) { + LOG.error("Unexpected exception while writing requests {}", t.getMessage(), t); + rc = BookieProtocol.EBADREQ; + } + + if (rc != BookieProtocol.EOK) { + requestProcessor.getRequestStats().getAddRequestStats() + .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); + sendWriteReqResponse(rc, + ResponseBuilder.buildErrorResponse(rc, requests), + requestProcessor.getRequestStats().getAddRequestStats()); + requests.forEach(ParsedAddRequest::recycle); + } + } + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java index 36aff7ad924..f02cec90c43 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java @@ -45,7 +45,7 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 { public WriteEntryProcessorV3(Request request, BookieRequestHandler requestHandler, BookieRequestProcessor requestProcessor) { super(request, requestHandler, requestProcessor); - requestProcessor.onAddRequestStart(requestHandler.ctx().channel()); + requestProcessor.onAddRequestStart(requestHandler.ctx().channel(), 1); } // Returns null if there is no exception thrown From 9bc115cbaf23b954f4d4a879dc3ba82a1c39f44e Mon Sep 17 00:00:00 2001 From: chenhang Date: Tue, 7 Mar 2023 11:47:10 +0800 Subject: [PATCH 03/16] batch add request for test --- .../org/apache/bookkeeper/bookie/Bookie.java | 5 +- .../apache/bookkeeper/bookie/BookieImpl.java | 47 +++++++++++++++---- .../org/apache/bookkeeper/bookie/Journal.java | 10 ++-- .../bookkeeper/proto/BookieProtocol.java | 2 +- .../proto/BookieRequestProcessor.java | 9 ++-- .../bookkeeper/proto/WriteEntryProcessor.java | 43 ++++++++--------- 6 files changed, 73 insertions(+), 43 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java index 449d35cc9b5..a9bcc267427 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java @@ -27,6 +27,7 @@ import org.apache.bookkeeper.processor.RequestProcessor; import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; +import org.apache.bookkeeper.proto.RequestStats; /** * Interface for the bookie. @@ -46,8 +47,8 @@ public interface Bookie { // TODO: replace ackBeforeSync with flags void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException, InterruptedException; - void addEntry(List requests, boolean ackBeforeSync, WriteCallback cb, Object ctx) - throws IOException, BookieException, InterruptedException; + void addEntry(List requests, boolean ackBeforeSync, WriteCallback cb, + Object ctx, RequestStats requestStats) throws InterruptedException; void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException, InterruptedException; void forceLedger(long ledgerId, WriteCallback cb, Object ctx); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java index daa97ab261b..d30676e2b34 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java @@ -45,6 +45,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.ListIterator; import java.util.Map; import java.util.Optional; import java.util.PrimitiveIterator.OfLong; @@ -72,6 +73,7 @@ import org.apache.bookkeeper.processor.RequestProcessor; import org.apache.bookkeeper.proto.BookieProtocol.ParsedAddRequest; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; +import org.apache.bookkeeper.proto.RequestStats; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.stats.ThreadRegistry; @@ -1089,11 +1091,13 @@ public void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Obj } public void addEntry(List requests, boolean ackBeforeSync, - WriteCallback cb, Object ctx) throws IOException, BookieException, InterruptedException { + WriteCallback cb, Object ctx, RequestStats requestStats) + throws InterruptedException { long requestNans = MathUtils.nowInNano(); - boolean success = false; - List failedRequests = new ArrayList<>(); - requests.forEach(request -> { + ListIterator iter = requests.listIterator(); + while (iter.hasNext()) { + ParsedAddRequest request = iter.next(); + int rc = BookieProtocol.EOK; try { LedgerDescriptor handle = getLedgerForEntry(request.getData(), request.getMasterKey()); synchronized (handle) { @@ -1103,15 +1107,40 @@ public void addEntry(List requests, boolean ackBeforeSync, addEntryInternalWithoutJournal(handle, request.getData(), ackBeforeSync, cb, ctx, request.getMasterKey()); } - } catch (BookieException.LedgerFencedException e) { - + } catch (BookieException.OperationRejectedException e) { + requestStats.getAddEntryRejectedCounter().inc(); + if (LOG.isDebugEnabled()) { + LOG.debug("Operation rejected while writing {} ", request, e); + } + rc = BookieProtocol.ETOOMANYREQUESTS; } catch (IOException e) { + LOG.error("Error writing {}", request, e); + rc = BookieProtocol.EIO; + } catch (BookieException.LedgerFencedException lfe) { + LOG.error("Attempt to write to fenced ledger ", lfe); + } catch (BookieException e) { + LOG.error("Unauthorized access to ledger {}", request.getLedgerId(), e); + rc = BookieProtocol.EUA; + } catch (Throwable t) { + LOG.error("Unexpected exception while writing {}@{} : {} ", + request.getLedgerId(), request.getEntryId(), t.getMessage(), t); + rc = BookieProtocol.EBADREQ; + } + if (rc != BookieProtocol.EOK) { + requestStats.getAddEntryStats() + .registerFailedEvent(MathUtils.elapsedNanos(requestNans), TimeUnit.NANOSECONDS); + cb.writeComplete(rc, request.getLedgerId(), request.getEntryId(), null, ctx); + iter.remove(); + request.recycle(); } - }); + } - List entries = requests.stream().filter(failedRequests::contains).map(ParsedAddRequest::getData).collect(Collectors.toList()); - getJournal(requests.get(0).getLedgerId()).logAddEntry(entries, ackBeforeSync, cb, ctx); + if (writeDataToJournal) { + List entries = requests.stream() + .map(ParsedAddRequest::getData).collect(Collectors.toList()); + getJournal(requests.get(0).getLedgerId()).logAddEntry(entries, ackBeforeSync, cb, ctx); + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index c68c546e484..6291fc4350d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -48,6 +48,8 @@ import java.util.stream.Collectors; import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; import org.apache.bookkeeper.bookie.stats.JournalStats; +import org.apache.bookkeeper.common.collections.BatchedArrayBlockingQueue; +import org.apache.bookkeeper.common.collections.BatchedBlockingQueue; import org.apache.bookkeeper.common.collections.BlockingMpscQueue; import org.apache.bookkeeper.common.collections.RecyclableArrayList; import org.apache.bookkeeper.common.util.MemoryLimitController; @@ -638,7 +640,7 @@ static void writePaddingBytes(JournalChannel jc, ByteBuf paddingBuffer, int jour private static final String journalThreadName = "BookieJournal"; // journal entry queue to commit - final BlockingQueue queue; + final BatchedBlockingQueue queue; final BlockingQueue forceWriteRequests; volatile boolean running = true; @@ -669,7 +671,7 @@ public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf queue = new BlockingMpscQueue<>(conf.getJournalQueueSize()); forceWriteRequests = new BlockingMpscQueue<>(conf.getJournalQueueSize()); } else { - queue = new ArrayBlockingQueue<>(conf.getJournalQueueSize()); + queue = new BatchedArrayBlockingQueue<>(conf.getJournalQueueSize()); forceWriteRequests = new ArrayBlockingQueue<>(conf.getJournalQueueSize()); } @@ -891,7 +893,7 @@ public void logAddEntry(List entries, boolean ackBeforeSync, WriteCallb throws InterruptedException { AtomicLong reserveMemory = new AtomicLong(); List queueEntries = entries.stream().map(entry -> { - long ledgerId = entry.getLong(entry.readerIndex() + 0); + long ledgerId = entry.getLong(entry.readerIndex()); long entryId = entry.getLong(entry.readerIndex() + 8); entry.retain(); reserveMemory.addAndGet(entry.readableBytes()); @@ -900,7 +902,7 @@ public void logAddEntry(List entries, boolean ackBeforeSync, WriteCallb memoryLimitController.releaseMemory(reserveMemory.get()); journalStats.getJournalQueueSize().addCount(entries.size()); - queue.putAll(); + queue.putAll(queueEntries.toArray(new QueueEntry[0]), 0, queueEntries.size()); } @VisibleForTesting diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java index 5f4f93fa735..e53ef9e5b15 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java @@ -223,7 +223,7 @@ public long getLedgerId() { return ledgerId; } - long getEntryId() { + public long getEntryId() { return entryId; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java index 3ffb5d201b0..d024e39e908 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java @@ -745,9 +745,12 @@ public void processAddRequest(List msgs, Bookie } getRequestStats().getAddEntryRejectedCounter().addCount(msgs.size()); - write.sendWriteReqResponse(BookieProtocol.ETOOMANYREQUESTS, - ResponseBuilder.buildErrorResponse(BookieProtocol.ETOOMANYREQUESTS, msgs), - requestStats.getAddRequestStats()); + for (BookieProtocol.ParsedAddRequest request : msgs) { + write.sendWriteReqResponse(BookieProtocol.ETOOMANYREQUESTS, + ResponseBuilder.buildErrorResponse(BookieProtocol.ETOOMANYREQUESTS, request), + requestStats.getAddRequestStats()); + request.recycle(); + } } } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java index 29cfb649252..7dbc9ec2c19 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.apache.bookkeeper.bookie.BookieException; import org.apache.bookkeeper.bookie.BookieException.OperationRejectedException; import org.apache.bookkeeper.net.BookieId; @@ -172,39 +171,35 @@ public void run() { } else if (requests != null && !requests.isEmpty()){ if (requestProcessor.getBookie().isReadOnly()) { LOG.warn("BookieServer is running in readOnly mode, so rejecting the request from the client!"); - sendWriteReqResponse(BookieProtocol.EREADONLY, - ResponseBuilder.buildErrorResponse(BookieProtocol.EREADONLY, requests), - requestProcessor.getRequestStats().getAddRequestStats()); - requests.forEach(ParsedAddRequest::release); - requests.forEach(ParsedAddRequest::recycle); + for (ParsedAddRequest r : requests) { + sendWriteReqResponse(BookieProtocol.EREADONLY, + ResponseBuilder.buildErrorResponse(BookieProtocol.EREADONLY, r), + requestProcessor.getRequestStats().getAddRequestStats()); + r.release(); + r.recycle(); + } return; } startTimeNanos = MathUtils.nowInNano(); int rc = BookieProtocol.EOK; try { - requestProcessor.getBookie().addEntry(requests, false, this, requestHandler); - } catch (OperationRejectedException e) { - requestProcessor.getRequestStats().getAddEntryRejectedCounter().addCount(requests.size()); - if (LOG.isDebugEnabled()) { - LOG.debug("Operation rejected while writing ", e); - } - rc = BookieProtocol.ETOOMANYREQUESTS; - } catch (IOException e) { - LOG.error("Error writing request ", e); - rc = BookieProtocol.EIO; - } catch ( Throwable t) { - LOG.error("Unexpected exception while writing requests {}", t.getMessage(), t); + requestProcessor.getBookie().addEntry(requests, false, this, requestHandler, + requestProcessor.getRequestStats()); + } catch (Throwable t) { + LOG.error("Unexpected exception while writing requests ", t); rc = BookieProtocol.EBADREQ; } if (rc != BookieProtocol.EOK) { - requestProcessor.getRequestStats().getAddRequestStats() - .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); - sendWriteReqResponse(rc, - ResponseBuilder.buildErrorResponse(rc, requests), - requestProcessor.getRequestStats().getAddRequestStats()); - requests.forEach(ParsedAddRequest::recycle); + requestProcessor.getRequestStats().getAddEntryStats() + .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); + for (ParsedAddRequest request : requests) { + sendWriteReqResponse(rc, + ResponseBuilder.buildErrorResponse(rc, request), + requestProcessor.getRequestStats().getAddRequestStats()); + request.recycle(); + }; } } } From ec36364a9452756b8176d1aad117e1ab223961a7 Mon Sep 17 00:00:00 2001 From: chenhang Date: Tue, 7 Mar 2023 11:49:59 +0800 Subject: [PATCH 04/16] format code --- .../GrowableArrayBlockingQueue.java | 441 ------------------ .../proto/BookieRequestHandler.java | 4 +- 2 files changed, 2 insertions(+), 443 deletions(-) delete mode 100644 bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/GrowableArrayBlockingQueue.java diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/GrowableArrayBlockingQueue.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/GrowableArrayBlockingQueue.java deleted file mode 100644 index 35705fae9ae..00000000000 --- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/GrowableArrayBlockingQueue.java +++ /dev/null @@ -1,441 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.bookkeeper.common.collections; - -import java.util.AbstractQueue; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.Objects; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.locks.StampedLock; -import java.util.function.Consumer; - -/** - * This implements a {@link BlockingQueue} backed by an array with no fixed capacity. - * - *

When the capacity is reached, data will be moved to a bigger array. - */ -public class GrowableArrayBlockingQueue extends AbstractQueue implements BlockingQueue { - - private final ReentrantLock headLock = new ReentrantLock(); - private final PaddedInt headIndex = new PaddedInt(); - private final PaddedInt tailIndex = new PaddedInt(); - private final StampedLock tailLock = new StampedLock(); - private final Condition isNotEmpty = headLock.newCondition(); - - private T[] data; - - @SuppressWarnings("rawtypes") - private static final AtomicIntegerFieldUpdater SIZE_UPDATER = AtomicIntegerFieldUpdater - .newUpdater(GrowableArrayBlockingQueue.class, "size"); - private volatile int size = 0; - - public GrowableArrayBlockingQueue() { - this(64); - } - - @SuppressWarnings("unchecked") - public GrowableArrayBlockingQueue(int initialCapacity) { - headIndex.value = 0; - tailIndex.value = 0; - - int capacity = io.netty.util.internal.MathUtil.findNextPositivePowerOfTwo(initialCapacity); - data = (T[]) new Object[capacity]; - } - - @Override - public T remove() { - T item = poll(); - if (item == null) { - throw new NoSuchElementException(); - } - - return item; - } - - @Override - public T poll() { - headLock.lock(); - try { - if (SIZE_UPDATER.get(this) > 0) { - T item = data[headIndex.value]; - data[headIndex.value] = null; - headIndex.value = (headIndex.value + 1) & (data.length - 1); - SIZE_UPDATER.decrementAndGet(this); - return item; - } else { - return null; - } - } finally { - headLock.unlock(); - } - } - - @Override - public T element() { - T item = peek(); - if (item == null) { - throw new NoSuchElementException(); - } - - return item; - } - - @Override - public T peek() { - headLock.lock(); - try { - if (SIZE_UPDATER.get(this) > 0) { - return data[headIndex.value]; - } else { - return null; - } - } finally { - headLock.unlock(); - } - } - - @Override - public boolean offer(T e) { - // Queue is unbounded and it will never reject new items - put(e); - return true; - } - - @Override - public void put(T e) { - long stamp = tailLock.writeLock(); - - boolean wasEmpty = false; - - try { - if (SIZE_UPDATER.get(this) == data.length) { - expandArray(); - } - - data[tailIndex.value] = e; - tailIndex.value = (tailIndex.value + 1) & (data.length - 1); - if (SIZE_UPDATER.getAndIncrement(this) == 0) { - wasEmpty = true; - } - } finally { - tailLock.unlockWrite(stamp); - } - - if (wasEmpty) { - headLock.lock(); - try { - isNotEmpty.signal(); - } finally { - headLock.unlock(); - } - } - } - - @Override - public boolean add(T e) { - put(e); - return true; - } - - @Override - public boolean offer(T e, long timeout, TimeUnit unit) { - // Queue is unbounded and it will never reject new items - put(e); - return true; - } - - @Override - public T take() throws InterruptedException { - headLock.lockInterruptibly(); - - try { - while (SIZE_UPDATER.get(this) == 0) { - isNotEmpty.await(); - } - - T item = data[headIndex.value]; - data[headIndex.value] = null; - headIndex.value = (headIndex.value + 1) & (data.length - 1); - if (SIZE_UPDATER.decrementAndGet(this) > 0) { - // There are still entries to consume - isNotEmpty.signal(); - } - return item; - } finally { - headLock.unlock(); - } - } - - @Override - public T poll(long timeout, TimeUnit unit) throws InterruptedException { - headLock.lockInterruptibly(); - - try { - long timeoutNanos = unit.toNanos(timeout); - while (SIZE_UPDATER.get(this) == 0) { - if (timeoutNanos <= 0) { - return null; - } - - timeoutNanos = isNotEmpty.awaitNanos(timeoutNanos); - } - - T item = data[headIndex.value]; - data[headIndex.value] = null; - headIndex.value = (headIndex.value + 1) & (data.length - 1); - if (SIZE_UPDATER.decrementAndGet(this) > 0) { - // There are still entries to consume - isNotEmpty.signal(); - } - return item; - } finally { - headLock.unlock(); - } - } - - @Override - public int remainingCapacity() { - return Integer.MAX_VALUE; - } - - @Override - public int drainTo(Collection c) { - return drainTo(c, Integer.MAX_VALUE); - } - - @Override - public int drainTo(Collection c, int maxElements) { - headLock.lock(); - - try { - int drainedItems = 0; - int size = SIZE_UPDATER.get(this); - - while (size > 0 && drainedItems < maxElements) { - T item = data[headIndex.value]; - data[headIndex.value] = null; - c.add(item); - - headIndex.value = (headIndex.value + 1) & (data.length - 1); - --size; - ++drainedItems; - } - - if (SIZE_UPDATER.addAndGet(this, -drainedItems) > 0) { - // There are still entries to consume - isNotEmpty.signal(); - } - - return drainedItems; - } finally { - headLock.unlock(); - } - } - - @Override - public void clear() { - headLock.lock(); - - try { - int size = SIZE_UPDATER.get(this); - - for (int i = 0; i < size; i++) { - data[headIndex.value] = null; - headIndex.value = (headIndex.value + 1) & (data.length - 1); - } - - if (SIZE_UPDATER.addAndGet(this, -size) > 0) { - // There are still entries to consume - isNotEmpty.signal(); - } - } finally { - headLock.unlock(); - } - } - - @Override - public boolean remove(Object o) { - long stamp = tailLock.writeLock(); - headLock.lock(); - - try { - int index = this.headIndex.value; - int size = this.size; - - for (int i = 0; i < size; i++) { - T item = data[index]; - - if (Objects.equals(item, o)) { - remove(index); - return true; - } - - index = (index + 1) & (data.length - 1); - } - } finally { - headLock.unlock(); - tailLock.unlockWrite(stamp); - } - - return false; - } - - private void remove(int index) { - int tailIndex = this.tailIndex.value; - - if (index < tailIndex) { - System.arraycopy(data, index + 1, data, index, tailIndex - index - 1); - this.tailIndex.value--; - } else { - System.arraycopy(data, index + 1, data, index, data.length - index - 1); - data[data.length - 1] = data[0]; - if (tailIndex > 0) { - System.arraycopy(data, 1, data, 0, tailIndex); - this.tailIndex.value--; - } else { - this.tailIndex.value = data.length - 1; - } - } - - if (tailIndex > 0) { - data[tailIndex - 1] = null; - } else { - data[data.length - 1] = null; - } - - SIZE_UPDATER.decrementAndGet(this); - } - - @Override - public int size() { - return SIZE_UPDATER.get(this); - } - - @Override - public Iterator iterator() { - throw new UnsupportedOperationException(); - } - - public List toList() { - List list = new ArrayList<>(size()); - forEach(list::add); - return list; - } - - @Override - public void forEach(Consumer action) { - long stamp = tailLock.writeLock(); - headLock.lock(); - - try { - int headIndex = this.headIndex.value; - int size = this.size; - - for (int i = 0; i < size; i++) { - T item = data[headIndex]; - - action.accept(item); - - headIndex = (headIndex + 1) & (data.length - 1); - } - - } finally { - headLock.unlock(); - tailLock.unlockWrite(stamp); - } - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - - long stamp = tailLock.writeLock(); - headLock.lock(); - - try { - int headIndex = this.headIndex.value; - int size = SIZE_UPDATER.get(this); - - sb.append('['); - - for (int i = 0; i < size; i++) { - T item = data[headIndex]; - if (i > 0) { - sb.append(", "); - } - - sb.append(item); - - headIndex = (headIndex + 1) & (data.length - 1); - } - - sb.append(']'); - } finally { - headLock.unlock(); - tailLock.unlockWrite(stamp); - } - return sb.toString(); - } - - @SuppressWarnings("unchecked") - private void expandArray() { - // We already hold the tailLock - headLock.lock(); - - try { - int size = SIZE_UPDATER.get(this); - int newCapacity = data.length * 2; - T[] newData = (T[]) new Object[newCapacity]; - - int oldHeadIndex = headIndex.value; - int newTailIndex = 0; - - for (int i = 0; i < size; i++) { - newData[newTailIndex++] = data[oldHeadIndex]; - oldHeadIndex = (oldHeadIndex + 1) & (data.length - 1); - } - - data = newData; - headIndex.value = 0; - tailIndex.value = size; - } finally { - headLock.unlock(); - } - } - - static final class PaddedInt { - private int value; - - // Padding to avoid false sharing - public volatile int pi1 = 1; - public volatile long p1 = 1L, p2 = 2L, p3 = 3L, p4 = 4L, p5 = 5L, p6 = 6L; - - public long exposeToAvoidOptimization() { - return pi1 + p1 + p2 + p3 + p4 + p5 + p6; - } - } -} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java index dca802c6094..abfc1c88d2a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java @@ -32,8 +32,8 @@ import org.apache.bookkeeper.processor.RequestProcessor; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import org.apache.bookkeeper.common.collections.GrowableArrayBlockingQueue; /** * Serverside handler for bookkeeper requests. @@ -55,7 +55,7 @@ public class BookieRequestHandler extends ChannelInboundHandlerAdapter { BookieRequestHandler(ServerConfiguration conf, RequestProcessor processor, ChannelGroup allChannels) { this.requestProcessor = processor; this.allChannels = allChannels; - this.msgs = new GrowableArrayBlockingQueue<>(); + this.msgs = new ArrayBlockingQueue<>(10_000); } public ChannelHandlerContext ctx() { From 66779966618a6043f36f0a413b24b38c95349a76 Mon Sep 17 00:00:00 2001 From: chenhang Date: Tue, 7 Mar 2023 13:53:17 +0800 Subject: [PATCH 05/16] format code --- .../src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java | 3 ++- .../src/main/java/org/apache/bookkeeper/bookie/Journal.java | 3 ++- .../java/org/apache/bookkeeper/proto/WriteEntryProcessor.java | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java index d30676e2b34..6a4df8c0c0e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java @@ -1105,7 +1105,8 @@ public void addEntry(List requests, boolean ackBeforeSync, throw BookieException.create(BookieException.Code.LedgerFencedException); } - addEntryInternalWithoutJournal(handle, request.getData(), ackBeforeSync, cb, ctx, request.getMasterKey()); + addEntryInternalWithoutJournal(handle, request.getData(), ackBeforeSync, + cb, ctx, request.getMasterKey()); } } catch (BookieException.OperationRejectedException e) { requestStats.getAddEntryRejectedCounter().inc(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index 6291fc4350d..efa935c2953 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -897,7 +897,8 @@ public void logAddEntry(List entries, boolean ackBeforeSync, WriteCallb long entryId = entry.getLong(entry.readerIndex() + 8); entry.retain(); reserveMemory.addAndGet(entry.readableBytes()); - return QueueEntry.create(entry, ackBeforeSync, ledgerId, entryId, cb, ctx, MathUtils.nowInNano(), journalStats.getJournalAddEntryStats(), callbackTime); + return QueueEntry.create(entry, ackBeforeSync, ledgerId, entryId, cb, ctx, + MathUtils.nowInNano(), journalStats.getJournalAddEntryStats(), callbackTime); }).collect(Collectors.toList()); memoryLimitController.releaseMemory(reserveMemory.get()); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java index 7dbc9ec2c19..40f061ce609 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java @@ -199,7 +199,7 @@ public void run() { ResponseBuilder.buildErrorResponse(rc, request), requestProcessor.getRequestStats().getAddRequestStats()); request.recycle(); - }; + } } } } From 5b39f4e2011d283138f88274dabef34669fb015f Mon Sep 17 00:00:00 2001 From: chenhang Date: Tue, 7 Mar 2023 23:28:21 +0800 Subject: [PATCH 06/16] streamline batch add requests --- .../apache/bookkeeper/bookie/BookieImpl.java | 6 + .../proto/BookieRequestHandler.java | 16 +- .../proto/BookieRequestProcessor.java | 2 +- .../bookkeeper/proto/PacketProcessorBase.java | 10 -- .../bookkeeper/proto/ResponseBuilder.java | 4 + .../proto/WriteBatchEntryProcessor.java | 140 ++++++++++++++++++ .../bookkeeper/proto/WriteEntryProcessor.java | 48 ------ .../client/BookieWriteLedgerTest.java | 21 +++ 8 files changed, 173 insertions(+), 74 deletions(-) create mode 100644 bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteBatchEntryProcessor.java diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java index 6a4df8c0c0e..0b198c44094 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java @@ -1133,6 +1133,7 @@ public void addEntry(List requests, boolean ackBeforeSync, .registerFailedEvent(MathUtils.elapsedNanos(requestNans), TimeUnit.NANOSECONDS); cb.writeComplete(rc, request.getLedgerId(), request.getEntryId(), null, ctx); iter.remove(); + request.release(); request.recycle(); } } @@ -1141,6 +1142,11 @@ public void addEntry(List requests, boolean ackBeforeSync, List entries = requests.stream() .map(ParsedAddRequest::getData).collect(Collectors.toList()); getJournal(requests.get(0).getLedgerId()).logAddEntry(entries, ackBeforeSync, cb, ctx); + + requests.forEach(t -> { + t.release(); + t.recycle(); + }); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java index abfc1c88d2a..b0cb90a3add 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java @@ -99,7 +99,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception if (msg instanceof BookieProtocol.ParsedAddRequest && ADDENTRY == ((BookieProtocol.ParsedAddRequest) msg).getOpCode() && !((BookieProtocol.ParsedAddRequest) msg).isHighPriority() - && isVersionCompatible((BookieProtocol.ParsedAddRequest) msg) + && ((BookieProtocol.ParsedAddRequest) msg).getProtocolVersion() == BookieProtocol.CURRENT_PROTOCOL_VERSION && !((BookieProtocol.ParsedAddRequest) msg).isRecoveryAdd()) { msgs.put((BookieProtocol.ParsedAddRequest) msg); } else { @@ -107,20 +107,6 @@ && isVersionCompatible((BookieProtocol.ParsedAddRequest) msg) } } - private boolean isVersionCompatible(BookieProtocol.ParsedAddRequest r) { - byte version = r.getProtocolVersion(); - if (version < BookieProtocol.LOWEST_COMPAT_PROTOCOL_VERSION - || version > BookieProtocol.CURRENT_PROTOCOL_VERSION) { - log.error("Invalid protocol version, expected something between " - + BookieProtocol.LOWEST_COMPAT_PROTOCOL_VERSION - + " & " + BookieProtocol.CURRENT_PROTOCOL_VERSION - + ". got " + r.getProtocolVersion()); - return false; - } else { - return true; - } - } - @Override public void channelReadComplete(ChannelHandlerContext ctx) { if (!msgs.isEmpty()) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java index d024e39e908..70207317ea5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java @@ -733,7 +733,7 @@ public void handleNonWritableChannel(Channel channel) { @Override public void processAddRequest(List msgs, BookieRequestHandler requestHandler) { - WriteEntryProcessor write = WriteEntryProcessor.create(msgs, requestHandler, this); + WriteBatchEntryProcessor write = WriteBatchEntryProcessor.create(msgs, requestHandler, this); if (writeThreadPool == null) { write.run(); } else { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java index c3c478dc057..c9798156c25 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java @@ -20,7 +20,6 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPromise; -import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.proto.BookieProtocol.Request; @@ -39,7 +38,6 @@ abstract class PacketProcessorBase implements Runnable { BookieRequestHandler requestHandler; BookieRequestProcessor requestProcessor; long enqueueNanos; - List requests; protected void init(T request, BookieRequestHandler requestHandler, BookieRequestProcessor requestProcessor) { this.request = request; @@ -48,14 +46,6 @@ protected void init(T request, BookieRequestHandler requestHandler, BookieReques this.enqueueNanos = MathUtils.nowInNano(); } - protected void init(List requests, BookieRequestHandler requestHandler, - BookieRequestProcessor requestProcessor) { - this.requests = requests; - this.requestHandler = requestHandler; - this.requestProcessor = requestProcessor; - this.enqueueNanos = MathUtils.nowInNano(); - } - protected void reset() { request = null; requestHandler = null; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java index 563c0a1352f..27bbb1f8b23 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java @@ -39,6 +39,10 @@ static BookieProtocol.Response buildAddResponse(BookieProtocol.Request r) { r.getEntryId()); } + static BookieProtocol.Response buildAddResponse(byte protocolVersion, long ledgerId, long entryId) { + return BookieProtocol.AddResponse.create(protocolVersion, BookieProtocol.EOK, ledgerId, entryId); + } + static BookieProtocol.Response buildReadResponse(ByteBuf data, BookieProtocol.Request r) { return new BookieProtocol.ReadResponse(r.getProtocolVersion(), BookieProtocol.EOK, r.getLedgerId(), r.getEntryId(), data); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteBatchEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteBatchEntryProcessor.java new file mode 100644 index 00000000000..dc4eb4ebe23 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteBatchEntryProcessor.java @@ -0,0 +1,140 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.proto; + +import io.netty.util.Recycler; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.proto.BookieProtocol.ParsedAddRequest; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; +import org.apache.bookkeeper.util.MathUtils; + +/** + * Processes batched add entry requests. + */ +@Slf4j +public class WriteBatchEntryProcessor extends PacketProcessorBase implements WriteCallback { + long startTimeNanos; + List requests; + AtomicInteger requestCount; + + @Override + protected void reset() { + requests = null; + requestHandler = null; + requestProcessor = null; + startTimeNanos = -1L; + } + + public static WriteBatchEntryProcessor create(List requests, BookieRequestHandler requestHandler, + BookieRequestProcessor requestProcessor) { + WriteBatchEntryProcessor wbep = RECYCLER.get(); + wbep.init(requests, requestHandler, requestProcessor); + requestProcessor.onAddRequestStart(requestHandler.ctx().channel(), requests.size()); + return wbep; + } + + protected void init(List requests, BookieRequestHandler requestHandler, + BookieRequestProcessor requestProcessor) { + this.requests = requests; + this.requestHandler = requestHandler; + this.requestProcessor = requestProcessor; + this.enqueueNanos = MathUtils.nowInNano(); + this.requestCount = new AtomicInteger(requests.size()); + } + + @Override + protected void processPacket() { + + } + + @Override + public void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Object ctx) { + if (BookieProtocol.EOK == rc) { + requestProcessor.getRequestStats().getAddEntryStats() + .registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); + } else { + requestProcessor.getRequestStats().getAddEntryStats() + .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); + } + + sendWriteReqResponse(rc, + ResponseBuilder.buildAddResponse(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId), + requestProcessor.getRequestStats().getAddRequestStats()); + + if (requestCount.decrementAndGet() == 0) { + recycle(); + } + } + + @Override + public void run() { + if (requestProcessor.getBookie().isReadOnly()) { + log.warn("BookieServer is running in readOnly mode, so rejecting the request from the client!"); + for (ParsedAddRequest r : requests) { + writeComplete(BookieProtocol.EREADONLY, r.getLedgerId(), r.getEntryId(), null, + requestHandler.ctx()); + r.release(); + r.recycle(); + } + return; + } + + startTimeNanos = MathUtils.nowInNano(); + int rc = BookieProtocol.EOK; + try { + requestProcessor.getBookie().addEntry(requests, false, this, requestHandler, + requestProcessor.getRequestStats()); + } catch (Throwable t) { + log.error("Unexpected exception while writing requests ", t); + rc = BookieProtocol.EBADREQ; + } + + if (rc != BookieProtocol.EOK) { + requestProcessor.getRequestStats().getAddEntryStats() + .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); + for (ParsedAddRequest r : requests) { + writeComplete(rc, r.getLedgerId(), r.getEntryId(), null, requestHandler.ctx()); + r.release(); + r.recycle(); + } + } + } + + void recycle() { + reset(); + recyclerHandle.recycle(this); + } + + private final Recycler.Handle recyclerHandle; + private WriteBatchEntryProcessor(Recycler.Handle recycleHandle) { + this.recyclerHandle = recycleHandle; + } + private static final Recycler RECYCLER = new Recycler() { + @Override + protected WriteBatchEntryProcessor newObject(Recycler.Handle handle) { + return new WriteBatchEntryProcessor(handle); + } + }; +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java index 40f061ce609..93a77203a5e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java @@ -21,7 +21,6 @@ import io.netty.buffer.ByteBuf; import io.netty.util.Recycler; import java.io.IOException; -import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.bookie.BookieException; import org.apache.bookkeeper.bookie.BookieException.OperationRejectedException; @@ -55,14 +54,6 @@ public static WriteEntryProcessor create(ParsedAddRequest request, BookieRequest return wep; } - public static WriteEntryProcessor create(List requests, BookieRequestHandler requestHandler, - BookieRequestProcessor requestProcessor) { - WriteEntryProcessor wep = RECYCLER.get(); - wep.init(requests, requestHandler, requestProcessor); - requestProcessor.onAddRequestStart(requestHandler.ctx().channel(), requests.size()); - return wep; - } - @Override protected void processPacket() { if (requestProcessor.getBookie().isReadOnly() @@ -164,43 +155,4 @@ protected WriteEntryProcessor newObject(Recycler.Handle han } }; - @Override - public void run() { - if (request != null) { - super.run(); - } else if (requests != null && !requests.isEmpty()){ - if (requestProcessor.getBookie().isReadOnly()) { - LOG.warn("BookieServer is running in readOnly mode, so rejecting the request from the client!"); - for (ParsedAddRequest r : requests) { - sendWriteReqResponse(BookieProtocol.EREADONLY, - ResponseBuilder.buildErrorResponse(BookieProtocol.EREADONLY, r), - requestProcessor.getRequestStats().getAddRequestStats()); - r.release(); - r.recycle(); - } - return; - } - - startTimeNanos = MathUtils.nowInNano(); - int rc = BookieProtocol.EOK; - try { - requestProcessor.getBookie().addEntry(requests, false, this, requestHandler, - requestProcessor.getRequestStats()); - } catch (Throwable t) { - LOG.error("Unexpected exception while writing requests ", t); - rc = BookieProtocol.EBADREQ; - } - - if (rc != BookieProtocol.EOK) { - requestProcessor.getRequestStats().getAddEntryStats() - .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); - for (ParsedAddRequest request : requests) { - sendWriteReqResponse(rc, - ResponseBuilder.buildErrorResponse(rc, request), - requestProcessor.getRequestStats().getAddRequestStats()); - request.recycle(); - } - } - } - } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java index 8ae0b720192..c46d9fa7c72 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java @@ -125,6 +125,7 @@ public BookieWriteLedgerTest() { */ baseConf.setSkipListSizeLimit(4 * 1024 * 1024); baseClientConf.setLedgerManagerFactoryClassName(ledgerManagerFactory); + baseClientConf.setUseV2WireProtocol(true); } /** @@ -1444,6 +1445,26 @@ public void testLedgerMetadataTest() throws Exception { lh.close(); } + @Test + public void testReadWriteEntry() throws Exception { + lh = bkc.createLedgerAdv(3, 3, 3, digestType, ledgerPassword); + LOG.info("Ledger ID: {}", lh.ledgerId); + CountDownLatch latch = new CountDownLatch(1000); + for (int i = 0; i < 1000; ++i) { + ByteBuffer entry = ByteBuffer.allocate(4); + entry.putInt(rng.nextInt(maxInt)); + entry.position(0); + lh.asyncAddEntry(i, entry.array(), new AddCallback() { + @Override + public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) { + LOG.info("Write entry: {}:{}, result: {}", lh.ledgerId, entryId, rc); + latch.countDown(); + } + }, null); + } + latch.await(); + } + private void readEntries(LedgerHandle lh, List entries) throws InterruptedException, BKException { ls = lh.readEntries(0, numEntriesToWrite - 1); int index = 0; From 9db1e1d89c239d77c4fa00db7313f31572334504 Mon Sep 17 00:00:00 2001 From: chenhang Date: Tue, 7 Mar 2023 23:48:13 +0800 Subject: [PATCH 07/16] format code --- .../apache/bookkeeper/bookie/BookieImpl.java | 1 + .../proto/BookieRequestHandler.java | 23 +++++++++++++------ 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java index 0b198c44094..014bb78f0e5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java @@ -71,6 +71,7 @@ import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.net.DNS; import org.apache.bookkeeper.processor.RequestProcessor; +import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.proto.BookieProtocol.ParsedAddRequest; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; import org.apache.bookkeeper.proto.RequestStats; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java index b0cb90a3add..aaa739c2afb 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java @@ -27,13 +27,13 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.group.ChannelGroup; import java.nio.channels.ClosedChannelException; -import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.conf.ServerConfiguration; -import org.apache.bookkeeper.processor.RequestProcessor; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.processor.RequestProcessor; /** * Serverside handler for bookkeeper requests. @@ -42,12 +42,13 @@ public class BookieRequestHandler extends ChannelInboundHandlerAdapter { static final Object EVENT_FLUSH_ALL_PENDING_RESPONSES = new Object(); + private static final int CAPACITY = 10_000; private final RequestProcessor requestProcessor; private final ChannelGroup allChannels; private ChannelHandlerContext ctx; - private BlockingQueue msgs; + private final BlockingQueue msgs; private ByteBuf pendingSendResponses = null; private int maxPendingResponsesSize; @@ -55,7 +56,7 @@ public class BookieRequestHandler extends ChannelInboundHandlerAdapter { BookieRequestHandler(ServerConfiguration conf, RequestProcessor processor, ChannelGroup allChannels) { this.requestProcessor = processor; this.allChannels = allChannels; - this.msgs = new ArrayBlockingQueue<>(10_000); + this.msgs = new ArrayBlockingQueue<>(CAPACITY); } public ChannelHandlerContext ctx() { @@ -102,6 +103,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception && ((BookieProtocol.ParsedAddRequest) msg).getProtocolVersion() == BookieProtocol.CURRENT_PROTOCOL_VERSION && !((BookieProtocol.ParsedAddRequest) msg).isRecoveryAdd()) { msgs.put((BookieProtocol.ParsedAddRequest) msg); + + if (msgs.size() == CAPACITY) { + int count = msgs.size(); + List c = new ArrayList<>(count); + msgs.drainTo(c, count); + requestProcessor.processAddRequest(c, this); + } } else { requestProcessor.processRequest(msg, this); } @@ -110,8 +118,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception @Override public void channelReadComplete(ChannelHandlerContext ctx) { if (!msgs.isEmpty()) { - List c = new ArrayList<>(); - msgs.drainTo(c); + int count = msgs.size(); + List c = new ArrayList<>(count); + msgs.drainTo(c, count); requestProcessor.processAddRequest(c, this); } } From 0bbed62681501b113590f9bf1a58604ff293b4db Mon Sep 17 00:00:00 2001 From: chenhang Date: Thu, 9 Mar 2023 20:16:54 +0800 Subject: [PATCH 08/16] fix some bugs --- .../apache/bookkeeper/bookie/BookieImpl.java | 15 +++++--- .../org/apache/bookkeeper/bookie/Journal.java | 11 +++--- .../processor/RequestProcessor.java | 6 ++++ .../bookkeeper/proto/BookieProtoEncoding.java | 9 +++++ .../proto/BookieRequestHandler.java | 10 +++++- .../proto/BookieRequestProcessor.java | 10 ++++++ .../proto/WriteBatchEntryProcessor.java | 9 +++-- .../client/BookieWriteLedgerTest.java | 12 +++++-- .../BookieWriteLedgerWithV2ProtocolTest.java | 35 +++++++++++++++++++ 9 files changed, 101 insertions(+), 16 deletions(-) create mode 100644 bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerWithV2ProtocolTest.java diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java index 014bb78f0e5..5f58f7664d1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java @@ -130,6 +130,7 @@ public class BookieImpl extends BookieCriticalThread implements Bookie { private final ByteBufAllocator allocator; private final boolean writeDataToJournal; + private RequestProcessor requestProcessor; // Write Callback do nothing static class NopWriteCallback implements WriteCallback { @@ -1092,9 +1093,9 @@ public void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Obj } public void addEntry(List requests, boolean ackBeforeSync, - WriteCallback cb, Object ctx, RequestStats requestStats) - throws InterruptedException { + WriteCallback cb, Object ctx, RequestStats requestStats) throws InterruptedException { long requestNans = MathUtils.nowInNano(); + boolean hasFailedRequests = false; ListIterator iter = requests.listIterator(); while (iter.hasNext()) { ParsedAddRequest request = iter.next(); @@ -1120,6 +1121,7 @@ public void addEntry(List requests, boolean ackBeforeSync, rc = BookieProtocol.EIO; } catch (BookieException.LedgerFencedException lfe) { LOG.error("Attempt to write to fenced ledger ", lfe); + rc = BookieProtocol.EFENCED; } catch (BookieException e) { LOG.error("Unauthorized access to ledger {}", request.getLedgerId(), e); rc = BookieProtocol.EUA; @@ -1130,6 +1132,7 @@ public void addEntry(List requests, boolean ackBeforeSync, } if (rc != BookieProtocol.EOK) { + hasFailedRequests = true; requestStats.getAddEntryStats() .registerFailedEvent(MathUtils.elapsedNanos(requestNans), TimeUnit.NANOSECONDS); cb.writeComplete(rc, request.getLedgerId(), request.getEntryId(), null, ctx); @@ -1139,7 +1142,11 @@ public void addEntry(List requests, boolean ackBeforeSync, } } - if (writeDataToJournal) { + if (hasFailedRequests && requestProcessor != null) { + requestProcessor.flushPendingResponses(); + } + + if (writeDataToJournal && !requests.isEmpty()) { List entries = requests.stream() .map(ParsedAddRequest::getData).collect(Collectors.toList()); getJournal(requests.get(0).getLedgerId()).logAddEntry(entries, ackBeforeSync, cb, ctx); @@ -1149,7 +1156,6 @@ public void addEntry(List requests, boolean ackBeforeSync, t.recycle(); }); } - } private void addEntryInternalWithoutJournal(LedgerDescriptor handle, ByteBuf entry, @@ -1387,5 +1393,6 @@ public void setRequestProcessor(RequestProcessor requestProcessor) { for (Journal journal : journals) { journal.setRequestProcessor(requestProcessor); } + this.requestProcessor = requestProcessor; } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index efa935c2953..3aea62e23ef 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -45,7 +45,6 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; import org.apache.bookkeeper.bookie.stats.JournalStats; import org.apache.bookkeeper.common.collections.BatchedArrayBlockingQueue; @@ -892,18 +891,20 @@ public void logAddEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, public void logAddEntry(List entries, boolean ackBeforeSync, WriteCallback cb, Object ctx) throws InterruptedException { AtomicLong reserveMemory = new AtomicLong(); - List queueEntries = entries.stream().map(entry -> { + QueueEntry[] queueEntries = new QueueEntry[entries.size()]; + for (int i = 0; i < entries.size(); ++i) { + ByteBuf entry = entries.get(i); long ledgerId = entry.getLong(entry.readerIndex()); long entryId = entry.getLong(entry.readerIndex() + 8); entry.retain(); reserveMemory.addAndGet(entry.readableBytes()); - return QueueEntry.create(entry, ackBeforeSync, ledgerId, entryId, cb, ctx, + queueEntries[i] = QueueEntry.create(entry, ackBeforeSync, ledgerId, entryId, cb, ctx, MathUtils.nowInNano(), journalStats.getJournalAddEntryStats(), callbackTime); - }).collect(Collectors.toList()); + } memoryLimitController.releaseMemory(reserveMemory.get()); journalStats.getJournalQueueSize().addCount(entries.size()); - queue.putAll(queueEntries.toArray(new QueueEntry[0]), 0, queueEntries.size()); + queue.putAll(queueEntries, 0, queueEntries.length); } @VisibleForTesting diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java index d6888c477dd..798f5a7661b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java @@ -49,5 +49,11 @@ public interface RequestProcessor extends AutoCloseable { * Flush any pending response staged on all the client connections. */ void flushPendingResponses(); + + /** + * Process a list of ParsedAddRequests. + * @param r + * @param channel + */ void processAddRequest(List r, BookieRequestHandler channel); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java index edbffa5f431..276c38b9111 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java @@ -342,6 +342,15 @@ public static void serializeAddResponseInto(int rc, BookieProtocol.ParsedAddRequ buf.writeLong(req.getLedgerId()); buf.writeLong(req.getEntryId()); } + + public static void serializeAddResponseInto(int rc, byte version, byte opCode, + long ledgerId, long entryId, ByteBuf buf) { + buf.writeInt(RESPONSE_HEADERS_SIZE); // Frame size + buf.writeInt(PacketHeader.toInt(version, opCode, (short) 0)); + buf.writeInt(rc); // rc-code + buf.writeLong(ledgerId); + buf.writeLong(entryId); + } } /** diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java index aaa739c2afb..96c24b2f4e3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java @@ -103,7 +103,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception && ((BookieProtocol.ParsedAddRequest) msg).getProtocolVersion() == BookieProtocol.CURRENT_PROTOCOL_VERSION && !((BookieProtocol.ParsedAddRequest) msg).isRecoveryAdd()) { msgs.put((BookieProtocol.ParsedAddRequest) msg); - if (msgs.size() == CAPACITY) { int count = msgs.size(); List c = new ArrayList<>(count); @@ -134,6 +133,15 @@ public synchronized void prepareSendResponseV2(int rc, BookieProtocol.ParsedAddR BookieProtoEncoding.ResponseEnDeCoderPreV3.serializeAddResponseInto(rc, req, pendingSendResponses); } + public synchronized void prepareSendResponseV2(int rc, byte version, byte opCode, long ledgerId, long entryId) { + if (pendingSendResponses == null) { + pendingSendResponses = ctx.alloc().directBuffer(maxPendingResponsesSize != 0 + ? maxPendingResponsesSize : 256); + } + BookieProtoEncoding.ResponseEnDeCoderPreV3.serializeAddResponseInto(rc, version, opCode, ledgerId, entryId, + pendingSendResponses); + } + @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt == EVENT_FLUSH_ALL_PENDING_RESPONSES) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java index 70207317ea5..60a5f511f25 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java @@ -230,6 +230,16 @@ protected void onAddRequestFinish() { } } + protected void onAddRequestFinishWithoutUnTrack() { + if (addsSemaphore != null) { + addsSemaphore.release(); + } + } + + protected void onAddRequestUnTrack() { + requestStats.untrackAddRequest(); + } + protected void onReadRequestStart(Channel channel) { if (readsSemaphore != null) { if (!readsSemaphore.tryAcquire()) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteBatchEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteBatchEntryProcessor.java index dc4eb4ebe23..6f5d522ccd4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteBatchEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteBatchEntryProcessor.java @@ -20,6 +20,8 @@ */ package org.apache.bookkeeper.proto; +import static org.apache.bookkeeper.proto.BookieProtocol.ADDENTRY; + import io.netty.util.Recycler; import java.util.List; import java.util.concurrent.TimeUnit; @@ -79,11 +81,11 @@ public void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Ob .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); } - sendWriteReqResponse(rc, - ResponseBuilder.buildAddResponse(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId), - requestProcessor.getRequestStats().getAddRequestStats()); + requestHandler.prepareSendResponseV2(rc, BookieProtocol.CURRENT_PROTOCOL_VERSION, ADDENTRY, ledgerId, entryId); + requestProcessor.onAddRequestFinishWithoutUnTrack(); if (requestCount.decrementAndGet() == 0) { + requestProcessor.onAddRequestUnTrack(); recycle(); } } @@ -119,6 +121,7 @@ public void run() { r.release(); r.recycle(); } + requestProcessor.flushPendingResponses(); } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java index c46d9fa7c72..46aaa6820a2 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java @@ -1448,12 +1448,14 @@ public void testLedgerMetadataTest() throws Exception { @Test public void testReadWriteEntry() throws Exception { lh = bkc.createLedgerAdv(3, 3, 3, digestType, ledgerPassword); - LOG.info("Ledger ID: {}", lh.ledgerId); - CountDownLatch latch = new CountDownLatch(1000); - for (int i = 0; i < 1000; ++i) { + numEntriesToWrite = 15000; + List entries = new ArrayList<>(); + CountDownLatch latch = new CountDownLatch(numEntriesToWrite); + for (int i = 0; i < numEntriesToWrite; ++i) { ByteBuffer entry = ByteBuffer.allocate(4); entry.putInt(rng.nextInt(maxInt)); entry.position(0); + entries.add(entry.array()); lh.asyncAddEntry(i, entry.array(), new AddCallback() { @Override public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) { @@ -1463,6 +1465,10 @@ public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) { }, null); } latch.await(); + + readEntries(lh, entries); + lh.close(); + } private void readEntries(LedgerHandle lh, List entries) throws InterruptedException, BKException { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerWithV2ProtocolTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerWithV2ProtocolTest.java new file mode 100644 index 00000000000..7d6e24b882e --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerWithV2ProtocolTest.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.client; + +import org.apache.bookkeeper.client.AsyncCallback.AddCallback; + +/** + * Testing ledger write entry cases. + */ +public class BookieWriteLedgerWithV2ProtocolTest extends + BookieWriteLedgerTest implements AddCallback { + + public BookieWriteLedgerWithV2ProtocolTest() { + super(); + baseClientConf.setUseV2WireProtocol(true); + } +} From d6574ce9e3a28a11dacac2d361d309711bc20853 Mon Sep 17 00:00:00 2001 From: chenhang Date: Fri, 10 Mar 2023 14:47:05 +0800 Subject: [PATCH 09/16] fix one bug --- .../apache/bookkeeper/proto/BookieRequestHandler.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java index 96c24b2f4e3..3e57564ab0b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java @@ -40,9 +40,9 @@ */ @Slf4j public class BookieRequestHandler extends ChannelInboundHandlerAdapter { - + private static final int DEFAULT_CAPACITY = 1_000; static final Object EVENT_FLUSH_ALL_PENDING_RESPONSES = new Object(); - private static final int CAPACITY = 10_000; + private final int maxCapacity; private final RequestProcessor requestProcessor; private final ChannelGroup allChannels; @@ -56,7 +56,8 @@ public class BookieRequestHandler extends ChannelInboundHandlerAdapter { BookieRequestHandler(ServerConfiguration conf, RequestProcessor processor, ChannelGroup allChannels) { this.requestProcessor = processor; this.allChannels = allChannels; - this.msgs = new ArrayBlockingQueue<>(CAPACITY); + this.maxCapacity = conf.getMaxAddsInProgressLimit() > 0 ? conf.getMaxAddsInProgressLimit() : DEFAULT_CAPACITY; + this.msgs = new ArrayBlockingQueue<>(maxCapacity); } public ChannelHandlerContext ctx() { @@ -103,7 +104,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception && ((BookieProtocol.ParsedAddRequest) msg).getProtocolVersion() == BookieProtocol.CURRENT_PROTOCOL_VERSION && !((BookieProtocol.ParsedAddRequest) msg).isRecoveryAdd()) { msgs.put((BookieProtocol.ParsedAddRequest) msg); - if (msgs.size() == CAPACITY) { + if (msgs.size() >= maxCapacity) { int count = msgs.size(); List c = new ArrayList<>(count); msgs.drainTo(c, count); From 7b062fb165f4fc67fc142e65eae2824d4a01ed92 Mon Sep 17 00:00:00 2001 From: chenhang Date: Fri, 10 Mar 2023 16:14:07 +0800 Subject: [PATCH 10/16] tune the impl --- .../java/org/apache/bookkeeper/bookie/Bookie.java | 2 +- .../org/apache/bookkeeper/bookie/BookieImpl.java | 13 +++++++++++-- .../bookkeeper/proto/WriteBatchEntryProcessor.java | 2 +- .../bookkeeper/client/BookieWriteLedgerTest.java | 2 +- 4 files changed, 14 insertions(+), 5 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java index a9bcc267427..270f01380ef 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java @@ -47,7 +47,7 @@ public interface Bookie { // TODO: replace ackBeforeSync with flags void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException, InterruptedException; - void addEntry(List requests, boolean ackBeforeSync, WriteCallback cb, + void addEntryList(List requests, boolean ackBeforeSync, WriteCallback cb, Object ctx, RequestStats requestStats) throws InterruptedException; void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException, InterruptedException; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java index 5f58f7664d1..c3aed795e5b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java @@ -85,6 +85,7 @@ import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.mutable.MutableBoolean; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1092,16 +1093,23 @@ public void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Obj } } - public void addEntry(List requests, boolean ackBeforeSync, + public void addEntryList(List requests, boolean ackBeforeSync, WriteCallback cb, Object ctx, RequestStats requestStats) throws InterruptedException { long requestNans = MathUtils.nowInNano(); boolean hasFailedRequests = false; + Map, LedgerDescriptor> handleMap = new HashMap<>(); ListIterator iter = requests.listIterator(); while (iter.hasNext()) { ParsedAddRequest request = iter.next(); int rc = BookieProtocol.EOK; try { - LedgerDescriptor handle = getLedgerForEntry(request.getData(), request.getMasterKey()); + Pair ledgerIdMasterKey = Pair.of(request.getLedgerId(), request.getMasterKey()); + LedgerDescriptor handle = handleMap.get(ledgerIdMasterKey); + if (handle == null) { + handle = getLedgerForEntry(request.getData(), request.getMasterKey()); + handleMap.put(ledgerIdMasterKey, handle); + } + synchronized (handle) { if (handle.isFenced()) { throw BookieException.create(BookieException.Code.LedgerFencedException); @@ -1141,6 +1149,7 @@ public void addEntry(List requests, boolean ackBeforeSync, request.recycle(); } } + handleMap.clear(); if (hasFailedRequests && requestProcessor != null) { requestProcessor.flushPendingResponses(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteBatchEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteBatchEntryProcessor.java index 6f5d522ccd4..7fb50c83139 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteBatchEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteBatchEntryProcessor.java @@ -106,7 +106,7 @@ public void run() { startTimeNanos = MathUtils.nowInNano(); int rc = BookieProtocol.EOK; try { - requestProcessor.getBookie().addEntry(requests, false, this, requestHandler, + requestProcessor.getBookie().addEntryList(requests, false, this, requestHandler, requestProcessor.getRequestStats()); } catch (Throwable t) { log.error("Unexpected exception while writing requests ", t); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java index 46aaa6820a2..c873f2fa40f 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java @@ -1459,7 +1459,7 @@ public void testReadWriteEntry() throws Exception { lh.asyncAddEntry(i, entry.array(), new AddCallback() { @Override public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) { - LOG.info("Write entry: {}:{}, result: {}", lh.ledgerId, entryId, rc); + assertEquals(0, rc); latch.countDown(); } }, null); From 9b6c3513632f8557d17bf7a77877355ff4abe870 Mon Sep 17 00:00:00 2001 From: chenhang Date: Mon, 13 Mar 2023 18:56:00 +0800 Subject: [PATCH 11/16] address comments --- .../apache/bookkeeper/bookie/BookieImpl.java | 51 ++++--------------- .../org/apache/bookkeeper/bookie/Journal.java | 7 ++- .../processor/RequestProcessor.java | 8 --- .../proto/BookieRequestHandler.java | 18 +++---- .../proto/BookieRequestProcessor.java | 12 ++++- 5 files changed, 32 insertions(+), 64 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java index c3aed795e5b..6f57acf81c0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java @@ -930,8 +930,8 @@ public ByteBuf createMasterKeyEntry(long ledgerId, byte[] masterKey) { * Add an entry to a ledger as specified by handle. */ private void addEntryInternal(LedgerDescriptor handle, ByteBuf entry, - boolean ackBeforeSync, WriteCallback cb, Object ctx, byte[] masterKey) - throws IOException, BookieException, InterruptedException { + boolean ackBeforeSync, WriteCallback cb, Object ctx, byte[] masterKey, + boolean writeJournal) throws IOException, BookieException, InterruptedException { long ledgerId = handle.getLedgerId(); long entryId = handle.addEntry(entry); @@ -961,7 +961,10 @@ private void addEntryInternal(LedgerDescriptor handle, ByteBuf entry, if (LOG.isTraceEnabled()) { LOG.trace("Adding {}@{}", entryId, ledgerId); } - getJournal(ledgerId).logAddEntry(entry, ackBeforeSync, cb, ctx); + + if (writeJournal) { + getJournal(ledgerId).logAddEntry(entry, ackBeforeSync, cb, ctx); + } } /** @@ -979,7 +982,7 @@ public void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] LedgerDescriptor handle = getLedgerForEntry(entry, masterKey); synchronized (handle) { entrySize = entry.readableBytes(); - addEntryInternal(handle, entry, false /* ackBeforeSync */, cb, ctx, masterKey); + addEntryInternal(handle, entry, false /* ackBeforeSync */, cb, ctx, masterKey, true); } success = true; } catch (NoWritableLedgerDirException e) { @@ -1073,7 +1076,7 @@ public void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Obj .create(BookieException.Code.LedgerFencedException); } entrySize = entry.readableBytes(); - addEntryInternal(handle, entry, ackBeforeSync, cb, ctx, masterKey); + addEntryInternal(handle, entry, ackBeforeSync, cb, ctx, masterKey, true); } success = true; } catch (NoWritableLedgerDirException e) { @@ -1115,8 +1118,8 @@ public void addEntryList(List requests, boolean ackBeforeSync, throw BookieException.create(BookieException.Code.LedgerFencedException); } - addEntryInternalWithoutJournal(handle, request.getData(), ackBeforeSync, - cb, ctx, request.getMasterKey()); + addEntryInternal(handle, request.getData(), ackBeforeSync, + cb, ctx, request.getMasterKey(), false); } } catch (BookieException.OperationRejectedException e) { requestStats.getAddEntryRejectedCounter().inc(); @@ -1167,40 +1170,6 @@ public void addEntryList(List requests, boolean ackBeforeSync, } } - private void addEntryInternalWithoutJournal(LedgerDescriptor handle, ByteBuf entry, - boolean ackBeforeSync, WriteCallback cb, Object ctx, byte[] masterKey) - throws IOException, BookieException, InterruptedException { - long ledgerId = handle.getLedgerId(); - long entryId = handle.addEntry(entry); - - bookieStats.getWriteBytes().addCount(entry.readableBytes()); - - // journal `addEntry` should happen after the entry is added to ledger storage. - // otherwise the journal entry can potentially be rolled before the ledger is created in ledger storage. - if (masterKeyCache.get(ledgerId) == null) { - // Force the load into masterKey cache - byte[] oldValue = masterKeyCache.putIfAbsent(ledgerId, masterKey); - if (oldValue == null) { - ByteBuf masterKeyEntry = createMasterKeyEntry(ledgerId, masterKey); - try { - getJournal(ledgerId).logAddEntry( - masterKeyEntry, false /* ackBeforeSync */, new NopWriteCallback(), null); - } finally { - ReferenceCountUtil.safeRelease(masterKeyEntry); - } - } - } - - if (!writeDataToJournal) { - cb.writeComplete(0, ledgerId, entryId, null, ctx); - return; - } - - if (LOG.isTraceEnabled()) { - LOG.trace("Adding {}@{}", entryId, ledgerId); - } - } - /** * Fences a ledger. From this point on, clients will be unable to * write to this ledger. Only recoveryAddEntry will be diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index 3aea62e23ef..da44a81c575 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -44,7 +44,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; import org.apache.bookkeeper.bookie.stats.JournalStats; import org.apache.bookkeeper.common.collections.BatchedArrayBlockingQueue; @@ -890,19 +889,19 @@ public void logAddEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, public void logAddEntry(List entries, boolean ackBeforeSync, WriteCallback cb, Object ctx) throws InterruptedException { - AtomicLong reserveMemory = new AtomicLong(); + long reserveMemory = 0; QueueEntry[] queueEntries = new QueueEntry[entries.size()]; for (int i = 0; i < entries.size(); ++i) { ByteBuf entry = entries.get(i); long ledgerId = entry.getLong(entry.readerIndex()); long entryId = entry.getLong(entry.readerIndex() + 8); entry.retain(); - reserveMemory.addAndGet(entry.readableBytes()); + reserveMemory += entry.readableBytes(); queueEntries[i] = QueueEntry.create(entry, ackBeforeSync, ledgerId, entryId, cb, ctx, MathUtils.nowInNano(), journalStats.getJournalAddEntryStats(), callbackTime); } - memoryLimitController.releaseMemory(reserveMemory.get()); + memoryLimitController.releaseMemory(reserveMemory); journalStats.getJournalQueueSize().addCount(entries.size()); queue.putAll(queueEntries, 0, queueEntries.length); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java index 798f5a7661b..ced57f6e9fa 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java @@ -20,8 +20,6 @@ */ package org.apache.bookkeeper.processor; -import java.util.List; -import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.proto.BookieRequestHandler; /** @@ -50,10 +48,4 @@ public interface RequestProcessor extends AutoCloseable { */ void flushPendingResponses(); - /** - * Process a list of ParsedAddRequests. - * @param r - * @param channel - */ - void processAddRequest(List r, BookieRequestHandler channel); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java index 3e57564ab0b..429da793f27 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java @@ -42,7 +42,6 @@ public class BookieRequestHandler extends ChannelInboundHandlerAdapter { private static final int DEFAULT_CAPACITY = 1_000; static final Object EVENT_FLUSH_ALL_PENDING_RESPONSES = new Object(); - private final int maxCapacity; private final RequestProcessor requestProcessor; private final ChannelGroup allChannels; @@ -56,7 +55,8 @@ public class BookieRequestHandler extends ChannelInboundHandlerAdapter { BookieRequestHandler(ServerConfiguration conf, RequestProcessor processor, ChannelGroup allChannels) { this.requestProcessor = processor; this.allChannels = allChannels; - this.maxCapacity = conf.getMaxAddsInProgressLimit() > 0 ? conf.getMaxAddsInProgressLimit() : DEFAULT_CAPACITY; + + int maxCapacity = conf.getMaxAddsInProgressLimit() > 0 ? conf.getMaxAddsInProgressLimit() : DEFAULT_CAPACITY; this.msgs = new ArrayBlockingQueue<>(maxCapacity); } @@ -103,12 +103,10 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception && !((BookieProtocol.ParsedAddRequest) msg).isHighPriority() && ((BookieProtocol.ParsedAddRequest) msg).getProtocolVersion() == BookieProtocol.CURRENT_PROTOCOL_VERSION && !((BookieProtocol.ParsedAddRequest) msg).isRecoveryAdd()) { - msgs.put((BookieProtocol.ParsedAddRequest) msg); - if (msgs.size() >= maxCapacity) { - int count = msgs.size(); - List c = new ArrayList<>(count); - msgs.drainTo(c, count); - requestProcessor.processAddRequest(c, this); + BookieProtocol.ParsedAddRequest request = (BookieProtocol.ParsedAddRequest) msg; + if (!msgs.offer(request)) { + ctx.fireChannelReadComplete(); + msgs.put(request); } } else { requestProcessor.processRequest(msg, this); @@ -121,7 +119,9 @@ public void channelReadComplete(ChannelHandlerContext ctx) { int count = msgs.size(); List c = new ArrayList<>(count); msgs.drainTo(c, count); - requestProcessor.processAddRequest(c, this); + if (!c.isEmpty()) { + requestProcessor.processRequest(c, this); + } } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java index 60a5f511f25..3b835a0c54e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java @@ -316,6 +316,14 @@ private void shutdownExecutor(OrderedExecutor service) { @Override public void processRequest(Object msg, BookieRequestHandler requestHandler) { + if (msg instanceof List) { + if (!((List) msg).isEmpty() + && ((List) msg).get(0) instanceof BookieProtocol.ParsedAddRequest) { + processBatchAddRequest((List) msg, requestHandler); + } + return; + } + Channel channel = requestHandler.ctx().channel(); // If we can decode this packet as a Request protobuf packet, process // it as a version 3 packet. Else, just use the old protocol. @@ -741,8 +749,8 @@ public void handleNonWritableChannel(Channel channel) { onResponseTimeout.accept(channel); } - @Override - public void processAddRequest(List msgs, BookieRequestHandler requestHandler) { + private void processBatchAddRequest(List msgs, + BookieRequestHandler requestHandler) { WriteBatchEntryProcessor write = WriteBatchEntryProcessor.create(msgs, requestHandler, this); if (writeThreadPool == null) { write.run(); From b128308fe31d72d44da943addf970c803a8dbdab Mon Sep 17 00:00:00 2001 From: chenhang Date: Mon, 13 Mar 2023 20:26:14 +0800 Subject: [PATCH 12/16] fix failed CI --- .../java/org/apache/bookkeeper/proto/BookieRequestHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java index 429da793f27..8e9f00d2a2c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java @@ -105,7 +105,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception && !((BookieProtocol.ParsedAddRequest) msg).isRecoveryAdd()) { BookieProtocol.ParsedAddRequest request = (BookieProtocol.ParsedAddRequest) msg; if (!msgs.offer(request)) { - ctx.fireChannelReadComplete(); + channelReadComplete(ctx); msgs.put(request); } } else { From 76db67b5769b2706fac5f35a9eefdda4e43dedb8 Mon Sep 17 00:00:00 2001 From: chenhang Date: Tue, 14 Mar 2023 12:24:24 +0800 Subject: [PATCH 13/16] address comments --- .../java/org/apache/bookkeeper/bookie/BookieImpl.java | 10 +++++----- .../java/org/apache/bookkeeper/bookie/Journal.java | 5 +++-- .../bookkeeper/proto/WriteBatchEntryProcessor.java | 1 + 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java index 6f57acf81c0..c539cab39ed 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java @@ -1162,12 +1162,12 @@ public void addEntryList(List requests, boolean ackBeforeSync, List entries = requests.stream() .map(ParsedAddRequest::getData).collect(Collectors.toList()); getJournal(requests.get(0).getLedgerId()).logAddEntry(entries, ackBeforeSync, cb, ctx); - - requests.forEach(t -> { - t.release(); - t.recycle(); - }); } + + requests.forEach(t -> { + t.release(); + t.recycle(); + }); } /** diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index da44a81c575..e01a1a59e11 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -891,6 +891,7 @@ public void logAddEntry(List entries, boolean ackBeforeSync, WriteCallb throws InterruptedException { long reserveMemory = 0; QueueEntry[] queueEntries = new QueueEntry[entries.size()]; + long start = MathUtils.nowInNano(); for (int i = 0; i < entries.size(); ++i) { ByteBuf entry = entries.get(i); long ledgerId = entry.getLong(entry.readerIndex()); @@ -898,10 +899,10 @@ public void logAddEntry(List entries, boolean ackBeforeSync, WriteCallb entry.retain(); reserveMemory += entry.readableBytes(); queueEntries[i] = QueueEntry.create(entry, ackBeforeSync, ledgerId, entryId, cb, ctx, - MathUtils.nowInNano(), journalStats.getJournalAddEntryStats(), callbackTime); + start, journalStats.getJournalAddEntryStats(), callbackTime); } - memoryLimitController.releaseMemory(reserveMemory); + memoryLimitController.reserveMemory(reserveMemory); journalStats.getJournalQueueSize().addCount(entries.size()); queue.putAll(queueEntries, 0, queueEntries.length); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteBatchEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteBatchEntryProcessor.java index 7fb50c83139..cd1b77a5659 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteBatchEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteBatchEntryProcessor.java @@ -46,6 +46,7 @@ protected void reset() { requests = null; requestHandler = null; requestProcessor = null; + requestCount = null; startTimeNanos = -1L; } From 616bb437b08e7f9e39d351c1cabadd0602a64755 Mon Sep 17 00:00:00 2001 From: chenhang Date: Tue, 14 Mar 2023 14:49:07 +0800 Subject: [PATCH 14/16] address comments --- .../bookkeeper/proto/BookieRequestProcessor.java | 12 +----------- .../org/apache/bookkeeper/proto/RequestStats.java | 4 ++-- .../bookkeeper/proto/WriteBatchEntryProcessor.java | 9 ++++----- 3 files changed, 7 insertions(+), 18 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java index 3b835a0c54e..6c015809b62 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java @@ -220,7 +220,7 @@ protected void onAddRequestStart(Channel channel, int permits) { requestStats.unblockAddRequest(delayNanos); } } - requestStats.trackAddRequest(); + requestStats.trackAddRequest(permits); } protected void onAddRequestFinish() { @@ -230,16 +230,6 @@ protected void onAddRequestFinish() { } } - protected void onAddRequestFinishWithoutUnTrack() { - if (addsSemaphore != null) { - addsSemaphore.release(); - } - } - - protected void onAddRequestUnTrack() { - requestStats.untrackAddRequest(); - } - protected void onReadRequestStart(Channel channel) { if (readsSemaphore != null) { if (!readsSemaphore.tryAcquire()) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestStats.java index e31963edd14..48b29147ec7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestStats.java @@ -343,8 +343,8 @@ void unblockAddRequest(long delayNanos) { addsBlocked.decrementAndGet(); } - void trackAddRequest() { - final int curr = addsInProgress.incrementAndGet(); + void trackAddRequest(int permits) { + final int curr = addsInProgress.addAndGet(permits); maxAddsInProgress.accumulateAndGet(curr, Integer::max); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteBatchEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteBatchEntryProcessor.java index cd1b77a5659..5b3e1f7b0d8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteBatchEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteBatchEntryProcessor.java @@ -39,14 +39,14 @@ public class WriteBatchEntryProcessor extends PacketProcessorBase implements WriteCallback { long startTimeNanos; List requests; - AtomicInteger requestCount; + AtomicInteger requestCount = new AtomicInteger(0); @Override protected void reset() { requests = null; requestHandler = null; requestProcessor = null; - requestCount = null; + requestCount.set(0); startTimeNanos = -1L; } @@ -64,7 +64,7 @@ protected void init(List requests, BookieRequestHandler reques this.requestHandler = requestHandler; this.requestProcessor = requestProcessor; this.enqueueNanos = MathUtils.nowInNano(); - this.requestCount = new AtomicInteger(requests.size()); + this.requestCount.set(requests.size()); } @Override @@ -83,10 +83,9 @@ public void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Ob } requestHandler.prepareSendResponseV2(rc, BookieProtocol.CURRENT_PROTOCOL_VERSION, ADDENTRY, ledgerId, entryId); - requestProcessor.onAddRequestFinishWithoutUnTrack(); + requestProcessor.onAddRequestFinish(); if (requestCount.decrementAndGet() == 0) { - requestProcessor.onAddRequestUnTrack(); recycle(); } } From 78495355d135d9525df6abb7f663627b0f5ad0b6 Mon Sep 17 00:00:00 2001 From: chenhang Date: Thu, 16 Mar 2023 09:51:14 +0800 Subject: [PATCH 15/16] address comments --- .../org/apache/bookkeeper/bookie/Bookie.java | 2 +- .../apache/bookkeeper/bookie/BookieImpl.java | 28 ++++++++++--------- .../org/apache/bookkeeper/bookie/Journal.java | 2 +- .../proto/BookieRequestHandler.java | 1 + .../proto/WriteBatchEntryProcessor.java | 2 +- 5 files changed, 19 insertions(+), 16 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java index 270f01380ef..893b6ba22d3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java @@ -47,7 +47,7 @@ public interface Bookie { // TODO: replace ackBeforeSync with flags void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException, InterruptedException; - void addEntryList(List requests, boolean ackBeforeSync, WriteCallback cb, + void addEntries(List requests, boolean ackBeforeSync, WriteCallback cb, Object ctx, RequestStats requestStats) throws InterruptedException; void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException, InterruptedException; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java index c539cab39ed..fa7f25e49ce 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java @@ -1096,7 +1096,7 @@ public void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Obj } } - public void addEntryList(List requests, boolean ackBeforeSync, + public void addEntries(List requests, boolean ackBeforeSync, WriteCallback cb, Object ctx, RequestStats requestStats) throws InterruptedException { long requestNans = MathUtils.nowInNano(); boolean hasFailedRequests = false; @@ -1154,20 +1154,22 @@ public void addEntryList(List requests, boolean ackBeforeSync, } handleMap.clear(); - if (hasFailedRequests && requestProcessor != null) { - requestProcessor.flushPendingResponses(); - } + try { + if (hasFailedRequests && requestProcessor != null) { + requestProcessor.flushPendingResponses(); + } - if (writeDataToJournal && !requests.isEmpty()) { - List entries = requests.stream() - .map(ParsedAddRequest::getData).collect(Collectors.toList()); - getJournal(requests.get(0).getLedgerId()).logAddEntry(entries, ackBeforeSync, cb, ctx); + if (writeDataToJournal && !requests.isEmpty()) { + List entries = requests.stream() + .map(ParsedAddRequest::getData).collect(Collectors.toList()); + getJournal(requests.get(0).getLedgerId()).logAddEntries(entries, ackBeforeSync, cb, ctx); + } + } finally { + requests.forEach(t -> { + t.release(); + t.recycle(); + }); } - - requests.forEach(t -> { - t.release(); - t.recycle(); - }); } /** diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index e01a1a59e11..860bace5029 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -887,7 +887,7 @@ public void logAddEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, logAddEntry(ledgerId, entryId, entry, ackBeforeSync, cb, ctx); } - public void logAddEntry(List entries, boolean ackBeforeSync, WriteCallback cb, Object ctx) + public void logAddEntries(List entries, boolean ackBeforeSync, WriteCallback cb, Object ctx) throws InterruptedException { long reserveMemory = 0; QueueEntry[] queueEntries = new QueueEntry[entries.size()]; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java index 8e9f00d2a2c..5412c25cd9f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java @@ -117,6 +117,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception public void channelReadComplete(ChannelHandlerContext ctx) { if (!msgs.isEmpty()) { int count = msgs.size(); + log.info("[hangc] count: {}", count); List c = new ArrayList<>(count); msgs.drainTo(c, count); if (!c.isEmpty()) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteBatchEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteBatchEntryProcessor.java index 5b3e1f7b0d8..6ad4951827b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteBatchEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteBatchEntryProcessor.java @@ -106,7 +106,7 @@ public void run() { startTimeNanos = MathUtils.nowInNano(); int rc = BookieProtocol.EOK; try { - requestProcessor.getBookie().addEntryList(requests, false, this, requestHandler, + requestProcessor.getBookie().addEntries(requests, false, this, requestHandler, requestProcessor.getRequestStats()); } catch (Throwable t) { log.error("Unexpected exception while writing requests ", t); From df2f728e25d686edb074593a29f48072159d26d8 Mon Sep 17 00:00:00 2001 From: chenhang Date: Thu, 16 Mar 2023 12:32:55 +0800 Subject: [PATCH 16/16] address comments --- .../java/org/apache/bookkeeper/proto/BookieRequestHandler.java | 1 - .../org/apache/bookkeeper/proto/WriteBatchEntryProcessor.java | 3 +++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java index 5412c25cd9f..8e9f00d2a2c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java @@ -117,7 +117,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception public void channelReadComplete(ChannelHandlerContext ctx) { if (!msgs.isEmpty()) { int count = msgs.size(); - log.info("[hangc] count: {}", count); List c = new ArrayList<>(count); msgs.drainTo(c, count); if (!c.isEmpty()) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteBatchEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteBatchEntryProcessor.java index 6ad4951827b..c7eb24122aa 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteBatchEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteBatchEntryProcessor.java @@ -92,6 +92,9 @@ public void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Ob @Override public void run() { + requestProcessor.getRequestStats().getWriteThreadQueuedLatency() + .registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS); + if (requestProcessor.getBookie().isReadOnly()) { log.warn("BookieServer is running in readOnly mode, so rejecting the request from the client!"); for (ParsedAddRequest r : requests) {