Skip to content

Commit 450d0b3

Browse files
jakubdyszkiewiczsnowp
authored andcommitted
server: Cache serialized protos (#98)
Signed-off-by: Jakub Dyszkiewicz <[email protected]>
1 parent b29dc60 commit 450d0b3

File tree

6 files changed

+198
-3
lines changed

6 files changed

+198
-3
lines changed

server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryServer.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import io.envoyproxy.controlplane.cache.Resources;
99
import io.envoyproxy.controlplane.cache.Response;
1010
import io.envoyproxy.controlplane.cache.Watch;
11+
import io.envoyproxy.controlplane.server.serializer.DefaultProtoResourcesSerializer;
12+
import io.envoyproxy.controlplane.server.serializer.ProtoResourcesSerializer;
1113
import io.envoyproxy.envoy.api.v2.ClusterDiscoveryServiceGrpc.ClusterDiscoveryServiceImplBase;
1214
import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
1315
import io.envoyproxy.envoy.api.v2.DiscoveryResponse;
@@ -20,6 +22,7 @@
2022
import io.grpc.StatusRuntimeException;
2123
import io.grpc.stub.ServerCallStreamObserver;
2224
import io.grpc.stub.StreamObserver;
25+
import java.util.Collection;
2326
import java.util.Collections;
2427
import java.util.List;
2528
import java.util.Map;
@@ -41,6 +44,7 @@ public class DiscoveryServer {
4144
private final ConfigWatcher configWatcher;
4245
private final ExecutorGroup executorGroup;
4346
private final AtomicLong streamCount = new AtomicLong();
47+
private final ProtoResourcesSerializer protoResourcesSerializer;
4448

4549
public DiscoveryServer(ConfigWatcher configWatcher) {
4650
this(Collections.emptyList(), configWatcher);
@@ -56,25 +60,29 @@ public DiscoveryServer(DiscoveryServerCallbacks callbacks, ConfigWatcher configW
5660
* @param configWatcher source of configuration updates
5761
*/
5862
public DiscoveryServer(List<DiscoveryServerCallbacks> callbacks, ConfigWatcher configWatcher) {
59-
this(callbacks, configWatcher, new DefaultExecutorGroup());
63+
this(callbacks, configWatcher, new DefaultExecutorGroup(), new DefaultProtoResourcesSerializer());
6064
}
6165

6266
/**
6367
* Creates the server.
6468
* @param callbacks server callbacks
6569
* @param configWatcher source of configuration updates
6670
* @param executorGroup executor group to use for responding stream requests
71+
* @param protoResourcesSerializer serializer of proto buffer messages
6772
*/
6873
public DiscoveryServer(List<DiscoveryServerCallbacks> callbacks,
6974
ConfigWatcher configWatcher,
70-
ExecutorGroup executorGroup) {
75+
ExecutorGroup executorGroup,
76+
ProtoResourcesSerializer protoResourcesSerializer) {
7177
Preconditions.checkNotNull(callbacks, "callbacks cannot be null");
7278
Preconditions.checkNotNull(configWatcher, "configWatcher cannot be null");
7379
Preconditions.checkNotNull(executorGroup, "executorGroup cannot be null");
80+
Preconditions.checkNotNull(protoResourcesSerializer, "protoResourcesSerializer cannot be null");
7481

7582
this.callbacks = callbacks;
7683
this.configWatcher = configWatcher;
7784
this.executorGroup = executorGroup;
85+
this.protoResourcesSerializer = protoResourcesSerializer;
7886
}
7987

8088
/**
@@ -306,9 +314,10 @@ private void cancel() {
306314
private void send(Response response, String typeUrl) {
307315
String nonce = Long.toString(streamNonce.getAndIncrement());
308316

317+
Collection<Any> resources = protoResourcesSerializer.serialize(response.resources());
309318
DiscoveryResponse discoveryResponse = DiscoveryResponse.newBuilder()
310319
.setVersionInfo(response.version())
311-
.addAllResources(response.resources().stream().map(Any::pack).collect(Collectors.toList()))
320+
.addAllResources(resources)
312321
.setTypeUrl(typeUrl)
313322
.setNonce(nonce)
314323
.build();
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package io.envoyproxy.controlplane.server.serializer;
2+
3+
import com.google.common.cache.Cache;
4+
import com.google.common.cache.CacheBuilder;
5+
import com.google.protobuf.Any;
6+
import com.google.protobuf.Message;
7+
import io.envoyproxy.envoy.api.v2.DiscoveryResponse;
8+
import java.util.concurrent.ExecutionException;
9+
10+
/**
11+
* Cached version of the {@link ProtoResourcesSerializer}. It uses Guava Cache with weak values to store the serialized
12+
* messages. The weak values are used so it's possible to share the same proto instance between snapshots that are kept
13+
* in the memory. The improvement especially visible when the same message is send to multiple Envoys.
14+
* The message is then only serialized once as long as it's referenced anywhere else.
15+
* The same instance is used not only between snapshots for different groups but also between subsequent snapshots
16+
* for the same group, because the last serialized proto instance of {@link DiscoveryResponse} is set in
17+
* DiscoveryRequestStreamObserver#latestResponse.
18+
*/
19+
public class CachedProtoResourcesSerializer implements ProtoResourcesSerializer {
20+
21+
private static final Cache<Message, Any> cache = CacheBuilder.newBuilder()
22+
.weakValues()
23+
.build();
24+
25+
/**
26+
* {@inheritDoc}
27+
*/
28+
@Override
29+
public Any serialize(Message resource) {
30+
try {
31+
return cache.get(resource, () -> Any.pack(resource));
32+
} catch (ExecutionException e) {
33+
throw new ProtoSerializerException("Error while serializing resources", e);
34+
}
35+
}
36+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package io.envoyproxy.controlplane.server.serializer;
2+
3+
import com.google.protobuf.Any;
4+
import com.google.protobuf.Message;
5+
6+
/**
7+
* Default implementation of ProtoResourcesSerializer that uses {@link Any#pack(Message)} method on {@link Message}.
8+
*/
9+
public class DefaultProtoResourcesSerializer implements ProtoResourcesSerializer {
10+
11+
/**
12+
* {@inheritDoc}
13+
*/
14+
@Override
15+
public Any serialize(Message resource) {
16+
return Any.pack(resource);
17+
}
18+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.envoyproxy.controlplane.server.serializer;
2+
3+
import com.google.protobuf.Any;
4+
import com.google.protobuf.Message;
5+
6+
import java.util.Collection;
7+
import java.util.stream.Collectors;
8+
9+
/**
10+
* Serializer of the proto buffers resource messages.
11+
*/
12+
public interface ProtoResourcesSerializer {
13+
14+
/**
15+
* Serialize messages to proto buffers.
16+
* @param resources list of resources to serialize
17+
* @return serialized resources
18+
*/
19+
default Collection<Any> serialize(Collection<? extends Message> resources) {
20+
return resources.stream().map(this::serialize).collect(Collectors.toList());
21+
}
22+
23+
/**
24+
* Serialize message to proto buffers.
25+
* @param resource the resource to serialize
26+
* @return serialized resource
27+
*/
28+
Any serialize(Message resource);
29+
30+
class ProtoSerializerException extends RuntimeException {
31+
ProtoSerializerException(String message, Throwable cause) {
32+
super(message, cause);
33+
}
34+
}
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package io.envoyproxy.controlplane.server.serializer;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import com.google.common.collect.Lists;
6+
import com.google.protobuf.Any;
7+
import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment;
8+
import java.util.Collection;
9+
import java.util.List;
10+
import org.junit.Test;
11+
12+
public class CachedProtoResourcesSerializerTest {
13+
14+
CachedProtoResourcesSerializer serializer = new CachedProtoResourcesSerializer();
15+
16+
@Test
17+
public void shouldKeepCachedProtoWhenSerializingSameMessage() {
18+
ClusterLoadAssignment endpoint = ClusterLoadAssignment.newBuilder()
19+
.setClusterName("service1")
20+
.build();
21+
22+
Any serializedEndpoint = serializer.serialize(endpoint);
23+
Any serializedSameEndpoint = serializer.serialize(endpoint);
24+
25+
assertThat(serializedEndpoint).isSameAs(serializedSameEndpoint);
26+
}
27+
28+
@Test
29+
public void shouldKeepCachedProtoWhenSerializingSameMessages() {
30+
List<ClusterLoadAssignment> endpoints = Lists.newArrayList(
31+
ClusterLoadAssignment.newBuilder()
32+
.setClusterName("service1")
33+
.build(),
34+
ClusterLoadAssignment.newBuilder()
35+
.setClusterName("service2")
36+
.build()
37+
);
38+
39+
Collection<Any> serializedEndpoints = serializer.serialize(endpoints);
40+
Collection<Any> serializedSameEndpoints = serializer.serialize(endpoints);
41+
42+
assertThat(serializedEndpoints).isEqualTo(serializedSameEndpoints);
43+
assertThat(serializedEndpoints).isNotSameAs(serializedSameEndpoints);
44+
assertThat(serializedEndpoints) // elements are the same instances
45+
.usingElementComparator((x, y) -> x == y ? 0 : 1)
46+
.hasSameElementsAs(serializedSameEndpoints);
47+
}
48+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package io.envoyproxy.controlplane.server.serializer;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import com.google.common.collect.Lists;
6+
import com.google.protobuf.Any;
7+
import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment;
8+
import java.util.Collection;
9+
import java.util.List;
10+
import org.junit.Test;
11+
12+
public class DefaultProtoResourcesSerializerTest {
13+
14+
DefaultProtoResourcesSerializer serializer = new DefaultProtoResourcesSerializer();
15+
16+
@Test
17+
public void shouldReturnDifferentInstanceOfSerializedProtoWhenResourcesAreTheSame() {
18+
ClusterLoadAssignment endpoint = ClusterLoadAssignment.newBuilder()
19+
.setClusterName("service1")
20+
.build();
21+
22+
Any serializedEndpoint = serializer.serialize(endpoint);
23+
Any serializedSameEndpoint = serializer.serialize(endpoint);
24+
25+
assertThat(serializedEndpoint).isEqualTo(serializedSameEndpoint);
26+
assertThat(serializedEndpoint).isNotSameAs(serializedSameEndpoint);
27+
}
28+
29+
@Test
30+
public void shouldReturnDifferentInstancesOfSerializedProtoWhenResourcesAreTheSame() {
31+
List<ClusterLoadAssignment> endpoints = Lists.newArrayList(
32+
ClusterLoadAssignment.newBuilder()
33+
.setClusterName("service1")
34+
.build(),
35+
ClusterLoadAssignment.newBuilder()
36+
.setClusterName("service2")
37+
.build()
38+
);
39+
40+
Collection<Any> serializedEndpoints = serializer.serialize(endpoints);
41+
Collection<Any> serializedSameEndpoints = serializer.serialize(endpoints);
42+
43+
assertThat(serializedEndpoints).isEqualTo(serializedSameEndpoints);
44+
assertThat(serializedEndpoints).isNotSameAs(serializedSameEndpoints);
45+
assertThat(serializedEndpoints) // elements are not the same instances
46+
.usingElementComparator((x, y) -> x == y ? 0 : 1)
47+
.doesNotContainAnyElementsOf(serializedSameEndpoints);
48+
}
49+
}

0 commit comments

Comments
 (0)