Skip to content

Commit a6a041e

Browse files
authored
xds: Support filter state retention
This PR adds support filter state retention in Java. The mechanism will be similar to the one described in [A83] (https://github.com/grpc/proposal/blob/master/A83-xds-gcp-authn-filter.md#filter-call-credentials-cache) for C-core, and will serve the same purpose. However, the implementation details are very different due to the different nature of xDS HTTP filter support in C-core and Java. ### Filter instance lifecycle #### xDS gRPC clients New filter instances are created per combination of: 1. `XdsNameResolver` instance, 2. Filter name+typeUrl as configured in HttpConnectionManager (HCM) http_filters. Existing client-side filter instances are shutdown: - A single a filter instance is shutdown when an LDS update contains HCM that is missing filter configuration for name+typeUrl combination of this instance. - All filter instances when watched LDS resource is missing from an LDS update. - All filter instances name resolver shutdown. #### xDS-enabled gRPC servers New filter instances are created per combination of: 1. Server instance, 2. FilterChain name, 3. Filter name+typeUrl as configured in FilterChain's HCM.http_filters Filter instances of Default Filter Chain is tracked separately per: 1. Server instance, 2. Filter name+typeUrl in default_filter_chain's HCM.http_filters. Existing server-side filter instances are shutdown: - A single a filter instance is shutdown when an LDS update contains FilterChain with HCM.http_filters that is missing configuration for filter name+typeUrl. - All filter instances associated with the FilterChain when an LDS update no longer contains FilterChain's name. - All filter instances when watched LDS resource is missing from an LDS update. - All filter instances on server shutdown. ### Related - Part 1: #11883
1 parent 602aece commit a6a041e

11 files changed

+1345
-78
lines changed

xds/src/main/java/io/grpc/xds/EnvoyServerProtoData.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ public static FilterChainMatch create(int destinationPort,
206206
@AutoValue
207207
abstract static class FilterChain {
208208

209-
// possibly empty
209+
// Must be unique per server instance (except the default chain).
210210
abstract String name();
211211

212212
// TODO(sanjaypujare): flatten structure by moving FilterChainMatch class members here.

xds/src/main/java/io/grpc/xds/Filter.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.protobuf.Message;
2121
import io.grpc.ClientInterceptor;
2222
import io.grpc.ServerInterceptor;
23+
import java.io.Closeable;
2324
import java.util.Objects;
2425
import java.util.concurrent.ScheduledExecutorService;
2526
import javax.annotation.Nullable;
@@ -32,7 +33,7 @@
3233
* {@link Provider#isClientFilter()}, {@link Provider#isServerFilter()} to indicate that the filter
3334
* is capable of working on the client side or server side or both, respectively.
3435
*/
35-
interface Filter {
36+
interface Filter extends Closeable {
3637

3738
/** Represents an opaque data structure holding configuration for a filter. */
3839
interface FilterConfig {
@@ -72,6 +73,19 @@ default boolean isServerFilter() {
7273
*
7374
* <p>Returns a filter instance registered with the same typeUrls as the provider,
7475
* capable of working with the same FilterConfig type returned by provider's parse functions.
76+
*
77+
* <p>For xDS gRPC clients, new filter instances are created per combination of:
78+
* <ol>
79+
* <li><code>XdsNameResolver</code> instance,</li>
80+
* <li>Filter name+typeUrl in HttpConnectionManager (HCM) http_filters.</li>
81+
* </ol>
82+
*
83+
* <p>For xDS-enabled gRPC servers, new filter instances are created per combination of:
84+
* <ol>
85+
* <li>Server instance,</li>
86+
* <li>FilterChain name,</li>
87+
* <li>Filter name+typeUrl in FilterChain's HCM.http_filters.</li>
88+
* </ol>
7589
*/
7690
Filter newInstance();
7791

@@ -103,6 +117,14 @@ default ServerInterceptor buildServerInterceptor(
103117
return null;
104118
}
105119

120+
/**
121+
* Releases filter resources like shared resources and remote connections.
122+
*
123+
* <p>See {@link Provider#newInstance()} for details on filter instance creation.
124+
*/
125+
@Override
126+
default void close() {}
127+
106128
/** Filter config with instance name. */
107129
final class NamedFilterConfig {
108130
// filter instance name
@@ -114,6 +136,10 @@ final class NamedFilterConfig {
114136
this.filterConfig = filterConfig;
115137
}
116138

139+
String filterStateKey() {
140+
return name + "_" + filterConfig.typeUrl();
141+
}
142+
117143
@Override
118144
public boolean equals(Object o) {
119145
if (this == o) {

xds/src/main/java/io/grpc/xds/FilterChainMatchingProtocolNegotiators.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,10 @@ static final class FilterChainSelector {
151151
this.defaultRoutingConfig = checkNotNull(defaultRoutingConfig, "defaultRoutingConfig");
152152
}
153153

154+
FilterChainSelector(Map<FilterChain, AtomicReference<ServerRoutingConfig>> routingConfigs) {
155+
this(routingConfigs, null, new AtomicReference<>());
156+
}
157+
154158
@VisibleForTesting
155159
Map<FilterChain, AtomicReference<ServerRoutingConfig>> getRoutingConfigs() {
156160
return routingConfigs;

xds/src/main/java/io/grpc/xds/XdsListenerResource.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ static EnvoyServerProtoData.Listener parseServerSideListener(
178178
}
179179

180180
ImmutableList.Builder<FilterChain> filterChains = ImmutableList.builder();
181+
Set<String> filterChainNames = new HashSet<>();
181182
Set<FilterChainMatch> filterChainMatchSet = new HashSet<>();
182183
int i = 0;
183184
for (io.envoyproxy.envoy.config.listener.v3.FilterChain fc : proto.getFilterChainsList()) {
@@ -187,6 +188,10 @@ static EnvoyServerProtoData.Listener parseServerSideListener(
187188
// Generate a name, so we can identify it in the logs.
188189
filterChainName = "chain_" + i;
189190
}
191+
if (!filterChainNames.add(filterChainName)) {
192+
throw new ResourceInvalidException("Filter chain names must be unique. "
193+
+ "Found duplicate: " + filterChainName);
194+
}
190195
filterChains.add(
191196
parseFilterChain(fc, filterChainName, tlsContextManager, filterRegistry,
192197
filterChainMatchSet, certProviderInstances, args));

xds/src/main/java/io/grpc/xds/XdsNameResolver.java

Lines changed: 52 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,10 @@ final class XdsNameResolver extends NameResolver {
127127
private final ConfigSelector configSelector = new ConfigSelector();
128128
private final long randomChannelId;
129129
private final MetricRecorder metricRecorder;
130+
// Must be accessed in syncContext.
131+
// Filter instances are unique per channel, and per filter (name+typeUrl).
132+
// NamedFilterConfig.filterStateKey -> filter_instance.
133+
private final HashMap<String, Filter> activeFilters = new HashMap<>();
130134

131135
private volatile RoutingConfig routingConfig = RoutingConfig.EMPTY;
132136
private Listener2 listener;
@@ -658,18 +662,23 @@ public void onChanged(final XdsListenerResource.LdsUpdate update) {
658662
HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
659663
List<VirtualHost> virtualHosts = httpConnectionManager.virtualHosts();
660664
String rdsName = httpConnectionManager.rdsName();
665+
ImmutableList<NamedFilterConfig> filterConfigs = httpConnectionManager.httpFilterConfigs();
666+
long streamDurationNano = httpConnectionManager.httpMaxStreamDurationNano();
667+
668+
// Create/update HCM-bound state.
661669
cleanUpRouteDiscoveryState();
670+
updateActiveFilters(filterConfigs);
671+
672+
// Routes specified directly in LDS.
662673
if (virtualHosts != null) {
663-
updateRoutes(virtualHosts, httpConnectionManager.httpMaxStreamDurationNano(),
664-
httpConnectionManager.httpFilterConfigs());
665-
} else {
666-
routeDiscoveryState = new RouteDiscoveryState(
667-
rdsName, httpConnectionManager.httpMaxStreamDurationNano(),
668-
httpConnectionManager.httpFilterConfigs());
669-
logger.log(XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName);
670-
xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),
671-
rdsName, routeDiscoveryState, syncContext);
674+
updateRoutes(virtualHosts, streamDurationNano, filterConfigs);
675+
return;
672676
}
677+
// Routes provided by RDS.
678+
routeDiscoveryState = new RouteDiscoveryState(rdsName, streamDurationNano, filterConfigs);
679+
logger.log(XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName);
680+
xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),
681+
rdsName, routeDiscoveryState, syncContext);
673682
}
674683

675684
@Override
@@ -690,6 +699,7 @@ public void onResourceDoesNotExist(final String resourceName) {
690699
String error = "LDS resource does not exist: " + resourceName;
691700
logger.log(XdsLogLevel.INFO, error);
692701
cleanUpRouteDiscoveryState();
702+
updateActiveFilters(null);
693703
cleanUpRoutes(error);
694704
}
695705

@@ -703,9 +713,35 @@ private void stop() {
703713
logger.log(XdsLogLevel.INFO, "Stop watching LDS resource {0}", ldsResourceName);
704714
stopped = true;
705715
cleanUpRouteDiscoveryState();
716+
updateActiveFilters(null);
706717
xdsClient.cancelXdsResourceWatch(XdsListenerResource.getInstance(), ldsResourceName, this);
707718
}
708719

720+
// called in syncContext
721+
private void updateActiveFilters(@Nullable List<NamedFilterConfig> filterConfigs) {
722+
if (filterConfigs == null) {
723+
filterConfigs = ImmutableList.of();
724+
}
725+
Set<String> filtersToShutdown = new HashSet<>(activeFilters.keySet());
726+
for (NamedFilterConfig namedFilter : filterConfigs) {
727+
String typeUrl = namedFilter.filterConfig.typeUrl();
728+
String filterKey = namedFilter.filterStateKey();
729+
730+
Filter.Provider provider = filterRegistry.get(typeUrl);
731+
checkNotNull(provider, "provider %s", typeUrl);
732+
Filter filter = activeFilters.computeIfAbsent(filterKey, k -> provider.newInstance());
733+
checkNotNull(filter, "filter %s", filterKey);
734+
filtersToShutdown.remove(filterKey);
735+
}
736+
737+
// Shutdown filters not present in current HCM.
738+
for (String filterKey : filtersToShutdown) {
739+
Filter filterToShutdown = activeFilters.remove(filterKey);
740+
checkNotNull(filterToShutdown, "filterToShutdown %s", filterKey);
741+
filterToShutdown.close();
742+
}
743+
}
744+
709745
// called in syncContext
710746
private void updateRoutes(List<VirtualHost> virtualHosts, long httpMaxStreamDurationNano,
711747
@Nullable List<NamedFilterConfig> filterConfigs) {
@@ -836,19 +872,16 @@ private ClientInterceptor createFilters(
836872

837873
ImmutableList.Builder<ClientInterceptor> filterInterceptors = ImmutableList.builder();
838874
for (NamedFilterConfig namedFilter : filterConfigs) {
839-
FilterConfig config = namedFilter.filterConfig;
840875
String name = namedFilter.name;
841-
String typeUrl = config.typeUrl();
842-
843-
Filter.Provider provider = filterRegistry.get(typeUrl);
844-
if (provider == null || !provider.isClientFilter()) {
845-
continue;
846-
}
847-
848-
Filter filter = provider.newInstance();
876+
FilterConfig config = namedFilter.filterConfig;
877+
FilterConfig overrideConfig = selectedOverrideConfigs.get(name);
878+
String filterKey = namedFilter.filterStateKey();
849879

880+
Filter filter = activeFilters.get(filterKey);
881+
checkNotNull(filter, "activeFilters.get(%s)", filterKey);
850882
ClientInterceptor interceptor =
851-
filter.buildClientInterceptor(config, selectedOverrideConfigs.get(name), scheduler);
883+
filter.buildClientInterceptor(config, overrideConfig, scheduler);
884+
852885
if (interceptor != null) {
853886
filterInterceptors.add(interceptor);
854887
}

0 commit comments

Comments
 (0)