Skip to content

Commit 219c02d

Browse files
authored
Terminate entities when no termination message is provided (#30)
* Terminate entities when no termination message is provided * Fix test * Fix tests
1 parent 8d9dc8e commit 219c02d

File tree

5 files changed

+28
-8
lines changed

5 files changed

+28
-8
lines changed

entities/src/main/scala/com/devsisters/shardcake/Sharding.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class Sharding private (
4040

4141
val register: Task[Unit] =
4242
logger.logDebug(s"Registering pod $address to Shard Manager") *>
43+
isShuttingDownRef.set(false) *>
4344
shardManager.register(address)
4445

4546
val unregister: Task[Unit] =

entities/src/main/scala/com/devsisters/shardcake/internal/EntityManager.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,17 @@ private[shardcake] object EntityManager {
5555
entities.update(map =>
5656
map.get(entityId) match {
5757
case Some((Some(queue), interruptionFiber)) =>
58-
// if a queue is found, offer the termination message, and set the queue to None so that no new message is enqueued
5958
Promise
6059
.make[Nothing, Unit]
61-
.flatMap(p => ZIO.foreach(terminateMessage(p))(queue.offer).run)
62-
.as(map.updated(entityId, (None, interruptionFiber)))
60+
.flatMap { p =>
61+
terminateMessage(p) match {
62+
case Some(msg) =>
63+
// if a queue is found, offer the termination message, and set the queue to None so that no new message is enqueued
64+
queue.offer(msg).run.as(map.updated(entityId, (None, interruptionFiber)))
65+
case None =>
66+
queue.shutdown.as(map - entityId)
67+
}
68+
}
6369
case _ =>
6470
// if no queue is found, do nothing
6571
ZIO.succeed(map)
@@ -145,7 +151,7 @@ private[shardcake] object EntityManager {
145151
case Some(queue) =>
146152
terminateMessage(p) match {
147153
case Some(terminate) => queue.offer(terminate).catchAllCause(_ => p.succeed(()))
148-
case None => p.succeed(())
154+
case None => queue.shutdown *> p.succeed(())
149155
}
150156
case None => p.succeed(())
151157
}

entities/src/test/scala/com/devsisters/shardcake/ShardingSpec.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import zio.test.environment.TestEnvironment
1515
object ShardingSpec extends DefaultRunnableSpec {
1616

1717
private val layer =
18-
(Clock.live ++ Random.live ++ ZLayer.succeed(Config.default) >+>
18+
(Clock.live ++ Random.live ++ ZLayer.succeed(Config.default.copy(entityMaxIdleTime = 2 seconds)) >+>
1919
ShardManagerClient.local ++ Logging.debug ++ Pods.noop ++ Storage.memory ++ Serialization.javaSerialization >+>
2020
Sharding.live).mapError(TestFailure.fail)
2121

@@ -36,6 +36,16 @@ object ShardingSpec extends DefaultRunnableSpec {
3636
} yield assertTrue(c1 == 2) && assertTrue(c2 == 1)
3737
}
3838
},
39+
testM("Entity termination") {
40+
(Sharding.registerEntity(Counter, behavior) *> Sharding.registerManaged).use { _ =>
41+
for {
42+
counter <- Sharding.messenger(Counter)
43+
_ <- counter.send("c3")(GetCounter.apply)
44+
_ <- clock.sleep(3 seconds)
45+
c <- counter.send("c3")(GetCounter.apply)
46+
} yield assertTrue(c == 0)
47+
}
48+
},
3949
testM("Cluster singleton") {
4050
Sharding.registerManaged.use(_ =>
4151
for {

examples/src/main/scala/example/complex/GuildApp.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package example.complex
22

33
import com.devsisters.shardcake._
44
import com.devsisters.shardcake.interfaces.Logging
5-
import example.complex.GuildBehavior.GuildMessage.Join
5+
import example.complex.GuildBehavior.GuildMessage.{ Join, Terminate }
66
import example.complex.GuildBehavior._
77
import zio._
88
import zio.clock.Clock
@@ -16,7 +16,7 @@ object GuildApp extends zio.App {
1616
.toLayer
1717

1818
val program =
19-
(Sharding.registerEntity(Guild, behavior) *> Sharding.registerManaged).use { _ =>
19+
(Sharding.registerEntity(Guild, behavior, p => Some(Terminate(p))) *> Sharding.registerManaged).use { _ =>
2020
Sharding.messenger(Guild).flatMap { guild =>
2121
for {
2222
user1 <- random.nextUUID.map(_.toString)

examples/src/main/scala/example/complex/GuildBehavior.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package example.complex
33
import com.devsisters.shardcake.Messenger.Replier
44
import com.devsisters.shardcake.{ EntityType, Sharding }
55
import dev.profunktor.redis4cats.RedisCommands
6-
import zio.{ Dequeue, Has, RIO, Task, ZIO }
6+
import zio.{ Dequeue, Has, Promise, RIO, Task, ZIO }
77

88
import scala.util.{ Failure, Success, Try }
99

@@ -13,6 +13,7 @@ object GuildBehavior {
1313
object GuildMessage {
1414
case class Join(userId: String, replier: Replier[Try[Set[String]]]) extends GuildMessage
1515
case class Leave(userId: String) extends GuildMessage
16+
case class Terminate(p: Promise[Nothing, Unit]) extends GuildMessage
1617
}
1718

1819
object Guild extends EntityType[GuildMessage]("guild")
@@ -47,5 +48,7 @@ object GuildBehavior {
4748
)
4849
case GuildMessage.Leave(userId) =>
4950
redis.lRem(entityId, 1, userId).unit
51+
case GuildMessage.Terminate(p) =>
52+
p.succeed(()) *> ZIO.interrupt
5053
}
5154
}

0 commit comments

Comments
 (0)