Skip to content

Commit 6f30f00

Browse files
lhotariZhangJian He
authored andcommitted
Fix TLS stability issues with V2 protocol that caused data corruption (#4404)
* Fix TLS stability issues with V2 protocol that caused data corruption - add the TLS handler after the FlushConsolidationHandler - This makes TLS connections from Pulsar Broker to Bookkeeper stable when bookkeeperUseV2WireProtocol=true is used - Fix test TestTLS for V2 - Fix inconsistency in client configuration in BookKeeperClusterTestCase (cherry picked from commit 5f73147)
1 parent aff2fb6 commit 6f30f00

File tree

5 files changed

+27
-16
lines changed

5 files changed

+27
-16
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@
9292
class BookieNettyServer {
9393

9494
private static final Logger LOG = LoggerFactory.getLogger(BookieNettyServer.class);
95+
public static final String CONSOLIDATION_HANDLER_NAME = "consolidation";
9596

9697
final int maxFrameSize;
9798
final ServerConfiguration conf;
@@ -344,7 +345,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
344345
new BookieSideConnectionPeerContextHandler();
345346
ChannelPipeline pipeline = ch.pipeline();
346347

347-
pipeline.addLast("consolidation", new FlushConsolidationHandler(1024, true));
348+
pipeline.addLast(CONSOLIDATION_HANDLER_NAME, new FlushConsolidationHandler(1024, true));
348349

349350
pipeline.addLast("bytebufList", ByteBufList.ENCODER);
350351

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
public class BookieRequestProcessor implements RequestProcessor {
6767

6868
private static final Logger LOG = LoggerFactory.getLogger(BookieRequestProcessor.class);
69+
public static final String TLS_HANDLER_NAME = "tls";
6970

7071
/**
7172
* The server configuration. We use this for getting the number of add and read
@@ -576,9 +577,15 @@ private void processStartTLSRequestV3(final BookkeeperProtocol.Request r,
576577
response.setStatus(BookkeeperProtocol.StatusCode.EBADREQ);
577578
writeAndFlush(c, response.build());
578579
} else {
580+
LOG.info("Starting TLS handshake with client on channel {}", c);
579581
// there is no need to execute in a different thread as this operation is light
580582
SslHandler sslHandler = shFactory.newTLSHandler();
581-
c.pipeline().addFirst("tls", sslHandler);
583+
if (c.pipeline().names().contains(BookieNettyServer.CONSOLIDATION_HANDLER_NAME)) {
584+
c.pipeline().addAfter(BookieNettyServer.CONSOLIDATION_HANDLER_NAME, TLS_HANDLER_NAME, sslHandler);
585+
} else {
586+
// local transport doesn't contain FlushConsolidationHandler
587+
c.pipeline().addFirst(TLS_HANDLER_NAME, sslHandler);
588+
}
582589

583590
response.setStatus(BookkeeperProtocol.StatusCode.EOK);
584591
BookkeeperProtocol.StartTLSResponse.Builder builder = BookkeeperProtocol.StartTLSResponse.newBuilder();

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
174174
BKException.Code.WriteOnReadOnlyBookieException));
175175
private static final int DEFAULT_HIGH_PRIORITY_VALUE = 100; // We may add finer grained priority later.
176176
private static final AtomicLong txnIdGenerator = new AtomicLong(0);
177+
static final String CONSOLIDATION_HANDLER_NAME = "consolidation";
177178

178179
final BookieId bookieId;
179180
final BookieAddressResolver bookieAddressResolver;
@@ -594,7 +595,7 @@ protected ChannelFuture connect() {
594595
@Override
595596
protected void initChannel(Channel ch) throws Exception {
596597
ChannelPipeline pipeline = ch.pipeline();
597-
pipeline.addLast("consolidation", new FlushConsolidationHandler(1024, true));
598+
pipeline.addLast(CONSOLIDATION_HANDLER_NAME, new FlushConsolidationHandler(1024, true));
598599
pipeline.addLast("bytebufList", ByteBufList.ENCODER);
599600
pipeline.addLast("lengthbasedframedecoder",
600601
new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
@@ -1522,9 +1523,16 @@ void initTLSHandshake() {
15221523
} else {
15231524
throw new RuntimeException("Unexpected socket address type");
15241525
}
1525-
SslHandler handler = parentObj.shFactory.newTLSHandler(address.getHostName(), address.getPort());
1526-
channel.pipeline().addFirst(parentObj.shFactory.getHandlerName(), handler);
1527-
handler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
1526+
LOG.info("Starting TLS handshake with {}:{}", address.getHostString(), address.getPort());
1527+
SslHandler sslHandler = parentObj.shFactory.newTLSHandler(address.getHostName(), address.getPort());
1528+
String sslHandlerName = parentObj.shFactory.getHandlerName();
1529+
if (channel.pipeline().names().contains(CONSOLIDATION_HANDLER_NAME)) {
1530+
channel.pipeline().addAfter(CONSOLIDATION_HANDLER_NAME, sslHandlerName, sslHandler);
1531+
} else {
1532+
// local transport doesn't contain FlushConsolidationHandler
1533+
channel.pipeline().addFirst(sslHandlerName, sslHandler);
1534+
}
1535+
sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
15281536
@Override
15291537
public void operationComplete(Future<Channel> future) throws Exception {
15301538
int rc;

bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ protected ServerConfiguration newServerConfiguration() throws Exception {
301301
}
302302

303303
protected ClientConfiguration newClientConfiguration() {
304-
return new ClientConfiguration(baseConf);
304+
return new ClientConfiguration(baseClientConf);
305305
}
306306

307307
protected ServerConfiguration newServerConfiguration(int port, File journalDir, File[] ledgerDirs) {

bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -349,11 +349,6 @@ public void testConnectToTLSClusterTLSClient() throws Exception {
349349
*/
350350
@Test
351351
public void testConnectToLocalTLSClusterTLSClient() throws Exception {
352-
// skip test
353-
if (useV2Protocol) {
354-
return;
355-
}
356-
357352
restartBookies(c -> {
358353
c.setDisableServerSocketBind(true);
359354
c.setEnableLocalTransport(true);
@@ -621,10 +616,6 @@ public void testBookieAuthPluginRequireClientTLSAuthentication() throws Exceptio
621616
*/
622617
@Test
623618
public void testBookieAuthPluginRequireClientTLSAuthenticationLocal() throws Exception {
624-
if (useV2Protocol) {
625-
return;
626-
}
627-
628619
restartBookies(c -> {
629620
c.setBookieAuthProviderFactoryClass(
630621
AllowOnlyClientsWithX509Certificates.class.getName());
@@ -755,6 +746,10 @@ public void testBookieAuthPluginDenyAccessToClientWithoutTLS() throws Exception
755746
testClient(clientConf, numBookies);
756747
fail("Shouldn't be able to connect");
757748
} catch (BKException.BKUnauthorizedAccessException authFailed) {
749+
} catch (BKException.BKNotEnoughBookiesException notEnoughBookiesException) {
750+
if (!useV2Protocol) {
751+
fail("Unexpected exception occurred.");
752+
}
758753
}
759754

760755
assertFalse(secureBookieSideChannel);

0 commit comments

Comments
 (0)