Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
class BookieNettyServer {

private static final Logger LOG = LoggerFactory.getLogger(BookieNettyServer.class);
public static final String CONSOLIDATION_HANDLER_NAME = "consolidation";

final int maxFrameSize;
final ServerConfiguration conf;
Expand Down Expand Up @@ -344,7 +345,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
new BookieSideConnectionPeerContextHandler();
ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast("consolidation", new FlushConsolidationHandler(1024, true));
pipeline.addLast(CONSOLIDATION_HANDLER_NAME, new FlushConsolidationHandler(1024, true));

pipeline.addLast("bytebufList", ByteBufList.ENCODER);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
public class BookieRequestProcessor implements RequestProcessor {

private static final Logger LOG = LoggerFactory.getLogger(BookieRequestProcessor.class);
public static final String TLS_HANDLER_NAME = "tls";

/**
* The server configuration. We use this for getting the number of add and read
Expand Down Expand Up @@ -580,9 +581,15 @@ private void processStartTLSRequestV3(final BookkeeperProtocol.Request r,
response.setStatus(BookkeeperProtocol.StatusCode.EBADREQ);
writeAndFlush(c, response.build());
} else {
LOG.info("Starting TLS handshake with client on channel {}", c);
// there is no need to execute in a different thread as this operation is light
SslHandler sslHandler = shFactory.newTLSHandler();
c.pipeline().addFirst("tls", sslHandler);
if (c.pipeline().names().contains(BookieNettyServer.CONSOLIDATION_HANDLER_NAME)) {
c.pipeline().addAfter(BookieNettyServer.CONSOLIDATION_HANDLER_NAME, TLS_HANDLER_NAME, sslHandler);
} else {
// local transport doesn't contain FlushConsolidationHandler
c.pipeline().addFirst(TLS_HANDLER_NAME, sslHandler);
}

response.setStatus(BookkeeperProtocol.StatusCode.EOK);
BookkeeperProtocol.StartTLSResponse.Builder builder = BookkeeperProtocol.StartTLSResponse.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
BKException.Code.WriteOnReadOnlyBookieException));
private static final int DEFAULT_HIGH_PRIORITY_VALUE = 100; // We may add finer grained priority later.
private static final AtomicLong txnIdGenerator = new AtomicLong(0);
static final String CONSOLIDATION_HANDLER_NAME = "consolidation";

final BookieId bookieId;
final BookieAddressResolver bookieAddressResolver;
Expand Down Expand Up @@ -595,7 +596,7 @@ protected ChannelFuture connect() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("consolidation", new FlushConsolidationHandler(1024, true));
pipeline.addLast(CONSOLIDATION_HANDLER_NAME, new FlushConsolidationHandler(1024, true));
pipeline.addLast("bytebufList", ByteBufList.ENCODER);
pipeline.addLast("lengthbasedframedecoder",
new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
Expand Down Expand Up @@ -1573,9 +1574,16 @@ void initTLSHandshake() {
} else {
throw new RuntimeException("Unexpected socket address type");
}
SslHandler handler = parentObj.shFactory.newTLSHandler(address.getHostName(), address.getPort());
channel.pipeline().addFirst(parentObj.shFactory.getHandlerName(), handler);
handler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
LOG.info("Starting TLS handshake with {}:{}", address.getHostString(), address.getPort());
SslHandler sslHandler = parentObj.shFactory.newTLSHandler(address.getHostName(), address.getPort());
String sslHandlerName = parentObj.shFactory.getHandlerName();
if (channel.pipeline().names().contains(CONSOLIDATION_HANDLER_NAME)) {
channel.pipeline().addAfter(CONSOLIDATION_HANDLER_NAME, sslHandlerName, sslHandler);
} else {
// local transport doesn't contain FlushConsolidationHandler
channel.pipeline().addFirst(sslHandlerName, sslHandler);
}
sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
@Override
public void operationComplete(Future<Channel> future) throws Exception {
int rc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ protected ServerConfiguration newServerConfiguration() throws Exception {
}

protected ClientConfiguration newClientConfiguration() {
return new ClientConfiguration(baseConf);
return new ClientConfiguration(baseClientConf);
}

protected ServerConfiguration newServerConfiguration(int port, File journalDir, File[] ledgerDirs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,11 +350,6 @@ public void testConnectToTLSClusterTLSClient() throws Exception {
*/
@Test
public void testConnectToLocalTLSClusterTLSClient() throws Exception {
// skip test
if (useV2Protocol) {
return;
}

restartBookies(c -> {
c.setDisableServerSocketBind(true);
c.setEnableLocalTransport(true);
Expand Down Expand Up @@ -622,10 +617,6 @@ public void testBookieAuthPluginRequireClientTLSAuthentication() throws Exceptio
*/
@Test
public void testBookieAuthPluginRequireClientTLSAuthenticationLocal() throws Exception {
if (useV2Protocol) {
return;
}

restartBookies(c -> {
c.setBookieAuthProviderFactoryClass(
AllowOnlyClientsWithX509Certificates.class.getName());
Expand Down Expand Up @@ -756,6 +747,10 @@ public void testBookieAuthPluginDenyAccessToClientWithoutTLS() throws Exception
testClient(clientConf, numBookies);
fail("Shouldn't be able to connect");
} catch (BKException.BKUnauthorizedAccessException authFailed) {
} catch (BKException.BKNotEnoughBookiesException notEnoughBookiesException) {
if (!useV2Protocol) {
fail("Unexpected exception occurred.");
}
}

assertFalse(secureBookieSideChannel);
Expand Down