Skip to content

Commit f54d234

Browse files
slonkasschepensmgajdawookieJenvoy-bot
authored
Delta XDS - improved (#166)
* delta xds Signed-off-by: Sebastian Schepens <[email protected]> Signed-off-by: slonka <[email protected]> * fixes Signed-off-by: Sebastian Schepens <[email protected]> Signed-off-by: slonka <[email protected]> * method naming Signed-off-by: Sebastian Schepens <[email protected]> Signed-off-by: slonka <[email protected]> * Building version of non-breaking changes Signed-off-by: slonka <[email protected]> * Get integration tests passing Signed-off-by: slonka <[email protected]> * Add delta tests back Signed-off-by: slonka <[email protected]> * Rewrite V3DiscoveryServerAdsDeltaResourcesIT to have a 2 second poll delay to account for potential delays in writing to the wire (we don't want that to happen) Signed-off-by: slonka <[email protected]> * Add comment to trigger PR build Signed-off-by: slonka <[email protected]> * Get V3DiscoveryServerXdsDeltaResourcesIT passing Signed-off-by: slonka <[email protected]> * Update protobuf to envoy 1.16.0 (#155) * Update protobuf to envoy 1.16.0 Signed-off-by: Lukasz Jedryczka <[email protected]> * Fixing test by setting -boostrap-version 2 flag Signed-off-by: Lukasz Jedryczka <[email protected]> * Update protobuf to envoy 1.16.0 Signed-off-by: Lukasz Jedryczka <[email protected]> * Information about update envoy image version in README.md Signed-off-by: wookieJ <[email protected]> Signed-off-by: slonka <[email protected]> * release: prepare release v0.1.25 Signed-off-by: slonka <[email protected]> * release: prepare for next development iteration Signed-off-by: slonka <[email protected]> * release: prepare release v0.1.26 Signed-off-by: slonka <[email protected]> * release: prepare for next development iteration Signed-off-by: slonka <[email protected]> * Bump nexus release plugin timeout to 20 minutes (#156) Signed-off-by: slonka <[email protected]> * release: prepare release v0.1.27 Signed-off-by: slonka <[email protected]> * release: prepare for next development iteration Signed-off-by: slonka <[email protected]> * ci: fixes javadoc and jacoco plugin issues (#158) Signed-off-by: karthik <[email protected]> Signed-off-by: slonka <[email protected]> * Update api to v1 17 (#159) * Update protobuf to envoy 1.17 Signed-off-by: Lukasz Dziedziak <[email protected]> * Use v2 version Signed-off-by: Lukasz Dziedziak <[email protected]> * Support V2/V3 in Envoy - remove V2 in separate PR Signed-off-by: Lukasz Dziedziak <[email protected]> * UDPA download - split directory create/copy Signed-off-by: Lukasz Dziedziak <[email protected]> Signed-off-by: slonka <[email protected]> * Fix references to main branch after rename (#160) Signed-off-by: slonka <[email protected]> * release: prepare release v0.1.28 Signed-off-by: slonka <[email protected]> * release: prepare for next development iteration Signed-off-by: slonka <[email protected]> * Change version to 0.1.29-delta-xds-slonka-SNAPSHOT Signed-off-by: slonka <[email protected]> * Revert ads configs to main Signed-off-by: slonka <[email protected]> * Hash bytes array not string Signed-off-by: slonka <[email protected]> * Remove unused import Signed-off-by: slonka <[email protected]> * Revert snapshot name Signed-off-by: slonka <[email protected]> * Remove respondDeltaTracked since it's not used anywhere Signed-off-by: slonka <[email protected]> * Delta xds non breaking hash bytes refactor (#181) * refactor delta xds Signed-off-by: radoslaw.chrzanowski <[email protected]> * refactor setDeltaWatch and remove code duplication Signed-off-by: radoslaw.chrzanowski <[email protected]> * resolve TODO in SnapshotTest Signed-off-by: radoslaw.chrzanowski <[email protected]> * Change version to 0.1.29-delta-xds-slonka-SNAPSHOT Signed-off-by: radoslaw.chrzanowski <[email protected]> * DEPLOY_BRANCH added for snapshot deploy Signed-off-by: radoslaw.chrzanowski <[email protected]> * changes after merge master with new envoy api Signed-off-by: radoslaw.chrzanowski <[email protected]> * make CacheStatusInfoAggregator public Signed-off-by: radoslaw.chrzanowski <[email protected]> * make GroupCacheStatusInfo and MutableStatusInfo public Signed-off-by: radoslaw.chrzanowski <[email protected]> * refactor SimpleCache Signed-off-by: radoslaw.chrzanowski <[email protected]> * create resources map only once Signed-off-by: radoslaw.chrzanowski <[email protected]> * build snapshot with improved performance Signed-off-by: radoslaw.chrzanowski <[email protected]> * refactor creating snapshot resources due to performance improvements Signed-off-by: radoslaw.chrzanowski <[email protected]> * create hash version from string Signed-off-by: radoslaw.chrzanowski <[email protected]> * remove custom snapshot version and deploy branch env Signed-off-by: radoslaw.chrzanowski <[email protected]> * fix test after merge Signed-off-by: radoslaw.chrzanowski <[email protected]> * fix V3DeltaDiscoveryServerCallbacks description Signed-off-by: radoslaw.chrzanowski <[email protected]> Signed-off-by: Sebastian Schepens <[email protected]> Signed-off-by: slonka <[email protected]> Signed-off-by: wookieJ <[email protected]> Signed-off-by: karthik <[email protected]> Signed-off-by: Lukasz Dziedziak <[email protected]> Signed-off-by: radoslaw.chrzanowski <[email protected]> Co-authored-by: Sebastian Schepens <[email protected]> Co-authored-by: mgajda <[email protected]> Co-authored-by: Łukasz Jędryczka <[email protected]> Co-authored-by: envoy-bot <[email protected]> Co-authored-by: Karthik Ram <[email protected]> Co-authored-by: Łukasz Dziedziak <[email protected]> Co-authored-by: Radek Chrzanowski <[email protected]> Co-authored-by: radoslaw.chrzanowski <[email protected]>
1 parent 76383ff commit f54d234

File tree

48 files changed

+2418
-447
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+2418
-447
lines changed
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package io.envoyproxy.controlplane.cache;
2+
3+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
4+
import java.util.function.Consumer;
5+
6+
public abstract class AbstractWatch<V, T> {
7+
8+
private static final AtomicIntegerFieldUpdater<AbstractWatch> isCancelledUpdater =
9+
AtomicIntegerFieldUpdater.newUpdater(AbstractWatch.class, "isCancelled");
10+
private final V request;
11+
private final Consumer<T> responseConsumer;
12+
private volatile int isCancelled = 0;
13+
private Runnable stop;
14+
15+
/**
16+
* Construct a watch.
17+
*
18+
* @param request the original request for the watch
19+
* @param responseConsumer handler for outgoing response messages
20+
*/
21+
public AbstractWatch(V request, Consumer<T> responseConsumer) {
22+
this.request = request;
23+
this.responseConsumer = responseConsumer;
24+
}
25+
26+
/**
27+
* Cancel the watch. A watch must be cancelled in order to complete its resource stream and free resources. Cancel
28+
* may be called multiple times, with each subsequent call being a no-op.
29+
*/
30+
public void cancel() {
31+
if (isCancelledUpdater.compareAndSet(this, 0, 1)) {
32+
if (stop != null) {
33+
stop.run();
34+
}
35+
}
36+
}
37+
38+
/**
39+
* Returns boolean indicating whether or not the watch has been cancelled.
40+
*/
41+
public boolean isCancelled() {
42+
return isCancelledUpdater.get(this) == 1;
43+
}
44+
45+
/**
46+
* Returns the original request for the watch.
47+
*/
48+
public V request() {
49+
return request;
50+
}
51+
52+
/**
53+
* Sends the given response to the watch's response handler.
54+
*
55+
* @param response the response to be handled
56+
* @throws WatchCancelledException if the watch has already been cancelled
57+
*/
58+
public void respond(T response) throws WatchCancelledException {
59+
if (isCancelled()) {
60+
throw new WatchCancelledException();
61+
}
62+
63+
responseConsumer.accept(response);
64+
}
65+
66+
/**
67+
* Sets the callback method to be executed when the watch is cancelled. Even if cancel is executed multiple times, it
68+
* ensures that this stop callback is only executed once.
69+
*/
70+
public void setStop(Runnable stop) {
71+
this.stop = stop;
72+
}
73+
}

cache/src/main/java/io/envoyproxy/controlplane/cache/Cache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,5 @@ public interface Cache<T> extends ConfigWatcher {
2020
*
2121
* @param group the node group whose status is being fetched
2222
*/
23-
StatusInfo statusInfo(T group);
23+
StatusInfo<T> statusInfo(T group);
2424
}
Lines changed: 2 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -1,94 +1,14 @@
11
package io.envoyproxy.controlplane.cache;
22

3-
import com.google.common.collect.ImmutableSet;
4-
import java.util.Set;
5-
import java.util.concurrent.ConcurrentHashMap;
6-
import java.util.concurrent.ConcurrentMap;
7-
import java.util.function.BiFunction;
83
import javax.annotation.concurrent.ThreadSafe;
94

105
/**
116
* {@code CacheStatusInfo} provides a default implementation of {@link StatusInfo} for use in {@link Cache}
127
* implementations.
138
*/
149
@ThreadSafe
15-
public class CacheStatusInfo<T> implements StatusInfo<T> {
16-
17-
private final T nodeGroup;
18-
19-
private final ConcurrentMap<Long, Watch> watches = new ConcurrentHashMap<>();
20-
private volatile long lastWatchRequestTime;
21-
10+
public class CacheStatusInfo<T> extends MutableStatusInfo<T, Watch> {
2211
public CacheStatusInfo(T nodeGroup) {
23-
this.nodeGroup = nodeGroup;
24-
}
25-
26-
/**
27-
* {@inheritDoc}
28-
*/
29-
@Override
30-
public long lastWatchRequestTime() {
31-
return lastWatchRequestTime;
32-
}
33-
34-
/**
35-
* {@inheritDoc}
36-
*/
37-
@Override
38-
public T nodeGroup() {
39-
return nodeGroup;
40-
}
41-
42-
/**
43-
* {@inheritDoc}
44-
*/
45-
@Override
46-
public int numWatches() {
47-
return watches.size();
48-
}
49-
50-
/**
51-
* Removes the given watch from the tracked collection of watches.
52-
*
53-
* @param watchId the ID for the watch that should be removed
54-
*/
55-
public void removeWatch(long watchId) {
56-
watches.remove(watchId);
57-
}
58-
59-
/**
60-
* Sets the timestamp of the last discovery watch request.
61-
*
62-
* @param lastWatchRequestTime the latest watch request timestamp
63-
*/
64-
public void setLastWatchRequestTime(long lastWatchRequestTime) {
65-
this.lastWatchRequestTime = lastWatchRequestTime;
66-
}
67-
68-
/**
69-
* Adds the given watch to the tracked collection of watches.
70-
*
71-
* @param watchId the ID for the watch that should be added
72-
* @param watch the watch that should be added
73-
*/
74-
public void setWatch(long watchId, Watch watch) {
75-
watches.put(watchId, watch);
76-
}
77-
78-
/**
79-
* Returns the set of IDs for all watched currently being tracked.
80-
*/
81-
public Set<Long> watchIds() {
82-
return ImmutableSet.copyOf(watches.keySet());
83-
}
84-
85-
/**
86-
* Iterate over all tracked watches and execute the given function. If it returns {@code true}, then the watch is
87-
* removed from the tracked collection. If it returns {@code false}, then the watch is not removed.
88-
*
89-
* @param filter the function to execute on each watch
90-
*/
91-
public void watchesRemoveIf(BiFunction<Long, Watch, Boolean> filter) {
92-
watches.entrySet().removeIf(entry -> filter.apply(entry.getKey(), entry.getValue()));
12+
super(nodeGroup);
9313
}
9414
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package io.envoyproxy.controlplane.cache;
2+
3+
import java.util.Collection;
4+
import java.util.Map;
5+
import java.util.concurrent.ConcurrentHashMap;
6+
import java.util.concurrent.ConcurrentMap;
7+
import java.util.stream.Collectors;
8+
import java.util.stream.Stream;
9+
10+
public class CacheStatusInfoAggregator<T> {
11+
private final ConcurrentMap<T, ConcurrentMap<Resources.ResourceType, CacheStatusInfo<T>>> statuses =
12+
new ConcurrentHashMap<>();
13+
private final ConcurrentMap<T, ConcurrentMap<Resources.ResourceType, DeltaCacheStatusInfo<T>>> deltaStatuses =
14+
new ConcurrentHashMap<>();
15+
16+
public Collection<T> groups() {
17+
return Stream.concat(statuses.keySet().stream(), deltaStatuses.keySet().stream()).collect(Collectors.toSet());
18+
}
19+
20+
public void remove(T group) {
21+
statuses.remove(group);
22+
deltaStatuses.remove(group);
23+
}
24+
25+
/**
26+
* Returns map of delta status infos for group identifier.
27+
*
28+
* @param group group identifier.
29+
*/
30+
public Map<Resources.ResourceType, DeltaCacheStatusInfo<T>> getDeltaStatus(T group) {
31+
return deltaStatuses.getOrDefault(group, new ConcurrentHashMap<>());
32+
}
33+
34+
/**
35+
* Returns map of status infos for group identifier.
36+
*
37+
* @param group group identifier.
38+
*/
39+
public Map<Resources.ResourceType, CacheStatusInfo<T>> getStatus(T group) {
40+
return statuses.getOrDefault(group, new ConcurrentHashMap<>());
41+
}
42+
43+
/**
44+
* Check if statuses for specific group have any watcher.
45+
*
46+
* @param group group identifier.
47+
* @return true if statuses for specific group have any watcher.
48+
*/
49+
public boolean hasStatuses(T group) {
50+
Map<Resources.ResourceType, CacheStatusInfo<T>> status = getStatus(group);
51+
Map<Resources.ResourceType, DeltaCacheStatusInfo<T>> deltaStatus = getDeltaStatus(group);
52+
return status.values().stream().mapToLong(CacheStatusInfo::numWatches).sum()
53+
+ deltaStatus.values().stream().mapToLong(DeltaCacheStatusInfo::numWatches).sum() > 0;
54+
}
55+
56+
/**
57+
* Returns delta status info for group identifier and creates new one if it doesn't exist.
58+
*
59+
* @param group group identifier.
60+
* @param resourceType resource type.
61+
*/
62+
public DeltaCacheStatusInfo<T> getOrAddDeltaStatusInfo(T group, Resources.ResourceType resourceType) {
63+
return deltaStatuses.computeIfAbsent(group, g -> new ConcurrentHashMap<>())
64+
.computeIfAbsent(resourceType, s -> new DeltaCacheStatusInfo<>(group));
65+
}
66+
67+
/**
68+
* Returns status info for group identifier and creates new one if it doesn't exist.
69+
*
70+
* @param group group identifier.
71+
* @param resourceType resource type.
72+
*/
73+
public CacheStatusInfo<T> getOrAddStatusInfo(T group, Resources.ResourceType resourceType) {
74+
return statuses.computeIfAbsent(group, g -> new ConcurrentHashMap<>())
75+
.computeIfAbsent(resourceType, s -> new CacheStatusInfo<>(group));
76+
}
77+
}

cache/src/main/java/io/envoyproxy/controlplane/cache/ConfigWatcher.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.envoyproxy.controlplane.cache;
22

3+
import java.util.Map;
34
import java.util.Set;
45
import java.util.function.Consumer;
56
import javax.annotation.concurrent.ThreadSafe;
@@ -28,4 +29,25 @@ Watch createWatch(
2829
Set<String> knownResourceNames,
2930
Consumer<Response> responseConsumer,
3031
boolean hasClusterChanged);
32+
33+
/**
34+
* Returns a new configuration resource {@link Watch} for the given discovery request.
35+
*
36+
* @param request the discovery request (node, names, etc.) to use to generate the watch
37+
* @param requesterVersion the last version applied by the requester
38+
* @param resourceVersions resources that are already known to the requester
39+
* @param pendingResources resources that the caller is waiting for
40+
* @param isWildcard indicates if the stream is in wildcard mode
41+
* @param responseConsumer the response handler, used to process outgoing response messages
42+
* @param hasClusterChanged indicates if EDS should be sent immediately, even if version has not been changed.
43+
* Supported in ADS mode.
44+
*/
45+
DeltaWatch createDeltaWatch(
46+
DeltaXdsRequest request,
47+
String requesterVersion,
48+
Map<String, String> resourceVersions,
49+
Set<String> pendingResources,
50+
boolean isWildcard,
51+
Consumer<DeltaResponse> responseConsumer,
52+
boolean hasClusterChanged);
3153
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package io.envoyproxy.controlplane.cache;
2+
3+
public class DeltaCacheStatusInfo<T> extends MutableStatusInfo<T, DeltaWatch> {
4+
5+
public DeltaCacheStatusInfo(T nodeGroup) {
6+
super(nodeGroup);
7+
}
8+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package io.envoyproxy.controlplane.cache;
2+
3+
import com.google.auto.value.AutoValue;
4+
import com.google.protobuf.Message;
5+
import java.util.List;
6+
import java.util.Map;
7+
8+
/**
9+
* {@code Response} is a data class that contains the response for an assumed configuration type.
10+
*/
11+
@AutoValue
12+
public abstract class DeltaResponse {
13+
14+
public static DeltaResponse create(DeltaXdsRequest request,
15+
Map<String, VersionedResource<?>> resources,
16+
List<String> removedResources,
17+
String version) {
18+
return new AutoValue_DeltaResponse(request, resources, removedResources, version);
19+
}
20+
21+
/**
22+
* Returns the original request associated with the response.
23+
*/
24+
public abstract DeltaXdsRequest request();
25+
26+
/**
27+
* Returns the resources to include in the response.
28+
*/
29+
public abstract Map<String, VersionedResource<? extends Message>> resources();
30+
31+
/**
32+
* Returns the removed resources to include in the response.
33+
*/
34+
public abstract List<String> removedResources();
35+
36+
/**
37+
* Returns the version of the resources as tracked by the cache for the given type. Envoy responds with this version
38+
* as an acknowledgement.
39+
*/
40+
public abstract String version();
41+
}

0 commit comments

Comments
 (0)