Skip to content

Commit 56eab69

Browse files
authored
Wait on previously terminating entities when terminating all entities (#87)
1 parent ef803c8 commit 56eab69

File tree

1 file changed

+21
-19
lines changed

1 file changed

+21
-19
lines changed

entities/src/main/scala/com/devsisters/shardcake/internal/EntityManager.scala

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,18 @@ private[shardcake] trait EntityManager[-Req] {
1818
}
1919

2020
private[shardcake] object EntityManager {
21+
private type Signal = Promise[Nothing, Unit]
22+
2123
def make[R, Req: Tag](
2224
recipientType: RecipientType[Req],
2325
behavior: (String, Queue[Req]) => RIO[R, Nothing],
24-
terminateMessage: Promise[Nothing, Unit] => Option[Req],
26+
terminateMessage: Signal => Option[Req],
2527
sharding: Sharding,
2628
config: Config,
2729
entityMaxIdleTime: Option[Duration]
2830
): URIO[R with Clock, EntityManager[Req]] =
2931
for {
30-
entities <- RefM.make[Map[String, (Option[Queue[Req]], Fiber[Nothing, Unit])]](Map())
32+
entities <- RefM.make[Map[String, (Either[Queue[Req], Signal], Fiber[Nothing, Unit])]](Map())
3133
env <- ZIO.environment[R]
3234
clock <- ZIO.service[Clock.Service]
3335
} yield new EntityManagerLive[Req](
@@ -44,8 +46,8 @@ private[shardcake] object EntityManager {
4446
class EntityManagerLive[Req](
4547
recipientType: RecipientType[Req],
4648
behavior: (String, Queue[Req]) => Task[Nothing],
47-
terminateMessage: Promise[Nothing, Unit] => Option[Req],
48-
entities: RefM[Map[String, (Option[Queue[Req]], Fiber[Nothing, Unit])]],
49+
terminateMessage: Signal => Option[Req],
50+
entities: RefM[Map[String, (Either[Queue[Req], Signal], Fiber[Nothing, Unit])]],
4951
sharding: Sharding,
5052
config: Config,
5153
entityMaxIdleTime: Option[Duration],
@@ -60,14 +62,14 @@ private[shardcake] object EntityManager {
6062
private def terminateEntity(entityId: String): UIO[Unit] =
6163
entities.update(map =>
6264
map.get(entityId) match {
63-
case Some((Some(queue), interruptionFiber)) =>
65+
case Some((Left(queue), interruptionFiber)) =>
6466
Promise
6567
.make[Nothing, Unit]
6668
.flatMap { p =>
6769
terminateMessage(p) match {
6870
case Some(msg) =>
6971
// if a queue is found, offer the termination message, and set the queue to None so that no new message is enqueued
70-
queue.offer(msg).run.as(map.updated(entityId, (None, interruptionFiber)))
72+
queue.offer(msg).run.as(map.updated(entityId, (Right(p), interruptionFiber)))
7173
case None =>
7274
queue.shutdown.as(map - entityId)
7375
}
@@ -96,13 +98,13 @@ private[shardcake] object EntityManager {
9698
// find the queue for that entity, or create it if needed
9799
queue <- entities.modify(map =>
98100
map.get(entityId) match {
99-
case Some((queue @ Some(_), expirationFiber)) =>
101+
case Some((queue @ Left(_), expirationFiber)) =>
100102
// queue exists, delay the interruption fiber and return the queue
101103
expirationFiber.interrupt *>
102104
startExpirationFiber(entityId).map(fiber => (queue, map.updated(entityId, (queue, fiber))))
103-
case Some((None, _)) =>
105+
case Some((p @ Right(_), _)) =>
104106
// the queue is shutting down, stash and retry
105-
ZIO.succeed((None, map))
107+
ZIO.succeed((p, map))
106108
case None =>
107109
sharding.isShuttingDown.flatMap {
108110
case true =>
@@ -121,16 +123,16 @@ private[shardcake] object EntityManager {
121123
queue.shutdown *> expirationFiber.interrupt
122124
)
123125
.forkDaemon
124-
someQueue = Some(queue)
125-
} yield (someQueue, map.updated(entityId, (someQueue, expirationFiber)))
126+
leftQueue = Left(queue)
127+
} yield (leftQueue, map.updated(entityId, (leftQueue, expirationFiber)))
126128
}
127129
}
128130
)
129131
_ <- queue match {
130-
case None =>
132+
case Right(_) =>
131133
// the queue is shutting down, try again a little later
132134
clock.sleep(100 millis) *> send(entityId, req, replyId, promise)
133-
case Some(queue) =>
135+
case Left(queue) =>
134136
// add the message to the queue and setup the response promise if needed
135137
(replyId match {
136138
case Some(replyId) => sharding.initReply(replyId, promise) *> queue.offer(req)
@@ -152,21 +154,21 @@ private[shardcake] object EntityManager {
152154
entities.getAndSet(Map()).flatMap(terminateEntities)
153155

154156
private def terminateEntities(
155-
entitiesToTerminate: Map[String, (Option[Queue[Req]], Fiber[Nothing, Unit])]
157+
entitiesToTerminate: Map[String, (Either[Queue[Req], Signal], Fiber[Nothing, Unit])]
156158
): UIO[Unit] =
157159
for {
158160
// send termination message to all entities
159161
promises <- ZIO.foreach(entitiesToTerminate.toList) { case (_, (queue, _)) =>
160162
Promise
161163
.make[Nothing, Unit]
162-
.tap(p =>
164+
.flatMap(p =>
163165
queue match {
164-
case Some(queue) =>
165-
terminateMessage(p) match {
166+
case Left(queue) =>
167+
(terminateMessage(p) match {
166168
case Some(terminate) => queue.offer(terminate).catchAllCause(_ => p.succeed(()))
167169
case None => queue.shutdown *> p.succeed(())
168-
}
169-
case None => p.succeed(())
170+
}).as(p)
171+
case Right(p) => ZIO.succeed(p)
170172
}
171173
)
172174
}

0 commit comments

Comments
 (0)