Skip to content

Commit 93293e3

Browse files
committed
Delta xds non breaking hash bytes refactor (#181)
* refactor delta xds Signed-off-by: radoslaw.chrzanowski <[email protected]> * refactor setDeltaWatch and remove code duplication Signed-off-by: radoslaw.chrzanowski <[email protected]> * resolve TODO in SnapshotTest Signed-off-by: radoslaw.chrzanowski <[email protected]>
1 parent e6e22eb commit 93293e3

File tree

12 files changed

+408
-475
lines changed

12 files changed

+408
-475
lines changed
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package io.envoyproxy.controlplane.cache;
2+
3+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
4+
import java.util.function.Consumer;
5+
6+
public abstract class AbstractWatch<V, T> {
7+
8+
private static final AtomicIntegerFieldUpdater<AbstractWatch> isCancelledUpdater =
9+
AtomicIntegerFieldUpdater.newUpdater(AbstractWatch.class, "isCancelled");
10+
private final V request;
11+
private final Consumer<T> responseConsumer;
12+
private volatile int isCancelled = 0;
13+
private Runnable stop;
14+
15+
/**
16+
* Construct a watch.
17+
*
18+
* @param request the original request for the watch
19+
* @param responseConsumer handler for outgoing response messages
20+
*/
21+
public AbstractWatch(V request, Consumer<T> responseConsumer) {
22+
this.request = request;
23+
this.responseConsumer = responseConsumer;
24+
}
25+
26+
/**
27+
* Cancel the watch. A watch must be cancelled in order to complete its resource stream and free resources. Cancel
28+
* may be called multiple times, with each subsequent call being a no-op.
29+
*/
30+
public void cancel() {
31+
if (isCancelledUpdater.compareAndSet(this, 0, 1)) {
32+
if (stop != null) {
33+
stop.run();
34+
}
35+
}
36+
}
37+
38+
/**
39+
* Returns boolean indicating whether or not the watch has been cancelled.
40+
*/
41+
public boolean isCancelled() {
42+
return isCancelledUpdater.get(this) == 1;
43+
}
44+
45+
/**
46+
* Returns the original request for the watch.
47+
*/
48+
public V request() {
49+
return request;
50+
}
51+
52+
/**
53+
* Sends the given response to the watch's response handler.
54+
*
55+
* @param response the response to be handled
56+
* @throws WatchCancelledException if the watch has already been cancelled
57+
*/
58+
public void respond(T response) throws WatchCancelledException {
59+
if (isCancelled()) {
60+
throw new WatchCancelledException();
61+
}
62+
63+
responseConsumer.accept(response);
64+
}
65+
66+
/**
67+
* Sets the callback method to be executed when the watch is cancelled. Even if cancel is executed multiple times, it
68+
* ensures that this stop callback is only executed once.
69+
*/
70+
public void setStop(Runnable stop) {
71+
this.stop = stop;
72+
}
73+
}

cache/src/main/java/io/envoyproxy/controlplane/cache/Cache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,5 @@ public interface Cache<T> extends ConfigWatcher {
2020
*
2121
* @param group the node group whose status is being fetched
2222
*/
23-
StatusInfo statusInfo(T group);
23+
StatusInfo<T> statusInfo(T group);
2424
}
Lines changed: 2 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -1,152 +1,14 @@
11
package io.envoyproxy.controlplane.cache;
22

3-
import com.google.common.collect.ImmutableSet;
4-
import java.util.Set;
5-
import java.util.concurrent.ConcurrentHashMap;
6-
import java.util.concurrent.ConcurrentMap;
7-
import java.util.function.BiFunction;
83
import javax.annotation.concurrent.ThreadSafe;
94

105
/**
116
* {@code CacheStatusInfo} provides a default implementation of {@link StatusInfo} for use in {@link Cache}
127
* implementations.
138
*/
149
@ThreadSafe
15-
public class CacheStatusInfo<T> implements StatusInfo<T> {
16-
17-
private final T nodeGroup;
18-
19-
private final ConcurrentMap<Long, Watch> watches = new ConcurrentHashMap<>();
20-
private final ConcurrentMap<Long, DeltaWatch> deltaWatches = new ConcurrentHashMap<>();
21-
private volatile long lastWatchRequestTime;
22-
private volatile long lastDeltaWatchRequestTime;
23-
10+
public class CacheStatusInfo<T> extends MutableStatusInfo<T, Watch> {
2411
public CacheStatusInfo(T nodeGroup) {
25-
this.nodeGroup = nodeGroup;
26-
}
27-
28-
/**
29-
* {@inheritDoc}
30-
*/
31-
@Override
32-
public long lastWatchRequestTime() {
33-
return lastWatchRequestTime;
34-
}
35-
36-
@Override
37-
public long lastDeltaWatchRequestTime() {
38-
return lastDeltaWatchRequestTime;
39-
}
40-
41-
/**
42-
* {@inheritDoc}
43-
*/
44-
@Override
45-
public T nodeGroup() {
46-
return nodeGroup;
47-
}
48-
49-
/**
50-
* {@inheritDoc}
51-
*/
52-
@Override
53-
public int numWatches() {
54-
return watches.size();
55-
}
56-
57-
@Override
58-
public int numDeltaWatches() {
59-
return deltaWatches.size();
60-
}
61-
62-
/**
63-
* Removes the given watch from the tracked collection of watches.
64-
*
65-
* @param watchId the ID for the watch that should be removed
66-
*/
67-
public void removeWatch(long watchId) {
68-
watches.remove(watchId);
69-
}
70-
71-
/**
72-
* Removes the given delta watch from the tracked collection of watches.
73-
*
74-
* @param watchId the ID for the delta watch that should be removed
75-
*/
76-
public void removeDeltaWatch(long watchId) {
77-
deltaWatches.remove(watchId);
78-
}
79-
80-
/**
81-
* Sets the timestamp of the last discovery watch request.
82-
*
83-
* @param lastWatchRequestTime the latest watch request timestamp
84-
*/
85-
public void setLastWatchRequestTime(long lastWatchRequestTime) {
86-
this.lastWatchRequestTime = lastWatchRequestTime;
87-
}
88-
89-
/**
90-
* Sets the timestamp of the last discovery delta watch request.
91-
*
92-
* @param lastDeltaWatchRequestTime the latest delta watch request timestamp
93-
*/
94-
public void setLastDeltaWatchRequestTime(long lastDeltaWatchRequestTime) {
95-
this.lastDeltaWatchRequestTime = lastDeltaWatchRequestTime;
96-
}
97-
98-
/**
99-
* Adds the given watch to the tracked collection of watches.
100-
*
101-
* @param watchId the ID for the watch that should be added
102-
* @param watch the watch that should be added
103-
*/
104-
public void setWatch(long watchId, Watch watch) {
105-
watches.put(watchId, watch);
106-
}
107-
108-
/**
109-
* Adds the given watch to the tracked collection of watches.
110-
*
111-
* @param watchId the ID for the watch that should be added
112-
* @param watch the watch that should be added
113-
*/
114-
public void setDeltaWatch(long watchId, DeltaWatch watch) {
115-
deltaWatches.put(watchId, watch);
116-
}
117-
118-
/**
119-
* Returns the set of IDs for all watched currently being tracked.
120-
*/
121-
public Set<Long> watchIds() {
122-
return ImmutableSet.copyOf(watches.keySet());
123-
}
124-
125-
/**
126-
* Returns the set of IDs for all watched currently being tracked.
127-
*/
128-
public Set<Long> deltaWatchIds() {
129-
return ImmutableSet.copyOf(deltaWatches.keySet());
130-
}
131-
132-
/**
133-
* Iterate over all tracked watches and execute the given function. If it returns {@code true}, then the watch is
134-
* removed from the tracked collection. If it returns {@code false}, then the watch is not removed.
135-
*
136-
* @param filter the function to execute on each watch
137-
*/
138-
public void watchesRemoveIf(BiFunction<Long, Watch, Boolean> filter) {
139-
watches.entrySet().removeIf(entry -> filter.apply(entry.getKey(), entry.getValue()));
140-
}
141-
142-
/**
143-
* Iterate over all tracked delta watches and execute the given function. If it returns {@code true},
144-
* then the watch is removed from the tracked collection. If it returns {@code false}, then
145-
* the watch is not removed.
146-
*
147-
* @param filter the function to execute on each delta watch
148-
*/
149-
public void deltaWatchesRemoveIf(BiFunction<Long, DeltaWatch, Boolean> filter) {
150-
deltaWatches.entrySet().removeIf(entry -> filter.apply(entry.getKey(), entry.getValue()));
12+
super(nodeGroup);
15113
}
15214
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package io.envoyproxy.controlplane.cache;
2+
3+
import java.util.Collection;
4+
import java.util.Map;
5+
import java.util.concurrent.ConcurrentHashMap;
6+
import java.util.concurrent.ConcurrentMap;
7+
import java.util.stream.Collectors;
8+
import java.util.stream.Stream;
9+
10+
class CacheStatusInfoAggregator<T> {
11+
private final ConcurrentMap<T, ConcurrentMap<Resources.ResourceType, CacheStatusInfo<T>>> statuses =
12+
new ConcurrentHashMap<>();
13+
private final ConcurrentMap<T, ConcurrentMap<Resources.ResourceType, DeltaCacheStatusInfo<T>>> deltaStatuses =
14+
new ConcurrentHashMap<>();
15+
16+
public Collection<T> groups() {
17+
return Stream.concat(statuses.keySet().stream(), deltaStatuses.keySet().stream()).collect(Collectors.toSet());
18+
}
19+
20+
void remove(T group) {
21+
statuses.remove(group);
22+
deltaStatuses.remove(group);
23+
}
24+
25+
/**
26+
* Returns map of delta status infos for group identifier.
27+
*
28+
* @param group group identifier.
29+
*/
30+
Map<Resources.ResourceType, DeltaCacheStatusInfo<T>> getDeltaStatus(T group) {
31+
return deltaStatuses.getOrDefault(group, new ConcurrentHashMap<>());
32+
}
33+
34+
/**
35+
* Returns map of status infos for group identifier.
36+
*
37+
* @param group group identifier.
38+
*/
39+
Map<Resources.ResourceType, CacheStatusInfo<T>> getStatus(T group) {
40+
return statuses.getOrDefault(group, new ConcurrentHashMap<>());
41+
}
42+
43+
/**
44+
* Check if statuses for specific group have any watcher.
45+
*
46+
* @param group group identifier.
47+
* @return true if statuses for specific group have any watcher.
48+
*/
49+
boolean hasStatuses(T group) {
50+
Map<Resources.ResourceType, CacheStatusInfo<T>> status = getStatus(group);
51+
Map<Resources.ResourceType, DeltaCacheStatusInfo<T>> deltaStatus = getDeltaStatus(group);
52+
return status.values().stream().mapToLong(CacheStatusInfo::numWatches).sum()
53+
+ deltaStatus.values().stream().mapToLong(DeltaCacheStatusInfo::numWatches).sum() > 0;
54+
}
55+
56+
/**
57+
* Returns delta status info for group identifier and creates new one if it doesn't exist.
58+
*
59+
* @param group group identifier.
60+
* @param resourceType resource type.
61+
*/
62+
DeltaCacheStatusInfo<T> getOrAddDeltaStatusInfo(T group, Resources.ResourceType resourceType) {
63+
return deltaStatuses.computeIfAbsent(group, g -> new ConcurrentHashMap<>())
64+
.computeIfAbsent(resourceType, s -> new DeltaCacheStatusInfo<>(group));
65+
}
66+
67+
/**
68+
* Returns status info for group identifier and creates new one if it doesn't exist.
69+
*
70+
* @param group group identifier.
71+
* @param resourceType resource type.
72+
*/
73+
CacheStatusInfo<T> getOrAddStatusInfo(T group, Resources.ResourceType resourceType) {
74+
return statuses.computeIfAbsent(group, g -> new ConcurrentHashMap<>())
75+
.computeIfAbsent(resourceType, s -> new CacheStatusInfo<>(group));
76+
}
77+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package io.envoyproxy.controlplane.cache;
2+
3+
public class DeltaCacheStatusInfo<T> extends MutableStatusInfo<T, DeltaWatch> {
4+
5+
public DeltaCacheStatusInfo(T nodeGroup) {
6+
super(nodeGroup);
7+
}
8+
}

0 commit comments

Comments
 (0)