Skip to content

Commit a289605

Browse files
authored
xds: integration of XdsClientImpl with XdsServerBuilder to deliver Listener updates (#6838)
1 parent 47b6b39 commit a289605

File tree

7 files changed

+803
-31
lines changed

7 files changed

+803
-31
lines changed

xds/src/main/java/io/grpc/xds/EnvoyServerProtoData.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ static final class FilterChainMatch {
9191
private final List<CidrRange> prefixRanges;
9292
private final List<String> applicationProtocols;
9393

94-
private FilterChainMatch(int destinationPort,
94+
@VisibleForTesting
95+
FilterChainMatch(int destinationPort,
9596
List<CidrRange> prefixRanges, List<String> applicationProtocols) {
9697
this.destinationPort = destinationPort;
9798
this.prefixRanges = Collections.unmodifiableList(prefixRanges);
@@ -164,7 +165,8 @@ static final class FilterChain {
164165
// TODO(sanjaypujare): remove dependency on envoy data type along with rest of the code.
165166
private final io.envoyproxy.envoy.api.v2.auth.DownstreamTlsContext downstreamTlsContext;
166167

167-
private FilterChain(FilterChainMatch filterChainMatch,
168+
@VisibleForTesting
169+
FilterChain(FilterChainMatch filterChainMatch,
168170
io.envoyproxy.envoy.api.v2.auth.DownstreamTlsContext downstreamTlsContext) {
169171
this.filterChainMatch = filterChainMatch;
170172
this.downstreamTlsContext = downstreamTlsContext;
@@ -223,7 +225,8 @@ static final class Listener {
223225
private final String address;
224226
private final List<FilterChain> filterChains;
225227

226-
private Listener(String name, String address,
228+
@VisibleForTesting
229+
Listener(String name, String address,
227230
List<FilterChain> filterChains) {
228231
this.name = name;
229232
this.address = address;
Lines changed: 296 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,296 @@
1+
/*
2+
* Copyright 2020 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.xds;
18+
19+
import static com.google.common.base.Preconditions.checkNotNull;
20+
import static com.google.common.base.Preconditions.checkState;
21+
22+
import com.google.common.annotations.VisibleForTesting;
23+
import io.envoyproxy.envoy.api.v2.auth.DownstreamTlsContext;
24+
import io.envoyproxy.envoy.api.v2.core.Node;
25+
import io.grpc.Internal;
26+
import io.grpc.Status;
27+
import io.grpc.SynchronizationContext;
28+
import io.grpc.internal.ExponentialBackoffPolicy;
29+
import io.grpc.internal.GrpcUtil;
30+
import io.grpc.internal.SharedResourceHolder;
31+
import io.grpc.xds.EnvoyServerProtoData.CidrRange;
32+
import io.grpc.xds.EnvoyServerProtoData.FilterChain;
33+
import io.grpc.xds.EnvoyServerProtoData.FilterChainMatch;
34+
import io.netty.channel.Channel;
35+
import io.netty.channel.epoll.Epoll;
36+
import io.netty.channel.epoll.EpollEventLoopGroup;
37+
import io.netty.util.concurrent.DefaultThreadFactory;
38+
import java.io.IOException;
39+
import java.net.InetAddress;
40+
import java.net.InetSocketAddress;
41+
import java.net.SocketAddress;
42+
import java.net.UnknownHostException;
43+
import java.util.Collections;
44+
import java.util.Comparator;
45+
import java.util.List;
46+
import java.util.NoSuchElementException;
47+
import java.util.concurrent.Executors;
48+
import java.util.concurrent.ScheduledExecutorService;
49+
import java.util.concurrent.ThreadFactory;
50+
import java.util.concurrent.TimeUnit;
51+
import java.util.logging.Level;
52+
import java.util.logging.Logger;
53+
import javax.annotation.Nullable;
54+
55+
/**
56+
* Serves as a wrapper for {@link XdsClientImpl} used on the server side by {@link
57+
* io.grpc.xds.internal.sds.XdsServerBuilder}.
58+
*/
59+
@Internal
60+
public final class XdsClientWrapperForServerSds {
61+
private static final Logger logger =
62+
Logger.getLogger(XdsClientWrapperForServerSds.class.getName());
63+
64+
private static final TimeServiceResource timeServiceResource =
65+
new TimeServiceResource("GrpcServerXdsClient");
66+
67+
private EnvoyServerProtoData.Listener curListener;
68+
// TODO(sanjaypujare): implement shutting down XdsServer which will need xdsClient reference
69+
@SuppressWarnings("unused")
70+
@Nullable private XdsClient xdsClient;
71+
private final int port;
72+
private final ScheduledExecutorService timeService;
73+
74+
/**
75+
* Factory method for creating a {@link XdsClientWrapperForServerSds}.
76+
*
77+
* @param port server's port for which listener config is needed.
78+
* @param bootstrapper {@link Bootstrapper} instance to load bootstrap config.
79+
* @param syncContext {@link SynchronizationContext} needed by {@link XdsClient}.
80+
*/
81+
public static XdsClientWrapperForServerSds newInstance(
82+
int port, Bootstrapper bootstrapper, SynchronizationContext syncContext) throws IOException {
83+
Bootstrapper.BootstrapInfo bootstrapInfo = bootstrapper.readBootstrap();
84+
final List<Bootstrapper.ServerInfo> serverList = bootstrapInfo.getServers();
85+
if (serverList.isEmpty()) {
86+
throw new NoSuchElementException("No management server provided by bootstrap");
87+
}
88+
final Node node = bootstrapInfo.getNode();
89+
ScheduledExecutorService timeService = SharedResourceHolder.get(timeServiceResource);
90+
XdsClientImpl xdsClientImpl =
91+
new XdsClientImpl(
92+
"",
93+
serverList,
94+
XdsClient.XdsChannelFactory.getInstance(),
95+
node,
96+
syncContext,
97+
timeService,
98+
new ExponentialBackoffPolicy.Provider(),
99+
GrpcUtil.STOPWATCH_SUPPLIER);
100+
return new XdsClientWrapperForServerSds(port, xdsClientImpl, timeService);
101+
}
102+
103+
@VisibleForTesting
104+
XdsClientWrapperForServerSds(int port, XdsClient xdsClient,
105+
ScheduledExecutorService timeService) {
106+
this.port = port;
107+
this.xdsClient = xdsClient;
108+
this.timeService = timeService;
109+
xdsClient.watchListenerData(
110+
port,
111+
new XdsClient.ListenerWatcher() {
112+
@Override
113+
public void onListenerChanged(XdsClient.ListenerUpdate update) {
114+
logger.log(
115+
Level.INFO,
116+
"Setting myListener from ConfigUpdate listener :{0}",
117+
update.getListener().toString());
118+
curListener = update.getListener();
119+
}
120+
121+
@Override
122+
public void onError(Status error) {
123+
// In order to distinguish between IO error and resource not found, set curListener
124+
// to null in case of NOT_FOUND
125+
if (error.getCode().equals(Status.Code.NOT_FOUND)) {
126+
curListener = null;
127+
}
128+
// TODO(sanjaypujare): Implement logic for other cases based on final design.
129+
logger.log(Level.SEVERE, "ListenerWatcher in XdsClientWrapperForServerSds:{0}", error);
130+
}
131+
});
132+
}
133+
134+
/**
135+
* Locates the best matching FilterChain to the channel from the current listener and if found
136+
* returns the DownstreamTlsContext from that FilterChain, else null.
137+
*/
138+
@Nullable
139+
public DownstreamTlsContext getDownstreamTlsContext(Channel channel) {
140+
if (curListener != null && channel != null) {
141+
SocketAddress localAddress = channel.localAddress();
142+
checkState(
143+
localAddress instanceof InetSocketAddress,
144+
"Channel localAddress is expected to be InetSocketAddress");
145+
InetSocketAddress localInetAddr = (InetSocketAddress) localAddress;
146+
checkState(
147+
port == localInetAddr.getPort(),
148+
"Channel localAddress port does not match requested listener port");
149+
List<FilterChain> filterChains = curListener.getFilterChains();
150+
FilterChainComparator comparator = new FilterChainComparator(localInetAddr);
151+
FilterChain bestMatch =
152+
filterChains.isEmpty() ? null : Collections.max(filterChains, comparator);
153+
if (bestMatch != null && comparator.isMatching(bestMatch.getFilterChainMatch())) {
154+
return bestMatch.getDownstreamTlsContext();
155+
}
156+
}
157+
return null;
158+
}
159+
160+
private static final class FilterChainComparator implements Comparator<FilterChain> {
161+
private InetSocketAddress localAddress;
162+
163+
private enum Match {
164+
NO_MATCH,
165+
EMPTY_PREFIX_RANGE_MATCH,
166+
IPANY_MATCH,
167+
EXACT_ADDRESS_MATCH
168+
}
169+
170+
private FilterChainComparator(InetSocketAddress localAddress) {
171+
checkNotNull(localAddress, "localAddress cannot be null");
172+
this.localAddress = localAddress;
173+
}
174+
175+
@Override
176+
public int compare(FilterChain first, FilterChain second) {
177+
checkNotNull(first, "first arg cannot be null");
178+
checkNotNull(second, "second arg cannot be null");
179+
FilterChainMatch firstMatch = first.getFilterChainMatch();
180+
FilterChainMatch secondMatch = second.getFilterChainMatch();
181+
182+
if (firstMatch == null) {
183+
return (secondMatch == null) ? 0 : (isMatching(secondMatch) ? -1 : 1);
184+
} else {
185+
return (secondMatch == null)
186+
? (isMatching(firstMatch) ? 1 : -1)
187+
: compare(firstMatch, secondMatch);
188+
}
189+
}
190+
191+
private int compare(FilterChainMatch first, FilterChainMatch second) {
192+
int channelPort = localAddress.getPort();
193+
194+
if (first.getDestinationPort() == channelPort) {
195+
return (second.getDestinationPort() == channelPort)
196+
? compare(first.getPrefixRanges(), second.getPrefixRanges())
197+
: (isInetAddressMatching(first.getPrefixRanges()) ? 1 : 0);
198+
} else {
199+
return (second.getDestinationPort() == channelPort)
200+
? (isInetAddressMatching(second.getPrefixRanges()) ? -1 : 0)
201+
: 0;
202+
}
203+
}
204+
205+
private int compare(List<CidrRange> first, List<CidrRange> second) {
206+
return getInetAddressMatch(first).ordinal() - getInetAddressMatch(second).ordinal();
207+
}
208+
209+
private boolean isInetAddressMatching(List<CidrRange> prefixRanges) {
210+
return getInetAddressMatch(prefixRanges).ordinal() > Match.NO_MATCH.ordinal();
211+
}
212+
213+
private Match getInetAddressMatch(List<CidrRange> prefixRanges) {
214+
if (prefixRanges == null || prefixRanges.isEmpty()) {
215+
return Match.EMPTY_PREFIX_RANGE_MATCH;
216+
}
217+
InetAddress localInetAddress = localAddress.getAddress();
218+
for (CidrRange cidrRange : prefixRanges) {
219+
if (cidrRange.getPrefixLen() == 32) {
220+
try {
221+
InetAddress cidrAddr = InetAddress.getByName(cidrRange.getAddressPrefix());
222+
if (cidrAddr.isAnyLocalAddress()) {
223+
return Match.IPANY_MATCH;
224+
}
225+
if (cidrAddr.equals(localInetAddress)) {
226+
return Match.EXACT_ADDRESS_MATCH;
227+
}
228+
} catch (UnknownHostException e) {
229+
logger.log(Level.WARNING, "cidrRange address parsing", e);
230+
// continue
231+
}
232+
}
233+
// TODO(sanjaypujare): implement prefix match logic as needed
234+
}
235+
return Match.NO_MATCH;
236+
}
237+
238+
private boolean isMatching(FilterChainMatch filterChainMatch) {
239+
if (filterChainMatch == null) {
240+
return true;
241+
}
242+
int destPort = filterChainMatch.getDestinationPort();
243+
if (destPort != localAddress.getPort()) {
244+
return false;
245+
}
246+
return isInetAddressMatching(filterChainMatch.getPrefixRanges());
247+
}
248+
}
249+
250+
/** Shutdown this instance and release resources. */
251+
public void shutdown() {
252+
logger.log(Level.FINER, "Shutdown");
253+
if (xdsClient != null) {
254+
xdsClient.shutdown();
255+
}
256+
if (timeService != null) {
257+
timeServiceResource.close(timeService);
258+
}
259+
}
260+
261+
private static final class TimeServiceResource
262+
implements SharedResourceHolder.Resource<ScheduledExecutorService> {
263+
264+
private final String name;
265+
266+
TimeServiceResource(String name) {
267+
this.name = name;
268+
}
269+
270+
@Override
271+
public ScheduledExecutorService create() {
272+
// Use Netty's DefaultThreadFactory in order to get the benefit of FastThreadLocal.
273+
ThreadFactory threadFactory = new DefaultThreadFactory(name, /* daemon= */ true);
274+
if (Epoll.isAvailable()) {
275+
return new EpollEventLoopGroup(1, threadFactory);
276+
} else {
277+
return Executors.newSingleThreadScheduledExecutor(threadFactory);
278+
}
279+
}
280+
281+
@SuppressWarnings("FutureReturnValueIgnored")
282+
@Override
283+
public void close(ScheduledExecutorService instance) {
284+
try {
285+
if (instance instanceof EpollEventLoopGroup) {
286+
((EpollEventLoopGroup)instance).shutdownGracefully(0, 0, TimeUnit.SECONDS).sync();
287+
} else {
288+
instance.shutdown();
289+
}
290+
} catch (InterruptedException e) {
291+
logger.log(Level.SEVERE, "Interrupted during shutdown", e);
292+
Thread.currentThread().interrupt();
293+
}
294+
}
295+
}
296+
}

0 commit comments

Comments
 (0)