From 3cae1ebfc68d0d8738c6f0d2b84005631972bcc8 Mon Sep 17 00:00:00 2001 From: Florian M Date: Wed, 15 Oct 2025 10:51:24 +0200 Subject: [PATCH 1/2] =?UTF-8?q?Improve=20=E2=80=9CWorker=20missing?= =?UTF-8?q?=E2=80=9D=20notifications?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/models/job/Worker.scala | 21 +++++++++++++-------- app/utils/WkConf.scala | 1 + conf/application.conf | 1 + 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/app/models/job/Worker.scala b/app/models/job/Worker.scala index 174e02fb6c6..ef3098ec49b 100644 --- a/app/models/job/Worker.scala +++ b/app/models/job/Worker.scala @@ -7,8 +7,10 @@ import com.scalableminds.util.objectid.ObjectId import com.scalableminds.util.time.Instant import com.scalableminds.util.tools.Fox import com.scalableminds.webknossos.datastore.helpers.IntervalScheduler +import com.scalableminds.webknossos.datastore.storage.TemporaryStore import com.scalableminds.webknossos.schema.Tables._ import com.typesafe.scalalogging.LazyLogging +import models.annotation.Annotation import models.job.JobCommand.JobCommand import play.api.inject.ApplicationLifecycle import play.api.libs.json.{JsObject, Json} @@ -101,6 +103,8 @@ class WorkerService @Inject()(conf: WkConf) { class WorkerLivenessService @Inject()(workerService: WorkerService, workerDAO: WorkerDAO, slackNotificationService: SlackNotificationService, + reportedAsDeadTemporaryStore: TemporaryStore[ObjectId, Unit], + conf: WkConf, val lifecycle: ApplicationLifecycle, val actorSystem: ActorSystem)(implicit val ec: ExecutionContext) extends IntervalScheduler @@ -117,32 +121,33 @@ class WorkerLivenessService @Inject()(workerService: WorkerService, _ = workers.foreach(reportIfLivenessChanged) } yield () - private val reportedAsDead: scala.collection.mutable.Set[ObjectId] = scala.collection.mutable.Set() - private def reportIfLivenessChanged(worker: Worker): Unit = { val heartBeatIsRecent = workerService.lastHeartBeatIsRecent(worker) - if (!heartBeatIsRecent && !reportedAsDead.contains(worker._id)) { + if (!heartBeatIsRecent && !reportedAsDeadTemporaryStore.contains(worker._id)) { reportAsDead(worker) - reportedAsDead.add(worker._id) + reportedAsDeadTemporaryStore.insert(worker._id, (), to = Some(conf.Jobs.workerLivenessReReportInterval)) } - if (heartBeatIsRecent && reportedAsDead.contains(worker._id)) { + if (heartBeatIsRecent && reportedAsDeadTemporaryStore.contains(worker._id)) { reportAsResurrected(worker) - reportedAsDead.remove(worker._id) + reportedAsDeadTemporaryStore.remove(worker._id) } } private def reportAsDead(worker: Worker): Unit = { val msg = - s"Worker ${worker.name} (${worker._id}) is not reporting. Last heartbeat was at ${worker.lastHeartBeat}" + s"Worker ${worker.name} (${worker._id}) is not reporting. ${formatHeartbeat(worker)}" slackNotificationService.warn("Worker missing", msg) logger.warn(msg) } private def reportAsResurrected(worker: Worker): Unit = { val msg = - s"Worker ${worker.name} (${worker._id}) is reporting again. Last heartbeat was at ${worker.lastHeartBeat}" + s"Worker ${worker.name} (${worker._id}) is reporting again. ${formatHeartbeat(worker)}" slackNotificationService.success("Worker return", msg) logger.info(msg) } + private def formatHeartbeat(worker: Worker): String = + f"Last heartbeat was at ${worker.lastHeartBeat} (${formatDuration(Instant.since(worker.lastHeartBeat))} ago)" + } diff --git a/app/utils/WkConf.scala b/app/utils/WkConf.scala index 3e6171bf031..b7bf730f76d 100644 --- a/app/utils/WkConf.scala +++ b/app/utils/WkConf.scala @@ -223,6 +223,7 @@ class WkConf @Inject()(configuration: Configuration, certificateValidationServic object Jobs { val workerLivenessTimeout: FiniteDuration = get[FiniteDuration]("jobs.workerLivenessTimeout") + val workerLivenessReReportInterval: FiniteDuration = get[FiniteDuration]("jobs.workerLivenessReReportInterval") val monthlyFreeCredits: Int = get[Int]("jobs.monthlyFreeCredits") } diff --git a/conf/application.conf b/conf/application.conf index 9615f3c3da9..6474fe3bbda 100644 --- a/conf/application.conf +++ b/conf/application.conf @@ -319,6 +319,7 @@ silhouette { # Execute long-running jobs jobs { workerLivenessTimeout = 1 minute + workerLivenessReReportInterval = 1 hour monthlyFreeCredits = 2 } From 3cdc31071f1113664e4e38ff88d0674f76259d86 Mon Sep 17 00:00:00 2001 From: Florian M Date: Wed, 15 Oct 2025 10:58:32 +0200 Subject: [PATCH 2/2] unused import --- app/models/job/Worker.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/app/models/job/Worker.scala b/app/models/job/Worker.scala index ef3098ec49b..ded33b2ec28 100644 --- a/app/models/job/Worker.scala +++ b/app/models/job/Worker.scala @@ -10,7 +10,6 @@ import com.scalableminds.webknossos.datastore.helpers.IntervalScheduler import com.scalableminds.webknossos.datastore.storage.TemporaryStore import com.scalableminds.webknossos.schema.Tables._ import com.typesafe.scalalogging.LazyLogging -import models.annotation.Annotation import models.job.JobCommand.JobCommand import play.api.inject.ApplicationLifecycle import play.api.libs.json.{JsObject, Json}