diff --git a/app/models/job/Worker.scala b/app/models/job/Worker.scala index 174e02fb6c6..ded33b2ec28 100644 --- a/app/models/job/Worker.scala +++ b/app/models/job/Worker.scala @@ -7,6 +7,7 @@ import com.scalableminds.util.objectid.ObjectId import com.scalableminds.util.time.Instant import com.scalableminds.util.tools.Fox import com.scalableminds.webknossos.datastore.helpers.IntervalScheduler +import com.scalableminds.webknossos.datastore.storage.TemporaryStore import com.scalableminds.webknossos.schema.Tables._ import com.typesafe.scalalogging.LazyLogging import models.job.JobCommand.JobCommand @@ -101,6 +102,8 @@ class WorkerService @Inject()(conf: WkConf) { class WorkerLivenessService @Inject()(workerService: WorkerService, workerDAO: WorkerDAO, slackNotificationService: SlackNotificationService, + reportedAsDeadTemporaryStore: TemporaryStore[ObjectId, Unit], + conf: WkConf, val lifecycle: ApplicationLifecycle, val actorSystem: ActorSystem)(implicit val ec: ExecutionContext) extends IntervalScheduler @@ -117,32 +120,33 @@ class WorkerLivenessService @Inject()(workerService: WorkerService, _ = workers.foreach(reportIfLivenessChanged) } yield () - private val reportedAsDead: scala.collection.mutable.Set[ObjectId] = scala.collection.mutable.Set() - private def reportIfLivenessChanged(worker: Worker): Unit = { val heartBeatIsRecent = workerService.lastHeartBeatIsRecent(worker) - if (!heartBeatIsRecent && !reportedAsDead.contains(worker._id)) { + if (!heartBeatIsRecent && !reportedAsDeadTemporaryStore.contains(worker._id)) { reportAsDead(worker) - reportedAsDead.add(worker._id) + reportedAsDeadTemporaryStore.insert(worker._id, (), to = Some(conf.Jobs.workerLivenessReReportInterval)) } - if (heartBeatIsRecent && reportedAsDead.contains(worker._id)) { + if (heartBeatIsRecent && reportedAsDeadTemporaryStore.contains(worker._id)) { reportAsResurrected(worker) - reportedAsDead.remove(worker._id) + reportedAsDeadTemporaryStore.remove(worker._id) } } private def reportAsDead(worker: Worker): Unit = { val msg = - s"Worker ${worker.name} (${worker._id}) is not reporting. Last heartbeat was at ${worker.lastHeartBeat}" + s"Worker ${worker.name} (${worker._id}) is not reporting. ${formatHeartbeat(worker)}" slackNotificationService.warn("Worker missing", msg) logger.warn(msg) } private def reportAsResurrected(worker: Worker): Unit = { val msg = - s"Worker ${worker.name} (${worker._id}) is reporting again. Last heartbeat was at ${worker.lastHeartBeat}" + s"Worker ${worker.name} (${worker._id}) is reporting again. ${formatHeartbeat(worker)}" slackNotificationService.success("Worker return", msg) logger.info(msg) } + private def formatHeartbeat(worker: Worker): String = + f"Last heartbeat was at ${worker.lastHeartBeat} (${formatDuration(Instant.since(worker.lastHeartBeat))} ago)" + } diff --git a/app/utils/WkConf.scala b/app/utils/WkConf.scala index 544aa1870c7..92a09206f58 100644 --- a/app/utils/WkConf.scala +++ b/app/utils/WkConf.scala @@ -224,6 +224,7 @@ class WkConf @Inject()(configuration: Configuration, certificateValidationServic object Jobs { val workerLivenessTimeout: FiniteDuration = get[FiniteDuration]("jobs.workerLivenessTimeout") + val workerLivenessReReportInterval: FiniteDuration = get[FiniteDuration]("jobs.workerLivenessReReportInterval") val monthlyFreeCredits: Int = get[Int]("jobs.monthlyFreeCredits") } diff --git a/conf/application.conf b/conf/application.conf index b397c04350e..0e5b2074178 100644 --- a/conf/application.conf +++ b/conf/application.conf @@ -321,6 +321,7 @@ silhouette { # Execute long-running jobs jobs { workerLivenessTimeout = 1 minute + workerLivenessReReportInterval = 1 hour monthlyFreeCredits = 2 }