Skip to content

Commit 22b4fad

Browse files
committed
Support TLS for cproto connector
Closes gh-122
1 parent 2fc558b commit 22b4fad

File tree

14 files changed

+284
-15
lines changed

14 files changed

+284
-15
lines changed

builtin-adapter/CMakeLists.txt

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
cmake_minimum_required(VERSION 3.0)
1+
cmake_minimum_required(VERSION 3.5)
22

33
project(builtin-adapter)
44

@@ -17,3 +17,18 @@ add_library(${TARGET} SHARED BuiltinAdapter.cpp)
1717
include_directories(${JNI_INCLUDE_DIRS} ${REINDEXER_INCLUDE_DIRS})
1818

1919
target_link_libraries(${TARGET} reindexer_server_library reindexer_server_resources ${REINDEXER_LIBRARIES})
20+
21+
# workaround for MacOS to propagate correct openssl path to Reindexer.
22+
if(APPLE)
23+
find_package(OpenSSL)
24+
if(OpenSSL_FOUND)
25+
if(NOT DEFINED OPENSSL_LIB_PATH)
26+
get_filename_component(OPENSSL_LIB_PATH ${OPENSSL_CRYPTO_LIBRARY} DIRECTORY)
27+
endif()
28+
message("-- Found package: OpenSSL@${OPENSSL_VERSION} - lib path: ${OPENSSL_LIB_PATH}")
29+
set_target_properties(${TARGET} PROPERTIES
30+
BUILD_WITH_INSTALL_RPATH TRUE
31+
INSTALL_RPATH ${OPENSSL_LIB_PATH}
32+
)
33+
endif()
34+
endif()

pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,10 @@
139139
<!-- Activate the use of TCP to transmit events to the plugin. -->
140140
<forkNode implementation="org.apache.maven.plugin.surefire.extensions.SurefireForkNodeFactory" />
141141
<groups>${tests}</groups>
142+
<systemPropertyVariables>
143+
<javax.net.ssl.trustStore>${project.basedir}/src/test/resources/builtin-server.jks</javax.net.ssl.trustStore>
144+
<javax.net.ssl.trustStorePassword>password</javax.net.ssl.trustStorePassword>
145+
</systemPropertyVariables>
142146
</configuration>
143147
</plugin>
144148

src/main/java/ru/rt/restream/reindexer/ReindexerConfiguration.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import java.util.Objects;
3434
import java.util.function.Consumer;
3535

