Skip to content

Commit fc180c1

Browse files
authored
backport (#49)
1 parent 619173c commit fc180c1

File tree

3 files changed

+24
-12
lines changed

3 files changed

+24
-12
lines changed

entities/src/main/scala/com/devsisters/shardcake/Sharding.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -323,9 +323,10 @@ class Sharding private (
323323
private def registerEntity[R, Req: Tag](
324324
entityType: EntityType[Req],
325325
behavior: (String, Dequeue[Req]) => RIO[R, Nothing],
326-
terminateMessage: Promise[Nothing, Unit] => Option[Req] = (_: Promise[Nothing, Unit]) => None
326+
terminateMessage: Promise[Nothing, Unit] => Option[Req] = (_: Promise[Nothing, Unit]) => None,
327+
entityMaxIdleTime: Option[Duration] = None
327328
): ZManaged[Clock with R, Nothing, Unit] =
328-
registerRecipient(entityType, behavior, terminateMessage) *>
329+
registerRecipient(entityType, behavior, terminateMessage, entityMaxIdleTime) *>
329330
eventsHub.publish(ShardingRegistrationEvent.EntityRegistered(entityType)).unit.toManaged_
330331

331332
private def registerTopic[R, Req: Tag](
@@ -341,10 +342,12 @@ class Sharding private (
341342
def registerRecipient[R, Req: Tag](
342343
recipientType: RecipientType[Req],
343344
behavior: (String, Dequeue[Req]) => RIO[R, Nothing],
344-
terminateMessage: Promise[Nothing, Unit] => Option[Req] = (_: Promise[Nothing, Unit]) => None
345+
terminateMessage: Promise[Nothing, Unit] => Option[Req] = (_: Promise[Nothing, Unit]) => None,
346+
entityMaxIdleTime: Option[Duration] = None
345347
): ZManaged[Clock with R, Nothing, Unit] =
346348
for {
347-
entityManager <- EntityManager.make(recipientType, behavior, terminateMessage, self, config).toManaged_
349+
entityManager <-
350+
EntityManager.make(recipientType, behavior, terminateMessage, self, config, entityMaxIdleTime).toManaged_
348351
binaryQueue <- Queue
349352
.unbounded[(BinaryMessage, Promise[Throwable, Option[Array[Byte]]], Promise[Nothing, Unit])]
350353
.toManaged(_.shutdown)
@@ -482,11 +485,12 @@ object Sharding {
482485
def registerEntity[R, Req: Tag](
483486
entityType: EntityType[Req],
484487
behavior: (String, Dequeue[Req]) => RIO[R, Nothing],
485-
terminateMessage: Promise[Nothing, Unit] => Option[Req] = (_: Promise[Nothing, Unit]) => None
488+
terminateMessage: Promise[Nothing, Unit] => Option[Req] = (_: Promise[Nothing, Unit]) => None,
489+
entityMaxIdleTime: Option[Duration] = None
486490
): ZManaged[Has[Sharding] with R with Clock, Nothing, Unit] =
487491
for {
488492
sharding <- ZIO.service[Sharding].toManaged_
489-
_ <- sharding.registerEntity[R, Req](entityType, behavior, terminateMessage)
493+
_ <- sharding.registerEntity[R, Req](entityType, behavior, terminateMessage, entityMaxIdleTime)
490494
} yield ()
491495

492496
/**

entities/src/main/scala/com/devsisters/shardcake/internal/EntityManager.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ private[shardcake] object EntityManager {
2323
behavior: (String, Queue[Req]) => RIO[R, Nothing],
2424
terminateMessage: Promise[Nothing, Unit] => Option[Req],
2525
sharding: Sharding,
26-
config: Config
26+
config: Config,
27+
entityMaxIdleTime: Option[Duration]
2728
): URIO[R with Clock, EntityManager[Req]] =
2829
for {
2930
entities <- RefM.make[Map[String, (Option[Queue[Req]], Fiber[Nothing, Unit])]](Map())
@@ -36,6 +37,7 @@ private[shardcake] object EntityManager {
3637
entities,
3738
sharding,
3839
config,
40+
entityMaxIdleTime,
3941
clock
4042
)
4143

@@ -46,11 +48,12 @@ private[shardcake] object EntityManager {
4648
entities: RefM[Map[String, (Option[Queue[Req]], Fiber[Nothing, Unit])]],
4749
sharding: Sharding,
4850
config: Config,
51+
entityMaxIdleTime: Option[Duration],
4952
clock: Clock.Service
5053
) extends EntityManager[Req] {
5154
private def startExpirationFiber(entityId: String): UIO[Fiber[Nothing, Unit]] =
5255
(for {
53-
_ <- clock.sleep(config.entityMaxIdleTime)
56+
_ <- clock.sleep(entityMaxIdleTime getOrElse config.entityMaxIdleTime)
5457
_ <- terminateEntity(entityId).forkDaemon.unit // fork daemon otherwise it will interrupt itself
5558
} yield ()).forkDaemon
5659

entities/src/test/scala/com/devsisters/shardcake/ShardingSpec.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,18 @@ object ShardingSpec extends DefaultRunnableSpec {
3636
}
3737
},
3838
testM("Entity termination") {
39-
(Sharding.registerEntity(Counter, behavior) *> Sharding.registerManaged).use { _ =>
39+
(Sharding.registerEntity(
40+
Counter,
41+
behavior,
42+
entityMaxIdleTime = Some(1.seconds)
43+
) *> Sharding.registerManaged).use { _ =>
4044
for {
4145
counter <- Sharding.messenger(Counter)
42-
_ <- counter.send("c3")(GetCounter.apply)
46+
_ <- counter.sendDiscard("c3")(IncrementCounter)
47+
c0 <- counter.send("c3")(GetCounter.apply)
4348
_ <- clock.sleep(3 seconds)
44-
c <- counter.send("c3")(GetCounter.apply)
45-
} yield assertTrue(c == 0)
49+
c1 <- counter.send("c3")(GetCounter.apply) // counter should be restarted
50+
} yield assertTrue(c0 == 1, c1 == 0)
4651
}
4752
},
4853
testM("Cluster singleton") {

0 commit comments

Comments
 (0)