Skip to content

Commit 26ed6f9

Browse files
committed
Terminate entities when no termination message is provided
1 parent d415359 commit 26ed6f9

File tree

5 files changed

+29
-7
lines changed

5 files changed

+29
-7
lines changed

entities/src/main/scala/com/devsisters/shardcake/Sharding.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ class Sharding private (
3434

3535
val register: Task[Unit] =
3636
ZIO.logDebug(s"Registering pod $address to Shard Manager") *>
37+
isShuttingDownRef.set(false) *>
3738
shardManager.register(address)
3839

3940
val unregister: Task[Unit] =

entities/src/main/scala/com/devsisters/shardcake/internal/EntityManager.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,17 @@ private[shardcake] object EntityManager {
5050
entities.updateZIO(map =>
5151
map.get(entityId) match {
5252
case Some((Some(queue), interruptionFiber)) =>
53-
// if a queue is found, offer the termination message, and set the queue to None so that no new message is enqueued
5453
Promise
5554
.make[Nothing, Unit]
56-
.flatMap(p => ZIO.foreach(terminateMessage(p))(queue.offer).exit)
57-
.as(map.updated(entityId, (None, interruptionFiber)))
55+
.flatMap { p =>
56+
terminateMessage(p) match {
57+
case Some(msg) =>
58+
// if a queue is found, offer the termination message, and set the queue to None so that no new message is enqueued
59+
queue.offer(msg).exit.as(map.updated(entityId, (None, interruptionFiber)))
60+
case None =>
61+
queue.shutdown.as(map - entityId)
62+
}
63+
}
5864
case _ =>
5965
// if no queue is found, do nothing
6066
ZIO.succeed(map)
@@ -139,7 +145,7 @@ private[shardcake] object EntityManager {
139145
case Some(queue) =>
140146
terminateMessage(p) match {
141147
case Some(terminate) => queue.offer(terminate).catchAllCause(_ => p.succeed(()))
142-
case None => p.succeed(())
148+
case None => queue.shutdown *> p.succeed(())
143149
}
144150
case None => p.succeed(())
145151
}

entities/src/test/scala/com/devsisters/shardcake/ShardingSpec.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,18 @@ object ShardingSpec extends ZIOSpecDefault {
2828
} yield assertTrue(c1 == 2) && assertTrue(c2 == 1)
2929
}
3030
},
31+
test("Entity termination") {
32+
ZIO.scoped {
33+
for {
34+
_ <- Sharding.registerEntity(Counter, behavior)
35+
_ <- Sharding.registerScoped
36+
counter <- Sharding.messenger(Counter)
37+
_ <- counter.send("c3")(GetCounter.apply)
38+
_ <- Clock.sleep(3 seconds)
39+
c <- counter.send("c3")(GetCounter.apply)
40+
} yield assertTrue(c == 0)
41+
}
42+
},
3143
test("Cluster singleton") {
3244
ZIO.scoped {
3345
for {

examples/src/main/scala/example/complex/GuildApp.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package example.complex
33
import com.devsisters.shardcake._
44
import com.devsisters.shardcake.interfaces.Serialization
55
import dev.profunktor.redis4cats.RedisCommands
6-
import example.complex.GuildBehavior.GuildMessage.Join
6+
import example.complex.GuildBehavior.GuildMessage.{ Join, Terminate }
77
import example.complex.GuildBehavior._
88
import zio._
99

@@ -17,7 +17,7 @@ object GuildApp extends ZIOAppDefault {
1717

1818
val program: ZIO[Sharding with Scope with Serialization with RedisCommands[Task, String, String], Throwable, Unit] =
1919
for {
20-
_ <- Sharding.registerEntity(Guild, behavior)
20+
_ <- Sharding.registerEntity(Guild, behavior, p => Some(Terminate(p)))
2121
_ <- Sharding.registerScoped
2222
guild <- Sharding.messenger(Guild)
2323
user1 <- Random.nextUUID.map(_.toString)

examples/src/main/scala/example/complex/GuildBehavior.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package example.complex
33
import com.devsisters.shardcake.Messenger.Replier
44
import com.devsisters.shardcake.{ EntityType, Sharding }
55
import dev.profunktor.redis4cats.RedisCommands
6-
import zio.{ Dequeue, RIO, Task, ZIO }
6+
import zio.{ Dequeue, Promise, RIO, Task, ZIO }
77

88
import scala.util.{ Failure, Success, Try }
99

@@ -13,6 +13,7 @@ object GuildBehavior {
1313
object GuildMessage {
1414
case class Join(userId: String, replier: Replier[Try[Set[String]]]) extends GuildMessage
1515
case class Leave(userId: String) extends GuildMessage
16+
case class Terminate(p: Promise[Nothing, Unit]) extends GuildMessage
1617
}
1718

1819
object Guild extends EntityType[GuildMessage]("guild")
@@ -45,5 +46,7 @@ object GuildBehavior {
4546
)
4647
case GuildMessage.Leave(userId) =>
4748
redis.lRem(entityId, 1, userId).unit
49+
case GuildMessage.Terminate(p) =>
50+
p.succeed(()) *> ZIO.interrupt
4851
}
4952
}

0 commit comments

Comments
 (0)