Skip to content

Commit 51ce993

Browse files
committed
fix a bug
1 parent 23d4d09 commit 51ce993

File tree

1 file changed

+37
-35
lines changed

1 file changed

+37
-35
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java

Lines changed: 37 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1163,10 +1163,11 @@ private void writeAndFlush(final Channel channel,
11631163

11641164
try {
11651165
if (request instanceof ByteBuf || request instanceof ByteBufList) {
1166-
prepareSendRequests(channel, request, key);
1167-
if (pendingSendRequests.readableBytes() > MAX_PENDING_REQUEST_SIZE) {
1166+
if (prepareSendRequests(channel, request, key)) {
11681167
flushPendingRequests();
1169-
} else if (nextScheduledFlush == null) {
1168+
}
1169+
1170+
if (nextScheduledFlush == null) {
11701171
nextScheduledFlush = channel.eventLoop().scheduleWithFixedDelay(this::flushPendingRequests,
11711172
1, 1, TimeUnit.MILLISECONDS);
11721173
}
@@ -1192,47 +1193,48 @@ private void writeAndFlush(final Channel channel,
11921193
}
11931194
}
11941195

1195-
public synchronized void prepareSendRequests(Channel channel, Object request, CompletionKey key) {
1196-
if (pendingSendRequests == null) {
1197-
pendingSendRequests = channel.alloc().directBuffer(maxPendingRequestsSize);
1198-
}
1199-
BookieProtoEncoding.RequestEnDeCoderPreV3.serializeAddRequests(request, pendingSendRequests);
1200-
pendingSendKeys.add(key);
1196+
public synchronized boolean prepareSendRequests(Channel channel, Object request, CompletionKey key) {
1197+
if (pendingSendRequests == null) {
1198+
pendingSendRequests = channel.alloc().directBuffer(maxPendingRequestsSize);
12011199
}
1200+
BookieProtoEncoding.RequestEnDeCoderPreV3.serializeAddRequests(request, pendingSendRequests);
1201+
pendingSendKeys.add(key);
1202+
return pendingSendRequests.readableBytes() > MAX_PENDING_REQUEST_SIZE;
1203+
}
12021204

1203-
public synchronized void flushPendingRequests() {
1204-
final long startTime = MathUtils.nowInNano();
1205-
Set<CompletionKey> keys = new HashSet<>(pendingSendKeys);
1206-
ChannelPromise promise = channel.newPromise().addListener(future -> {
1207-
if (future.isSuccess()) {
1208-
nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
1209-
for (CompletionKey completionKey : keys) {
1210-
CompletionValue completion = completionObjects.get(completionKey);
1211-
if (completion != null) {
1212-
completion.setOutstanding();
1213-
}
1205+
public synchronized void flushPendingRequests() {
1206+
final long startTime = MathUtils.nowInNano();
1207+
Set<CompletionKey> keys = new HashSet<>(pendingSendKeys);
1208+
ChannelPromise promise = channel.newPromise().addListener(future -> {
1209+
if (future.isSuccess()) {
1210+
nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
1211+
for (CompletionKey completionKey : keys) {
1212+
CompletionValue completion = completionObjects.get(completionKey);
1213+
if (completion != null) {
1214+
completion.setOutstanding();
12141215
}
1215-
} else {
1216-
nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
12171216
}
1218-
});
1217+
} else {
1218+
nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
1219+
}
1220+
});
12191221

1220-
if (pendingSendRequests != null) {
1221-
maxPendingRequestsSize = (int) Math.max(
1222-
maxPendingRequestsSize * 0.5 + pendingSendRequests.readableBytes() * 0.5,
1223-
DEFAULT_PENDING_REQUEST_SIZE);
1222+
if (pendingSendRequests != null) {
1223+
maxPendingRequestsSize = (int) Math.max(
1224+
maxPendingRequestsSize * 0.5 + pendingSendRequests.readableBytes() * 0.5,
1225+
DEFAULT_PENDING_REQUEST_SIZE);
12241226

1225-
if (channel != null && channel.isActive()) {
1226-
channel.writeAndFlush(pendingSendRequests, promise);
1227-
} else {
1228-
pendingSendRequests.release();
1229-
keys.forEach(key -> errorOut(key, BKException.Code.TooManyRequestsException));
1227+
if (channel != null && channel.isActive()) {
1228+
channel.writeAndFlush(pendingSendRequests, promise);
1229+
} else {
1230+
pendingSendRequests.release();
1231+
keys.forEach(key -> errorOut(key, BKException.Code.TooManyRequestsException));
12301232

1231-
}
1232-
pendingSendRequests = null;
1233-
pendingSendKeys.clear();
12341233
}
1234+
pendingSendRequests = null;
1235+
pendingSendKeys.clear();
12351236
}
1237+
}
12361238

12371239
void errorOut(final CompletionKey key) {
12381240
if (LOG.isDebugEnabled()) {

0 commit comments

Comments
 (0)