diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java index 90c8acf5af4..893b6ba22d3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java @@ -20,10 +20,14 @@ import io.netty.buffer.ByteBuf; import java.io.IOException; +import java.util.List; import java.util.PrimitiveIterator; import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.common.util.Watcher; +import org.apache.bookkeeper.processor.RequestProcessor; +import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; +import org.apache.bookkeeper.proto.RequestStats; /** * Interface for the bookie. @@ -43,6 +47,8 @@ public interface Bookie { // TODO: replace ackBeforeSync with flags void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException, InterruptedException; + void addEntries(List requests, boolean ackBeforeSync, WriteCallback cb, + Object ctx, RequestStats requestStats) throws InterruptedException; void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException, InterruptedException; void forceLedger(long ledgerId, WriteCallback cb, Object ctx); @@ -86,6 +92,8 @@ void cancelWaitForLastAddConfirmedUpdate(long ledgerId, // TODO: Should be constructed and passed in as a parameter LedgerStorage getLedgerStorage(); + void setRequestProcessor(RequestProcessor requestProcessor); + // TODO: Move this exceptions somewhere else /** * Exception is thrown when no such a ledger is found in this bookie. diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java index 0db230d9d3d..fa7f25e49ce 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java @@ -45,6 +45,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.ListIterator; import java.util.Map; import java.util.Optional; import java.util.PrimitiveIterator.OfLong; @@ -69,7 +70,11 @@ import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.net.DNS; +import org.apache.bookkeeper.processor.RequestProcessor; +import org.apache.bookkeeper.proto.BookieProtocol; +import org.apache.bookkeeper.proto.BookieProtocol.ParsedAddRequest; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; +import org.apache.bookkeeper.proto.RequestStats; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.stats.ThreadRegistry; @@ -80,6 +85,7 @@ import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.mutable.MutableBoolean; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -125,6 +131,7 @@ public class BookieImpl extends BookieCriticalThread implements Bookie { private final ByteBufAllocator allocator; private final boolean writeDataToJournal; + private RequestProcessor requestProcessor; // Write Callback do nothing static class NopWriteCallback implements WriteCallback { @@ -923,8 +930,8 @@ public ByteBuf createMasterKeyEntry(long ledgerId, byte[] masterKey) { * Add an entry to a ledger as specified by handle. */ private void addEntryInternal(LedgerDescriptor handle, ByteBuf entry, - boolean ackBeforeSync, WriteCallback cb, Object ctx, byte[] masterKey) - throws IOException, BookieException, InterruptedException { + boolean ackBeforeSync, WriteCallback cb, Object ctx, byte[] masterKey, + boolean writeJournal) throws IOException, BookieException, InterruptedException { long ledgerId = handle.getLedgerId(); long entryId = handle.addEntry(entry); @@ -954,7 +961,10 @@ private void addEntryInternal(LedgerDescriptor handle, ByteBuf entry, if (LOG.isTraceEnabled()) { LOG.trace("Adding {}@{}", entryId, ledgerId); } - getJournal(ledgerId).logAddEntry(entry, ackBeforeSync, cb, ctx); + + if (writeJournal) { + getJournal(ledgerId).logAddEntry(entry, ackBeforeSync, cb, ctx); + } } /** @@ -972,7 +982,7 @@ public void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] LedgerDescriptor handle = getLedgerForEntry(entry, masterKey); synchronized (handle) { entrySize = entry.readableBytes(); - addEntryInternal(handle, entry, false /* ackBeforeSync */, cb, ctx, masterKey); + addEntryInternal(handle, entry, false /* ackBeforeSync */, cb, ctx, masterKey, true); } success = true; } catch (NoWritableLedgerDirException e) { @@ -1066,7 +1076,7 @@ public void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Obj .create(BookieException.Code.LedgerFencedException); } entrySize = entry.readableBytes(); - addEntryInternal(handle, entry, ackBeforeSync, cb, ctx, masterKey); + addEntryInternal(handle, entry, ackBeforeSync, cb, ctx, masterKey, true); } success = true; } catch (NoWritableLedgerDirException e) { @@ -1086,6 +1096,82 @@ public void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Obj } } + public void addEntries(List requests, boolean ackBeforeSync, + WriteCallback cb, Object ctx, RequestStats requestStats) throws InterruptedException { + long requestNans = MathUtils.nowInNano(); + boolean hasFailedRequests = false; + Map, LedgerDescriptor> handleMap = new HashMap<>(); + ListIterator iter = requests.listIterator(); + while (iter.hasNext()) { + ParsedAddRequest request = iter.next(); + int rc = BookieProtocol.EOK; + try { + Pair ledgerIdMasterKey = Pair.of(request.getLedgerId(), request.getMasterKey()); + LedgerDescriptor handle = handleMap.get(ledgerIdMasterKey); + if (handle == null) { + handle = getLedgerForEntry(request.getData(), request.getMasterKey()); + handleMap.put(ledgerIdMasterKey, handle); + } + + synchronized (handle) { + if (handle.isFenced()) { + throw BookieException.create(BookieException.Code.LedgerFencedException); + } + + addEntryInternal(handle, request.getData(), ackBeforeSync, + cb, ctx, request.getMasterKey(), false); + } + } catch (BookieException.OperationRejectedException e) { + requestStats.getAddEntryRejectedCounter().inc(); + if (LOG.isDebugEnabled()) { + LOG.debug("Operation rejected while writing {} ", request, e); + } + rc = BookieProtocol.ETOOMANYREQUESTS; + } catch (IOException e) { + LOG.error("Error writing {}", request, e); + rc = BookieProtocol.EIO; + } catch (BookieException.LedgerFencedException lfe) { + LOG.error("Attempt to write to fenced ledger ", lfe); + rc = BookieProtocol.EFENCED; + } catch (BookieException e) { + LOG.error("Unauthorized access to ledger {}", request.getLedgerId(), e); + rc = BookieProtocol.EUA; + } catch (Throwable t) { + LOG.error("Unexpected exception while writing {}@{} : {} ", + request.getLedgerId(), request.getEntryId(), t.getMessage(), t); + rc = BookieProtocol.EBADREQ; + } + + if (rc != BookieProtocol.EOK) { + hasFailedRequests = true; + requestStats.getAddEntryStats() + .registerFailedEvent(MathUtils.elapsedNanos(requestNans), TimeUnit.NANOSECONDS); + cb.writeComplete(rc, request.getLedgerId(), request.getEntryId(), null, ctx); + iter.remove(); + request.release(); + request.recycle(); + } + } + handleMap.clear(); + + try { + if (hasFailedRequests && requestProcessor != null) { + requestProcessor.flushPendingResponses(); + } + + if (writeDataToJournal && !requests.isEmpty()) { + List entries = requests.stream() + .map(ParsedAddRequest::getData).collect(Collectors.toList()); + getJournal(requests.get(0).getLedgerId()).logAddEntries(entries, ackBeforeSync, cb, ctx); + } + } finally { + requests.forEach(t -> { + t.release(); + t.recycle(); + }); + } + } + /** * Fences a ledger. From this point on, clients will be unable to * write to this ledger. Only recoveryAddEntry will be @@ -1281,4 +1367,12 @@ public OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException, NoLedg } } } + + @Override + public void setRequestProcessor(RequestProcessor requestProcessor) { + for (Journal journal : journals) { + journal.setRequestProcessor(requestProcessor); + } + this.requestProcessor = requestProcessor; + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index eff1673edb0..860bace5029 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -46,11 +46,14 @@ import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; import org.apache.bookkeeper.bookie.stats.JournalStats; +import org.apache.bookkeeper.common.collections.BatchedArrayBlockingQueue; +import org.apache.bookkeeper.common.collections.BatchedBlockingQueue; import org.apache.bookkeeper.common.collections.BlockingMpscQueue; import org.apache.bookkeeper.common.collections.RecyclableArrayList; import org.apache.bookkeeper.common.util.MemoryLimitController; import org.apache.bookkeeper.common.util.affinity.CpuAffinity; import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.processor.RequestProcessor; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.NullStatsLogger; @@ -444,6 +447,8 @@ private class ForceWriteThread extends BookieCriticalThread { // This holds the queue entries that should be notified after a // successful force write Thread threadToNotifyOnEx; + + RequestProcessor requestProcessor; // should we group force writes private final boolean enableGroupForceWrites; private final Counter forceWriteThreadTime; @@ -499,6 +504,10 @@ public void run() { journalStats.getForceWriteGroupingCountStats() .registerSuccessfulValue(numReqInLastForceWrite); + if (requestProcessor != null) { + requestProcessor.flushPendingResponses(); + } + } catch (IOException ioe) { LOG.error("I/O exception in ForceWrite thread", ioe); running = false; @@ -629,7 +638,7 @@ static void writePaddingBytes(JournalChannel jc, ByteBuf paddingBuffer, int jour private static final String journalThreadName = "BookieJournal"; // journal entry queue to commit - final BlockingQueue queue; + final BatchedBlockingQueue queue; final BlockingQueue forceWriteRequests; volatile boolean running = true; @@ -660,7 +669,7 @@ public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf queue = new BlockingMpscQueue<>(conf.getJournalQueueSize()); forceWriteRequests = new BlockingMpscQueue<>(conf.getJournalQueueSize()); } else { - queue = new ArrayBlockingQueue<>(conf.getJournalQueueSize()); + queue = new BatchedArrayBlockingQueue<>(conf.getJournalQueueSize()); forceWriteRequests = new ArrayBlockingQueue<>(conf.getJournalQueueSize()); } @@ -878,6 +887,26 @@ public void logAddEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, logAddEntry(ledgerId, entryId, entry, ackBeforeSync, cb, ctx); } + public void logAddEntries(List entries, boolean ackBeforeSync, WriteCallback cb, Object ctx) + throws InterruptedException { + long reserveMemory = 0; + QueueEntry[] queueEntries = new QueueEntry[entries.size()]; + long start = MathUtils.nowInNano(); + for (int i = 0; i < entries.size(); ++i) { + ByteBuf entry = entries.get(i); + long ledgerId = entry.getLong(entry.readerIndex()); + long entryId = entry.getLong(entry.readerIndex() + 8); + entry.retain(); + reserveMemory += entry.readableBytes(); + queueEntries[i] = QueueEntry.create(entry, ackBeforeSync, ledgerId, entryId, cb, ctx, + start, journalStats.getJournalAddEntryStats(), callbackTime); + } + + memoryLimitController.reserveMemory(reserveMemory); + journalStats.getJournalQueueSize().addCount(entries.size()); + queue.putAll(queueEntries, 0, queueEntries.length); + } + @VisibleForTesting public void logAddEntry(long ledgerId, long entryId, ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Object ctx) @@ -1093,6 +1122,10 @@ journalFormatVersionToWrite, getBufferedChannelBuilder(), numEntriesToFlush--; entry.run(); } + + if (forceWriteThread.requestProcessor != null) { + forceWriteThread.requestProcessor.flushPendingResponses(); + } } lastFlushPosition = bc.position(); @@ -1211,6 +1244,10 @@ public BufferedChannelBuilder getBufferedChannelBuilder() { return (FileChannel fc, int capacity) -> new BufferedChannel(allocator, fc, capacity); } + public void setRequestProcessor(RequestProcessor requestProcessor) { + forceWriteThread.requestProcessor = requestProcessor; + } + /** * Shuts down the journal. */ diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java index 5a4238e64d6..ced57f6e9fa 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java @@ -42,4 +42,10 @@ public interface RequestProcessor extends AutoCloseable { * channel received the given request r */ void processRequest(Object r, BookieRequestHandler channel); + + /** + * Flush any pending response staged on all the client connections. + */ + void flushPendingResponses(); + } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java index 3e41a3f5ea7..276c38b9111 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java @@ -334,6 +334,23 @@ public Object decode(ByteBuf buffer) throw new IllegalStateException("Received unknown response : op code = " + opCode); } } + + public static void serializeAddResponseInto(int rc, BookieProtocol.ParsedAddRequest req, ByteBuf buf) { + buf.writeInt(RESPONSE_HEADERS_SIZE); // Frame size + buf.writeInt(PacketHeader.toInt(req.getProtocolVersion(), req.getOpCode(), (short) 0)); + buf.writeInt(rc); // rc-code + buf.writeLong(req.getLedgerId()); + buf.writeLong(req.getEntryId()); + } + + public static void serializeAddResponseInto(int rc, byte version, byte opCode, + long ledgerId, long entryId, ByteBuf buf) { + buf.writeInt(RESPONSE_HEADERS_SIZE); // Frame size + buf.writeInt(PacketHeader.toInt(version, opCode, (short) 0)); + buf.writeInt(rc); // rc-code + buf.writeLong(ledgerId); + buf.writeLong(entryId); + } } /** @@ -504,7 +521,10 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) if (LOG.isTraceEnabled()) { LOG.trace("Encode response {} to channel {}.", msg, ctx.channel()); } - if (msg instanceof BookkeeperProtocol.Response) { + + if (msg instanceof ByteBuf) { + ctx.write(msg, promise); + } else if (msg instanceof BookkeeperProtocol.Response) { ctx.write(repV3.encode(msg, ctx.alloc()), promise); } else if (msg instanceof BookieProtocol.Response) { ctx.write(repPreV3.encode(msg, ctx.alloc()), promise); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java index 89654449aad..e53ef9e5b15 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java @@ -219,11 +219,11 @@ byte getOpCode() { return opCode; } - long getLedgerId() { + public long getLedgerId() { return ledgerId; } - long getEntryId() { + public long getEntryId() { return entryId; } @@ -235,7 +235,7 @@ boolean hasMasterKey() { return masterKey != null; } - byte[] getMasterKey() { + public byte[] getMasterKey() { assert hasMasterKey(); return masterKey; } @@ -323,7 +323,7 @@ static ParsedAddRequest create(byte protocolVersion, long ledgerId, long entryId return add; } - ByteBuf getData() { + public ByteBuf getData() { // We need to have different ByteBufList instances for each bookie write return data; } @@ -332,7 +332,7 @@ boolean isRecoveryAdd() { return (flags & FLAG_RECOVERY_ADD) == FLAG_RECOVERY_ADD; } - void release() { + public void release() { ReferenceCountUtil.release(data); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java index c9d65a73174..8e9f00d2a2c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java @@ -20,29 +20,44 @@ */ package org.apache.bookkeeper.proto; +import static org.apache.bookkeeper.proto.BookieProtocol.ADDENTRY; + +import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.group.ChannelGroup; import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.processor.RequestProcessor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Serverside handler for bookkeeper requests. */ +@Slf4j public class BookieRequestHandler extends ChannelInboundHandlerAdapter { + private static final int DEFAULT_CAPACITY = 1_000; + static final Object EVENT_FLUSH_ALL_PENDING_RESPONSES = new Object(); - private static final Logger LOG = LoggerFactory.getLogger(BookieRequestHandler.class); private final RequestProcessor requestProcessor; private final ChannelGroup allChannels; private ChannelHandlerContext ctx; + private final BlockingQueue msgs; + + private ByteBuf pendingSendResponses = null; + private int maxPendingResponsesSize; BookieRequestHandler(ServerConfiguration conf, RequestProcessor processor, ChannelGroup allChannels) { this.requestProcessor = processor; this.allChannels = allChannels; + + int maxCapacity = conf.getMaxAddsInProgressLimit() > 0 ? conf.getMaxAddsInProgressLimit() : DEFAULT_CAPACITY; + this.msgs = new ArrayBlockingQueue<>(maxCapacity); } public ChannelHandlerContext ctx() { @@ -51,7 +66,7 @@ public ChannelHandlerContext ctx() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { - LOG.info("Channel connected {}", ctx.channel()); + log.info("Channel connected {}", ctx.channel()); this.ctx = ctx; super.channelActive(ctx); } @@ -63,16 +78,16 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { - LOG.info("Channels disconnected: {}", ctx.channel()); + log.info("Channels disconnected: {}", ctx.channel()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (cause instanceof ClosedChannelException) { - LOG.info("Client died before request could be completed on {}", ctx.channel(), cause); + log.info("Client died before request could be completed on {}", ctx.channel(), cause); return; } - LOG.error("Unhandled exception occurred in I/O thread or handler on {}", ctx.channel(), cause); + log.error("Unhandled exception occurred in I/O thread or handler on {}", ctx.channel(), cause); ctx.close(); } @@ -82,6 +97,70 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception ctx.fireChannelRead(msg); return; } - requestProcessor.processRequest(msg, this); + + if (msg instanceof BookieProtocol.ParsedAddRequest + && ADDENTRY == ((BookieProtocol.ParsedAddRequest) msg).getOpCode() + && !((BookieProtocol.ParsedAddRequest) msg).isHighPriority() + && ((BookieProtocol.ParsedAddRequest) msg).getProtocolVersion() == BookieProtocol.CURRENT_PROTOCOL_VERSION + && !((BookieProtocol.ParsedAddRequest) msg).isRecoveryAdd()) { + BookieProtocol.ParsedAddRequest request = (BookieProtocol.ParsedAddRequest) msg; + if (!msgs.offer(request)) { + channelReadComplete(ctx); + msgs.put(request); + } + } else { + requestProcessor.processRequest(msg, this); + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + if (!msgs.isEmpty()) { + int count = msgs.size(); + List c = new ArrayList<>(count); + msgs.drainTo(c, count); + if (!c.isEmpty()) { + requestProcessor.processRequest(c, this); + } + } + } + + public synchronized void prepareSendResponseV2(int rc, BookieProtocol.ParsedAddRequest req) { + if (pendingSendResponses == null) { + pendingSendResponses = ctx.alloc().directBuffer(maxPendingResponsesSize != 0 + ? maxPendingResponsesSize : 256); + } + + BookieProtoEncoding.ResponseEnDeCoderPreV3.serializeAddResponseInto(rc, req, pendingSendResponses); + } + + public synchronized void prepareSendResponseV2(int rc, byte version, byte opCode, long ledgerId, long entryId) { + if (pendingSendResponses == null) { + pendingSendResponses = ctx.alloc().directBuffer(maxPendingResponsesSize != 0 + ? maxPendingResponsesSize : 256); + } + BookieProtoEncoding.ResponseEnDeCoderPreV3.serializeAddResponseInto(rc, version, opCode, ledgerId, entryId, + pendingSendResponses); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt == EVENT_FLUSH_ALL_PENDING_RESPONSES) { + synchronized (this) { + if (pendingSendResponses != null) { + maxPendingResponsesSize = Math.max(maxPendingResponsesSize, + pendingSendResponses.readableBytes()); + if (ctx.channel().isActive()) { + ctx.writeAndFlush(pendingSendResponses, ctx.voidPromise()); + } else { + pendingSendResponses.release(); + } + + pendingSendResponses = null; + } + } + } else { + super.userEventTriggered(ctx, evt); + } } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java index 9237c451ed6..6c015809b62 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java @@ -30,10 +30,12 @@ import com.google.protobuf.ByteString; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; +import io.netty.channel.group.ChannelGroup; import io.netty.handler.ssl.SslHandler; import io.netty.util.HashedWheelTimer; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; +import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; @@ -118,6 +120,8 @@ public class BookieRequestProcessor implements RequestProcessor { final Semaphore addsSemaphore; final Semaphore readsSemaphore; + final ChannelGroup allChannels; + // to temporary blacklist channels final Optional> blacklistedChannels; final Consumer onResponseTimeout; @@ -127,9 +131,11 @@ public class BookieRequestProcessor implements RequestProcessor { private final boolean throttleReadResponses; public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie, StatsLogger statsLogger, - SecurityHandlerFactory shFactory, ByteBufAllocator allocator) throws SecurityException { + SecurityHandlerFactory shFactory, ByteBufAllocator allocator, + ChannelGroup allChannels) throws SecurityException { this.serverCfg = serverCfg; this.allocator = allocator; + this.allChannels = allChannels; this.waitTimeoutOnBackpressureMillis = serverCfg.getWaitTimeoutOnResponseBackpressureMillis(); this.preserveMdcForTaskExecution = serverCfg.getPreserveMdcForTaskExecution(); this.bookie = bookie; @@ -200,21 +206,21 @@ public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie, Stat readsSemaphore = maxReads > 0 ? new Semaphore(maxReads, true) : null; } - protected void onAddRequestStart(Channel channel) { + protected void onAddRequestStart(Channel channel, int permits) { if (addsSemaphore != null) { - if (!addsSemaphore.tryAcquire()) { + if (!addsSemaphore.tryAcquire(permits)) { final long throttlingStartTimeNanos = MathUtils.nowInNano(); channel.config().setAutoRead(false); LOG.info("Too many add requests in progress, disabling autoread on channel {}", channel); requestStats.blockAddRequest(); - addsSemaphore.acquireUninterruptibly(); + addsSemaphore.acquireUninterruptibly(permits); channel.config().setAutoRead(true); final long delayNanos = MathUtils.elapsedNanos(throttlingStartTimeNanos); LOG.info("Re-enabled autoread on channel {} after AddRequest delay of {} nanos", channel, delayNanos); requestStats.unblockAddRequest(delayNanos); } } - requestStats.trackAddRequest(); + requestStats.trackAddRequest(permits); } protected void onAddRequestFinish() { @@ -300,6 +306,14 @@ private void shutdownExecutor(OrderedExecutor service) { @Override public void processRequest(Object msg, BookieRequestHandler requestHandler) { + if (msg instanceof List) { + if (!((List) msg).isEmpty() + && ((List) msg).get(0) instanceof BookieProtocol.ParsedAddRequest) { + processBatchAddRequest((List) msg, requestHandler); + } + return; + } + Channel channel = requestHandler.ctx().channel(); // If we can decode this packet as a Request protobuf packet, process // it as a version 3 packet. Else, just use the old protocol. @@ -694,6 +708,13 @@ private void processReadRequest(final BookieProtocol.ReadRequest r, final Bookie } } + @Override + public void flushPendingResponses() { + for (Channel c : allChannels) { + c.pipeline().fireUserEventTriggered(BookieRequestHandler.EVENT_FLUSH_ALL_PENDING_RESPONSES); + } + } + public long getWaitTimeoutOnBackpressureMillis() { return waitTimeoutOnBackpressureMillis; } @@ -717,4 +738,28 @@ public boolean isBlacklisted(Channel channel) { public void handleNonWritableChannel(Channel channel) { onResponseTimeout.accept(channel); } + + private void processBatchAddRequest(List msgs, + BookieRequestHandler requestHandler) { + WriteBatchEntryProcessor write = WriteBatchEntryProcessor.create(msgs, requestHandler, this); + if (writeThreadPool == null) { + write.run(); + } else { + try { + writeThreadPool.execute(write); + } catch (RejectedExecutionException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to process request to add entry. Too many pending requests ", e); + } + getRequestStats().getAddEntryRejectedCounter().addCount(msgs.size()); + + for (BookieProtocol.ParsedAddRequest request : msgs) { + write.sendWriteReqResponse(BookieProtocol.ETOOMANYREQUESTS, + ResponseBuilder.buildErrorResponse(BookieProtocol.ETOOMANYREQUESTS, request), + requestStats.getAddRequestStats()); + request.recycle(); + } + } + } + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java index 64f439b2134..caff467db36 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java @@ -102,9 +102,11 @@ public BookieServer(ServerConfiguration conf, shFactory = SecurityProviderFactoryFactory .getSecurityProviderFactory(conf.getTLSProviderFactoryClass()); + this.requestProcessor = new BookieRequestProcessor(conf, bookie, - statsLogger.scope(SERVER_SCOPE), shFactory, allocator); + statsLogger.scope(SERVER_SCOPE), shFactory, allocator, nettyServer.allChannels); this.nettyServer.setRequestProcessor(this.requestProcessor); + this.bookie.setRequestProcessor(this.requestProcessor); } /** diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestStats.java index e31963edd14..48b29147ec7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestStats.java @@ -343,8 +343,8 @@ void unblockAddRequest(long delayNanos) { addsBlocked.decrementAndGet(); } - void trackAddRequest() { - final int curr = addsInProgress.incrementAndGet(); + void trackAddRequest(int permits) { + final int curr = addsInProgress.addAndGet(permits); maxAddsInProgress.accumulateAndGet(curr, Integer::max); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java index 563c0a1352f..27bbb1f8b23 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java @@ -39,6 +39,10 @@ static BookieProtocol.Response buildAddResponse(BookieProtocol.Request r) { r.getEntryId()); } + static BookieProtocol.Response buildAddResponse(byte protocolVersion, long ledgerId, long entryId) { + return BookieProtocol.AddResponse.create(protocolVersion, BookieProtocol.EOK, ledgerId, entryId); + } + static BookieProtocol.Response buildReadResponse(ByteBuf data, BookieProtocol.Request r) { return new BookieProtocol.ReadResponse(r.getProtocolVersion(), BookieProtocol.EOK, r.getLedgerId(), r.getEntryId(), data); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteBatchEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteBatchEntryProcessor.java new file mode 100644 index 00000000000..c7eb24122aa --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteBatchEntryProcessor.java @@ -0,0 +1,146 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.proto; + +import static org.apache.bookkeeper.proto.BookieProtocol.ADDENTRY; + +import io.netty.util.Recycler; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.proto.BookieProtocol.ParsedAddRequest; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; +import org.apache.bookkeeper.util.MathUtils; + +/** + * Processes batched add entry requests. + */ +@Slf4j +public class WriteBatchEntryProcessor extends PacketProcessorBase implements WriteCallback { + long startTimeNanos; + List requests; + AtomicInteger requestCount = new AtomicInteger(0); + + @Override + protected void reset() { + requests = null; + requestHandler = null; + requestProcessor = null; + requestCount.set(0); + startTimeNanos = -1L; + } + + public static WriteBatchEntryProcessor create(List requests, BookieRequestHandler requestHandler, + BookieRequestProcessor requestProcessor) { + WriteBatchEntryProcessor wbep = RECYCLER.get(); + wbep.init(requests, requestHandler, requestProcessor); + requestProcessor.onAddRequestStart(requestHandler.ctx().channel(), requests.size()); + return wbep; + } + + protected void init(List requests, BookieRequestHandler requestHandler, + BookieRequestProcessor requestProcessor) { + this.requests = requests; + this.requestHandler = requestHandler; + this.requestProcessor = requestProcessor; + this.enqueueNanos = MathUtils.nowInNano(); + this.requestCount.set(requests.size()); + } + + @Override + protected void processPacket() { + + } + + @Override + public void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Object ctx) { + if (BookieProtocol.EOK == rc) { + requestProcessor.getRequestStats().getAddEntryStats() + .registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); + } else { + requestProcessor.getRequestStats().getAddEntryStats() + .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); + } + + requestHandler.prepareSendResponseV2(rc, BookieProtocol.CURRENT_PROTOCOL_VERSION, ADDENTRY, ledgerId, entryId); + requestProcessor.onAddRequestFinish(); + + if (requestCount.decrementAndGet() == 0) { + recycle(); + } + } + + @Override + public void run() { + requestProcessor.getRequestStats().getWriteThreadQueuedLatency() + .registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS); + + if (requestProcessor.getBookie().isReadOnly()) { + log.warn("BookieServer is running in readOnly mode, so rejecting the request from the client!"); + for (ParsedAddRequest r : requests) { + writeComplete(BookieProtocol.EREADONLY, r.getLedgerId(), r.getEntryId(), null, + requestHandler.ctx()); + r.release(); + r.recycle(); + } + return; + } + + startTimeNanos = MathUtils.nowInNano(); + int rc = BookieProtocol.EOK; + try { + requestProcessor.getBookie().addEntries(requests, false, this, requestHandler, + requestProcessor.getRequestStats()); + } catch (Throwable t) { + log.error("Unexpected exception while writing requests ", t); + rc = BookieProtocol.EBADREQ; + } + + if (rc != BookieProtocol.EOK) { + requestProcessor.getRequestStats().getAddEntryStats() + .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); + for (ParsedAddRequest r : requests) { + writeComplete(rc, r.getLedgerId(), r.getEntryId(), null, requestHandler.ctx()); + r.release(); + r.recycle(); + } + requestProcessor.flushPendingResponses(); + } + } + + void recycle() { + reset(); + recyclerHandle.recycle(this); + } + + private final Recycler.Handle recyclerHandle; + private WriteBatchEntryProcessor(Recycler.Handle recycleHandle) { + this.recyclerHandle = recycleHandle; + } + private static final Recycler RECYCLER = new Recycler() { + @Override + protected WriteBatchEntryProcessor newObject(Recycler.Handle handle) { + return new WriteBatchEntryProcessor(handle); + } + }; +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java index 7e8f9fa768d..93a77203a5e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java @@ -50,7 +50,7 @@ public static WriteEntryProcessor create(ParsedAddRequest request, BookieRequest BookieRequestProcessor requestProcessor) { WriteEntryProcessor wep = RECYCLER.get(); wep.init(request, requestHandler, requestProcessor); - requestProcessor.onAddRequestStart(requestHandler.ctx().channel()); + requestProcessor.onAddRequestStart(requestHandler.ctx().channel(), 1); return wep; } @@ -122,9 +122,10 @@ public void writeComplete(int rc, long ledgerId, long entryId, requestProcessor.getRequestStats().getAddEntryStats() .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); } - sendWriteReqResponse(rc, - ResponseBuilder.buildAddResponse(request), - requestProcessor.getRequestStats().getAddRequestStats()); + + requestHandler.prepareSendResponseV2(rc, request); + requestProcessor.onAddRequestFinish(); + request.recycle(); recycle(); } @@ -153,4 +154,5 @@ protected WriteEntryProcessor newObject(Recycler.Handle han return new WriteEntryProcessor(handle); } }; + } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java index 36aff7ad924..f02cec90c43 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java @@ -45,7 +45,7 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 { public WriteEntryProcessorV3(Request request, BookieRequestHandler requestHandler, BookieRequestProcessor requestProcessor) { super(request, requestHandler, requestProcessor); - requestProcessor.onAddRequestStart(requestHandler.ctx().channel()); + requestProcessor.onAddRequestStart(requestHandler.ctx().channel(), 1); } // Returns null if there is no exception thrown diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java index 8ae0b720192..c873f2fa40f 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java @@ -125,6 +125,7 @@ public BookieWriteLedgerTest() { */ baseConf.setSkipListSizeLimit(4 * 1024 * 1024); baseClientConf.setLedgerManagerFactoryClassName(ledgerManagerFactory); + baseClientConf.setUseV2WireProtocol(true); } /** @@ -1444,6 +1445,32 @@ public void testLedgerMetadataTest() throws Exception { lh.close(); } + @Test + public void testReadWriteEntry() throws Exception { + lh = bkc.createLedgerAdv(3, 3, 3, digestType, ledgerPassword); + numEntriesToWrite = 15000; + List entries = new ArrayList<>(); + CountDownLatch latch = new CountDownLatch(numEntriesToWrite); + for (int i = 0; i < numEntriesToWrite; ++i) { + ByteBuffer entry = ByteBuffer.allocate(4); + entry.putInt(rng.nextInt(maxInt)); + entry.position(0); + entries.add(entry.array()); + lh.asyncAddEntry(i, entry.array(), new AddCallback() { + @Override + public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) { + assertEquals(0, rc); + latch.countDown(); + } + }, null); + } + latch.await(); + + readEntries(lh, entries); + lh.close(); + + } + private void readEntries(LedgerHandle lh, List entries) throws InterruptedException, BKException { ls = lh.readEntries(0, numEntriesToWrite - 1); int index = 0; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerWithV2ProtocolTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerWithV2ProtocolTest.java new file mode 100644 index 00000000000..7d6e24b882e --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerWithV2ProtocolTest.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.client; + +import org.apache.bookkeeper.client.AsyncCallback.AddCallback; + +/** + * Testing ledger write entry cases. + */ +public class BookieWriteLedgerWithV2ProtocolTest extends + BookieWriteLedgerTest implements AddCallback { + + public BookieWriteLedgerWithV2ProtocolTest() { + super(); + baseClientConf.setUseV2WireProtocol(true); + } +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java index d5ee8f52750..46304023433 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java @@ -33,6 +33,8 @@ import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest; @@ -53,12 +55,15 @@ public class TestBookieRequestProcessor { final BookieRequestProcessor requestProcessor = mock(BookieRequestProcessor.class); + private final ChannelGroup channelGroup = new DefaultChannelGroup(null); + @Test public void testConstructLongPollThreads() throws Exception { // long poll threads == read threads ServerConfiguration conf = new ServerConfiguration(); try (BookieRequestProcessor processor = new BookieRequestProcessor( - conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT)) { + conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT, + channelGroup)) { assertSame(processor.getReadThreadPool(), processor.getLongPollThreadPool()); } @@ -66,7 +71,8 @@ conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocat conf = new ServerConfiguration(); conf.setNumReadWorkerThreads(0); try (BookieRequestProcessor processor = new BookieRequestProcessor( - conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT)) { + conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT, + channelGroup)) { assertNull(processor.getReadThreadPool()); assertNotNull(processor.getLongPollThreadPool()); } @@ -76,7 +82,8 @@ conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocat conf.setNumReadWorkerThreads(2); conf.setNumLongPollWorkerThreads(2); try (BookieRequestProcessor processor = new BookieRequestProcessor( - conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT)) { + conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT, + channelGroup)) { assertNotNull(processor.getReadThreadPool()); assertNotNull(processor.getLongPollThreadPool()); assertNotSame(processor.getReadThreadPool(), processor.getLongPollThreadPool()); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java index 8fc3a89f004..a02cde4ab99 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.doAnswer; @@ -181,28 +182,25 @@ public void testHighPriorityWritesOnReadOnlyBookieWhenHighPriorityWritesAllowed( return null; }).when(bookie).addEntry(any(ByteBuf.class), eq(false), same(processor), same(requestHandler), eq(new byte[0])); - AtomicReference writtenObject = new AtomicReference<>(); + AtomicReference writtenObject = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); doAnswer(invocationOnMock -> { writtenObject.set(invocationOnMock.getArgument(0)); latch.countDown(); return null; - }).when(channel).writeAndFlush(any(), any()); + }).when(requestHandler).prepareSendResponseV2(anyInt(), any()); processor.run(); verify(bookie, times(1)) .addEntry(any(ByteBuf.class), eq(false), same(processor), same(requestHandler), eq(new byte[0])); - verify(channel, times(1)).writeAndFlush(any(), any()); + verify(requestHandler, times(1)).prepareSendResponseV2(anyInt(), any()); +// verify(channel, times(1)).writeAndFlush(any(), any()); latch.await(); - assertTrue(writtenObject.get() instanceof Response); - Response response = (Response) writtenObject.get(); - assertEquals(BookieProtocol.EOK, response.getErrorCode()); - - response.release(); - response.recycle(); + assertTrue(writtenObject.get() instanceof Integer); + assertEquals(BookieProtocol.EOK, (int) writtenObject.get()); } @Test @@ -216,28 +214,23 @@ public void testNormalWritesOnWritableBookie() throws Exception { return null; }).when(bookie).addEntry(any(ByteBuf.class), eq(false), same(processor), same(requestHandler), eq(new byte[0])); - AtomicReference writtenObject = new AtomicReference<>(); + AtomicReference writtenObject = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); doAnswer(invocationOnMock -> { writtenObject.set(invocationOnMock.getArgument(0)); latch.countDown(); return null; - }).when(channel).writeAndFlush(any(), any()); + }).when(requestHandler).prepareSendResponseV2(anyInt(), any()); processor.run(); verify(bookie, times(1)) .addEntry(any(ByteBuf.class), eq(false), same(processor), same(requestHandler), eq(new byte[0])); - verify(channel, times(1)).writeAndFlush(any(), any()); + verify(requestHandler, times(1)).prepareSendResponseV2(anyInt(), any()); latch.await(); - assertTrue(writtenObject.get() instanceof Response); - Response response = (Response) writtenObject.get(); - assertEquals(BookieProtocol.EOK, response.getErrorCode()); - - response.release(); - response.recycle(); + assertEquals(BookieProtocol.EOK, (int) writtenObject.get()); } @Test