Skip to content

Commit 0144f58

Browse files
committed
predictable scalability
1 parent b16f6e2 commit 0144f58

File tree

1 file changed

+26
-39
lines changed

1 file changed

+26
-39
lines changed

src/domain/thread-pool.js

Lines changed: 26 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,6 @@ export class ThreadPool extends EventEmitter {
166166
this.aborting = false
167167
this.jobsRequested = this.jobsQueued = 0
168168
this.broadcastChannel = options.broadcast
169-
this.checkoutOpen = true
170169

171170
if (options?.preload) {
172171
console.info('preload enabled for', this.name)
@@ -458,14 +457,11 @@ export class ThreadPool extends EventEmitter {
458457

459458
incrementJobsQueued () {
460459
this.jobsQueued++
461-
//if (this.jobsQueued % 10 === 0) this.jobQueueRate()
462460
return this
463461
}
464462

465463
jobQueueRate () {
466-
const rate = Math.round((this.jobsQueued / this.jobsRequested) * 100)
467-
//if (rate > this.jobQueueThreshold()) this.updateLocks()
468-
return rate
464+
return Math.round((this.jobsQueued / this.jobsRequested) * 100)
469465
}
470466

471467
jobQueueThreshold () {
@@ -557,12 +553,15 @@ export class ThreadPool extends EventEmitter {
557553
/**
558554
* Spin up a new thread if needed and available.
559555
*/
560-
async allocate (cb) {
561-
const yes = this.poolCanGrow()
562-
if (yes) {
563-
const thread = this.startThread()
564-
cb(thread)
565-
return this.startThread()
556+
async allocate (cb = null) {
557+
if (this.poolCanGrow()) {
558+
console.debug('allocating thread')
559+
const thread = await this.startThread()
560+
if (!thread) {
561+
throw new Error('cannot allocate thread')
562+
}
563+
if (cb) cb(thread)
564+
return this.freeThreads.shift()
566565
}
567566
}
568567

@@ -572,35 +571,23 @@ export class ThreadPool extends EventEmitter {
572571
return diff
573572
}
574573

575-
checkout () {
576-
try {
577-
if (this.checkoutOpen) {
578-
const thread = this.freeThreads.shift()
579-
if (!thread) {
580-
if (this.workers.length > 1) this.checkoutOpen = false
581-
ThreadPoolFactory.pauseMonitoring(this)
582-
this.allocate(() => {
583-
if (this.poolCanGrow())
584-
setTimeout(() => {
585-
if (this.poolCanGrow()) {
586-
this.checkoutOpen = true
587-
ThreadPoolFactory.resumeMonitoring(this)
588-
}
589-
}, 1000)
590-
})
591-
}
592-
console.debug(
593-
`thread checked out, total in use now ${this.threadsInUse()}`
594-
)
595-
return thread
596-
}
597-
} catch (err) {}
574+
async checkout () {
575+
const thread = this.freeThreads.shift()
576+
if (thread) {
577+
console.debug(
578+
`thread checked out, total in use now ${this.threadsInUse()}`
579+
)
580+
return thread
581+
}
582+
if (this.threads.length == this.maxThreads) return
583+
if (this.threads.length > this.maxThreads)
584+
throw new Error('too many threads')
585+
return this.allocate()
598586
}
599587

600588
checkin (thread) {
601589
if (thread) {
602590
this.freeThreads.push(thread)
603-
this.checkoutOpen = true
604591
console.debug(
605592
`thread checked in, total in use now ${this.threadsInUse()}`
606593
)
@@ -638,7 +625,7 @@ export class ThreadPool extends EventEmitter {
638625
...options
639626
})
640627

641-
const thread = this.checkout()
628+
const thread = await this.checkout()
642629

643630
if (thread) {
644631
thread.run(job)
@@ -1030,7 +1017,7 @@ const ThreadPoolFactory = (() => {
10301017
if (monitorIntervalId) return
10311018

10321019
monitorIntervalId = setInterval(() => {
1033-
monitorPools.forEach(pool => {
1020+
monitoredPools.forEach(pool => {
10341021
if (pool.aborting) return
10351022

10361023
const workRequested = pool.totalJobsRequested()
@@ -1086,8 +1073,8 @@ const ThreadPoolFactory = (() => {
10861073
monitoredPools.delete(pool.name)
10871074
}
10881075

1089-
function resumeMonitoring () {
1090-
monitorPools()
1076+
function resumeMonitoring (pool = null) {
1077+
monitorPools(pool)
10911078
}
10921079

10931080
monitorPools()

0 commit comments

Comments
 (0)