@@ -266,8 +266,8 @@ public DeltaWatch createDeltaWatch(
266
266
if (!isWildcard && watch .pendingResources ().size () != 0 ) {
267
267
// If any of the pending resources are in the snapshot respond immediately. If not we'll fall back to
268
268
// version comparisons.
269
- Map <String , SnapshotResource <?>> resources = snapshot .resources (request .getResourceType ());
270
- Map <String , SnapshotResource <?>> requestedResources = watch .pendingResources ()
269
+ Map <String , VersionedResource <?>> resources = snapshot .versionedResources (request .getResourceType ());
270
+ Map <String , VersionedResource <?>> requestedResources = watch .pendingResources ()
271
271
.stream ()
272
272
.filter (resources ::containsKey )
273
273
.collect (Collectors .toMap (Function .identity (), resources ::get ));
@@ -280,10 +280,10 @@ public DeltaWatch createDeltaWatch(
280
280
return watch ;
281
281
}
282
282
} else if (hasClusterChanged && requestResourceType .equals (ResourceType .ENDPOINT )) {
283
- Map <String , SnapshotResource <?>> snapshotResources = snapshot .resources (request .getResourceType ());
283
+ Map <String , VersionedResource <?>> snapshotResources = snapshot .versionedResources (request .getResourceType ());
284
284
List <String > removedResources = findRemovedResources (watch ,
285
285
snapshotResources );
286
- Map <String , SnapshotResource <?>> changedResources = findChangedResources (watch , snapshotResources );
286
+ Map <String , VersionedResource <?>> changedResources = findChangedResources (watch , snapshotResources );
287
287
ResponseState responseState = respondDelta (
288
288
watch ,
289
289
changedResources ,
@@ -309,10 +309,10 @@ public DeltaWatch createDeltaWatch(
309
309
}
310
310
311
311
// Otherwise, version is different, the watch may be responded immediately
312
- Map <String , SnapshotResource <?>> snapshotResources = snapshot .resources (request .getResourceType ());
312
+ Map <String , VersionedResource <?>> snapshotResources = snapshot .versionedResources (request .getResourceType ());
313
313
List <String > removedResources = findRemovedResources (watch ,
314
314
snapshotResources );
315
- Map <String , SnapshotResource <?>> changedResources = findChangedResources (watch , snapshotResources );
315
+ Map <String , VersionedResource <?>> changedResources = findChangedResources (watch , snapshotResources );
316
316
ResponseState responseState = respondDelta (watch ,
317
317
changedResources ,
318
318
removedResources ,
@@ -414,8 +414,8 @@ public StatusInfo statusInfo(T group) {
414
414
415
415
@ VisibleForTesting
416
416
protected void respondWithSpecificOrder (T group ,
417
- U previousSnapshot , U snapshot ,
418
- ConcurrentMap <ResourceType , CacheStatusInfo <T >> statusMap ) {
417
+ U previousSnapshot , U snapshot ,
418
+ ConcurrentMap <ResourceType , CacheStatusInfo <T >> statusMap ) {
419
419
for (ResourceType resourceType : RESOURCE_TYPES_IN_ORDER ) {
420
420
CacheStatusInfo <T > status = statusMap .get (resourceType );
421
421
if (status == null ) {
@@ -447,16 +447,17 @@ protected void respondWithSpecificOrder(T group,
447
447
return false ;
448
448
});
449
449
450
- Map <String , SnapshotResource <?>> previousResources = previousSnapshot == null
450
+ Map <String , VersionedResource <?>> previousResources = previousSnapshot == null
451
451
? Collections .emptyMap ()
452
- : previousSnapshot .resources (resourceType );
453
- Map <String , SnapshotResource <?>> snapshotResources = snapshot .resources (resourceType );
452
+ : previousSnapshot .versionedResources (resourceType );
453
+ Map <String , VersionedResource <?>> snapshotResources = snapshot .versionedResources (resourceType );
454
454
455
- Map <String , SnapshotResource <?>> snapshotChangedResources = snapshotResources .entrySet ()
455
+ Map <String , VersionedResource <?>> snapshotChangedResources = snapshotResources .entrySet ()
456
456
.stream ()
457
457
.filter (entry -> {
458
- SnapshotResource <?> snapshotResource = previousResources .get (entry .getKey ());
459
- return snapshotResource == null || !snapshotResource .version ().equals (entry .getValue ().version ());
458
+ VersionedResource <?> versionedResource = previousResources .get (entry .getKey ());
459
+ return versionedResource == null || !versionedResource
460
+ .version ().equals (entry .getValue ().version ());
460
461
})
461
462
.collect (Collectors .toMap (Map .Entry ::getKey , Map .Entry ::getValue ));
462
463
@@ -480,7 +481,7 @@ protected void respondWithSpecificOrder(T group,
480
481
.filter (s -> watch .trackedResources ().get (s ) != null )
481
482
.collect (Collectors .toList ());
482
483
483
- Map <String , SnapshotResource <?>> changedResources = findChangedResources (watch , snapshotChangedResources );
484
+ Map <String , VersionedResource <?>> changedResources = findChangedResources (watch , snapshotChangedResources );
484
485
485
486
ResponseState responseState = respondDelta (watch ,
486
487
changedResources ,
@@ -498,23 +499,24 @@ protected void respondWithSpecificOrder(T group,
498
499
}
499
500
}
500
501
501
- private Response createResponse (XdsRequest request , Map <String , SnapshotResource <?>> resources ,
502
- String version ) {
502
+ private Response createResponse (XdsRequest request , Map <String , VersionedResource <?>> resources ,
503
+ String version ) {
503
504
Collection <? extends Message > filtered = request .getResourceNamesList ().isEmpty ()
504
505
? resources .values ().stream ()
505
- .map (SnapshotResource ::resource )
506
+ .map (VersionedResource ::resource )
506
507
.collect (Collectors .toList ())
507
508
: request .getResourceNamesList ().stream ()
508
- .map (resources ::get )
509
- .filter (Objects ::nonNull )
510
- .map (SnapshotResource ::resource )
511
- .collect (Collectors .toList ());
509
+ .map (resources ::get )
510
+ .filter (Objects ::nonNull )
511
+ .map (VersionedResource ::resource )
512
+ .collect (Collectors .toList ());
512
513
513
514
return Response .create (request , filtered , version );
514
515
}
515
516
516
517
private boolean respond (Watch watch , U snapshot , T group ) {
517
- Map <String , SnapshotResource <?>> snapshotResources = snapshot .resources (watch .request ().getResourceType ());
518
+ Map <String , VersionedResource <?>> snapshotResources =
519
+ snapshot .versionedResources (watch .request ().getResourceType ());
518
520
519
521
if (!watch .request ().getResourceNamesList ().isEmpty () && watch .ads ()) {
520
522
Collection <String > missingNames = watch .request ().getResourceNamesList ().stream ()
@@ -563,16 +565,16 @@ private boolean respond(Watch watch, U snapshot, T group) {
563
565
return false ;
564
566
}
565
567
566
- private List <String > findRemovedResources (DeltaWatch watch , Map <String , SnapshotResource <?>> snapshotResources ) {
568
+ private List <String > findRemovedResources (DeltaWatch watch , Map <String , VersionedResource <?>> snapshotResources ) {
567
569
// remove resources for which client has a tracked version but do not exist in snapshot
568
570
return watch .trackedResources ().keySet ()
569
571
.stream ()
570
572
.filter (s -> !snapshotResources .containsKey (s ))
571
573
.collect (Collectors .toList ());
572
574
}
573
575
574
- private Map <String , SnapshotResource <?>> findChangedResources (DeltaWatch watch ,
575
- Map <String , SnapshotResource <?>> snapshotResources ) {
576
+ private Map <String , VersionedResource <?>> findChangedResources (DeltaWatch watch ,
577
+ Map <String , VersionedResource <?>> snapshotResources ) {
576
578
return snapshotResources .entrySet ()
577
579
.stream ()
578
580
.filter (entry -> {
@@ -590,12 +592,12 @@ private Map<String, SnapshotResource<?>> findChangedResources(DeltaWatch watch,
590
592
}
591
593
592
594
private ResponseState respondDeltaTracked (DeltaWatch watch ,
593
- Map <String , SnapshotResource <?>> snapshotResources ,
594
- List <String > removedResources ,
595
- String version ,
596
- T group ) {
595
+ Map <String , VersionedResource <?>> snapshotResources ,
596
+ List <String > removedResources ,
597
+ String version ,
598
+ T group ) {
597
599
598
- Map <String , SnapshotResource <?>> resources = snapshotResources .entrySet ()
600
+ Map <String , VersionedResource <?>> resources = snapshotResources .entrySet ()
599
601
.stream ()
600
602
.filter (entry -> {
601
603
if (watch .pendingResources ().contains (entry .getKey ())) {
@@ -614,10 +616,10 @@ private ResponseState respondDeltaTracked(DeltaWatch watch,
614
616
}
615
617
616
618
private ResponseState respondDelta (DeltaWatch watch ,
617
- Map <String , SnapshotResource <?>> resources ,
618
- List <String > removedResources ,
619
- String version ,
620
- T group ) {
619
+ Map <String , VersionedResource <?>> resources ,
620
+ List <String > removedResources ,
621
+ String version ,
622
+ T group ) {
621
623
if (resources .isEmpty () && removedResources .isEmpty ()) {
622
624
return ResponseState .UNRESPONDED ;
623
625
}
0 commit comments