Skip to content

Commit b0b8a8d

Browse files
Add support for ReactorNettyHttpClient
This new client is based on reactor-netty library. https://github.com/reactor/reactor-netty It supports both HTTP1_1 and HTTP2 communication with the workers and uses netty under the cover. We observed 10%-15% CPU improvement on coordinator when we enabled HTTP2 via ReactorNettyHttpClient.
1 parent 70e1d5e commit b0b8a8d

File tree

10 files changed

+918
-9
lines changed

10 files changed

+918
-9
lines changed

presto-main/pom.xml

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,78 @@
258258
<artifactId>fastutil</artifactId>
259259
</dependency>
260260

261+
<dependency>
262+
<groupId>io.projectreactor.netty</groupId>
263+
<artifactId>reactor-netty-core</artifactId>
264+
<version>1.1.29</version>
265+
</dependency>
266+
267+
<dependency>
268+
<groupId>io.projectreactor.netty</groupId>
269+
<artifactId>reactor-netty-http</artifactId>
270+
<version>1.1.29</version>
271+
</dependency>
272+
273+
<dependency>
274+
<groupId>io.micrometer</groupId>
275+
<artifactId>micrometer-core</artifactId>
276+
<version>1.11.0</version>
277+
<exclusions>
278+
<exclusion>
279+
<groupId>org.hdrhistogram</groupId>
280+
<artifactId>HdrHistogram</artifactId>
281+
</exclusion>
282+
</exclusions>
283+
</dependency>
284+
<dependency>
285+
<groupId>io.micrometer</groupId>
286+
<artifactId>micrometer-registry-jmx</artifactId>
287+
<version>1.11.0</version>
288+
</dependency>
289+
290+
<dependency>
291+
<groupId>io.projectreactor</groupId>
292+
<artifactId>reactor-core</artifactId>
293+
<version>3.8.0-M2</version>
294+
</dependency>
295+
296+
<dependency>
297+
<groupId>org.reactivestreams</groupId>
298+
<artifactId>reactive-streams</artifactId>
299+
<version>1.0.4</version>
300+
</dependency>
301+
302+
<dependency>
303+
<groupId>io.netty</groupId>
304+
<artifactId>netty-codec-http</artifactId>
305+
</dependency>
306+
307+
<dependency>
308+
<groupId>io.netty</groupId>
309+
<artifactId>netty-transport-classes-epoll</artifactId>
310+
</dependency>
311+
312+
<dependency>
313+
<groupId>io.netty</groupId>
314+
<artifactId>netty-handler</artifactId>
315+
</dependency>
316+
317+
<!-- Use Native Epoll event loop -->
318+
<dependency>
319+
<groupId>io.netty</groupId>
320+
<artifactId>netty-transport-native-epoll</artifactId>
321+
<version>${dep.netty.version}</version>
322+
<classifier>linux-x86_64</classifier>
323+
<scope>runtime</scope>
324+
</dependency>
325+
326+
<!-- Use OpenSSL for ssl connections -->
327+
<dependency>
328+
<groupId>io.netty</groupId>
329+
<artifactId>netty-tcnative-boringssl-static</artifactId>
330+
<scope>runtime</scope>
331+
</dependency>
332+
261333
<!-- Testing dependencies -->
262334
<dependency>
263335
<groupId>com.facebook.presto</groupId>
@@ -311,7 +383,7 @@
311383
<groupId>io.netty</groupId>
312384
<artifactId>netty-common</artifactId>
313385
</dependency>
314-
386+
315387
<dependency>
316388
<groupId>com.squareup.okhttp3</groupId>
317389
<artifactId>mockwebserver</artifactId>

