6
6
import io .envoyproxy .envoy .api .v2 .DiscoveryRequest ;
7
7
import java .util .Collection ;
8
8
import java .util .HashMap ;
9
- import java .util .HashSet ;
10
9
import java .util .Map ;
11
10
import java .util .Objects ;
12
11
import java .util .Set ;
12
+ import java .util .concurrent .ConcurrentHashMap ;
13
+ import java .util .concurrent .ConcurrentMap ;
14
+ import java .util .concurrent .atomic .AtomicLong ;
13
15
import java .util .concurrent .locks .Lock ;
14
16
import java .util .concurrent .locks .ReadWriteLock ;
15
17
import java .util .concurrent .locks .ReentrantReadWriteLock ;
@@ -40,11 +42,9 @@ public class SimpleCache<T> implements SnapshotCache<T> {
40
42
41
43
@ GuardedBy ("lock" )
42
44
private final Map <T , Snapshot > snapshots = new HashMap <>();
43
- @ GuardedBy ("lock" )
44
- private final Map <T , CacheStatusInfo <T >> statuses = new HashMap <>();
45
+ private final ConcurrentMap <T , CacheStatusInfo <T >> statuses = new ConcurrentHashMap <>();
45
46
46
- @ GuardedBy ("lock" )
47
- private long watchCount ;
47
+ private AtomicLong watchCount = new AtomicLong ();
48
48
49
49
/**
50
50
* Constructs a simple cache.
@@ -58,9 +58,10 @@ public SimpleCache(NodeGroup<T> groups) {
58
58
/**
59
59
* {@inheritDoc}
60
60
*/
61
- @ Override public boolean clearSnapshot (T group ) {
61
+ @ Override
62
+ public boolean clearSnapshot (T group ) {
63
+ // we take a writeLock to prevent watches from being created
62
64
writeLock .lock ();
63
-
64
65
try {
65
66
CacheStatusInfo <T > status = statuses .get (group );
66
67
@@ -91,12 +92,11 @@ public Watch createWatch(
91
92
Consumer <Response > responseConsumer ) {
92
93
93
94
T group = groups .hash (request .getNode ());
94
-
95
- writeLock . lock ();
96
-
95
+ // even though we're modifying, we take a readLock to allow multiple watches to be created in parallel since it
96
+ // doesn't conflict
97
+ readLock . lock ();
97
98
try {
98
99
CacheStatusInfo <T > status = statuses .computeIfAbsent (group , g -> new CacheStatusInfo <>(group ));
99
-
100
100
status .setLastWatchRequestTime (System .currentTimeMillis ());
101
101
102
102
Snapshot snapshot = snapshots .get (group );
@@ -105,7 +105,7 @@ public Watch createWatch(
105
105
Watch watch = new Watch (ads , request , responseConsumer );
106
106
107
107
if (snapshot != null ) {
108
- HashSet <String > requestedResources = new HashSet <> (request .getResourceNamesList ());
108
+ Set <String > requestedResources = ImmutableSet . copyOf (request .getResourceNamesList ());
109
109
110
110
// If the request is asking for resources we haven't sent to the proxy yet, see if we have additional resources.
111
111
if (!knownResourceNames .equals (requestedResources )) {
@@ -126,9 +126,7 @@ public Watch createWatch(
126
126
127
127
// If the requested version is up-to-date or missing a response, leave an open watch.
128
128
if (snapshot == null || request .getVersionInfo ().equals (version )) {
129
- watchCount ++;
130
-
131
- long watchId = watchCount ;
129
+ long watchId = watchCount .incrementAndGet ();
132
130
133
131
if (LOGGER .isDebugEnabled ()) {
134
132
LOGGER .debug ("open watch {} for {}[{}] from node {} for version {}" ,
@@ -150,33 +148,33 @@ public Watch createWatch(
150
148
boolean responded = respond (watch , snapshot , group );
151
149
152
150
if (!responded ) {
153
- watchCount ++ ;
151
+ long watchId = watchCount . incrementAndGet () ;
154
152
155
153
if (LOGGER .isDebugEnabled ()) {
156
154
LOGGER .debug ("did not respond immediately, leaving open watch {} for {}[{}] from node {} for version {}" ,
157
- watchCount ,
155
+ watchId ,
158
156
request .getTypeUrl (),
159
157
String .join (", " , request .getResourceNamesList ()),
160
158
group ,
161
159
request .getVersionInfo ());
162
160
}
163
161
164
- status .setWatch (watchCount , watch );
162
+ status .setWatch (watchId , watch );
165
163
166
- watch .setStop (() -> status .removeWatch (watchCount ));
164
+ watch .setStop (() -> status .removeWatch (watchId ));
167
165
}
168
166
169
167
return watch ;
170
-
171
168
} finally {
172
- writeLock .unlock ();
169
+ readLock .unlock ();
173
170
}
174
171
}
175
172
176
173
/**
177
174
* {@inheritDoc}
178
175
*/
179
- @ Override public Snapshot getSnapshot (T group ) {
176
+ @ Override
177
+ public Snapshot getSnapshot (T group ) {
180
178
readLock .lock ();
181
179
182
180
try {
@@ -189,56 +187,51 @@ public Watch createWatch(
189
187
/**
190
188
* {@inheritDoc}
191
189
*/
192
- @ Override public Collection <T > groups () {
193
- readLock .lock ();
194
-
195
- try {
196
- return ImmutableSet .copyOf (statuses .keySet ());
197
- } finally {
198
- readLock .unlock ();
199
- }
190
+ @ Override
191
+ public Collection <T > groups () {
192
+ return ImmutableSet .copyOf (statuses .keySet ());
200
193
}
201
194
202
195
/**
203
196
* {@inheritDoc}
204
197
*/
205
198
@ Override
206
199
public void setSnapshot (T group , Snapshot snapshot ) {
200
+ // we take a writeLock to prevent watches from being created while we update the snapshot
201
+ CacheStatusInfo <T > status ;
207
202
writeLock .lock ();
208
-
209
203
try {
210
204
// Update the existing snapshot entry.
211
205
snapshots .put (group , snapshot );
206
+ status = statuses .get (group );
207
+ } finally {
208
+ writeLock .unlock ();
209
+ }
212
210
213
- CacheStatusInfo <T > status = statuses .get (group );
214
-
215
- if (status == null ) {
216
- return ;
217
- }
211
+ if (status == null ) {
212
+ return ;
213
+ }
218
214
219
- status .watchesRemoveIf ((id , watch ) -> {
220
- String version = snapshot .version (watch .request ().getTypeUrl (), watch .request ().getResourceNamesList ());
215
+ status .watchesRemoveIf ((id , watch ) -> {
216
+ String version = snapshot .version (watch .request ().getTypeUrl (), watch .request ().getResourceNamesList ());
221
217
222
- if (!watch .request ().getVersionInfo ().equals (version )) {
223
- if (LOGGER .isDebugEnabled ()) {
224
- LOGGER .debug ("responding to open watch {}[{}] with new version {}" ,
225
- id ,
226
- String .join (", " , watch .request ().getResourceNamesList ()),
227
- version );
228
- }
218
+ if (!watch .request ().getVersionInfo ().equals (version )) {
219
+ if (LOGGER .isDebugEnabled ()) {
220
+ LOGGER .debug ("responding to open watch {}[{}] with new version {}" ,
221
+ id ,
222
+ String .join (", " , watch .request ().getResourceNamesList ()),
223
+ version );
224
+ }
229
225
230
- respond (watch , snapshot , group );
226
+ respond (watch , snapshot , group );
231
227
232
- // Discard the watch. A new watch will be created for future snapshots once envoy ACKs the response.
233
- return true ;
234
- }
228
+ // Discard the watch. A new watch will be created for future snapshots once envoy ACKs the response.
229
+ return true ;
230
+ }
235
231
236
- // Do not discard the watch. The request version is the same as the snapshot version, so we wait to respond.
237
- return false ;
238
- });
239
- } finally {
240
- writeLock .unlock ();
241
- }
232
+ // Do not discard the watch. The request version is the same as the snapshot version, so we wait to respond.
233
+ return false ;
234
+ });
242
235
}
243
236
244
237
/**
@@ -259,9 +252,9 @@ private Response createResponse(DiscoveryRequest request, Map<String, ? extends
259
252
Collection <? extends Message > filtered = request .getResourceNamesList ().isEmpty ()
260
253
? resources .values ()
261
254
: request .getResourceNamesList ().stream ()
262
- .map (resources ::get )
263
- .filter (Objects ::nonNull )
264
- .collect (Collectors .toList ());
255
+ .map (resources ::get )
256
+ .filter (Objects ::nonNull )
257
+ .collect (Collectors .toList ());
265
258
266
259
return Response .create (request , filtered , version );
267
260
}
0 commit comments