Skip to content

Commit c051b5b

Browse files
snowpjoeyb
authored andcommitted
server: add a snapshot collecting DiscoveryServerCallbacks (#73)
Adds a callback that watches the streams for requests and stream closes and cleans up unreferenced snapshots after some time period. Signed-off-by: Snow Pettersen <[email protected]>
1 parent ed1c340 commit c051b5b

File tree

3 files changed

+240
-0
lines changed

3 files changed

+240
-0
lines changed

cache/src/main/java/io/envoyproxy/controlplane/cache/Snapshot.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import envoy.api.v2.Lds.Listener;
1616
import envoy.api.v2.Rds.RouteConfiguration;
1717
import envoy.api.v2.auth.Cert.Secret;
18+
import java.util.Collections;
1819
import java.util.Map;
1920
import java.util.Set;
2021

@@ -83,6 +84,16 @@ public static Snapshot create(
8384
SnapshotResources.create(secrets, secretsVersion));
8485
}
8586

87+
/**
88+
* Creates an empty snapshot with the given version.
89+
*
90+
* @param version the version of the snapshot resources
91+
*/
92+
public static Snapshot createEmpty(String version) {
93+
return create(Collections.emptySet(), Collections.emptySet(),
94+
Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), version);
95+
}
96+
8697
/**
8798
* Returns all cluster items in the CDS payload.
8899
*/
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package io.envoyproxy.controlplane.server.callback;
2+
3+
import com.google.common.annotations.VisibleForTesting;
4+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
5+
import envoy.api.v2.Discovery;
6+
import io.envoyproxy.controlplane.cache.NodeGroup;
7+
import io.envoyproxy.controlplane.cache.Snapshot;
8+
import io.envoyproxy.controlplane.cache.SnapshotCache;
9+
import io.envoyproxy.controlplane.server.DiscoveryServerCallbacks;
10+
import java.time.Clock;
11+
import java.time.Instant;
12+
import java.time.temporal.ChronoUnit;
13+
import java.util.LinkedHashSet;
14+
import java.util.Map;
15+
import java.util.Set;
16+
import java.util.concurrent.ConcurrentHashMap;
17+
import java.util.concurrent.Executors;
18+
import java.util.concurrent.ScheduledExecutorService;
19+
import java.util.concurrent.TimeUnit;
20+
import java.util.function.Consumer;
21+
22+
/**
23+
* Callback that keeps track of the number of streams associated with each node group and periodically clears
24+
* out {@link Snapshot}s from the cache that are no longer referenced by any streams.
25+
*
26+
* <p>Works by monitoring the stream to determine what group they belong to and keeps a running count as well
27+
* as when a request is seen that targets a given node group.
28+
*
29+
* <p>Every {@code collectionIntervalMillis} milliseconds a cleanup job runs which looks for snapshots with no
30+
* active streams that haven't been updated within the configured time frame. Checking the time since last update
31+
* is done to prevent snapshots from being prematurely removed from the cache. It ensures that a group must have
32+
* no active streams for {@code collectAfterMillis} milliseconds before being collected.
33+
*
34+
* <p>To be notified of snapshots that are removed, a set of callbacks may be provided which will be triggered
35+
* whenever a snapshot is removed from the cache. Any other callback which maintains state about the snapshots
36+
* that is cleaned up by one of these callbacks should be run *after* this callback. This helps ensure that
37+
* if state is cleaned up while a request in inbound, the request will be blocked by the lock in this callback
38+
* until collection finishes and the subsequent callbacks will see the new request come in after collection. If the
39+
* order is reversed, another callback might have seen the new request but the refcount here hasn't been incremented,
40+
* causing it to get cleaned up and wipe the state of the other callback even though we now have an active stream
41+
* for that group.
42+
*/
43+
public class SnapshotCollectingCallback<T> implements DiscoveryServerCallbacks {
44+
private static class SnapshotState {
45+
int streamCount;
46+
Instant lastSeen;
47+
}
48+
49+
private final SnapshotCache<T> snapshotCache;
50+
private final NodeGroup<T> nodeGroup;
51+
private final Clock clock;
52+
private final Set<Consumer<T>> collectorCallbacks;
53+
private final long collectAfterMillis;
54+
private final Map<T, SnapshotState> snapshotStates = new ConcurrentHashMap<>();
55+
private final Map<Long, T> groupByStream = new ConcurrentHashMap<>();
56+
57+
/**
58+
* Creates the callback.
59+
*
60+
* @param snapshotCache the cache to evict snapshots from
61+
* @param nodeGroup the node group used to map requests to groups
62+
* @param clock system clock
63+
* @param collectorCallbacks the callbacks to invoke when snapshot is collected
64+
* @param collectAfterMillis how long a snapshot must be referenced for before being collected
65+
* @param collectionIntervalMillis how often the collection background action should run
66+
*/
67+
public SnapshotCollectingCallback(SnapshotCache<T> snapshotCache,
68+
NodeGroup<T> nodeGroup, Clock clock, Set<Consumer<T>> collectorCallbacks,
69+
long collectAfterMillis, long collectionIntervalMillis) {
70+
this.snapshotCache = snapshotCache;
71+
this.nodeGroup = nodeGroup;
72+
this.clock = clock;
73+
this.collectorCallbacks = collectorCallbacks;
74+
this.collectAfterMillis = collectAfterMillis;
75+
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(
76+
new ThreadFactoryBuilder().setNameFormat("snapshot-gc-%d").build());
77+
executorService.scheduleAtFixedRate(() -> deleteUnreferenced(clock), collectionIntervalMillis,
78+
collectionIntervalMillis, TimeUnit.MILLISECONDS);
79+
}
80+
81+
@Override
82+
public synchronized void onStreamRequest(long streamId, Discovery.DiscoveryRequest request) {
83+
T groupIdentifier = nodeGroup.hash(request.getNode());
84+
85+
SnapshotState snapshotState =
86+
this.snapshotStates.computeIfAbsent(groupIdentifier, x -> new SnapshotState());
87+
snapshotState.lastSeen = clock.instant();
88+
89+
if (groupByStream.put(streamId, groupIdentifier) == null) {
90+
snapshotState.streamCount++;
91+
}
92+
}
93+
94+
@Override public void onStreamClose(long streamId, String typeUrl) {
95+
onStreamCloseHelper(streamId);
96+
}
97+
98+
@Override public void onStreamCloseWithError(long streamId, String typeUrl, Throwable error) {
99+
onStreamCloseHelper(streamId);
100+
}
101+
102+
@VisibleForTesting
103+
synchronized void deleteUnreferenced(Clock clock) {
104+
// Keep track of snapshots to delete to avoid CME.
105+
Set<T> toDelete = new LinkedHashSet<>();
106+
107+
for (Map.Entry<T, SnapshotState> entry : snapshotStates.entrySet()) {
108+
if (entry.getValue().streamCount == 0 && entry.getValue().lastSeen.isBefore(
109+
clock.instant().minus(collectAfterMillis, ChronoUnit.MILLIS))) {
110+
111+
// clearSnapshot will do nothing and return false if there are any pending watches - this
112+
// ensures that we don't actually remove a snapshot that's in use.
113+
T groupIdentifier = entry.getKey();
114+
if (snapshotCache.clearSnapshot(groupIdentifier)) {
115+
toDelete.add(groupIdentifier);
116+
}
117+
}
118+
}
119+
120+
toDelete.forEach(group -> {
121+
snapshotStates.remove(group);
122+
collectorCallbacks.forEach(cb -> cb.accept(group));
123+
});
124+
}
125+
126+
private synchronized void onStreamCloseHelper(long streamId) {
127+
T removed = groupByStream.remove(streamId);
128+
if (removed == null) {
129+
// This will happen if the stream closed before we received the first request.
130+
return;
131+
}
132+
133+
SnapshotState snapshotState = snapshotStates.get(removed);
134+
snapshotState.streamCount--;
135+
snapshotState.lastSeen = clock.instant();
136+
}
137+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package io.envoyproxy.controlplane.server.callback;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import com.google.common.collect.ImmutableSet;
6+
import envoy.api.v2.Discovery;
7+
import io.envoyproxy.controlplane.cache.NodeGroup;
8+
import io.envoyproxy.controlplane.cache.SimpleCache;
9+
import io.envoyproxy.controlplane.cache.Snapshot;
10+
import java.time.Clock;
11+
import java.time.Duration;
12+
import java.time.Instant;
13+
import java.time.ZoneId;
14+
import java.util.ArrayList;
15+
import java.util.Collections;
16+
import java.util.concurrent.CountDownLatch;
17+
import java.util.concurrent.TimeUnit;
18+
import org.junit.Before;
19+
import org.junit.Test;
20+
21+
public class SnapshotCollectingCallbackTest {
22+
23+
private static final Clock CLOCK = Clock.fixed(Instant.now(), ZoneId.systemDefault());
24+
private static final NodeGroup<String> NODE_GROUP = node -> "group";
25+
private final ArrayList<String> collectedGroups = new ArrayList<>();
26+
private SnapshotCollectingCallback<String> callback;
27+
private SimpleCache<String> cache;
28+
29+
@Before
30+
public void setUp() {
31+
collectedGroups.clear();
32+
cache = new SimpleCache<>(NODE_GROUP);
33+
cache.setSnapshot("group", Snapshot.createEmpty(""));
34+
callback = new SnapshotCollectingCallback<>(cache, NODE_GROUP, CLOCK,
35+
Collections.singleton(collectedGroups::add), 3, 100);
36+
}
37+
38+
@Test
39+
public void testSingleSnapshot() {
40+
callback.onStreamRequest(0, Discovery.DiscoveryRequest.getDefaultInstance());
41+
callback.onStreamRequest(1, Discovery.DiscoveryRequest.getDefaultInstance());
42+
43+
// We have 2 references to the snapshot, this should do nothing.
44+
callback.deleteUnreferenced(Clock.offset(CLOCK, Duration.ofMillis(5)));
45+
assertThat(collectedGroups).isEmpty();
46+
47+
callback.onStreamClose(0, "");
48+
49+
// We have 1 reference to the snapshot, this should do nothing.
50+
callback.deleteUnreferenced(Clock.offset(CLOCK, Duration.ofMillis(5)));
51+
assertThat(collectedGroups).isEmpty();
52+
53+
callback.onStreamCloseWithError(1, "", new RuntimeException());
54+
55+
// We have 0 references to the snapshot, but 1 < 3 so it's too early to collect the snapshot.
56+
callback.deleteUnreferenced(Clock.offset(CLOCK, Duration.ofMillis(1)));
57+
assertThat(collectedGroups).isEmpty();
58+
59+
// We have 0 references to the snapshot, and 5 > 3 so we clear out the snapshot.
60+
callback.deleteUnreferenced(Clock.offset(CLOCK, Duration.ofMillis(5)));
61+
assertThat(collectedGroups).containsExactly("group");
62+
}
63+
64+
@Test
65+
public void testAsyncCollection() throws InterruptedException {
66+
CountDownLatch snapshotCollectedLatch = new CountDownLatch(1);
67+
CountDownLatch deleteUnreferencedLatch = new CountDownLatch(1);
68+
69+
// Create a cache with 0 expiry delay, which means the snapshot should get collected immediately.
70+
callback = new SnapshotCollectingCallback<String>(cache, NODE_GROUP, CLOCK,
71+
ImmutableSet.of(collectedGroups::add, group -> snapshotCollectedLatch.countDown()), -3, 1) {
72+
@Override synchronized void deleteUnreferenced(Clock clock) {
73+
super.deleteUnreferenced(clock);
74+
deleteUnreferencedLatch.countDown();;
75+
}
76+
};
77+
78+
callback.onStreamRequest(0, Discovery.DiscoveryRequest.getDefaultInstance());
79+
assertThat(deleteUnreferencedLatch.await(100, TimeUnit.MILLISECONDS)).isTrue();
80+
assertThat(collectedGroups).isEmpty();
81+
82+
callback.onStreamClose(0, "");
83+
assertThat(snapshotCollectedLatch.await(100,TimeUnit.MILLISECONDS)).isTrue();
84+
assertThat(collectedGroups).containsExactly("group");
85+
}
86+
87+
@Test
88+
public void testCloseBeforeRequest() {
89+
callback.onStreamClose(0, "");
90+
assertThat(collectedGroups).isEmpty();
91+
}
92+
}

0 commit comments

Comments
 (0)