Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions app/models/job/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.scalableminds.util.objectid.ObjectId
import com.scalableminds.util.time.Instant
import com.scalableminds.util.tools.Fox
import com.scalableminds.webknossos.datastore.helpers.IntervalScheduler
import com.scalableminds.webknossos.datastore.storage.TemporaryStore
import com.scalableminds.webknossos.schema.Tables._
import com.typesafe.scalalogging.LazyLogging
import models.job.JobCommand.JobCommand
Expand Down Expand Up @@ -101,6 +102,8 @@ class WorkerService @Inject()(conf: WkConf) {
class WorkerLivenessService @Inject()(workerService: WorkerService,
workerDAO: WorkerDAO,
slackNotificationService: SlackNotificationService,
reportedAsDeadTemporaryStore: TemporaryStore[ObjectId, Unit],
conf: WkConf,
val lifecycle: ApplicationLifecycle,
val actorSystem: ActorSystem)(implicit val ec: ExecutionContext)
extends IntervalScheduler
Expand All @@ -117,32 +120,33 @@ class WorkerLivenessService @Inject()(workerService: WorkerService,
_ = workers.foreach(reportIfLivenessChanged)
} yield ()

private val reportedAsDead: scala.collection.mutable.Set[ObjectId] = scala.collection.mutable.Set()

private def reportIfLivenessChanged(worker: Worker): Unit = {
val heartBeatIsRecent = workerService.lastHeartBeatIsRecent(worker)
if (!heartBeatIsRecent && !reportedAsDead.contains(worker._id)) {
if (!heartBeatIsRecent && !reportedAsDeadTemporaryStore.contains(worker._id)) {
reportAsDead(worker)
reportedAsDead.add(worker._id)
reportedAsDeadTemporaryStore.insert(worker._id, (), to = Some(conf.Jobs.workerLivenessReReportInterval))
}
if (heartBeatIsRecent && reportedAsDead.contains(worker._id)) {
if (heartBeatIsRecent && reportedAsDeadTemporaryStore.contains(worker._id)) {
reportAsResurrected(worker)
reportedAsDead.remove(worker._id)
reportedAsDeadTemporaryStore.remove(worker._id)
}
}

private def reportAsDead(worker: Worker): Unit = {
val msg =
s"Worker ${worker.name} (${worker._id}) is not reporting. Last heartbeat was at ${worker.lastHeartBeat}"
s"Worker ${worker.name} (${worker._id}) is not reporting. ${formatHeartbeat(worker)}"
slackNotificationService.warn("Worker missing", msg)
logger.warn(msg)
}

private def reportAsResurrected(worker: Worker): Unit = {
val msg =
s"Worker ${worker.name} (${worker._id}) is reporting again. Last heartbeat was at ${worker.lastHeartBeat}"
s"Worker ${worker.name} (${worker._id}) is reporting again. ${formatHeartbeat(worker)}"
slackNotificationService.success("Worker return", msg)
logger.info(msg)
}

private def formatHeartbeat(worker: Worker): String =
f"Last heartbeat was at ${worker.lastHeartBeat} (${formatDuration(Instant.since(worker.lastHeartBeat))} ago)"

}
1 change: 1 addition & 0 deletions app/utils/WkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ class WkConf @Inject()(configuration: Configuration, certificateValidationServic

object Jobs {
val workerLivenessTimeout: FiniteDuration = get[FiniteDuration]("jobs.workerLivenessTimeout")
val workerLivenessReReportInterval: FiniteDuration = get[FiniteDuration]("jobs.workerLivenessReReportInterval")
val monthlyFreeCredits: Int = get[Int]("jobs.monthlyFreeCredits")
}

Expand Down
1 change: 1 addition & 0 deletions conf/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ silhouette {
# Execute long-running jobs
jobs {
workerLivenessTimeout = 1 minute
workerLivenessReReportInterval = 1 hour
monthlyFreeCredits = 2
}

Expand Down