@@ -416,7 +416,7 @@ public boolean beginInvalidatingRegion() {
416
416
PendingPutMap entry = it .next ();
417
417
if (entry .acquireLock (60 , TimeUnit .SECONDS )) {
418
418
try {
419
- entry .invalidate (now , expirationPeriod );
419
+ entry .invalidate (now );
420
420
}
421
421
finally {
422
422
entry .releaseLock ();
@@ -548,7 +548,7 @@ public boolean beginInvalidatingWithPFER(Object lockOwner, Object key, Object va
548
548
continue ;
549
549
}
550
550
long now = System .currentTimeMillis ();
551
- pending .invalidate (now , expirationPeriod );
551
+ pending .invalidate (now );
552
552
pending .addInvalidator (lockOwner , valueForPFER , now );
553
553
}
554
554
finally {
@@ -638,6 +638,8 @@ private static String lockOwnerToString(Object lockOwner) {
638
638
* This class is NOT THREAD SAFE. All operations on it must be performed with the lock held.
639
639
*/
640
640
private class PendingPutMap extends Lock {
641
+ // Number of pending puts which trigger garbage collection
642
+ private static final int GC_THRESHOLD = 10 ;
641
643
private PendingPut singlePendingPut ;
642
644
private Map <Object , PendingPut > fullMap ;
643
645
private final java .util .concurrent .locks .Lock lock = new ReentrantLock ();
@@ -705,6 +707,9 @@ public void put(PendingPut pendingPut) {
705
707
}
706
708
else {
707
709
fullMap .put ( pendingPut .owner , pendingPut );
710
+ if (fullMap .size () >= GC_THRESHOLD ) {
711
+ gc ();
712
+ }
708
713
}
709
714
}
710
715
else {
@@ -750,7 +755,7 @@ public void releaseLock() {
750
755
lock .unlock ();
751
756
}
752
757
753
- public void invalidate (long now , long expirationPeriod ) {
758
+ public void invalidate (long now ) {
754
759
if ( singlePendingPut != null ) {
755
760
if (singlePendingPut .invalidate (now , expirationPeriod )) {
756
761
singlePendingPut = null ;
@@ -766,6 +771,27 @@ else if ( fullMap != null ) {
766
771
}
767
772
}
768
773
774
+ /**
775
+ * Running {@link #gc()} is important when the key is regularly queried but it is not
776
+ * present in DB. In such case, the putFromLoad would not be called at all and we would
777
+ * leak pending puts. Cache expiration should handle the case when the pending puts
778
+ * are not accessed frequently; when these are accessed, we have to do the housekeeping
779
+ * internally to prevent unlimited growth of the map.
780
+ * The pending puts will get their timestamps when the map reaches {@link #GC_THRESHOLD}
781
+ * entries; after expiration period these will be removed completely either through
782
+ * invalidation or when we try to register next pending put.
783
+ */
784
+ private void gc () {
785
+ assert fullMap != null ;
786
+ long now = System .currentTimeMillis ();
787
+ for ( Iterator <PendingPut > it = fullMap .values ().iterator (); it .hasNext (); ) {
788
+ PendingPut pp = it .next ();
789
+ if (pp .gc (now , expirationPeriod )) {
790
+ it .remove ();
791
+ }
792
+ }
793
+ }
794
+
769
795
public void addInvalidator (Object owner , Object valueForPFER , long now ) {
770
796
assert owner != null ;
771
797
if (invalidators == null ) {
@@ -885,6 +911,10 @@ public String toString() {
885
911
886
912
public boolean invalidate (long now , long expirationPeriod ) {
887
913
completed = true ;
914
+ return gc (now , expirationPeriod );
915
+ }
916
+
917
+ public boolean gc (long now , long expirationPeriod ) {
888
918
if (registeredTimestamp == Long .MIN_VALUE ) {
889
919
registeredTimestamp = now ;
890
920
}
0 commit comments