-
Notifications
You must be signed in to change notification settings - Fork 145
Description
The latest gRPC library does unsubscribe resource if channel is not used and then re-subscribed. This not work properly when using V3DiscoveryServer with SimpleCache<>.
The question is how it should look like protocol flow in the SotW protocol variant.
Here is whats going on:
- subscribe resource "a" and "b"
- unsubscribe resource "b"
- subscribe resource "a" and "b": Here library didn't response, however the xds-client already removed "b" and expecting response. If instead of re-subscribing "b" is subscribed "c", the control-plain responds.
According https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#unsubscribing-from-resources
Unsubscribing From Resources
In the SotW protocol variants, each request must contain the full list of resource names being subscribed to in the resource_names field, so unsubscribing to a set of resources is done by sending a new request containing all resource names that are still being subscribed to but not containing the resource names being unsubscribed to. For example, if the client had previously been subscribed to resources A and B but wishes to unsubscribe from B, it must send a new request containing only resource A.
Looks like that gRPC confirms the specification at un-subscribe.
Acording to specification https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#how-the-client-specifies-what-resources-to-return
When the client sends a new request that changes the set of resources being requested, the server must resend any newly requested resources, even if it previously sent those resources without having been asked for them and the resources have not changed since that time.
Here i cannot tell. The resource "b" is not exactly new. It is already known resource but unsubscribed.
This unit test I used to see how it is implemented.
package cz.seznam.profile.xds;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.InvalidProtocolBufferException;
import io.envoyproxy.controlplane.cache.NodeGroup;
import io.envoyproxy.controlplane.cache.v3.SimpleCache;
import io.envoyproxy.controlplane.cache.v3.Snapshot;
import io.envoyproxy.controlplane.server.V3DiscoveryServer;
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
import io.envoyproxy.envoy.config.core.v3.Node;
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
import io.envoyproxy.envoy.config.listener.v3.Listener;
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.Secret;
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import io.envoyproxy.controlplane.cache.Resources;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
public class ReSubscribeTest {
static {
var rootLogger = org.apache.log4j.Logger.getRootLogger();
rootLogger.setLevel(org.apache.log4j.Level.DEBUG);
rootLogger.addAppender(new org.apache.log4j.ConsoleAppender(
new org.apache.log4j.PatternLayout("%-6r [%p] %c - %m%n")));
}
public static class StaticNodeGroup implements NodeGroup<String> {
@Override
public String hash(Node node) {
return node.getCluster();
}
}
@BeforeAll
public static void setup() throws Exception {
}
private static final String CLUSTER_NAME = "default-cluster";
static final io.envoyproxy.envoy.config.core.v3.Node node = io.envoyproxy.envoy.config.core.v3.Node.newBuilder()
.setCluster("default-cluster").setUserAgentName("gRPC C-core linux")
.setUserAgentVersion("C-core 37.0.0").addClientFeatures("envoy.lb.does_not_support_overprovisioning")
.addClientFeatures("xds.config.resource-in-sotw").build();
static final DiscoveryRequest drListener1 = DiscoveryRequest.newBuilder()
.setTypeUrl(Resources.V3.LISTENER_TYPE_URL).setNode(node)
.addResourceNames("my-echo-server")
.addResourceNames("my-echo-server2")
.build();
static final DiscoveryRequest drRoute1 = DiscoveryRequest.newBuilder()
.setTypeUrl(Resources.V3.ROUTE_TYPE_URL).setNode(node)
.addResourceNames("my-echo-server-route")
.addResourceNames("my-echo-server2-route")
.build();
static final DiscoveryRequest drCluster1 = DiscoveryRequest.newBuilder()
.setTypeUrl(Resources.V3.CLUSTER_TYPE_URL).setNode(node)
.addResourceNames("my-echo-server-cluster")
.addResourceNames("my-echo-server2-cluster")
.build();
static final DiscoveryRequest drEndpoint1 = DiscoveryRequest.newBuilder()
.setTypeUrl(Resources.V3.ENDPOINT_TYPE_URL).setNode(node)
.addResourceNames("my-echo-server-cluster")
.addResourceNames("my-echo-server2-cluster")
.build();
private static final String VERSION1 = "88eb5ecd-1e5f-484f-9522-d5be08dd166a";
@SuppressWarnings("null")
private static final Snapshot SNAPSHOT1 = Snapshot.create(
ImmutableList.of(Cluster.newBuilder().setName("my-echo-server-cluster").build(),
Cluster.newBuilder().setName("my-echo-server2-cluster").build(),
Cluster.newBuilder().setName("my-echo-server3-cluster").build()),
ImmutableList.of(ClusterLoadAssignment.newBuilder().setClusterName("my-echo-server-cluster").build(),
ClusterLoadAssignment.newBuilder().setClusterName("my-echo-server2-cluster").build(),
ClusterLoadAssignment.newBuilder().setClusterName("my-echo-server3-cluster").build()),
ImmutableList.of(Listener.newBuilder().setName("my-echo-server").build(),
Listener.newBuilder().setName("my-echo-server2").build(),
Listener.newBuilder().setName("my-echo-server3").build()),
ImmutableList.of(RouteConfiguration.newBuilder().setName("my-echo-server-route").build(),
RouteConfiguration.newBuilder().setName("my-echo-server2-route").build(),
RouteConfiguration.newBuilder().setName("my-echo-server3-route").build()),
ImmutableList.<Secret>of(),
VERSION1);
@Test
void test() throws IOException {
var cache = new SimpleCache<>(new StaticNodeGroup());
cache.setSnapshot("default-cluster", SNAPSHOT1);
var discoveryServer = new V3DiscoveryServer(cache);
final var serverName = InProcessServerBuilder.generateName();
final var server = InProcessServerBuilder
.forName(serverName).directExecutor().addService(discoveryServer.getAggregatedDiscoveryServiceImpl())
.build();
server.start();
final var stub = AggregatedDiscoveryServiceGrpc
.newStub(InProcessChannelBuilder.forName(serverName).directExecutor().build());
var xdsConfig = new XdsConfig();
final var requestObserver = stub.streamAggregatedResources(new ResponseStreamObserver(xdsConfig));
// LDS->RDS->CDS->EDS
// LDS
requestObserver.onNext(drListener1);
Assertions.assertEquals("0", xdsConfig.ldsNonce);
Assertions.assertEquals(List.of("my-echo-server", "my-echo-server2"), xdsConfig.ldsResourcesNames);
// RDS
requestObserver.onNext(drRoute1);
Assertions.assertEquals("1", xdsConfig.rdsNonce);
Assertions.assertEquals(List.of("my-echo-server-route", "my-echo-server2-route"), xdsConfig.rdsResourcesNames);
// CDS
requestObserver.onNext(drCluster1);
Assertions.assertEquals("2", xdsConfig.cdsNonce);
Assertions.assertEquals(List.of("my-echo-server-cluster", "my-echo-server2-cluster"),
xdsConfig.cdsResourcesNames);
// EDS
requestObserver.onNext(drEndpoint1);
Assertions.assertEquals("3", xdsConfig.edsNonce);
Assertions.assertEquals(List.of("my-echo-server-cluster", "my-echo-server2-cluster"),
xdsConfig.edsResourcesNames);
// ACK configuration
final var drListener2 = DiscoveryRequest.newBuilder(drListener1).setResponseNonce(xdsConfig.ldsNonce)
.setVersionInfo(xdsConfig.ldsVersionInfo).build();
final var drRoute2 = DiscoveryRequest.newBuilder(drRoute1).setResponseNonce(xdsConfig.rdsNonce)
.setVersionInfo(xdsConfig.rdsVersionInfo).build();
final var drCluster2 = DiscoveryRequest.newBuilder(drCluster1).setResponseNonce(xdsConfig.cdsNonce)
.setVersionInfo(xdsConfig.cdsVersionInfo).build();
final var drEndpoint2 = DiscoveryRequest.newBuilder(drEndpoint1).setResponseNonce(xdsConfig.edsNonce)
.setVersionInfo(xdsConfig.edsVersionInfo).build();
requestObserver.onNext(drListener2);
requestObserver.onNext(drRoute2);
requestObserver.onNext(drCluster2);
requestObserver.onNext(drEndpoint2);
// verify that no other change was received
Assertions.assertEquals("0", xdsConfig.ldsNonce);
Assertions.assertEquals("1", xdsConfig.rdsNonce);
Assertions.assertEquals("2", xdsConfig.cdsNonce);
Assertions.assertEquals("3", xdsConfig.edsNonce);
// unsubscribe one resource
System.out.println("----------------------------------------------------------------------");
System.out.println(" unsubscribe one resource ");
System.out.println("----------------------------------------------------------------------");
final var drListener3 = DiscoveryRequest.newBuilder(drListener2).clearResourceNames()
.addResourceNames("my-echo-server").build();
final var drRoute3 = DiscoveryRequest.newBuilder(drRoute2).clearResourceNames()
.addResourceNames("my-echo-server-route").build();
final var drCluster3 = DiscoveryRequest.newBuilder(drCluster2).clearResourceNames()
.addResourceNames("my-echo-server-cluster").build();
final var drEndpoint3 = DiscoveryRequest.newBuilder(drEndpoint2).clearResourceNames()
.addResourceNames("my-echo-server-cluster").build();
requestObserver.onNext(drListener3);
requestObserver.onNext(drRoute3);
requestObserver.onNext(drCluster3);
requestObserver.onNext(drEndpoint3);
// verify reply
Assertions.assertEquals("0", xdsConfig.ldsNonce);
Assertions.assertEquals("1", xdsConfig.rdsNonce);
Assertions.assertEquals("2", xdsConfig.cdsNonce);
Assertions.assertEquals("3", xdsConfig.edsNonce);
// no change was send
System.out.println("----------------------------------------------------------------------");
System.out.println(" re-subscribe the resource my-echo-server2");
System.out.println("----------------------------------------------------------------------");
final var drListener4 = DiscoveryRequest.newBuilder(drListener3)
.addResourceNames("my-echo-server2").build();
requestObserver.onNext(drListener4);
// verify reply
Assertions.assertEquals("0", xdsConfig.ldsNonce);
// no change was send!
System.out.println("----------------------------------------------------------------------");
System.out.println(" subscribe new resource my-echo-server3");
System.out.println("----------------------------------------------------------------------");
final var drListener5 = DiscoveryRequest.newBuilder(drListener3)
.addResourceNames("my-echo-server3").build();
requestObserver.onNext(drListener5);
// verify reply
Assertions.assertEquals("4", xdsConfig.ldsNonce);
Assertions.assertEquals(List.of("my-echo-server", "my-echo-server3"),
xdsConfig.ldsResourcesNames);
// only "my-echo-server3" was send
requestObserver.onCompleted();
server.shutdown();
try {
server.awaitTermination();
} catch (InterruptedException e) {
}
System.out.println("-- end --");
}
static class XdsConfig {
// The flow used in gRPC is `LDS->RDS->CDS->EDS`
String ldsVersionInfo;
String ldsNonce;
List<String> ldsResourcesNames = List.of();
String rdsVersionInfo;
String rdsNonce;
List<String> rdsResourcesNames = List.of();
String cdsVersionInfo;
String cdsNonce;
List<String> cdsResourcesNames = List.of();
String edsVersionInfo;
String edsNonce;
List<String> edsResourcesNames = List.of();
}
static class ResponseStreamObserver implements StreamObserver<DiscoveryResponse> {
XdsConfig xdsConfig;
public ResponseStreamObserver(XdsConfig xdsConfig) {
this.xdsConfig = xdsConfig;
}
@Override
public void onNext(DiscoveryResponse value) {
System.out.println(value.toString());
if (value.getTypeUrl().equals(Resources.V3.LISTENER_TYPE_URL)) {
xdsConfig.ldsVersionInfo = value.getVersionInfo();
xdsConfig.ldsNonce = value.getNonce();
var resourceNames = new ArrayList<String>();
for (var any : value.getResourcesList()) {
try {
var listener = any.unpack(Listener.class);
resourceNames.add(listener.getName());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
xdsConfig.ldsResourcesNames = resourceNames;
} else if (value.getTypeUrl().equals(Resources.V3.ROUTE_TYPE_URL)) {
xdsConfig.rdsVersionInfo = value.getVersionInfo();
xdsConfig.rdsNonce = value.getNonce();
var resourceNames = new ArrayList<String>();
for (var any : value.getResourcesList()) {
try {
var route = any.unpack(RouteConfiguration.class);
resourceNames.add(route.getName());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
xdsConfig.rdsResourcesNames = resourceNames;
} else if (value.getTypeUrl().equals(Resources.V3.CLUSTER_TYPE_URL)) {
xdsConfig.cdsVersionInfo = value.getVersionInfo();
xdsConfig.cdsNonce = value.getNonce();
var resourceNames = new ArrayList<String>();
for (var any : value.getResourcesList()) {
try {
var cluster = any.unpack(Cluster.class);
resourceNames.add(cluster.getName());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
xdsConfig.cdsResourcesNames = resourceNames;
} else if (value.getTypeUrl().equals(Resources.V3.ENDPOINT_TYPE_URL)) {
xdsConfig.edsVersionInfo = value.getVersionInfo();
xdsConfig.edsNonce = value.getNonce();
var resourceNames = new ArrayList<String>();
for (var any : value.getResourcesList()) {
try {
var cla = any.unpack(ClusterLoadAssignment.class);
resourceNames.add(cla.getClusterName());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
xdsConfig.edsResourcesNames = resourceNames;
}
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
}
}