Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@
**/.settings/
**/.DS_Store/
/.idea/
**/*.iml
/.vscode/
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ public CacheClient(String clientId, String sharedSecret, ServerLocator brokerLoc
}

private CacheClient(String clientId, String sharedSecret, ServerLocator brokerLocator,
boolean offHeap, ByteBufAllocator allocator, MetricsProvider metricsProvider) {
boolean offHeap, ByteBufAllocator allocator, MetricsProvider metricsProvider) {
this.offHeap = offHeap;
this.brokerLocator = brokerLocator;
this.sharedSecret = sharedSecret;
Expand Down Expand Up @@ -712,7 +712,7 @@ private void performEviction() throws InterruptedException {
@Override
public void accept(EntryHandle t) {
if ((maxMemory > 0 && releasedMemory < to_release)
|| (maxLocalEntryAge > 0 && t.getLastGetTime() < maxAgeTsNanos)) {
|| (maxLocalEntryAge > 0 && t.getLastGetTime() < maxAgeTsNanos)) {
evictable.add(t);
releasedMemory += t.getSerializedDataLength();
}
Expand Down Expand Up @@ -758,7 +758,7 @@ public void accept(EntryHandle t) {

private boolean checkPerformEvictionForMaxLocalEntryAge(final long now) {
return maxLocalEntryAge > 0
&& now - lastPerformedEvictionTimestamp >= maxLocalEntryAge / 2;
&& now - lastPerformedEvictionTimestamp >= maxLocalEntryAge / 2;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class CacheClientBuilder {
private Object cacheServer;
private int port = 1025;
private boolean ssl = false;
private boolean sslInsecure = true;
private boolean jmx = false;
private EntrySerializer entrySerializer = new JDKEntrySerializer();
private MetricsProvider metricsProvider;
Expand Down Expand Up @@ -290,11 +291,13 @@ public CacheClient build() {
locator = new NettyCacheServerLocator(host, port, ssl);
((GenericNettyBrokerLocator) locator).setConnectTimeout(connectTimeout);
((GenericNettyBrokerLocator) locator).setSocketTimeout(socketTimeout);
((GenericNettyBrokerLocator) locator).setSslInsecure(sslInsecure);
break;
case CLUSTERED:
locator = new ZKCacheServerLocator(zkConnectString, zkSessionTimeout, zkPath);
((GenericNettyBrokerLocator) locator).setConnectTimeout(connectTimeout);
((GenericNettyBrokerLocator) locator).setSocketTimeout(socketTimeout);
((GenericNettyBrokerLocator) locator).setSslInsecure(sslInsecure);
break;
case LOCAL:
if (cacheServer == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public abstract class GenericNettyBrokerLocator implements ServerLocator {

protected int connectTimeout = 60000;
protected int socketTimeout = 240000;
protected boolean sslInsecure = true;

public int getConnectTimeout() {
return connectTimeout;
Expand All @@ -61,6 +62,14 @@ public void setSocketTimeout(int socketTimeout) {
this.socketTimeout = socketTimeout;
}

public boolean isSslInsecure() {
return sslInsecure;
}

public void setSslInsecure(boolean sslInsecure) {
this.sslInsecure = sslInsecure;
}

@Override
public Channel connect(ChannelEventListener messageReceiver, ConnectionRequestInfo clientInfo) throws InterruptedException, ServerNotAvailableException, ServerRejectedConnectionException {
boolean ok = false;
Expand All @@ -82,6 +91,7 @@ public Channel connect(ChannelEventListener messageReceiver, ConnectionRequestIn
connector.setConnectTimeout(connectTimeout);
connector.setSocketTimeout(socketTimeout);
connector.setSsl(broker.isSsl());
connector.setSslInsecure(sslInsecure);
NettyChannel channel;
try {
channel = connector.connect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import io.netty.handler.ssl.SslProvider;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import java.io.File;
import java.io.FileInputStream;
import java.security.KeyStore;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand All @@ -48,6 +50,8 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLException;

/**
* Accepts connections from workers
Expand Down Expand Up @@ -165,21 +169,38 @@ public void start() throws Exception {
SelfSignedCertificate ssc = new SelfSignedCertificate();
try {
sslCtx = SslContextBuilder
.forServer(ssc.certificate(), ssc.privateKey())
.sslProvider(useOpenSSL ? SslProvider.OPENSSL : SslProvider.JDK)
.ciphers(sslCiphers)
.build();
.forServer(ssc.certificate(), ssc.privateKey())
.sslProvider(useOpenSSL ? SslProvider.OPENSSL : SslProvider.JDK)
.ciphers(sslCiphers)
.build();
} finally {
ssc.delete();
}
} else {
LOGGER.log(Level.SEVERE, "start SSL with certificate " + sslCertFile.getAbsolutePath() + " chain file " + sslCertChainFile.getAbsolutePath() + ", useOpenSSL:" + useOpenSSL);
LOGGER.log(Level.SEVERE, "start SSL with certificate " + sslCertFile.getAbsolutePath()
+ " chain file " + (sslCertChainFile == null ? "null" : sslCertChainFile.getAbsolutePath())
+ ", useOpenSSL:" + useOpenSSL);
if (sslCiphers != null) {
LOGGER.log(Level.SEVERE, "required sslCiphers " + sslCiphers);
}
sslCtx = SslContextBuilder.forServer(sslCertChainFile, sslCertFile, sslCertPassword)
.sslProvider(useOpenSSL ? SslProvider.OPENSSL : SslProvider.JDK)
.ciphers(sslCiphers).build();
SslContextBuilder builder;
if (sslCertFile.getName().endsWith(".p12") || sslCertFile.getName().endsWith(".pfx")) {
try (FileInputStream fis = new FileInputStream(sslCertFile)) {
KeyStore ks = KeyStore.getInstance("PKCS12");
ks.load(fis, sslCertPassword.toCharArray());

KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(ks, sslCertPassword.toCharArray());

builder = SslContextBuilder.forServer(kmf);
} catch (Exception e) {
throw new SSLException("provided certFile looks like a PKCS12 file but could not be loaded", e);
}
} else {
builder = SslContextBuilder.forServer(sslCertChainFile, sslCertFile, sslCertPassword);
}
sslCtx = builder.sslProvider(useOpenSSL ? SslProvider.OPENSSL : SslProvider.JDK)
.ciphers(sslCiphers).build();
}

}
Expand All @@ -205,31 +226,31 @@ public Thread newThread(Runnable r) {
}
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NetworkUtils.isEnableEpollNative() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
NettyChannel session = new NettyChannel("unnamed", ch, callbackExecutor, null);
if (acceptor != null) {
acceptor.createConnection(session);
}
.channel(NetworkUtils.isEnableEpollNative() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
NettyChannel session = new NettyChannel("unnamed", ch, callbackExecutor, null);
if (acceptor != null) {
acceptor.createConnection(session);
}

// ch.pipeline().addLast(new LoggingHandler());
// Add SSL handler first to encrypt and decrypt everything.
if (ssl) {
ch.pipeline().addLast(sslCtx.newHandler(ch.alloc()));
// Add SSL handler first to encrypt and decrypt everything.
if (ssl) {
ch.pipeline().addLast(sslCtx.newHandler(ch.alloc()));
}

ch.pipeline().addLast("lengthprepender", new LengthFieldPrepender(4));
ch.pipeline().addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
ch.pipeline().addLast("messageencoder", new DataMessageEncoder());
ch.pipeline().addLast("messagedecoder", new DataMessageDecoder());
ch.pipeline().addLast(new InboundMessageHandler(session));
}

ch.pipeline().addLast("lengthprepender", new LengthFieldPrepender(4));
ch.pipeline().addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
ch.pipeline().addLast("messageencoder", new DataMessageEncoder());
ch.pipeline().addLast("messagedecoder", new DataMessageDecoder());
ch.pipeline().addLast(new InboundMessageHandler(session));
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, true);
})
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, true);

ChannelFuture f = b.bind(host, port).sync(); // (7)
this.channel = f.channel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import io.netty.handler.timeout.ReadTimeoutHandler;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
Expand All @@ -59,7 +58,7 @@ public class NettyConnector implements AutoCloseable {
private EventLoopGroup group;
private SslContext sslCtx;
private boolean ssl;
private boolean sslUnsecure = true;
private boolean sslInsecure = true;
protected int connectTimeout = 60000;
protected int socketTimeout = 240000;
private final ExecutorService callbackExecutor = Executors.newCachedThreadPool();
Expand Down Expand Up @@ -100,12 +99,12 @@ public void setSsl(boolean ssl) {
this.ssl = ssl;
}

public boolean isSslUnsecure() {
return sslUnsecure;
public boolean isSslInsecure() {
return sslInsecure;
}

public void setSslUnsecure(boolean sslUnsecure) {
this.sslUnsecure = sslUnsecure;
public void setSslInsecure(boolean sslInsecure) {
this.sslInsecure = sslInsecure;
}

private ChannelEventListener receiver;
Expand All @@ -117,7 +116,7 @@ public NettyConnector(ChannelEventListener receiver) {
public NettyChannel connect() throws Exception {
if (ssl) {
boolean useOpenSSL = NetworkUtils.isOpenSslAvailable();
if (sslUnsecure) {
if (sslInsecure) {
this.sslCtx = SslContextBuilder
.forClient()
.sslProvider(useOpenSSL ? SslProvider.OPENSSL : SslProvider.JDK)
Expand Down
22 changes: 18 additions & 4 deletions blazingcache-core/src/test/java/blazingcache/SimpleSSLTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import blazingcache.server.CacheServer;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import java.io.File;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -35,21 +36,34 @@ public class SimpleSSLTest {

@Test
public void basicTestSslSelfSigned() throws Exception {
basicTestSsl(null, null);
basicTestSsl(null, null, null);
}

@Test
public void basicTestSslWithCert() throws Exception {
SelfSignedCertificate ssc = new SelfSignedCertificate();
basicTestSsl(ssc.privateKey(), ssc.certificate());
basicTestSsl(ssc.privateKey(), ssc.certificate(), null);
}

private void basicTestSsl(File certificateFile, File certificateChain) throws Exception {
@Test
public void basicTestSslWithPwdProtectedCert() throws Exception {
File cert = new File(this.getClass().getClassLoader().getResource("cert1.key").getFile());
File chain = new File(this.getClass().getClassLoader().getResource("cert1_chain.pem").getFile());
basicTestSsl(cert, chain, "blazingcache1");
}

@Test
public void basicTestSslWithPKCS12() throws Exception {
File cert = new File(this.getClass().getClassLoader().getResource("cert1.p12").getFile());
basicTestSsl(cert, null, "blazingcache1");
}

private void basicTestSsl(File certificateFile, File certificateChain, String certificateFilePassword) throws Exception {
byte[] data = "testdata".getBytes(StandardCharsets.UTF_8);

ServerHostData serverHostData = new ServerHostData("localhost", 1234, "test", true, null);
try (CacheServer cacheServer = new CacheServer("ciao", serverHostData)) {
cacheServer.setupSsl(certificateFile, null, certificateChain, null);
cacheServer.setupSsl(certificateFile, certificateFilePassword, certificateChain, null);
cacheServer.start();
try (CacheClient client1 = new CacheClient("theClient1", "ciao", new NettyCacheServerLocator(serverHostData));
CacheClient client2 = new CacheClient("theClient2", "ciao", new NettyCacheServerLocator(serverHostData));) {
Expand Down
30 changes: 30 additions & 0 deletions blazingcache-core/src/test/resources/cert1.key
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
-----BEGIN ENCRYPTED PRIVATE KEY-----
MIIFLTBXBgkqhkiG9w0BBQ0wSjApBgkqhkiG9w0BBQwwHAQI0cy6qzGPdncCAggA
MAwGCCqGSIb3DQIJBQAwHQYJYIZIAWUDBAEqBBD4YpSngAfZjRa9qJ87LmpVBIIE
0JyIDCAHYufKL06QAJgmtoUjDxK28y7S7HkT/RB+6UDLtaWy6XDaFtXOfAymQsgx
L48rX9JkYjPR+O/A+3ddbA7yMiGUYnPH2mk0WCJdehGGI9D1oBkf18qyKIbV0fqE
W5wcpggFYY+issTxX5njhr4Q1iyiA94SHmZG/0TPL3dKlN9XkKVW2eNkm2xH6W3m
AEX4uRC3x2VH0sTW1i7eHhisSz6STlMtdzl4j/SzWsBwNnCHfu8v3GIvGwI9OQ63
OKean6Dv68vmTitFPIHEDeWiZ2dD2SbRJhANxjDrn6buT1IVLZ4AGvS0+MrmQZSf
oJan0z9A0G9q/xnwfAq9RGlswK2fDJCt+rtpVJdTSLr7OP0cWxqBaFFM3ZL+eoFK
wOxcI4hmNlnbvk6+lYUY7G2i+ZXcGQz+jE/ArJ07NJUH8YXbh7DDV69HqkilJRM4
f/JpVjMk0zT/ygzRosgGW2MndBGiBV1EP835Nd7zzkKfyvT98+o0zEp9r7FZP7mv
qLuQGSaiACgWlcc7YG9cjEPqbMQ/6jwlOACwLo7hvE/JcRZHN23ib7ssg8hvlpi7
EW60ZK9fGtJgMm9CDgn0UVdVaCXNXUonuI2r3AD6B7xhlV0ScEZ2T+myQ7vvMvNi
vlK6NbCmms0PgbG0jvzFlAeXw8hWq7cuTbv6dBDokNjQ9XM6GZPDG/ynYpFb7SnK
eznAHtWPYnDjSRtYWaqU3sTWqh0BkAR3xaL0/jRDm3MJBhcE8U3gEugVC5yHML7q
4Ux+aBt/Gq0jT6M3PZENheN8c5wso9shL3M8m+mQ760sHNYH3vTGF+4Z/AOBsgpU
sYTgrPLMScxDfTomf/srT9XG6P+Cd2X5d6pLMWy8FS5/R3GNRRDkOnI3BlkkTR8Y
0Kw8Z7CE05Uw8rUwlkpSGrGwWxq5+ZMN7YnvJH4I3P0FMzl8BSPmVI3XGyaibpBi
reU36l0QekkDIoVa3erMnB0FYO6R29XpY83BGbtiQlrS8Mw47zMHJRo3W4Zl7TdF
2vdNjoKw+Fe3tvMkOT1cNBzHyU00uSaKFB8kxTOv/sLkXeGUUT1n+SLscaxqfGV6
Ctxwims18/ItUsMkMAHX3fedRbBXRV99Cr5wBg9NPc3j+NAC2c5w/8Xt+e66C6lB
bFPRlRXyPJlWCEPrGQEX4OgVQI//JtiRaB39wjCDW+RCYlz2a1fDVy4mfOuu3jp5
RBz1WxL+gJWbpGbXwqBv8jofxm7DtQl8wghWbCGmNTj4pXtPQBA4zamdvi/s2qrL
IZerX0k5zZnY5KQYUgpxybpU28ua9fgrIxzA/Tg5V5dL34WjF3S4IQBqz4PYLVMh
t0Uz6l8s8ccLkZ0e0j9vsFM91oiAn2RIOBheoLg2vNJeLl5KGi+gl3SoiEigdwG8
hTAO8FR+iCoYz+ApOV6H1rqs3g4cWWP/RKWxjBFJ4qERXBDNvBtbA97b/yDnv7hh
X3ZtXtsml9sT4gOZmAtZD3VviJWiuSWpfEcN/DWaNtMLzyQ1WW8Ai+iKqMODjcw/
EnU+/kl2zWvOpbRGK2G8HI7FJVWo6dPfApXdY16ITMXKY35uQAD6HqP2vbPGeBrh
QcZh2pbF3v86Kmwlcj0dTWS4UBSPWJtyeIresTOp3+be
-----END ENCRYPTED PRIVATE KEY-----
Binary file added blazingcache-core/src/test/resources/cert1.p12
Binary file not shown.
21 changes: 21 additions & 0 deletions blazingcache-core/src/test/resources/cert1_chain.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-----BEGIN CERTIFICATE-----
MIIDXzCCAkegAwIBAgIUI9HMRW+gfFWKIV3y+Nu3jCj+ZpowDQYJKoZIhvcNAQEL
BQAwPjELMAkGA1UEBhMCSVQxCzAJBgNVBAgMAlJBMRAwDgYDVQQKDAdEaWVubmVh
MRAwDgYDVQQLDAdEaWVubmVhMCAXDTI1MDMxMzExNDUwN1oYDzIxMjUwMjE3MTE0
NTA3WjA+MQswCQYDVQQGEwJJVDELMAkGA1UECAwCUkExEDAOBgNVBAoMB0RpZW5u
ZWExEDAOBgNVBAsMB0RpZW5uZWEwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK
AoIBAQCtpMg1OcPN985H9cVRy9S9fqHZWCH9EuSHrtFgD+l+XA72lDVI6Z20NNQA
OJoKLEnpJyfeMTYVvr13zo49OefPsEh4xQTp4RnmYFynAptw/pI/7LjPe6w0jgsu
HhTKN9IHyoJykELkfTmGoj21aWP7dK4/OBZUuoIgbI26oWNRahHpsoc8pxf/M5tM
aZZxAQm0Vk2dA7U7qHWjFopwsEQwRGKSl1SRRtoIut5QYdN7SRRZeJXdzqdUmcjx
mEBsOiRoaCa+p6C6MKwhVCAPUdV3UM1v0W/jWVB/NVe/x/vjiqEJITbNvyFRxw57
c39A9TgqPFFV8cYNWJPEo+DyKJ4DAgMBAAGjUzBRMB0GA1UdDgQWBBRSxJLBO4Xu
H/ImY7eJ7nlm4+doajAfBgNVHSMEGDAWgBRSxJLBO4XuH/ImY7eJ7nlm4+doajAP
BgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQChpeQew+D/Vp15izU6
dOlrwVjeJfj5r2nkpQOAtx8G8lNjfXt7+5zKz/s4WsD+L8JrMLGPQy6roXP2kx2t
ZhClGTRojPhFiOdcJ2l2fwzXFW3kREPhvHuKOzr1O3U4nguKO9+vE4prlfX1hL48
N11WULA1vd5H6zI/Yn7l6i9RiCp9hwYZKGq9zgiZzhq5+YSVSN+nkEycEDmBDl4+
frBoamxgE1dapdfHcKrEz8HXplCzmkx1JEkAdvB/+Jwf4RkeIhqcPreyhBR7Axqj
U4iCziFwHkNuRa55T1YfZaZMo0zPOOHkb0OR/dRR3DYF5wwtO+tR11AKPUdj7sns
sdxJ
-----END CERTIFICATE-----
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public class BlazingCacheManager implements CacheManager {
String mode = properties_and_params.getProperty("blazingcache.mode", "local");
int sockettimeout = Integer.parseInt(properties_and_params.getProperty("blazingcache.zookeeper.sockettimeout", "0"));
int connecttimeout = Integer.parseInt(properties_and_params.getProperty("blazingcache.zookeeper.connecttimeout", "10000"));
boolean clientSSLInsecure = Boolean.parseBoolean(properties_and_params.getProperty("blazingcache.locator.client.sslinsecure", "true"));
switch (mode) {
case "clustered": {
String connect = properties_and_params.getProperty("blazingcache.zookeeper.connectstring", "localhost:1281");
Expand All @@ -118,6 +119,7 @@ public class BlazingCacheManager implements CacheManager {
locator = new ZKCacheServerLocator(connect, timeout, path);
((ZKCacheServerLocator) locator).setSocketTimeout(sockettimeout);
((ZKCacheServerLocator) locator).setConnectTimeout(connecttimeout);
((ZKCacheServerLocator) locator).setSslInsecure(clientSSLInsecure);
this.client = new CacheClient(clientId, secret, locator);
this.embeddedServer = null;
}
Expand All @@ -136,6 +138,7 @@ public class BlazingCacheManager implements CacheManager {
locator = new ZKCacheServerLocator(connect, timeout, path);
((ZKCacheServerLocator) locator).setSocketTimeout(sockettimeout);
((ZKCacheServerLocator) locator).setConnectTimeout(connecttimeout);
((ZKCacheServerLocator) locator).setSslInsecure(clientSSLInsecure);
this.client = new CacheClient(clientId, secret, locator);
ServerHostData hostData = new ServerHostData(host, port, "", ssl, new HashMap<>());
this.embeddedServer = new CacheServer(secret, hostData);
Expand All @@ -149,6 +152,7 @@ public class BlazingCacheManager implements CacheManager {
locator = new NettyCacheServerLocator(host, port, ssl);
((NettyCacheServerLocator) locator).setSocketTimeout(sockettimeout);
((NettyCacheServerLocator) locator).setConnectTimeout(connecttimeout);
((NettyCacheServerLocator) locator).setSslInsecure(clientSSLInsecure);
this.client = new CacheClient(clientId, secret, locator);
this.embeddedServer = null;
break;
Expand Down
Loading