@@ -3,11 +3,13 @@ package io.chrisdavenport.mules
33import cats .effect ._
44import cats .effect .implicits ._
55import cats .implicits ._
6- import scala .collection .immutable .Map
76
7+ import scala .collection .immutable .Map
88import io .chrisdavenport .mapref .MapRef
9+ import io .chrisdavenport .mapref .MapRef .fromSeqRefs
910
1011import java .util .concurrent .ConcurrentHashMap
12+ import scala .collection .mutable
1113
1214final class DispatchOneCache [F [_], K , V ] private [DispatchOneCache ] (
1315 private val mapRef : MapRef [F , K , Option [DispatchOneCache .DispatchOneCacheItem [F , V ]]],
@@ -17,6 +19,7 @@ final class DispatchOneCache[F[_], K, V] private[DispatchOneCache] (
1719 import DispatchOneCache .DispatchOneCacheItem
1820 import DispatchOneCache .CancelationDuringDispatchOneCacheInsertProcessing
1921
22+ // Note the default will not actually purge any entries
2023 private val purgeExpiredEntries : Long => F [List [K ]] =
2124 purgeExpiredEntriesOpt.getOrElse({(_ : Long ) => List .empty[K ].pure[F ]})
2225
@@ -250,71 +253,148 @@ object DispatchOneCache {
250253 Ref .of[F , Map [K , DispatchOneCacheItem [F , V ]]](Map .empty[K , DispatchOneCacheItem [F , V ]])
251254 .map(ref => new DispatchOneCache [F , K , V ](
252255 MapRef .fromSingleImmutableMapRef(ref),
253- {(l : Long ) => SingleRef .purgeExpiredEntries(ref)(l)}.some,
256+ {(l : Long ) => SingleRef .purgeExpiredEntries[ F , K , DispatchOneCacheItem [ F , V ]] (ref, isExpired )(l)}.some,
254257 defaultExpiration
255258 ))
256259
260+ // We can't key the keys from the mapref construction, so we have to repeat the construction hereto retain access to the underlying data
257261 def ofShardedImmutableMap [F [_]: Async , K , V ](
258262 shardCount : Int ,
259263 defaultExpiration : Option [TimeSpec ]
260- ): F [DispatchOneCache [F , K , V ]] =
261- MapRef .ofShardedImmutableMap[F , K , DispatchOneCacheItem [F , V ]](shardCount).map{
264+ ): F [DispatchOneCache [F , K , V ]] = {
265+ PurgeableMapRef .ofShardedImmutableMap[F ,K , DispatchOneCacheItem [F , V ]](shardCount, isExpired ).map{ smr =>
262266 new DispatchOneCache [F , K , V ](
263- _ ,
264- None ,
265- defaultExpiration,
267+ smr.mapRef ,
268+ Some (smr.purgeExpiredEntries) ,
269+ defaultExpiration
266270 )
267271 }
272+ }
268273
269274 def ofConcurrentHashMap [F [_]: Async , K , V ](
270275 defaultExpiration : Option [TimeSpec ],
271276 initialCapacity : Int = 16 ,
272277 loadFactor : Float = 0.75f ,
273278 concurrencyLevel : Int = 16 ,
274- ): F [DispatchOneCache [F , K , V ]] = Sync [F ].delay{
275- val chm = new ConcurrentHashMap [K , DispatchOneCacheItem [F , V ]](initialCapacity, loadFactor, concurrencyLevel)
279+ ): F [DispatchOneCache [F , K , V ]] =
280+ PurgeableMapRef .ofConcurrentHashMap[F ,K , DispatchOneCacheItem [F , V ]](
281+ initialCapacity,
282+ loadFactor,
283+ concurrencyLevel,
284+ isExpired).map {pmr =>
285+ new DispatchOneCache (
286+ pmr.mapRef,
287+ Some (pmr.purgeExpiredEntries),
288+ defaultExpiration
289+ )
290+ }
291+
292+
293+ // No access to keys here by default, so cache entries will only be cleared on overwrite.
294+ // kept separate from method below for bincompat
295+ def ofMapRef [F [_]: Concurrent : Clock , K , V ](
296+ mr : MapRef [F , K , Option [DispatchOneCacheItem [F , V ]]],
297+ defaultExpiration : Option [TimeSpec ]
298+ ): DispatchOneCache [F , K , V ] = {
276299 new DispatchOneCache [F , K , V ](
277- MapRef .fromConcurrentHashMap(chm) ,
300+ mr ,
278301 None ,
279302 defaultExpiration
280303 )
281304 }
282-
283305 def ofMapRef [F [_]: Concurrent : Clock , K , V ](
284306 mr : MapRef [F , K , Option [DispatchOneCacheItem [F , V ]]],
285- defaultExpiration : Option [TimeSpec ]
307+ defaultExpiration : Option [TimeSpec ],
308+ purgeExpiredEntries : Option [Long => F [List [K ]]]
286309 ): DispatchOneCache [F , K , V ] = {
287310 new DispatchOneCache [F , K , V ](
288311 mr,
289- None ,
312+ purgeExpiredEntries ,
290313 defaultExpiration
291314 )
292315 }
293316
317+ private [mules] def isExpired [F [_], A ](checkAgainst : Long , cacheItem : DispatchOneCacheItem [F , A ]): Boolean = {
318+ cacheItem.itemExpiration match {
319+ case Some (e) if e.nanos < checkAgainst => true
320+ case _ => false
321+ }
322+ }
323+ }
294324
295- private object SingleRef {
296-
297- def purgeExpiredEntries [F [_], K , V ](ref : Ref [F , Map [K , DispatchOneCacheItem [F , V ]]])(now : Long ): F [List [K ]] = {
298- ref.modify(
299- m => {
300- val l = scala.collection.mutable.ListBuffer .empty[K ]
301- m.foreach{ case (k, item) =>
302- if (isExpired(now, item)) {
303- l.+= (k)
304- }
325+ private [mules] object SingleRef {
326+ def purgeExpiredEntries [F [_], K , V ](ref : Ref [F , Map [K , V ]], isExpired : (Long , V ) => Boolean )(now : Long ): F [List [K ]] = {
327+ ref.modify(
328+ m => {
329+ val l = scala.collection.mutable.ListBuffer .empty[K ]
330+ m.foreach { case (k, item) =>
331+ if (isExpired(now, item)) {
332+ l.+= (k)
305333 }
306- val remove = l.result()
307- val finalMap = m -- remove
308- (finalMap, remove)
309334 }
335+ val remove = l.result()
336+ val finalMap = m -- remove
337+ (finalMap, remove)
338+ }
339+ )
340+ }
341+ }
342+
343+ private [mules] case class PurgeableMapRef [F [_],K ,V ](mapRef : MapRef [F ,K ,V ], purgeExpiredEntries : Long => F [List [K ]])
344+
345+ private [mules] object PurgeableMapRef {
346+ def ofShardedImmutableMap [F [_]: Concurrent , K , V ](
347+ shardCount : Int ,
348+ isExpired : (Long , V ) => Boolean
349+ ): F [PurgeableMapRef [F , K , Option [V ]]] = {
350+ assert(shardCount >= 1 , " MapRef.sharded should have at least 1 shard" )
351+ val shards : F [List [Ref [F , Map [K , V ]]]] = List .fill(shardCount)(())
352+ .traverse(_ => Concurrent [F ].ref[Map [K , V ]](Map .empty))
353+
354+ def purgeExpiredEntries (shards: List [Ref [F , Map [K , V ]]])(now : Long ) = shards.parFlatTraverse(SingleRef .purgeExpiredEntries(_, isExpired)(now))
355+
356+ shards.map{ s =>
357+ PurgeableMapRef (
358+ fromSeqRefs(s),
359+ purgeExpiredEntries(s)
310360 )
311361 }
312362 }
313363
314- private def isExpired [F [_], A ](checkAgainst : Long , cacheItem : DispatchOneCacheItem [F , A ]): Boolean = {
315- cacheItem.itemExpiration match {
316- case Some (e) if e.nanos < checkAgainst => true
317- case _ => false
364+ def ofConcurrentHashMap [F [_]: Concurrent , K , V ](
365+ initialCapacity : Int = 16 ,
366+ loadFactor : Float = 0.75f ,
367+ concurrencyLevel : Int = 16 ,
368+ isExpired : (Long , V ) => Boolean
369+ ): F [PurgeableMapRef [F , K , Option [V ]]] = Concurrent [F ].unit.map{ _ => // replaced Sync[F].delay. Needed?
370+ val chm = new ConcurrentHashMap [K , V ](initialCapacity, loadFactor, concurrencyLevel)
371+ val mapRef : MapRef [F , K , Option [V ]] = MapRef .fromConcurrentHashMap(chm)
372+ val getKeys : () => F [List [K ]] = () => Concurrent [F ].unit.map{ _ =>
373+ val k = chm.keys()
374+ val builder = new mutable.ListBuffer [K ]
375+ if (k != null ){
376+ while (k.hasMoreElements()){
377+ val next = k.nextElement()
378+ builder.+= (next)
379+ }
380+ }
381+ builder.result()
318382 }
383+ def purgeExpiredEntries (now : Long ): F [List [K ]] = {
384+ val keys : F [List [K ]] = getKeys()
385+ keys.flatMap(l =>
386+ l.flatTraverse(k =>
387+ mapRef(k).modify(optItem =>
388+ optItem.map(item =>
389+ if (isExpired(now, item))
390+ (None , List (k))
391+ else
392+ (optItem, List .empty)
393+ ).getOrElse((optItem, List .empty))
394+ )
395+ )
396+ )
397+ }
398+ PurgeableMapRef (mapRef, purgeExpiredEntries)
319399 }
320- }
400+ }
0 commit comments