36+
import javax.net.ssl.SSLSocketFactory;
37+
3638
/**
3739
* Represents approach for bootstrapping Reindexer.
3840
*/
@@ -54,6 +56,8 @@ public final class ReindexerConfiguration {
5456

5557
private String serverConfigFile = "default-builtin-server-config.yml";
5658

59+
private SSLSocketFactory sslSocketFactory;
60+
5761
private ReindexerConfiguration() {
5862

5963
}
@@ -161,6 +165,17 @@ public ReindexerConfiguration serverConfigFile(String serverConfigFile) {
161165
return this;
162166
}
163167

168+
/**
169+
* Configure an {@link SSLSocketFactory}.
170+
*
171+
* @param sslSocketFactory the {@link SSLSocketFactory} to use
172+
* @return the {@link ReindexerConfiguration} for further customizations
173+
*/
174+
public ReindexerConfiguration sslSocketFactory(SSLSocketFactory sslSocketFactory) {
175+
this.sslSocketFactory = sslSocketFactory;
176+
return this;
177+
}
178+
164179
/**
165180
* Build and return reindexer connector instance.
166181
*
@@ -186,18 +201,23 @@ public Reindexer getReindexer() {
186201

187202
private Binding getBinding(String protocol, List<URI> uris) {
188203
switch (protocol) {
204+
case "cprotos":
205+
if (sslSocketFactory == null) {
206+
sslSocketFactory = (SSLSocketFactory) SSLSocketFactory.getDefault();
207+
}
189208
case "cproto":
190209
DataSourceConfiguration dataSourceConfig = DataSourceConfiguration.builder()
191210
.urls(urls)
192211
.allowUnlistedDataSource(allowUnlistedDataSource)
212+
.sslSocketFactory(sslSocketFactory)
193213
.build();
194214
return new Cproto(dataSourceFactory, dataSourceConfig, connectionPoolSize, requestTimeout);
195215
case "builtin":
196216
return new Builtin(uris.get(0), requestTimeout);
197217
case "builtinserver":
198218
return new BuiltinServer(uris.get(0), serverConfigFile, serverStartupTimeout, requestTimeout);
199219
default:
200-
throw new UnimplementedException("Protocol: '" + protocol + "' is not suppored");
220+
throw new UnimplementedException("Protocol: '" + protocol + "' is not supported");
201221
}
202222
}
203223

src/main/java/ru/rt/restream/reindexer/binding/cproto/DataSourceConfiguration.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.util.List;
2323
import java.util.Objects;
2424

25+
import javax.net.ssl.SSLSocketFactory;
26+
2527
/**
2628
* A {@link DataSource} configuration.
2729
*/
@@ -37,6 +39,11 @@ public class DataSourceConfiguration {
3739
*/
3840
private final boolean allowUnlistedDataSource;
3941

42+
/**
43+
* An {@link SSLSocketFactory} to connect to Reindexer using cprotos (SSL/TLS) protocol.
44+
*/
45+
private final SSLSocketFactory sslSocketFactory;
46+
4047
/**
4148
* An index of the current active data source.
4249
*/
@@ -46,6 +53,7 @@ private DataSourceConfiguration(Builder builder) {
4653
urls = builder.urls;
4754
allowUnlistedDataSource = builder.allowUnlistedDataSource;
4855
active = builder.active;
56+
sslSocketFactory = builder.sslSocketFactory;
4957
}
5058

5159
public static Builder builder() {
@@ -69,6 +77,15 @@ public boolean isAllowUnlistedDataSource() {
6977
return allowUnlistedDataSource;
7078
}
7179

80+
/**
81+
* Returns an {@link SSLSocketFactory} to connect to Reindexer using cprotos (SSL/TLS) protocol.
82+
*
83+
* @return the {@link SSLSocketFactory} to use
84+
*/
85+
public SSLSocketFactory getSslSocketFactory() {
86+
return sslSocketFactory;
87+
}
88+
7289
/**
7390
* Returns the index of the current active data source.
7491
*
@@ -102,6 +119,11 @@ public static class Builder {
102119
*/
103120
private boolean allowUnlistedDataSource = true;
104121

122+
/**
123+
* An {@link SSLSocketFactory} to connect to Reindexer using cprotos (SSL/TLS) protocol.
124+
*/
125+
private SSLSocketFactory sslSocketFactory;
126+
105127
/**
106128
* An index of the current active data source.
107129
*/
@@ -157,6 +179,17 @@ public Builder allowUnlistedDataSource(boolean allowUnlistedDataSource) {
157179
return this;
158180
}
159181

182+
/**
183+
* Configure an {@link SSLSocketFactory} to connect to Reindexer using cprotos (TLS) protocol.
184+
*
185+
* @param sslSocketFactory the {@link SSLSocketFactory} to use
186+
* @return the {@link Builder} for further customizations
187+
*/
188+
public Builder sslSocketFactory(SSLSocketFactory sslSocketFactory) {
189+
this.sslSocketFactory = sslSocketFactory;
190+
return this;
191+
}
192+
160193
/**
161194
* Build and return a {@link DataSource} configuration.
162195
*

src/main/java/ru/rt/restream/reindexer/binding/cproto/DataSourceFactoryStrategy.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public enum DataSourceFactoryStrategy implements DataSourceFactory {
4343
public DataSource getDataSource(DataSourceConfiguration configuration) {
4444
List<String> urls = configuration.getUrls();
4545
configuration.setActive((configuration.getActive() + 1) % urls.size());
46-
return new PhysicalDataSource(urls.get(configuration.getActive()));
46+
return new PhysicalDataSource(urls.get(configuration.getActive()), configuration.getSslSocketFactory());
4747
}
4848
},
4949

@@ -55,7 +55,7 @@ public DataSource getDataSource(DataSourceConfiguration configuration) {
5555
public DataSource getDataSource(DataSourceConfiguration configuration) {
5656
List<String> urls = configuration.getUrls();
5757
configuration.setActive(ThreadLocalRandom.current().nextInt(urls.size()));
58-
return new PhysicalDataSource(urls.get(configuration.getActive()));
58+
return new PhysicalDataSource(urls.get(configuration.getActive()), configuration.getSslSocketFactory());
5959
}
6060
},
6161

src/main/java/ru/rt/restream/reindexer/binding/cproto/PhysicalConnection.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@
4646
import java.util.concurrent.locks.ReentrantLock;
4747
import java.util.concurrent.locks.ReentrantReadWriteLock;
4848

49+
import javax.net.ssl.SSLSocket;
50+
import javax.net.ssl.SSLSocketFactory;
51+
4952
import static ru.rt.restream.reindexer.binding.Consts.APP_PROPERTY_NAME;
5053
import static ru.rt.restream.reindexer.binding.Consts.BINDING_CAPABILITY_COMPLEX_RANK;
5154
import static ru.rt.restream.reindexer.binding.Consts.BINDING_CAPABILITY_NAMESPACE_INCARNATIONS;
@@ -106,9 +109,18 @@ public class PhysicalConnection implements Connection {
106109
private final ScheduledFuture<?> writeTaskFuture;
107110

108111
public PhysicalConnection(String host, int port, String user, String password, String database,
112+
SSLSocketFactory sslSocketFactory,
109113
Duration requestTimeout, ScheduledExecutorService scheduler) {
110114
try {
111-
clientSocket = new Socket(host, port);
115+
if (sslSocketFactory != null) {
116+
LOGGER.debug("rx: using SSL/TLS connection to {}:{}", host, port);
117+
SSLSocket sslSocket = (SSLSocket) sslSocketFactory.createSocket(host, port);
118+
// Fail fast if SSL/TLS handshake fails.
119+
sslSocket.startHandshake();
120+
clientSocket = sslSocket;
121+
} else {
122+
clientSocket = new Socket(host, port);
123+
}
112124
output = new DataOutputStream(clientSocket.getOutputStream());
113125
input = new DataInputStream(clientSocket.getInputStream());
114126
timeout = requestTimeout;

src/main/java/ru/rt/restream/reindexer/binding/cproto/PhysicalDataSource.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import java.time.Duration;
2121
import java.util.concurrent.ScheduledThreadPoolExecutor;
2222

23+
import javax.net.ssl.SSLSocketFactory;
24+
2325
/**
2426
* A {@link DataSource} that creates a {@link PhysicalConnection}.
2527
*/
@@ -35,12 +37,27 @@ public class PhysicalDataSource implements DataSource {
3537

3638
private final String database;
3739

40+
private final SSLSocketFactory sslSocketFactory;
41+
3842
/**
3943
* Creates an instance.
4044
*
4145
* @param url the URL to use
46+
* @deprecated Use {@link #PhysicalDataSource(String, SSLSocketFactory)}
47+
* to connect to Reindexer using cprotos (SSL/TLS) protocol.
4248
*/
49+
@Deprecated
4350
public PhysicalDataSource(String url) {
51+
this(url, null);
52+
}
53+
54+
/**
55+
* Creates an instance.
56+
*
57+
* @param url the URL to use
58+
* @param sslSocketFactory the {@link SSLSocketFactory} socket factory to use
59+
*/
60+
public PhysicalDataSource(String url, SSLSocketFactory sslSocketFactory) {
4461
URI uri = URI.create(url);
4562
host = uri.getHost();
4663
port = uri.getPort();
@@ -58,11 +75,12 @@ public PhysicalDataSource(String url) {
5875
password = "";
5976
}
6077
database = uri.getPath().substring(1);
78+
this.sslSocketFactory = sslSocketFactory;
6179
}
6280

6381
@Override
6482
public Connection getConnection(Duration timeout, ScheduledThreadPoolExecutor scheduler) {
65-
return new PhysicalConnection(host, port, user, password, database, timeout, scheduler);
83+
return new PhysicalConnection(host, port, user, password, database, sslSocketFactory, timeout, scheduler);
6684
}
6785

6886
@Override
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2020 Restream
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package ru.rt.restream.reindexer.connector;
17+
18+
import org.junit.jupiter.api.BeforeAll;
19+
import ru.rt.restream.category.CprotoTest;
20+
import ru.rt.restream.reindexer.db.DbLocator;
21+
import ru.rt.restream.reindexer.db.DbLocator.Type;
22+
23+
/**
24+
* Tests for Cprotos (SSL/TLS) protocol implementation.
25+
*/
26+
@CprotoTest
27+
public class CprotosReindexerTest extends ReindexerTest {
28+
29+
@BeforeAll
30+
@Override
31+
protected void initDb() {
32+
db = DbLocator.getDb(Type.CPROTOS);
33+
}
34+
35+
}

src/test/java/ru/rt/restream/reindexer/db/DbBaseTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public abstract class DbBaseTest {
4141
* Initializes the Reindexer instance before all tests.
4242
*/
4343
@BeforeAll
44-
void initDb() {
44+
protected void initDb() {
4545
if (this.getClass().isAnnotationPresent(CprotoTest.class)) {
4646
db = DbLocator.getDb(CPROTO);
4747
} else if (this.getClass().isAnnotationPresent(BuiltinTest.class)) {

0 commit comments

Comments
 (0)