Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@

import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.List;
import java.util.PrimitiveIterator;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.processor.RequestProcessor;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.proto.RequestStats;

/**
* Interface for the bookie.
Expand All @@ -43,6 +47,8 @@ public interface Bookie {
// TODO: replace ackBeforeSync with flags
void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Object ctx, byte[] masterKey)
throws IOException, BookieException, InterruptedException;
void addEntries(List<BookieProtocol.ParsedAddRequest> requests, boolean ackBeforeSync, WriteCallback cb,
Object ctx, RequestStats requestStats) throws InterruptedException;
void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey)
throws IOException, BookieException, InterruptedException;
void forceLedger(long ledgerId, WriteCallback cb, Object ctx);
Expand Down Expand Up @@ -86,6 +92,8 @@ void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
// TODO: Should be constructed and passed in as a parameter
LedgerStorage getLedgerStorage();

void setRequestProcessor(RequestProcessor requestProcessor);

// TODO: Move this exceptions somewhere else
/**
* Exception is thrown when no such a ledger is found in this bookie.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Optional;
import java.util.PrimitiveIterator.OfLong;
Expand All @@ -69,7 +70,11 @@
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.DNS;
import org.apache.bookkeeper.processor.RequestProcessor;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookieProtocol.ParsedAddRequest;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.proto.RequestStats;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.ThreadRegistry;
Expand All @@ -80,6 +85,7 @@
import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -125,6 +131,7 @@ public class BookieImpl extends BookieCriticalThread implements Bookie {
private final ByteBufAllocator allocator;

private final boolean writeDataToJournal;
private RequestProcessor requestProcessor;

// Write Callback do nothing
static class NopWriteCallback implements WriteCallback {
Expand Down Expand Up @@ -923,8 +930,8 @@ public ByteBuf createMasterKeyEntry(long ledgerId, byte[] masterKey) {
* Add an entry to a ledger as specified by handle.
*/
private void addEntryInternal(LedgerDescriptor handle, ByteBuf entry,
boolean ackBeforeSync, WriteCallback cb, Object ctx, byte[] masterKey)
throws IOException, BookieException, InterruptedException {
boolean ackBeforeSync, WriteCallback cb, Object ctx, byte[] masterKey,
boolean writeJournal) throws IOException, BookieException, InterruptedException {
long ledgerId = handle.getLedgerId();
long entryId = handle.addEntry(entry);

Expand Down Expand Up @@ -954,7 +961,10 @@ private void addEntryInternal(LedgerDescriptor handle, ByteBuf entry,
if (LOG.isTraceEnabled()) {
LOG.trace("Adding {}@{}", entryId, ledgerId);
}
getJournal(ledgerId).logAddEntry(entry, ackBeforeSync, cb, ctx);

if (writeJournal) {
getJournal(ledgerId).logAddEntry(entry, ackBeforeSync, cb, ctx);
}
}

/**
Expand All @@ -972,7 +982,7 @@ public void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[]
LedgerDescriptor handle = getLedgerForEntry(entry, masterKey);
synchronized (handle) {
entrySize = entry.readableBytes();
addEntryInternal(handle, entry, false /* ackBeforeSync */, cb, ctx, masterKey);
addEntryInternal(handle, entry, false /* ackBeforeSync */, cb, ctx, masterKey, true);
}
success = true;
} catch (NoWritableLedgerDirException e) {
Expand Down Expand Up @@ -1066,7 +1076,7 @@ public void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Obj
.create(BookieException.Code.LedgerFencedException);
}
entrySize = entry.readableBytes();
addEntryInternal(handle, entry, ackBeforeSync, cb, ctx, masterKey);
addEntryInternal(handle, entry, ackBeforeSync, cb, ctx, masterKey, true);
}
success = true;
} catch (NoWritableLedgerDirException e) {
Expand All @@ -1086,6 +1096,82 @@ public void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, Obj
}
}

