Skip to content

Commit e47926b

Browse files
authored
[fix] Write stuck due to pending add callback by multiple threads (#4557)
[fix] Write stuck due to pending add callback by multiple threads (#4557)
1 parent 22c45f9 commit e47926b

File tree

3 files changed

+171
-11
lines changed

3 files changed

+171
-11
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import static java.nio.charset.StandardCharsets.UTF_8;
2424

25+
import com.google.common.annotations.VisibleForTesting;
2526
import com.google.common.collect.Lists;
2627
import com.google.protobuf.ExtensionRegistry;
2728
import io.netty.buffer.ByteBuf;
@@ -370,7 +371,9 @@ private void completeBatchRead(final int rc,
370371
}
371372
}
372373

373-
private static class ChannelReadyForAddEntryCallback
374+
// Without test, this class should be modifier with "private".
375+
@VisibleForTesting
376+
static class ChannelReadyForAddEntryCallback
374377
implements GenericCallback<PerChannelBookieClient> {
375378
private final Handle<ChannelReadyForAddEntryCallback> recyclerHandle;
376379

@@ -380,7 +383,9 @@ private static class ChannelReadyForAddEntryCallback
380383
private long entryId;
381384
private BookieId addr;
382385
private Object ctx;
383-
private WriteCallback cb;
386+
// Without test, this class should be modifier with "private".
387+
@VisibleForTesting
388+
WriteCallback cb;
384389
private int options;
385390
private byte[] masterKey;
386391
private boolean allowFastFail;
@@ -409,17 +414,24 @@ static ChannelReadyForAddEntryCallback create(
409414
@Override
410415
public void operationComplete(final int rc,
411416
PerChannelBookieClient pcbc) {
412-
try {
413-
if (rc != BKException.Code.OK) {
414-
bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, ctx);
415-
} else {
417+
if (rc != BKException.Code.OK) {
418+
bookieClient.executor.executeOrdered(ledgerId, () -> {
419+
try {
420+
bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, ctx);
421+
} finally {
422+
ReferenceCountUtil.release(toSend);
423+
}
424+
recycle();
425+
});
426+
} else {
427+
try {
416428
pcbc.addEntry(ledgerId, masterKey, entryId,
417429
toSend, cb, ctx, options, allowFastFail, writeFlags);
430+
} finally {
431+
ReferenceCountUtil.release(toSend);
418432
}
419-
} finally {
420-
ReferenceCountUtil.release(toSend);
433+
recycle();
421434
}
422-
recycle();
423435
}
424436

425437
private ChannelReadyForAddEntryCallback(

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -616,11 +616,15 @@ protected void initChannel(Channel ch) throws Exception {
616616
}
617617

618618
ChannelFuture future = bootstrap.connect(bookieAddr);
619-
future.addListener(contextPreservingListener(new ConnectionFutureListener(startTime)));
620-
future.addListener(x -> makeWritable());
619+
addChannelListeners(future, startTime);
621620
return future;
622621
}
623622

623+
protected void addChannelListeners(ChannelFuture future, long connectStartTime) {
624+
future.addListener(contextPreservingListener(new ConnectionFutureListener(connectStartTime)));
625+
future.addListener(x -> makeWritable());
626+
}
627+
624628
void cleanDisconnectAndClose() {
625629
disconnect();
626630
close();
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
*
20+
*/
21+
package org.apache.bookkeeper.proto;
22+
23+
import io.netty.channel.ChannelFuture;
24+
import io.netty.channel.ChannelFutureListener;
25+
import io.netty.util.concurrent.DefaultThreadFactory;
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.concurrent.ConcurrentHashMap;
29+
import java.util.concurrent.CountDownLatch;
30+
import java.util.concurrent.atomic.AtomicInteger;
31+
import lombok.extern.slf4j.Slf4j;
32+
import org.apache.bookkeeper.client.LedgerHandle;
33+
import org.apache.bookkeeper.client.api.BookKeeper;
34+
import org.apache.bookkeeper.client.api.DigestType;
35+
import org.apache.bookkeeper.conf.ClientConfiguration;
36+
import org.apache.bookkeeper.net.BookieId;
37+
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
38+
import org.apache.bookkeeper.tls.SecurityException;
39+
import org.apache.bookkeeper.util.EventLoopUtil;
40+
import org.junit.Assert;
41+
import org.junit.jupiter.api.Test;
42+
43+
@Slf4j
44+
public class ClientSocketDisconnectTest extends BookKeeperClusterTestCase {
45+
46+
public ClientSocketDisconnectTest() {
47+
super(1);
48+
this.useUUIDasBookieId = true;
49+
}
50+
51+
public static class PerChannelBookieClientDecorator extends PerChannelBookieClient {
52+
53+
private final ThreadCounter threadCounter;
54+
private final AtomicInteger failurePredicate = new AtomicInteger();
55+
56+
public PerChannelBookieClientDecorator(PerChannelBookieClient client, BookieId addr, ThreadCounter tCounter)
57+
throws SecurityException {
58+
super(client.executor, client.eventLoopGroup, addr, client.bookieAddressResolver);
59+
this.threadCounter = tCounter;
60+
}
61+
62+
// Inject a disconnection per two connections.
63+
protected void addChannelListeners(ChannelFuture future, long connectStartTime) {
64+
future.addListener((ChannelFutureListener) future1 -> {
65+
if (failurePredicate.incrementAndGet() % 2 == 1) {
66+
future1.channel().close();
67+
}
68+
});
69+
super.addChannelListeners(future, connectStartTime);
70+
}
71+
72+
// Records the thread who running "PendingAddOp.writeComplete".
73+
@Override
74+
protected void connectIfNeededAndDoOp(BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient> op) {
75+
BookieClientImpl.ChannelReadyForAddEntryCallback callback =
76+
(BookieClientImpl.ChannelReadyForAddEntryCallback) op;
77+
BookkeeperInternalCallbacks.WriteCallback originalCallback = callback.cb;
78+
callback.cb = (rc, ledgerId, entryId, addr, ctx) -> {
79+
threadCounter.record();
80+
originalCallback.writeComplete(rc, ledgerId, entryId, addr, ctx);
81+
};
82+
super.connectIfNeededAndDoOp(op);
83+
}
84+
}
85+
86+
private static class ThreadCounter {
87+
88+
private final Map<Thread, AtomicInteger> records = new ConcurrentHashMap<>();
89+
90+
public void record() {
91+
Thread currentThread = Thread.currentThread();
92+
records.computeIfAbsent(currentThread, k -> new AtomicInteger());
93+
records.get(currentThread).incrementAndGet();
94+
}
95+
}
96+
97+
@Test
98+
public void testAddEntriesCallbackWithBKClientThread() throws Exception {
99+
// Create BKC and a ledger handle.
100+
ClientConfiguration conf = new ClientConfiguration();
101+
conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
102+
org.apache.bookkeeper.client.BookKeeper bkc =
103+
(org.apache.bookkeeper.client.BookKeeper) BookKeeper.newBuilder(conf)
104+
.eventLoopGroup(
105+
EventLoopUtil.getClientEventLoopGroup(conf, new DefaultThreadFactory("test-io")))
106+
.build();
107+
final BookieClientImpl bookieClient = (BookieClientImpl) bkc.getClientCtx().getBookieClient();
108+
LedgerHandle lh = (LedgerHandle) bkc.newCreateLedgerOp()
109+
.withEnsembleSize(1)
110+
.withWriteQuorumSize(1)
111+
.withAckQuorumSize(1)
112+
.withDigestType(DigestType.CRC32C)
113+
.withPassword(new byte[0])
114+
.execute().join();
115+
116+
// Inject two operations.
117+
// 1. Inject a disconnection when connecting successfully.
118+
// 2. Records the thread who running "PendingAddOp.writeComplete".
119+
final ThreadCounter callbackThreadRecorder = new ThreadCounter();
120+
List<BookieId> ensemble = lh.getLedgerMetadata()
121+
.getAllEnsembles().entrySet().iterator().next().getValue();
122+
DefaultPerChannelBookieClientPool clientPool =
123+
(DefaultPerChannelBookieClientPool) bookieClient.lookupClient(ensemble.get(0));
124+
PerChannelBookieClient[] clients = clientPool.clients;
125+
126+
// Write 100 entries and wait for finishing.
127+
for (int i = 0; i < clients.length; i++) {
128+
clients[i] = new PerChannelBookieClientDecorator(clients[i], ensemble.get(0), callbackThreadRecorder);
129+
}
130+
int addCount = 1000;
131+
CountDownLatch countDownLatch = new CountDownLatch(addCount);
132+
for (int i = 0; i < addCount; i++) {
133+
lh.asyncAddEntry(new byte[]{1}, (rc, lh1, entryId, ctx) -> {
134+
countDownLatch.countDown();
135+
}, i);
136+
}
137+
countDownLatch.await();
138+
139+
// Verify: all callback will run in the "BookKeeperClientWorker" thread.
140+
for (Thread callbackThread : callbackThreadRecorder.records.keySet()) {
141+
Assert.assertTrue(callbackThread.getName(), callbackThread.getName().startsWith("BookKeeperClientWorker"));
142+
}
143+
}
144+
}

0 commit comments

Comments
 (0)