Skip to content

Commit 9e2cca8

Browse files
committed
fix(map,reactive): make timeouts more resilient during processing, also handle the case that the map crashes and restart it
1 parent 85bbeb8 commit 9e2cca8

File tree

2 files changed

+59
-37
lines changed

2 files changed

+59
-37
lines changed

mini-reactive/src/main/scala/org/updraft0/minireactive/ReactiveEntity.scala

Lines changed: 53 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ import zio.metrics.*
77
import java.util.concurrent.TimeoutException
88

99
// TODO:
10-
// - [ ] Monitoring
11-
// - [ ] Timeouts for mailbox processing
1210
// - [ ] Comprehensive tests
1311

1412
/** A "reactive entity" is like a "persistent actor" - it has a typed {in,out}box, a hydrate/onStart state recovery
@@ -52,7 +50,9 @@ trait MiniReactive[K, I, O]:
5250

5351
def subscribe(key: K): URIO[Scope, Dequeue[O]]
5452

55-
def destroy(key: K): UIO[Boolean]
53+
def destroy(key: K): UIO[Unit]
54+
55+
private[minireactive] def restart(key: K): UIO[Unit]
5656

5757
object MiniReactive:
5858
private case class State[K, S, I, O](
@@ -76,45 +76,39 @@ object MiniReactive:
7676
entity: ReactiveEntity[R, K, S, I, O],
7777
config: MiniReactiveConfig
7878
): URIO[Scope & R, MiniReactive[K, I, O]] =
79-
for
79+
(for
8080
scope <- ZIO.environment[Scope]
8181
env <- ZIO.environment[R]
82-
s <- Semaphore.make(1)
83-
refs <- Ref.make(Map.empty[K, State[K, S, I, O]])
84-
sup <- Supervisor.track(true) // TODO - is this necessary?
85-
yield new MiniReactive[K, I, O]:
82+
refs <- Ref.Synchronized.make(Map.empty[K, State[K, S, I, O]])
83+
// there's a queue to restart
84+
restartQ <- Queue.unbounded[K]
85+
yield (restartQ -> new MiniReactive[K, I, O]:
8686

8787
override def enqueue(key: K): UIO[Enqueue[I]] =
88-
lookupOrCreate(key).map(_.inbox) @@ ReactiveEntity.Tag(entity.tag) @@ ReactiveEntity.Id[K].apply(key)
88+
lookupOrCreate(key).map(_.inbox) @@ ReactiveEntity.Tag(entity.tag) @@ ReactiveEntity.Id(key)
8989

9090
override def subscribe(key: K): URIO[Scope, Dequeue[O]] =
91-
lookupOrCreate(key).flatMap(_.outbox.subscribe)
91+
lookupOrCreate(key).flatMap(_.outbox.subscribe) @@ ReactiveEntity.Tag(entity.tag) @@ ReactiveEntity.Id(key)
9292

93-
override def destroy(key: K): UIO[Boolean] =
93+
override def destroy(key: K): UIO[Unit] =
9494
refs
95-
.modify(refs => refs.get(key) -> (refs - key))
96-
.flatMap(ZIO.fromOption)
97-
.flatMap(stop)
98-
.as(true)
99-
.orElseSucceed(false) @@ ReactiveEntity.Tag(entity.tag) @@ ReactiveEntity.Id[K].apply(key)
95+
.updateSomeZIO:
96+
case m if m.contains(key) =>
97+
stop(m(key)).as(m - key)
98+
@@ ReactiveEntity.Tag(entity.tag) @@ ReactiveEntity.Id(key)
10099

101100
private def lookupOrCreate(key: K): UIO[State[K, S, I, O]] =
102-
s.withPermit(
103-
refs.get.flatMap(allRefs =>
104-
allRefs.get(key) match
105-
case Some(state) => ZIO.succeed(state)
106-
case None => create(key).provideEnvironment(scope).tap(state => refs.update(m => m + (key -> state)))
107-
)
108-
)
101+
refs
102+
.updateSomeAndGetZIO:
103+
case m if !m.contains(key) => create(key).provideEnvironment(scope).map(state => m + (key -> state))
104+
.map(m => m(key))
109105

110106
private def create(key: K): URIO[Scope, State[K, S, I, O]] =
111107
for
112-
inbox <- Queue.bounded[I](config.mailboxSize)
113-
outbox <- Hub.bounded[O](config.mailboxSize)
114-
initialState <- entity
115-
.hydrate(key, inbox)
116-
.provideEnvironment(env.add(scope.get))
117-
.flatMap(Ref.make) // TODO add timeout here too
108+
_ <- ZIO.logInfo(s"starting entity")
109+
inbox <- Queue.bounded[I](config.mailboxSize)
110+
outbox <- Hub.bounded[O](config.mailboxSize)
111+
initialState <- runHydration(key, inbox)
118112
labels = Set(MetricLabel("entity_tag", entity.tag), MetricLabel("key", key.toString))
119113
inboxCounter = Metric.counter("entity_inbox_count").fromConst(1).tagged(labels)
120114
outboxCounter = Metric.counter("entity_outbox_count").tagged(labels)
@@ -124,6 +118,32 @@ object MiniReactive:
124118
fiber <- runEntity(key, inbox, outbox, initialState, inboxCounter, outboxCounter, processingTime)
125119
yield State(inbox, outbox, key, initialState, fiber, inboxCounter, outboxCounter, processingTime)
126120

121+
private def runHydration(key: K, inbox: Queue[I]) =
122+
entity
123+
.hydrate(key, inbox)
124+
.timeoutFailCause(Cause.die(new TimeoutException("entity failed to hydrate in time")))(
125+
config.handlerTimeout
126+
)
127+
.provideEnvironment(env.add(scope.get))
128+
.flatMap(Ref.make)
129+
130+
private[minireactive] def restart(key: K): UIO[Unit] =
131+
ZIO.logDebug("restarting entity") *> refs.updateZIO: m =>
132+
for
133+
prev = m(key)
134+
initialState <- runHydration(key, prev.inbox)
135+
fiber <- runEntity(
136+
key,
137+
prev.inbox,
138+
prev.outbox,
139+
initialState,
140+
prev.inboxCounter,
141+
prev.outboxCounter,
142+
prev.processingTime
143+
).provideEnvironment(env.add(scope.get))
144+
next = prev.copy(current = initialState, fiber = fiber)
145+
yield m.updated(key, next)
146+
127147
private def runEntity(
128148
key: K,
129149
inbox: Queue[I],
@@ -140,21 +160,21 @@ object MiniReactive:
140160
resT <- entity
141161
.handle(key, state, inMsg)
142162
.provideEnvironment(env)
143-
.timeoutFailCause(Cause.die(new TimeoutException("entity failed to process message")))(
163+
.timeoutFailCause(Cause.die(new TimeoutException("entity failed to process message in time")))(
144164
config.handlerTimeout
145165
)
146166
.timed
147167
(took, res) = resT
148168
_ <- processingTime.update(took.toMillis.toDouble)
149-
// FIXME do timeout
150169
(nextState, msgs) = res
151170
_ <- stateRef.set(nextState)
152171
_ <- outboxCounter.update(msgs.size)
153-
_ <- ZIO.iterate(msgs)(_.nonEmpty)(outbox.offerAll)
154-
yield ()).forever.supervised(sup).forkScoped
172+
_ <- outbox.offerAll(msgs)
173+
yield ()).forever.forkScoped.onError(c => ZIO.logDebugCause(s"failed for $key", c) *> restartQ.offer(key))
155174

156175
private def stop(state: State[K, S, I, O]): UIO[Unit] =
157176
ZIO.logInfo("stopping entity") *>
158177
state.fiber.interrupt.ignoreLogged *>
159178
state.inbox.shutdown *>
160179
state.outbox.shutdown
180+
)).flatMap((restartQ, res) => restartQ.take.flatMap(res.restart).forever.forkScoped.as(res))

server/src/main/scala/org/updraft0/controltower/server/map/MapReactive.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1110,16 +1110,18 @@ object MapEntity extends ReactiveEntity[MapEnv, MapId, MapState, Identified[MapR
11101110
rSigCount <- query.map
11111111
.hardDeleteMapWormholeSignatures(mapId, Chunk.fromIterable(hardDeleteSignatures))
11121112
.when(hardDeleteSignatures.nonEmpty)
1113-
.someOrElse(0)
1113+
.someOrElse(0L)
11141114
rCJumpCount <- query.map
11151115
.hardDeleteMapWormholeConnectionJumps(Chunk.fromIterable(hardDeleteConnectionIds))
11161116
.when(hardDeleteConnectionIds.nonEmpty)
1117-
.someOrElse(0)
1117+
.someOrElse(0L)
11181118
rConnCount <- query.map
11191119
.hardDeleteMapWormholeConnections(mapId, Chunk.fromIterable(hardDeleteConnectionIds))
11201120
.when(hardDeleteConnectionIds.nonEmpty)
1121-
.someOrElse(0)
1122-
_ <- ZIO.logTrace(s"Hard deleted $rSigCount sigs, $rCJumpCount jumps and $rConnCount connections")
1121+
.someOrElse(0L)
1122+
_ <- ZIO.when(rSigCount + rCJumpCount + rConnCount > 0L)(
1123+
ZIO.logTrace(s"Hard deleted $rSigCount sigs, $rCJumpCount jumps and $rConnCount connections")
1124+
)
11231125
// get expired connections
11241126
expiredConnections <- MapQueries.getWormholeConnectionsWithSigsExpiredOrEol(
11251127
mapId,

0 commit comments

Comments
 (0)