9
9
import com .google .common .collect .Sets ;
10
10
import com .google .protobuf .Message ;
11
11
import io .envoyproxy .controlplane .cache .Resources .ResourceType ;
12
+
12
13
import java .util .Collection ;
13
14
import java .util .Collections ;
14
15
import java .util .HashMap ;
25
26
import java .util .stream .Collectors ;
26
27
import java .util .stream .Stream ;
27
28
import javax .annotation .concurrent .GuardedBy ;
29
+
28
30
import org .slf4j .Logger ;
29
31
import org .slf4j .LoggerFactory ;
30
32
@@ -238,7 +240,7 @@ public DeltaWatch createDeltaWatch(
238
240
return watch ;
239
241
}
240
242
} else if (hasClusterChanged && requestResourceType .equals (ResourceType .ENDPOINT )) {
241
- ResponseState responseState = respondDelta (request , group , snapshot , version , watch );
243
+ ResponseState responseState = respondDelta (request , watch , snapshot , version , group );
242
244
if (responseState .isFinished ()) {
243
245
return watch ;
244
246
}
@@ -250,7 +252,7 @@ public DeltaWatch createDeltaWatch(
250
252
}
251
253
252
254
// Otherwise, version is different, the watch may be responded immediately
253
- ResponseState responseState = respondDelta (request , group , snapshot , version , watch );
255
+ ResponseState responseState = respondDelta (request , watch , snapshot , version , group );
254
256
if (responseState .isFinished ()) {
255
257
return watch ;
256
258
}
@@ -357,8 +359,9 @@ public StatusInfo<T> statusInfo(T group) {
357
359
358
360
@ VisibleForTesting
359
361
protected void respondWithSpecificOrder (T group ,
360
- U previousSnapshot , U snapshot ,
361
- Map <ResourceType , CacheStatusInfo <T >> statusMap , Map <ResourceType , DeltaCacheStatusInfo <T >> deltaStatusMap ) {
362
+ U previousSnapshot , U snapshot ,
363
+ Map <ResourceType , CacheStatusInfo <T >> statusMap ,
364
+ Map <ResourceType , DeltaCacheStatusInfo <T >> deltaStatusMap ) {
362
365
for (ResourceType resourceType : RESOURCE_TYPES_IN_ORDER ) {
363
366
CacheStatusInfo <T > status = statusMap .get (resourceType );
364
367
if (status != null ) {
@@ -425,6 +428,7 @@ protected void respondWithSpecificOrder(T group,
425
428
426
429
Map <String , VersionedResource <?>> changedResources = findChangedResources (watch , snapshotChangedResources );
427
430
431
+
428
432
ResponseState responseState = respondDelta (watch ,
429
433
changedResources ,
430
434
removedResources ,
@@ -443,16 +447,16 @@ protected void respondWithSpecificOrder(T group,
443
447
}
444
448
445
449
private Response createResponse (XdsRequest request , Map <String , VersionedResource <?>> resources ,
446
- String version ) {
450
+ String version ) {
447
451
Collection <? extends Message > filtered = request .getResourceNamesList ().isEmpty ()
448
452
? resources .values ().stream ()
449
453
.map (VersionedResource ::resource )
450
454
.collect (Collectors .toList ())
451
455
: request .getResourceNamesList ().stream ()
452
- .map (resources ::get )
453
- .filter (Objects ::nonNull )
454
- .map (VersionedResource ::resource )
455
- .collect (Collectors .toList ());
456
+ .map (resources ::get )
457
+ .filter (Objects ::nonNull )
458
+ .map (VersionedResource ::resource )
459
+ .collect (Collectors .toList ());
456
460
457
461
return Response .create (request , filtered , version );
458
462
}
@@ -517,7 +521,7 @@ private List<String> findRemovedResources(DeltaWatch watch, Map<String, Versione
517
521
}
518
522
519
523
private Map <String , VersionedResource <?>> findChangedResources (DeltaWatch watch ,
520
- Map <String , VersionedResource <?>> snapshotResources ) {
524
+ Map <String , VersionedResource <?>> snapshotResources ) {
521
525
return snapshotResources .entrySet ()
522
526
.stream ()
523
527
.filter (entry -> {
@@ -534,24 +538,24 @@ private Map<String, VersionedResource<?>> findChangedResources(DeltaWatch watch,
534
538
.collect (Collectors .toMap (Map .Entry ::getKey , Map .Entry ::getValue ));
535
539
}
536
540
537
- private ResponseState respondDelta (DeltaXdsRequest request , T group , U snapshot , String version , DeltaWatch watch ) {
541
+
542
+ private ResponseState respondDelta (DeltaXdsRequest request , DeltaWatch watch , U snapshot , String version , T group ) {
538
543
Map <String , VersionedResource <?>> snapshotResources = snapshot .versionedResources (request .getResourceType ());
539
544
List <String > removedResources = findRemovedResources (watch ,
540
545
snapshotResources );
541
546
Map <String , VersionedResource <?>> changedResources = findChangedResources (watch , snapshotResources );
542
- return respondDelta (
543
- watch ,
547
+ return respondDelta (watch ,
544
548
changedResources ,
545
549
removedResources ,
546
550
version ,
547
551
group );
548
552
}
549
553
550
554
private ResponseState respondDelta (DeltaWatch watch ,
551
- Map <String , VersionedResource <?>> resources ,
552
- List <String > removedResources ,
553
- String version ,
554
- T group ) {
555
+ Map <String , VersionedResource <?>> resources ,
556
+ List <String > removedResources ,
557
+ String version ,
558
+ T group ) {
555
559
if (resources .isEmpty () && removedResources .isEmpty ()) {
556
560
return ResponseState .UNRESPONDED ;
557
561
}
0 commit comments