Skip to content

Commit 73b4d53

Browse files
authored
make direct remote channel use manual flow control with a larger flow control window (#34035)
1 parent 228c878 commit 73b4d53

File tree

1 file changed

+27
-44
lines changed
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs

1 file changed

+27
-44
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java

Lines changed: 27 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,27 @@
1717
*/
1818
package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
1919

20-
import java.net.Inet6Address;
21-
import java.net.InetSocketAddress;
2220
import java.util.concurrent.TimeUnit;
2321
import javax.net.ssl.SSLException;
2422
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
2523
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress.AuthenticatedGcpServiceAddress;
2624
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Channel;
27-
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ForwardingChannelBuilder2;
2825
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ManagedChannel;
29-
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.alts.AltsChannelBuilder;
26+
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.alts.AltsChannelCredentials;
3027
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.inprocess.InProcessChannelBuilder;
3128
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.netty.GrpcSslContexts;
3229
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.netty.NegotiationType;
3330
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.netty.NettyChannelBuilder;
31+
import org.apache.beam.vendor.grpc.v1p69p0.io.netty.handler.ssl.SslContext;
3432
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
3533

3634
/** Utility class used to create different RPC Channels. */
3735
public final class WindmillChannelFactory {
3836
public static final String LOCALHOST = "localhost";
3937
private static final int MAX_REMOTE_TRACE_EVENTS = 100;
38+
// 10MiB.
39+
private static final int WINDMILL_MAX_FLOW_CONTROL_WINDOW =
40+
NettyChannelBuilder.DEFAULT_FLOW_CONTROL_WINDOW * 10;
4041

4142
private WindmillChannelFactory() {}
4243

@@ -69,55 +70,42 @@ public static ManagedChannel remoteChannel(
6970
}
7071
}
7172

72-
static ManagedChannel remoteDirectChannel(
73+
private static ManagedChannel remoteDirectChannel(
7374
AuthenticatedGcpServiceAddress authenticatedGcpServiceAddress,
7475
int windmillServiceRpcChannelTimeoutSec) {
7576
return withDefaultChannelOptions(
76-
AltsChannelBuilder.forAddress(
77+
NettyChannelBuilder.forAddress(
7778
authenticatedGcpServiceAddress.gcpServiceAddress().getHost(),
78-
authenticatedGcpServiceAddress.gcpServiceAddress().getPort())
79+
authenticatedGcpServiceAddress.gcpServiceAddress().getPort(),
80+
new AltsChannelCredentials.Builder().build())
7981
.overrideAuthority(authenticatedGcpServiceAddress.authenticatingService()),
8082
windmillServiceRpcChannelTimeoutSec)
8183
.build();
8284
}
8385

8486
public static ManagedChannel remoteChannel(
8587
HostAndPort endpoint, int windmillServiceRpcChannelTimeoutSec) {
86-
try {
87-
return createRemoteChannel(
88-
NettyChannelBuilder.forAddress(endpoint.getHost(), endpoint.getPort()),
89-
windmillServiceRpcChannelTimeoutSec);
90-
} catch (SSLException sslException) {
91-
throw new WindmillChannelCreationException(endpoint, sslException);
92-
}
88+
return withDefaultChannelOptions(
89+
NettyChannelBuilder.forAddress(endpoint.getHost(), endpoint.getPort()),
90+
windmillServiceRpcChannelTimeoutSec)
91+
.negotiationType(NegotiationType.TLS)
92+
.sslContext(dataflowGrpcSslContext(endpoint))
93+
.build();
9394
}
9495

95-
public static Channel remoteChannel(
96-
Inet6Address directEndpoint, int port, int windmillServiceRpcChannelTimeoutSec) {
96+
@SuppressWarnings("nullness")
97+
private static SslContext dataflowGrpcSslContext(HostAndPort endpoint) {
9798
try {
98-
return createRemoteChannel(
99-
NettyChannelBuilder.forAddress(new InetSocketAddress(directEndpoint, port)),
100-
windmillServiceRpcChannelTimeoutSec);
99+
// Set ciphers(null) to not use GCM, which is disabled for Dataflow
100+
// due to it being horribly slow.
101+
return GrpcSslContexts.forClient().ciphers(null).build();
101102
} catch (SSLException sslException) {
102-
throw new WindmillChannelCreationException(directEndpoint.toString(), sslException);
103+
throw new WindmillChannelCreationException(endpoint, sslException);
103104
}
104105
}
105106

106-
@SuppressWarnings("nullness")
107-
private static ManagedChannel createRemoteChannel(
108-
NettyChannelBuilder channelBuilder, int windmillServiceRpcChannelTimeoutSec)
109-
throws SSLException {
110-
return withDefaultChannelOptions(channelBuilder, windmillServiceRpcChannelTimeoutSec)
111-
.flowControlWindow(10 * 1024 * 1024)
112-
.negotiationType(NegotiationType.TLS)
113-
// Set ciphers(null) to not use GCM, which is disabled for Dataflow
114-
// due to it being horribly slow.
115-
.sslContext(GrpcSslContexts.forClient().ciphers(null).build())
116-
.build();
117-
}
118-
119-
private static <T extends ForwardingChannelBuilder2<T>> T withDefaultChannelOptions(
120-
T channelBuilder, int windmillServiceRpcChannelTimeoutSec) {
107+
private static NettyChannelBuilder withDefaultChannelOptions(
108+
NettyChannelBuilder channelBuilder, int windmillServiceRpcChannelTimeoutSec) {
121109
if (windmillServiceRpcChannelTimeoutSec > 0) {
122110
channelBuilder
123111
.keepAliveTime(windmillServiceRpcChannelTimeoutSec, TimeUnit.SECONDS)
@@ -128,23 +116,18 @@ private static <T extends ForwardingChannelBuilder2<T>> T withDefaultChannelOpti
128116
return channelBuilder
129117
.maxInboundMessageSize(Integer.MAX_VALUE)
130118
.maxTraceEvents(MAX_REMOTE_TRACE_EVENTS)
131-
.maxInboundMetadataSize(1024 * 1024);
119+
// 1MiB
120+
.maxInboundMetadataSize(1024 * 1024)
121+
.flowControlWindow(WINDMILL_MAX_FLOW_CONTROL_WINDOW);
132122
}
133123

134-
public static class WindmillChannelCreationException extends IllegalStateException {
124+
private static class WindmillChannelCreationException extends IllegalStateException {
135125
private WindmillChannelCreationException(HostAndPort endpoint, SSLException sourceException) {
136126
super(
137127
String.format(
138128
"Exception thrown when trying to create channel to endpoint={host:%s; port:%d}",
139129
endpoint.getHost(), endpoint.getPort()),
140130
sourceException);
141131
}
142-
143-
WindmillChannelCreationException(String directEndpoint, Throwable sourceException) {
144-
super(
145-
String.format(
146-
"Exception thrown when trying to create channel to endpoint={%s}", directEndpoint),
147-
sourceException);
148-
}
149132
}
150133
}

0 commit comments

Comments
 (0)