presto-main/src/main/java/com/facebook/presto/server/CoordinatorModule.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.facebook.airlift.concurrent.BoundedExecutor;
1717
import com.facebook.airlift.configuration.AbstractConfigurationAwareModule;
1818
import com.facebook.airlift.discovery.server.EmbeddedDiscoveryModule;
19+
import com.facebook.airlift.http.client.HttpClient;
1920
import com.facebook.airlift.http.server.HttpServerBinder.HttpResourceBinding;
2021
import com.facebook.presto.client.QueryResults;
2122
import com.facebook.presto.cost.CostCalculator;
@@ -83,6 +84,8 @@
8384
import com.facebook.presto.server.protocol.QueuedStatementResource;
8485
import com.facebook.presto.server.protocol.RetryCircuitBreaker;
8586
import com.facebook.presto.server.remotetask.HttpRemoteTaskFactory;
87+
import com.facebook.presto.server.remotetask.ReactorNettyHttpClient;
88+
import com.facebook.presto.server.remotetask.ReactorNettyHttpClientConfig;
8689
import com.facebook.presto.server.remotetask.RemoteTaskStats;
8790
import com.facebook.presto.spi.memory.ClusterMemoryPoolManager;
8891
import com.facebook.presto.spi.security.SelectedRole;
@@ -142,7 +145,7 @@ public class CoordinatorModule
142145
{
143146
private static final String DEFAULT_WEBUI_CSP =
144147
"default-src 'self'; style-src 'self' 'unsafe-inline' https://fonts.googleapis.com; " +
145-
"font-src 'self' https://fonts.gstatic.com; frame-ancestors 'self'; img-src http: https: data:";
148+
"font-src 'self' https://fonts.gstatic.com; frame-ancestors 'self'; img-src http: https: data:";
146149

147150
public static HttpResourceBinding webUIBinder(Binder binder, String path, String classPathResourceBase)
148151
{
@@ -268,13 +271,20 @@ protected void setup(Binder binder)
268271
binder.bind(RemoteTaskStats.class).in(Scopes.SINGLETON);
269272
newExporter(binder).export(RemoteTaskStats.class).withGeneratedName();
270273

271-
httpClientBinder(binder).bindHttpClient("scheduler", ForScheduler.class)
272-
.withTracing()
273-
.withFilter(GenerateTraceTokenRequestFilter.class)
274-
.withConfigDefaults(config -> {
275-
config.setRequestTimeout(new Duration(10, SECONDS));
276-
config.setMaxConnectionsPerServer(250);
277-
});
274+
ReactorNettyHttpClientConfig reactorNettyHttpClientConfig = buildConfigObject(ReactorNettyHttpClientConfig.class);
275+
if (reactorNettyHttpClientConfig.isReactorNettyHttpClientEnabled()) {
276+
binder.bind(ReactorNettyHttpClient.class).in(Scopes.SINGLETON);
277+
binder.bind(HttpClient.class).annotatedWith(ForScheduler.class).to(ReactorNettyHttpClient.class);
278+
}
279+
else {
280+
httpClientBinder(binder).bindHttpClient("scheduler", ForScheduler.class)
281+
.withTracing()
282+
.withFilter(GenerateTraceTokenRequestFilter.class)
283+
.withConfigDefaults(config -> {
284+
config.setRequestTimeout(new Duration(10, SECONDS));
285+
config.setMaxConnectionsPerServer(250);
286+
});
287+
}
278288

279289
binder.bind(ScheduledExecutorService.class).annotatedWith(ForScheduler.class)
280290
.toInstance(newSingleThreadScheduledExecutor(threadsNamed("stage-scheduler")));

presto-main/src/main/java/com/facebook/presto/server/InternalCommunicationModule.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.facebook.airlift.configuration.AbstractConfigurationAwareModule;
1717
import com.facebook.airlift.http.client.HttpClientConfig;
1818
import com.facebook.airlift.http.client.spnego.KerberosConfig;
19+
import com.facebook.presto.server.remotetask.ReactorNettyHttpClientConfig;
1920
import com.facebook.presto.server.security.InternalAuthenticationFilter;
2021
import com.google.inject.Binder;
2122
import com.google.inject.Module;
@@ -53,6 +54,16 @@ protected void setup(Binder binder)
5354
}
5455
});
5556

57+
configBinder(binder).bindConfigGlobalDefaults(ReactorNettyHttpClientConfig.class, config -> {
58+
config.setHttpsEnabled(internalCommunicationConfig.isHttpsRequired());
59+
config.setKeyStorePath(internalCommunicationConfig.getKeyStorePath());
60+
config.setKeyStorePassword(internalCommunicationConfig.getKeyStorePassword());
61+
config.setTrustStorePath(internalCommunicationConfig.getTrustStorePath());
62+
if (internalCommunicationConfig.getIncludedCipherSuites().isPresent()) {
63+
config.setCipherSuites(internalCommunicationConfig.getIncludedCipherSuites().get());
64+
}
65+
});
66+
5667
install(installModuleIf(InternalCommunicationConfig.class, InternalCommunicationConfig::isKerberosEnabled, kerberosInternalCommunicationModule()));
5768
binder.bind(InternalAuthenticationManager.class);
5869
httpClientBinder(binder).bindGlobalFilter(InternalAuthenticationManager.class);

presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@
140140
import com.facebook.presto.resourcemanager.ResourceManagerInconsistentException;
141141
import com.facebook.presto.resourcemanager.ResourceManagerResourceGroupService;
142142
import com.facebook.presto.server.remotetask.HttpLocationFactory;
143+
import com.facebook.presto.server.remotetask.ReactorNettyHttpClientConfig;
143144
import com.facebook.presto.server.thrift.FixedAddressSelector;
144145
import com.facebook.presto.server.thrift.HandleThriftModule;
145146
import com.facebook.presto.server.thrift.ThriftServerInfoClient;
@@ -544,6 +545,7 @@ public ListeningExecutorService createResourceManagerExecutor(ResourceManagerCon
544545
binder.bind(PageFunctionCompiler.class).in(Scopes.SINGLETON);
545546
newExporter(binder).export(PageFunctionCompiler.class).withGeneratedName();
546547
configBinder(binder).bindConfig(TaskManagerConfig.class);
548+
configBinder(binder).bindConfig(ReactorNettyHttpClientConfig.class);
547549
binder.bind(IndexJoinLookupStats.class).in(Scopes.SINGLETON);
548550
newExporter(binder).export(IndexJoinLookupStats.class).withGeneratedName();
549551
binder.bind(AsyncHttpExecutionMBean.class).in(Scopes.SINGLETON);

0 commit comments

Comments
 (0)