Skip to content

Commit fb447ae

Browse files
authored
Add ability to skip scheduling unique jobs (#14838)
Debugging showed that cancelling jobs that started executing but i.e. waiting for a lock is a costly operation. This PR adds optimization that adds ability to skip scheduling some jobs instead of cancelling and rescheduling them. Drastically improves the startup performance of large projects.
1 parent 0136021 commit fb447ae

File tree

3 files changed

+72
-7
lines changed

3 files changed

+72
-7
lines changed

engine/runtime-instrument-common/src/main/java/org/enso/interpreter/instrument/job/ExecuteJob.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
* @see UniqueJob
2727
*/
2828
@SuppressWarnings("unchecked")
29-
public class ExecuteJob extends Job<Void> implements UniqueJob<Void> {
29+
public class ExecuteJob extends Job<Void> implements UniqueJob<Void>, SkipSchedulingUniqueJob {
3030

3131
private static final Logger logger = LoggerFactory.getLogger(ExecuteJob.class);
3232
private static final String PENDING_VISUALIZATIONS_TRIGGER_CONTEXT = "pending visualizations";

engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/JobExecutionEngine.scala

Lines changed: 64 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import org.enso.interpreter.instrument.job.{
66
BackgroundJob,
77
ExecuteJob,
88
Job,
9+
SkipSchedulingUniqueJob,
910
UniqueJob
1011
}
1112
import org.enso.text.Sha3_224VersionCalculator
@@ -164,10 +165,21 @@ final class JobExecutionEngine(
164165
override def runBackground[A](job: BackgroundJob[A]): Unit =
165166
synchronized {
166167
if (isBackgroundJobsStarted) {
167-
cancelDuplicateJobs(job, backgroundJobsRef)
168+
if (handleDuplicateJobs(job, backgroundJobsRef)) {
169+
logger.trace("Skipping duplicate background job [{}].", job)
170+
return
171+
}
168172
runInternal(job, backgroundJobExecutor, backgroundJobsRef, "background")
169173
} else {
170174
job match {
175+
case _: SkipSchedulingUniqueJob =>
176+
if (hasDuplicateInDelayedQueue(job.asInstanceOf[UniqueJob[_]])) {
177+
logger.trace(
178+
"Skipping duplicate delayed background job [{}].",
179+
job
180+
)
181+
return
182+
}
171183
case job: UniqueJob[_] =>
172184
delayedBackgroundJobsQueue.removeIf {
173185
case that: UniqueJob[_] => that.equalsTo(job)
@@ -180,11 +192,32 @@ final class JobExecutionEngine(
180192
}
181193

182194
/** @inheritdoc */
183-
override def run[A](job: Job[A]): Future[A] = {
184-
cancelDuplicateJobs(job, runningJobsRef)
185-
val executor =
186-
if (job.highPriority) highPriorityJobExecutor else jobExecutor
187-
runInternal(job, executor, runningJobsRef, "regular")
195+
override def run[A](job: Job[A]): Future[A] =
196+
synchronized {
197+
if (handleDuplicateJobs(job, runningJobsRef)) {
198+
logger.trace("Skipping duplicate job [{}].", job)
199+
return Future.successful(null.asInstanceOf[A])
200+
}
201+
val executor =
202+
if (job.highPriority) highPriorityJobExecutor else jobExecutor
203+
runInternal(job, executor, runningJobsRef, "regular")
204+
}
205+
206+
/** Returns `true` if the job should be skipped (not scheduled).
207+
* For [[SkipSchedulingUniqueJob]], checks if a duplicate exists.
208+
* For regular [[UniqueJob]], cancels existing duplicates.
209+
*/
210+
private def handleDuplicateJobs[A](
211+
job: Job[A],
212+
runningJobsRef: AtomicReference[Vector[RunningJob]]
213+
): Boolean = {
214+
job match {
215+
case _: SkipSchedulingUniqueJob =>
216+
hasDuplicateJob(job.asInstanceOf[UniqueJob[_]], runningJobsRef)
217+
case _ =>
218+
cancelDuplicateJobs(job, runningJobsRef)
219+
false
220+
}
188221
}
189222

190223
private def cancelDuplicateJobs[A](
@@ -212,6 +245,31 @@ final class JobExecutionEngine(
212245
}
213246
}
214247

248+
private def hasDuplicateJob(
249+
job: UniqueJob[_],
250+
runningJobsRef: AtomicReference[Vector[RunningJob]]
251+
): Boolean = {
252+
val allJobs =
253+
runningJobsRef.updateAndGet(_.filterNot(_.future.isCancelled))
254+
allJobs.exists { runningJob =>
255+
runningJob.job match {
256+
case jobRef: UniqueJob[_] => jobRef.equalsTo(job)
257+
case _ => false
258+
}
259+
}
260+
}
261+
262+
private def hasDuplicateInDelayedQueue(job: UniqueJob[_]): Boolean = {
263+
val iter = delayedBackgroundJobsQueue.iterator()
264+
while (iter.hasNext) {
265+
iter.next() match {
266+
case that: UniqueJob[_] if that.equalsTo(job) => return true
267+
case _ =>
268+
}
269+
}
270+
false
271+
}
272+
215273
private def updatePendingCancellations(
216274
jobsToCancel: Seq[RunningJob]
217275
): Unit = {

engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/Job.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,3 +71,10 @@ trait UniqueJob[A] { self: Job[A] =>
7171
*/
7272
def equalsTo(that: UniqueJob[_]): Boolean
7373
}
74+
75+
/** A [[UniqueJob]] that skips scheduling if a duplicate job already exists
76+
* in the queue, as decided by the `equalsTo` method. Unlike regular
77+
* [[UniqueJob]] which cancels previous duplicates, this trait prevents
78+
* the new job from being scheduled at all.
79+
*/
80+
trait SkipSchedulingUniqueJob { self: UniqueJob[java.lang.Void] => }

0 commit comments

Comments
 (0)