diff --git a/manager/src/main/scala/com/devsisters/shardcake/ShardManager.scala b/manager/src/main/scala/com/devsisters/shardcake/ShardManager.scala index e4e6201..02d9241 100644 --- a/manager/src/main/scala/com/devsisters/shardcake/ShardManager.scala +++ b/manager/src/main/scala/com/devsisters/shardcake/ShardManager.scala @@ -42,7 +42,7 @@ class ShardManager( _ <- ManagerMetrics.pods.increment _ <- eventsHub.publish(ShardingEvent.PodRegistered(pod.address)) _ <- ZIO.when(state.unassignedShards.nonEmpty)(rebalance(rebalanceImmediately = false)) - _ <- persistPods.forkDaemon + _ <- persistPods } yield (), onFalse = ZIO.logWarning(s"Pod $pod requested to register but is not alive, ignoring") *> ZIO.fail(new RuntimeException(s"Pod $pod is not healthy, refusing to register")) @@ -160,14 +160,15 @@ class ShardManager( _ <- (Clock.sleep(config.rebalanceRetryInterval) *> rebalance(rebalanceImmediately)).forkDaemon .when(failedPods.nonEmpty && rebalanceImmediately) // persist state changes to Redis - _ <- persistAssignments.forkDaemon.when(areChanges) + _ <- persistAssignments.when(areChanges) } yield () } - private def withRetry[E, A](zio: IO[E, A]): UIO[Unit] = + private def withRetry[A](zio: Task[A]): UIO[Unit] = zio .retry[Any, Any](Schedule.spaced(config.persistRetryInterval) && Schedule.recurs(config.persistRetryCount)) - .ignore + .orDie + .unit private def persistAssignments: UIO[Unit] = withRetry( @@ -260,7 +261,7 @@ object ShardManager { ZIO.logWarningCause("Failed to persist pods on shutdown", cause) ) } - _ <- shardManager.persistPods.forkDaemon + _ <- shardManager.persistPods // rebalance immediately if there are unassigned shards _ <- shardManager.rebalance(rebalanceImmediately = initialState.unassignedShards.nonEmpty).forkDaemon // start a regular rebalance at the given interval