public void addEntries(List<ParsedAddRequest> requests, boolean ackBeforeSync,
WriteCallback cb, Object ctx, RequestStats requestStats) throws InterruptedException {
long requestNans = MathUtils.nowInNano();
boolean hasFailedRequests = false;
Map<Pair<Long, byte[]>, LedgerDescriptor> handleMap = new HashMap<>();
ListIterator<ParsedAddRequest> iter = requests.listIterator();
while (iter.hasNext()) {
ParsedAddRequest request = iter.next();
int rc = BookieProtocol.EOK;
try {
Pair<Long, byte[]> ledgerIdMasterKey = Pair.of(request.getLedgerId(), request.getMasterKey());
LedgerDescriptor handle = handleMap.get(ledgerIdMasterKey);
if (handle == null) {
handle = getLedgerForEntry(request.getData(), request.getMasterKey());
handleMap.put(ledgerIdMasterKey, handle);
}

synchronized (handle) {
if (handle.isFenced()) {
throw BookieException.create(BookieException.Code.LedgerFencedException);
}

addEntryInternal(handle, request.getData(), ackBeforeSync,
cb, ctx, request.getMasterKey(), false);
}
} catch (BookieException.OperationRejectedException e) {
requestStats.getAddEntryRejectedCounter().inc();
if (LOG.isDebugEnabled()) {
LOG.debug("Operation rejected while writing {} ", request, e);
}
rc = BookieProtocol.ETOOMANYREQUESTS;
} catch (IOException e) {
LOG.error("Error writing {}", request, e);
rc = BookieProtocol.EIO;
} catch (BookieException.LedgerFencedException lfe) {
LOG.error("Attempt to write to fenced ledger ", lfe);
rc = BookieProtocol.EFENCED;
} catch (BookieException e) {
LOG.error("Unauthorized access to ledger {}", request.getLedgerId(), e);
rc = BookieProtocol.EUA;
} catch (Throwable t) {
LOG.error("Unexpected exception while writing {}@{} : {} ",
request.getLedgerId(), request.getEntryId(), t.getMessage(), t);
rc = BookieProtocol.EBADREQ;
}

if (rc != BookieProtocol.EOK) {
hasFailedRequests = true;
requestStats.getAddEntryStats()
.registerFailedEvent(MathUtils.elapsedNanos(requestNans), TimeUnit.NANOSECONDS);
cb.writeComplete(rc, request.getLedgerId(), request.getEntryId(), null, ctx);
iter.remove();
request.release();
request.recycle();
}
}
handleMap.clear();

try {
if (hasFailedRequests && requestProcessor != null) {
requestProcessor.flushPendingResponses();
}

if (writeDataToJournal && !requests.isEmpty()) {
List<ByteBuf> entries = requests.stream()
.map(ParsedAddRequest::getData).collect(Collectors.toList());
getJournal(requests.get(0).getLedgerId()).logAddEntries(entries, ackBeforeSync, cb, ctx);
}
} finally {
requests.forEach(t -> {
t.release();
t.recycle();
});
}
}

/**
* Fences a ledger. From this point on, clients will be unable to
* write to this ledger. Only recoveryAddEntry will be
Expand Down Expand Up @@ -1281,4 +1367,12 @@ public OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException, NoLedg
}
}
}

@Override
public void setRequestProcessor(RequestProcessor requestProcessor) {
for (Journal journal : journals) {
journal.setRequestProcessor(requestProcessor);
}
this.requestProcessor = requestProcessor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,14 @@
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.bookie.stats.JournalStats;
import org.apache.bookkeeper.common.collections.BatchedArrayBlockingQueue;
import org.apache.bookkeeper.common.collections.BatchedBlockingQueue;
import org.apache.bookkeeper.common.collections.BlockingMpscQueue;
import org.apache.bookkeeper.common.collections.RecyclableArrayList;
import org.apache.bookkeeper.common.util.MemoryLimitController;
import org.apache.bookkeeper.common.util.affinity.CpuAffinity;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.processor.RequestProcessor;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.NullStatsLogger;
Expand Down Expand Up @@ -444,6 +447,8 @@ private class ForceWriteThread extends BookieCriticalThread {
// This holds the queue entries that should be notified after a
// successful force write
Thread threadToNotifyOnEx;

RequestProcessor requestProcessor;
// should we group force writes
private final boolean enableGroupForceWrites;
private final Counter forceWriteThreadTime;
Expand Down Expand Up @@ -499,6 +504,10 @@ public void run() {
journalStats.getForceWriteGroupingCountStats()
.registerSuccessfulValue(numReqInLastForceWrite);

if (requestProcessor != null) {
requestProcessor.flushPendingResponses();
}

} catch (IOException ioe) {
LOG.error("I/O exception in ForceWrite thread", ioe);
running = false;
Expand Down Expand Up @@ -629,7 +638,7 @@ static void writePaddingBytes(JournalChannel jc, ByteBuf paddingBuffer, int jour
private static final String journalThreadName = "BookieJournal";

// journal entry queue to commit
final BlockingQueue<QueueEntry> queue;
final BatchedBlockingQueue<QueueEntry> queue;
final BlockingQueue<ForceWriteRequest> forceWriteRequests;

volatile boolean running = true;
Expand Down Expand Up @@ -660,7 +669,7 @@ public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf
queue = new BlockingMpscQueue<>(conf.getJournalQueueSize());
forceWriteRequests = new BlockingMpscQueue<>(conf.getJournalQueueSize());
} else {
queue = new ArrayBlockingQueue<>(conf.getJournalQueueSize());
queue = new BatchedArrayBlockingQueue<>(conf.getJournalQueueSize());
forceWriteRequests = new ArrayBlockingQueue<>(conf.getJournalQueueSize());
}

Expand Down Expand Up @@ -878,6 +887,26 @@ public void logAddEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb,
logAddEntry(ledgerId, entryId, entry, ackBeforeSync, cb, ctx);
}

public void logAddEntries(List<ByteBuf> entries, boolean ackBeforeSync, WriteCallback cb, Object ctx)
throws InterruptedException {
long reserveMemory = 0;
QueueEntry[] queueEntries = new QueueEntry[entries.size()];
long start = MathUtils.nowInNano();
for (int i = 0; i < entries.size(); ++i) {
ByteBuf entry = entries.get(i);
long ledgerId = entry.getLong(entry.readerIndex());
long entryId = entry.getLong(entry.readerIndex() + 8);
entry.retain();
reserveMemory += entry.readableBytes();
queueEntries[i] = QueueEntry.create(entry, ackBeforeSync, ledgerId, entryId, cb, ctx,
start, journalStats.getJournalAddEntryStats(), callbackTime);
}

memoryLimitController.reserveMemory(reserveMemory);
journalStats.getJournalQueueSize().addCount(entries.size());
queue.putAll(queueEntries, 0, queueEntries.length);
}

@VisibleForTesting
public void logAddEntry(long ledgerId, long entryId, ByteBuf entry,
boolean ackBeforeSync, WriteCallback cb, Object ctx)
Expand Down Expand Up @@ -1093,6 +1122,10 @@ journalFormatVersionToWrite, getBufferedChannelBuilder(),
numEntriesToFlush--;
entry.run();
}

if (forceWriteThread.requestProcessor != null) {
forceWriteThread.requestProcessor.flushPendingResponses();
}
}

lastFlushPosition = bc.position();
Expand Down Expand Up @@ -1211,6 +1244,10 @@ public BufferedChannelBuilder getBufferedChannelBuilder() {
return (FileChannel fc, int capacity) -> new BufferedChannel(allocator, fc, capacity);
}

public void setRequestProcessor(RequestProcessor requestProcessor) {
forceWriteThread.requestProcessor = requestProcessor;
}

/**
* Shuts down the journal.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,10 @@ public interface RequestProcessor extends AutoCloseable {
* channel received the given request <i>r</i>
*/
void processRequest(Object r, BookieRequestHandler channel);

