Skip to content

Commit 3eba300

Browse files
Merge branch 'master' into caffeineIntegration
2 parents 4c9f3b4 + 0ff3bc1 commit 3eba300

File tree

3 files changed

+96
-59
lines changed

3 files changed

+96
-59
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ lazy val commonSettings = Seq(
6161
libraryDependencies ++= Seq(
6262
"org.typelevel" %% "cats-core" % catsV,
6363
"org.typelevel" %% "cats-effect" % catsEffectV,
64-
"io.chrisdavenport" %% "mapref" % "0.1.0",
64+
"io.chrisdavenport" %% "mapref" % "0.1.1",
6565

6666
"org.typelevel" %% "cats-effect-laws" % catsEffectV % Test,
6767
"com.codecommit" %% "cats-effect-testing-specs2" % "0.3.0" % Test,

modules/core/src/main/scala/io/chrisdavenport/mules/DispatchOneCache.scala

Lines changed: 70 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,8 @@ import java.util.concurrent.ConcurrentHashMap
1414
final class DispatchOneCache[F[_], K, V] private[DispatchOneCache] (
1515
private val mapRef: MapRef[F, K, Option[DispatchOneCache.DispatchOneCacheItem[F, V]]],
1616
private val purgeExpiredEntriesOpt : Option[Long => F[List[K]]], // Optional Performance Improvement over Default
17-
val defaultExpiration: Option[TimeSpec],
18-
private val createItem: K => F[V]
19-
)(implicit val F: Concurrent[F], val C: Clock[F]) extends Cache[F, K, V] with Get[F, K, V] {
17+
val defaultExpiration: Option[TimeSpec]
18+
)(implicit val F: Concurrent[F], val C: Clock[F]) extends Cache[F, K, V]{
2019
import DispatchOneCache.DispatchOneCacheItem
2120
import DispatchOneCache.CancelationDuringDispatchOneCacheInsertProcessing
2221

@@ -54,7 +53,7 @@ final class DispatchOneCache[F[_], K, V] private[DispatchOneCache] (
5453
(k, cacheItem) => cacheItem.item.tryGet.flatMap{
5554
case Some(Left(_)) =>
5655
mapRef(k).modify{
57-
case Some(cacheItemNow) if (cacheItem.itemExpiration === cacheItemNow.itemExpiration) =>
56+
case Some(cacheItemNow) if (cacheItem.itemExpiration.map(_.nanos) === cacheItemNow.itemExpiration.map(_.nanos)) =>
5857
(None, createEmptyIfUnset(k))
5958
case otherwise =>
6059
(otherwise, emptyFV)
@@ -63,15 +62,15 @@ final class DispatchOneCache[F[_], K, V] private[DispatchOneCache] (
6362
emptyFV
6463
}
6564

66-
private def insertAtomic(k: K): F[Unit] = {
65+
private def insertAtomic(k: K, action: K => F[V]): F[Unit] = {
6766
mapRef(k).modify{
6867
case None =>
6968
(None, createEmptyIfUnset(k))
7069
case s@Some(cacheItem) =>
7170
(s, updateIfFailedThenCreate(k, cacheItem))
7271
}.flatMap{ maybeDeferred =>
7372
maybeDeferred.bracketCase(_.traverse_{ deferred =>
74-
createItem(k).attempt.flatMap(e => deferred.complete(e).attempt.void)
73+
action(k).attempt.flatMap(e => deferred.complete(e).attempt.void)
7574
}){
7675
case (Some(deferred), ExitCase.Canceled) => deferred.complete(CancelationDuringDispatchOneCacheInsertProcessing.asLeft).attempt.void
7776
case (Some(deferred), ExitCase.Error(e)) => deferred.complete(e.asLeft).attempt.void
@@ -80,6 +79,57 @@ final class DispatchOneCache[F[_], K, V] private[DispatchOneCache] (
8079
}
8180
}
8281

82+
/**
83+
* Gives an atomic only once loading function, or
84+
* gets the value in the system
85+
**/
86+
def lookupOrLoad(k: K, action: K => F[V]): F[V] = {
87+
C.monotonic(NANOSECONDS)
88+
.flatMap{now =>
89+
mapRef(k).modify[Option[DispatchOneCacheItem[F, V]]]{
90+
case s@Some(value) =>
91+
if (DispatchOneCache.isExpired(now, value)){
92+
(None, None)
93+
} else {
94+
(s, s)
95+
}
96+
case None =>
97+
(None, None)
98+
}
99+
}
100+
.flatMap{
101+
case Some(s) => s.item.get.flatMap{
102+
case Left(_) => insertAtomic(k, action) >> lookupOrLoad(k, action)
103+
case Right(v) => F.pure(v)
104+
}
105+
case None => insertAtomic(k, action) >> lookupOrLoad(k, action)
106+
}
107+
}
108+
109+
def insertWith(k: K, action: K => F[V]): F[Unit] = {
110+
for {
111+
defer <- Deferred.tryable[F, Either[Throwable, V]]
112+
now <- Clock[F].monotonic(NANOSECONDS)
113+
item = DispatchOneCacheItem(defer, defaultExpiration.map(spec => TimeSpec.unsafeFromNanos(now + spec.nanos))).some
114+
out <- mapRef(k).getAndSet(item)
115+
.bracketCase{oldDeferOpt =>
116+
action(k).flatMap[Unit]{ a =>
117+
val set = a.asRight
118+
oldDeferOpt.traverse_(oldDefer => oldDefer.item.complete(set)).attempt >>
119+
defer.complete(set)
120+
}
121+
}{
122+
case (_, ExitCase.Completed) => F.unit
123+
case (oldItem, ExitCase.Canceled) =>
124+
val set = CancelationDuringDispatchOneCacheInsertProcessing.asLeft
125+
oldItem.traverse_(_.item.complete(set)).attempt >> defer.complete(set).attempt.void
126+
case (oldItem, ExitCase.Error(e)) =>
127+
val set = e.asLeft
128+
oldItem.traverse_(_.item.complete(set)).attempt >> defer.complete(set).attempt.void
129+
}
130+
} yield out
131+
}
132+
83133
/**
84134
* Overrides any background insert
85135
**/
@@ -106,7 +156,7 @@ final class DispatchOneCache[F[_], K, V] private[DispatchOneCache] (
106156
defered <- Deferred.tryable[F, Either[Throwable, V]]
107157
setAs = v.asRight
108158
_ <- defered.complete(setAs)
109-
now <- C.monotonic(NANOSECONDS)
159+
now <- C.monotonic(NANOSECONDS)
110160
item = DispatchOneCacheItem(defered, optionTimeout.map(spec => TimeSpec.unsafeFromNanos(now + spec.nanos))).some
111161
action <- mapRef(k).modify{
112162
case None =>
@@ -117,18 +167,7 @@ final class DispatchOneCache[F[_], K, V] private[DispatchOneCache] (
117167
out <- action
118168
} yield out
119169

120-
def lookup(k: K): F[Option[V]] = get(k).map(_.some)
121-
122-
def delete(k: K): F[Unit] = mapRef(k).set(None)
123-
124-
/**
125-
* Lookup an item with the given key, and delete it if it is expired.
126-
*
127-
* The function will only return a value if it is present in the cache and if the item is not expired.
128-
*
129-
* The function will eagerly delete the item from the cache if it is expired.
130-
**/
131-
def get(k: K): F[V] = {
170+
def lookup(k: K): F[Option[V]] = {
132171
C.monotonic(NANOSECONDS)
133172
.flatMap{now =>
134173
mapRef(k).modify[Option[DispatchOneCacheItem[F, V]]]{
@@ -142,14 +181,16 @@ final class DispatchOneCache[F[_], K, V] private[DispatchOneCache] (
142181
(None, None)
143182
}
144183
}
145-
.flatMap{
146-
case Some(s) => s.item.get.flatMap{
147-
case Left(_) => insertAtomic(k) >> get(k)
148-
case Right(v) => F.pure(v)
184+
.flatMap{
185+
case Some(s) => s.item.get.map{
186+
case Left(_) => None
187+
case Right(v) => v.some
149188
}
150-
case None => insertAtomic(k) >> get(k)
151-
}
152-
}
189+
case None => F.pure(None)
190+
}
191+
}
192+
193+
def delete(k: K): F[Unit] = mapRef(k).set(None)
153194

154195
/**
155196
* Change the default expiration value of newly added cache items. Shares an underlying reference
@@ -160,7 +201,6 @@ final class DispatchOneCache[F[_], K, V] private[DispatchOneCache] (
160201
mapRef,
161202
purgeExpiredEntriesOpt,
162203
defaultExpiration,
163-
createItem
164204
)
165205

166206
/**
@@ -209,19 +249,16 @@ object DispatchOneCache {
209249
* If the specified default expiration value is None, items inserted by insert will never expire.
210250
**/
211251
def ofSingleImmutableMap[F[_]: Concurrent: Clock, K, V](
212-
createAction: K => F[V],
213252
defaultExpiration: Option[TimeSpec]
214253
): F[DispatchOneCache[F, K, V]] =
215254
Ref.of[F, Map[K, DispatchOneCacheItem[F, V]]](Map.empty[K, DispatchOneCacheItem[F, V]])
216255
.map(ref => new DispatchOneCache[F, K, V](
217256
MapRef.fromSingleImmutableMapRef(ref),
218257
{l: Long => SingleRef.purgeExpiredEntries(ref)(l)}.some,
219-
defaultExpiration,
220-
createAction
258+
defaultExpiration
221259
))
222260

223261
def ofShardedImmutableMap[F[_]: Concurrent : Clock, K, V](
224-
createAction: K => F[V],
225262
shardCount: Int,
226263
defaultExpiration: Option[TimeSpec]
227264
): F[DispatchOneCache[F, K, V]] =
@@ -230,12 +267,10 @@ object DispatchOneCache {
230267
_,
231268
None,
232269
defaultExpiration,
233-
createAction
234270
)
235271
}
236272

237273
def ofConcurrentHashMap[F[_]: Concurrent: Clock, K, V](
238-
createAction: K => F[V],
239274
defaultExpiration: Option[TimeSpec],
240275
initialCapacity: Int = 16,
241276
loadFactor: Float = 0.75f,
@@ -245,21 +280,18 @@ object DispatchOneCache {
245280
new DispatchOneCache[F, K, V](
246281
MapRef.fromConcurrentHashMap(chm),
247282
None,
248-
defaultExpiration,
249-
createAction
283+
defaultExpiration
250284
)
251285
}
252286

253287
def ofMapRef[F[_]: Concurrent: Clock, K, V](
254-
createAction: K => F[V],
255288
mr: MapRef[F, K, Option[DispatchOneCacheItem[F, V]]],
256289
defaultExpiration: Option[TimeSpec]
257290
): DispatchOneCache[F, K, V] = {
258291
new DispatchOneCache[F, K, V](
259292
mr,
260293
None,
261-
defaultExpiration,
262-
createAction
294+
defaultExpiration
263295
)
264296
}
265297

modules/core/src/test/scala/io/chrisdavenport/mules/DispatchOneCacheSpec.scala

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,11 @@ class DispatchOneCacheSpec extends Specification with CatsIO {
1414
"only run once" in {
1515
for {
1616
ref <- Ref[IO].of(0)
17-
cache <- DispatchOneCache.ofSingleImmutableMap[IO, Unit, Int](_ => Timer[IO].sleep(1.second) >> ref.modify(i => (i+1, i)), None)
18-
first <- cache.get(()).start
19-
second <- cache.get(()).start
20-
third <- cache.get(()).start
17+
cache <- DispatchOneCache.ofSingleImmutableMap[IO, Unit, Int](None)
18+
action = {_: Unit => Timer[IO].sleep(1.second) >> ref.modify(i => (i+1, i))}
19+
first <- cache.lookupOrLoad((), action).start
20+
second <- cache.lookupOrLoad((), action).start
21+
third <- cache.lookupOrLoad((), action).start
2122
_ <- first.join
2223
_ <- second.join
2324
_ <- third.join
@@ -29,10 +30,10 @@ class DispatchOneCacheSpec extends Specification with CatsIO {
2930
for {
3031
ref <- Ref[IO].of(0)
3132
errorFunction = ref.modify(i => (i+1, if (i > 3) i.pure[IO] else Timer[IO].sleep(1.second) >> IO.raiseError(new Throwable("whoopsie")))).flatten
32-
cache <- DispatchOneCache.ofSingleImmutableMap[IO, Unit, Int](_ => errorFunction, None)
33-
first <- cache.get(()).start
34-
second <- cache.get(()).start
35-
third <- cache.get(()).start
33+
cache <- DispatchOneCache.ofSingleImmutableMap[IO, Unit, Int](None)
34+
first <- cache.lookupOrLoad((), { _ => errorFunction}).start
35+
second <- cache.lookupOrLoad((), { _ => errorFunction}).start
36+
third <- cache.lookupOrLoad((), { _ => errorFunction}).start
3637
_ <- first.join
3738
_ <- second.join
3839
_ <- third.join
@@ -42,30 +43,33 @@ class DispatchOneCacheSpec extends Specification with CatsIO {
4243

4344
"insert places a value" in {
4445
for {
45-
cache <- DispatchOneCache.ofSingleImmutableMap[IO, Unit, Int](_ => IO.pure(5), None)
46+
cache <- DispatchOneCache.ofSingleImmutableMap[IO, Unit, Int](None)
47+
action = {_: Unit => IO.pure(5)}
4648
_ <- cache.insert((), 1)
47-
now <- cache.get(())
49+
now <- cache.lookupOrLoad((), action)
4850
} yield {
4951
now must_=== 1
5052
}
5153
}
5254

53-
"insert overrides background action for first action get" in {
55+
"insert overrides background action for first action" in {
5456
for {
55-
cache <- DispatchOneCache.ofSingleImmutableMap[IO, Unit, Int](_ => Timer[IO].sleep(5.seconds).as(5), None)
56-
first <- cache.get(()).start
57+
cache <- DispatchOneCache.ofSingleImmutableMap[IO, Unit, Int](None)
58+
action = {_: Unit => Timer[IO].sleep(5.seconds).as(5)}
59+
first <- cache.lookupOrLoad((), action).start
5760
_ <- cache.insert((), 1)
5861
value <- first.join
5962
} yield {
6063
value must_=== 1
6164
}
6265
}
6366

64-
"insert overrides background action for secondary action get" in {
67+
"insert overrides background action for secondary action" in {
6568
for {
66-
cache <- DispatchOneCache.ofSingleImmutableMap[IO, Unit, Int](_ => Timer[IO].sleep(5.seconds).as(5), None)
67-
first <- cache.get(()).start
68-
second <- cache.get(()).start
69+
cache <- DispatchOneCache.ofSingleImmutableMap[IO, Unit, Int](None)
70+
action = {_: Unit => Timer[IO].sleep(5.seconds).as(5)}
71+
first <- cache.lookupOrLoad((),action).start
72+
second <- cache.lookupOrLoad((), action).start
6973
_ <- cache.insert((), 1)
7074
resultSecond <- second.join
7175
_ <- first.cancel.timeout(1.second).attempt.void
@@ -77,10 +81,11 @@ class DispatchOneCacheSpec extends Specification with CatsIO {
7781

7882
"insert overrides set value" in {
7983
for {
80-
cache <- DispatchOneCache.ofSingleImmutableMap[IO, Unit, Int](_ => IO.pure(2), None)
81-
first <- cache.get(())
84+
cache <- DispatchOneCache.ofSingleImmutableMap[IO, Unit, Int](None)
85+
action = {_: Unit => IO.pure(2)}
86+
first <- cache.lookupOrLoad((), action)
8287
_ <- cache.insert((), 1)
83-
second <- cache.get(())
88+
second <- cache.lookupOrLoad((), action)
8489
} yield {
8590
(first,second).must_===((2, 1))
8691

0 commit comments

Comments
 (0)