/**
* Flush any pending response staged on all the client connections.
*/
void flushPendingResponses();

}
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,23 @@ public Object decode(ByteBuf buffer)
throw new IllegalStateException("Received unknown response : op code = " + opCode);
}
}

public static void serializeAddResponseInto(int rc, BookieProtocol.ParsedAddRequest req, ByteBuf buf) {
buf.writeInt(RESPONSE_HEADERS_SIZE); // Frame size
buf.writeInt(PacketHeader.toInt(req.getProtocolVersion(), req.getOpCode(), (short) 0));
buf.writeInt(rc); // rc-code
buf.writeLong(req.getLedgerId());
buf.writeLong(req.getEntryId());
}

public static void serializeAddResponseInto(int rc, byte version, byte opCode,
long ledgerId, long entryId, ByteBuf buf) {
buf.writeInt(RESPONSE_HEADERS_SIZE); // Frame size
buf.writeInt(PacketHeader.toInt(version, opCode, (short) 0));
buf.writeInt(rc); // rc-code
buf.writeLong(ledgerId);
buf.writeLong(entryId);
}
}

/**
Expand Down Expand Up @@ -504,7 +521,10 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
if (LOG.isTraceEnabled()) {
LOG.trace("Encode response {} to channel {}.", msg, ctx.channel());
}
if (msg instanceof BookkeeperProtocol.Response) {

if (msg instanceof ByteBuf) {
ctx.write(msg, promise);
} else if (msg instanceof BookkeeperProtocol.Response) {
ctx.write(repV3.encode(msg, ctx.alloc()), promise);
} else if (msg instanceof BookieProtocol.Response) {
ctx.write(repPreV3.encode(msg, ctx.alloc()), promise);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,11 @@ byte getOpCode() {
return opCode;
}

long getLedgerId() {
public long getLedgerId() {
return ledgerId;
}

long getEntryId() {
public long getEntryId() {
return entryId;
}

Expand All @@ -235,7 +235,7 @@ boolean hasMasterKey() {
return masterKey != null;
}

byte[] getMasterKey() {
public byte[] getMasterKey() {
assert hasMasterKey();
return masterKey;
}
Expand Down Expand Up @@ -323,7 +323,7 @@ static ParsedAddRequest create(byte protocolVersion, long ledgerId, long entryId
return add;
}

ByteBuf getData() {
public ByteBuf getData() {
// We need to have different ByteBufList instances for each bookie write
return data;
}
Expand All @@ -332,7 +332,7 @@ boolean isRecoveryAdd() {
return (flags & FLAG_RECOVERY_ADD) == FLAG_RECOVERY_ADD;
}

void release() {
public void release() {
ReferenceCountUtil.release(data);
}

Expand Down
Loading