Skip to content

Commit b16f6e2

Browse files
committed
threadpool limit save
1 parent 1d81cd9 commit b16f6e2

File tree

5 files changed

+86
-37
lines changed

5 files changed

+86
-37
lines changed

src/domain/event-router.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import domainEvents from './domain-events'
55
const { portEvent } = domainEvents
66

77
/**
8-
* Port events controls event-driven, distributed workflow and
8+
* Port events control event-driven, distributed workflow and
99
* inter-process communcation between co-located and remote models.
1010
*/
1111
export class PortEventRouter {
@@ -19,7 +19,7 @@ export class PortEventRouter {
1919
this.broker = broker
2020
this._localSpec = this.models.getModelSpec(workerData.poolName)
2121
if (!this._localSpec)
22-
this.models.getModelSpecs().find(i => i.domain === workerData.poolName)
22+
this.models.getModelSpecs().find(s => s.domain === workerData.poolName)
2323
this._threadRemotePorts = this.threadRemotePorts()
2424
this._threadLocalPorts = this.threadLocalPorts()
2525
}

src/domain/make-relations.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ export default function makeRelations (relations, datasource, broker) {
274274
if (local && createNew) {
275275
const result = createNewModels(args, rel, datasource)
276276
if (result.yes) return result.create()
277-
throw new Error('cannot create new models')
277+
throw new RelError('create models failed', 500)
278278
}
279279

280280
// If object is remote, we should have its code by now.

src/domain/thread-pool.js

Lines changed: 67 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,21 @@
11
'use strict'
22

3-
import { Worker, BroadcastChannel } from 'worker_threads'
4-
import { EventEmitter } from 'node:stream'
3+
import {
4+
Worker,
5+
BroadcastChannel,
6+
captureRejectionSymbol as workerCapture
7+
} from 'node:worker_threads'
8+
import {
9+
EventEmitter,
10+
captureRejectionSymbol as eventCapture
11+
} from 'node:events'
512
import { AsyncResource } from 'node:async_hooks'
13+
import { strict as assert } from 'node:assert'
614
import ModelFactory, { totalServices, requestContext } from '.'
715
import EventBrokerFactory from './event-broker'
816
import domainEvents from './domain-events'
9-
import assert from 'assert'
1017
import path from 'path'
1118
import os from 'os'
12-
import { setServers } from 'dns/promises'
1319

1420
const { poolOpen, poolClose, poolDrain, poolAbort } = domainEvents
1521
const broker = EventBrokerFactory.getInstance()
@@ -160,14 +166,20 @@ export class ThreadPool extends EventEmitter {
160166
this.aborting = false
161167
this.jobsRequested = this.jobsQueued = 0
162168
this.broadcastChannel = options.broadcast
163-
this.updateLocks()
169+
this.checkoutOpen = true
164170

165171
if (options?.preload) {
166172
console.info('preload enabled for', this.name)
167173
this.startThreads()
168174
}
169175
}
170176

177+
[eventCapture] (err, event, ...args) {
178+
const reason = `rejection happened for ${event} with ${err} ${args}`
179+
console.error(reason)
180+
this.abort(reason)
181+
}
182+
171183
/**
172184
* Connect event subchannel to {@link EventBroker}
173185
* @param {Worker} worker worker thread
@@ -201,6 +213,12 @@ export class ThreadPool extends EventEmitter {
201213
const eventChannel = new MessageChannel()
202214
const worker = new Worker(file, { workerData })
203215

216+
worker[workerCapture] = function (err, event, ...args) {
217+
const reason = `rejection happened for ${event} with ${err} ${args}`
218+
console.error(reason)
219+
pool.abort(reason)
220+
}
221+
204222
/**
205223
* @type {Thread}
206224
*/
@@ -440,11 +458,14 @@ export class ThreadPool extends EventEmitter {
440458

441459
incrementJobsQueued () {
442460
this.jobsQueued++
461+
//if (this.jobsQueued % 10 === 0) this.jobQueueRate()
443462
return this
444463
}
445464

446465
jobQueueRate () {
447-
return Math.round((this.jobsQueued / this.jobsRequested) * 100)
466+
const rate = Math.round((this.jobsQueued / this.jobsRequested) * 100)
467+
//if (rate > this.jobQueueThreshold()) this.updateLocks()
468+
return rate
448469
}
449470

450471
jobQueueThreshold () {
@@ -536,8 +557,13 @@ export class ThreadPool extends EventEmitter {
536557
/**
537558
* Spin up a new thread if needed and available.
538559
*/
539-
async allocate () {
540-
if (this.poolCanGrow()) return this.startThread()
560+
async allocate (cb) {
561+
const yes = this.poolCanGrow()
562+
if (yes) {
563+
const thread = this.startThread()
564+
cb(thread)
565+
return this.startThread()
566+
}
541567
}
542568

543569
threadsInUse () {
@@ -546,33 +572,35 @@ export class ThreadPool extends EventEmitter {
546572
return diff
547573
}
548574

549-
updateLocks () {
550-
this.locks = []
551-
this.threads.forEach(t => this.locks.push(t.id))
552-
}
553-
554-
canCheckout () {
555-
return this.threads.some(t => t.id === this.locks.pop())
556-
}
557-
558575
checkout () {
559-
if (!this.canCheckout()) return
560-
561576
try {
562-
if (this.freeThreads.length > 0) {
577+
if (this.checkoutOpen) {
563578
const thread = this.freeThreads.shift()
579+
if (!thread) {
580+
if (this.workers.length > 1) this.checkoutOpen = false
581+
ThreadPoolFactory.pauseMonitoring(this)
582+
this.allocate(() => {
583+
if (this.poolCanGrow())
584+
setTimeout(() => {
585+
if (this.poolCanGrow()) {
586+
this.checkoutOpen = true
587+
ThreadPoolFactory.resumeMonitoring(this)
588+
}
589+
}, 1000)
590+
})
591+
}
564592
console.debug(
565593
`thread checked out, total in use now ${this.threadsInUse()}`
566594
)
567595
return thread
568596
}
569-
this.allocate()
570597
} catch (err) {}
571598
}
572599

573600
checkin (thread) {
574601
if (thread) {
575602
this.freeThreads.push(thread)
603+
this.checkoutOpen = true
576604
console.debug(
577605
`thread checked in, total in use now ${this.threadsInUse()}`
578606
)
@@ -754,6 +782,7 @@ export class ThreadPool extends EventEmitter {
754782
const ThreadPoolFactory = (() => {
755783
/** @type {Map<string, ThreadPool>} */
756784
const threadPools = new Map()
785+
const monitoredPools = new Map()
757786

758787
/** @type {Map<string, BroadcastChannel>} */
759788
const broadcastChannels = new Map()
@@ -826,6 +855,7 @@ const ThreadPoolFactory = (() => {
826855
})
827856

828857
threadPools.set(poolName, pool)
858+
monitoredPools.set(poolName, pool)
829859
return pool
830860
} catch (error) {
831861
console.error({ fn: createThreadPool.name, error })
@@ -991,9 +1021,16 @@ const ThreadPoolFactory = (() => {
9911021
/**
9921022
* Monitor pools for stuck threads and restart them
9931023
*/
994-
function monitorPools () {
1024+
function monitorPools (pool = null) {
1025+
if (pool && !monitoredPools.has(pool)) {
1026+
monitoredPools.set(pool.name, pool)
1027+
return
1028+
}
1029+
1030+
if (monitorIntervalId) return
1031+
9951032
monitorIntervalId = setInterval(() => {
996-
threadPools.forEach(pool => {
1033+
monitorPools.forEach(pool => {
9971034
if (pool.aborting) return
9981035

9991036
const workRequested = pool.totalJobsRequested()
@@ -1040,8 +1077,13 @@ const ThreadPoolFactory = (() => {
10401077
}, poolMaxAbortTime())
10411078
}
10421079

1043-
function pauseMonitoring () {
1044-
clearInterval(monitorIntervalId)
1080+
function pauseMonitoring (pool = null) {
1081+
if (!pool) {
1082+
clearInterval(monitorIntervalId)
1083+
monitorIntervalId = null
1084+
return
1085+
}
1086+
monitoredPools.delete(pool.name)
10451087
}
10461088

10471089
function resumeMonitoring () {

src/services/service-mesh/web-switch/switch.js

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { nanoid } from 'nanoid'
44
import { hostname } from 'os'
55
import { Server, WebSocket } from 'ws'
66

7+
const HOSTNAME = 'webswitch.local'
78
const SERVICENAME = 'webswitch'
89
const CLIENT_MAX_ERRORS = 3
910
const CLIENT_MAX_RETRIES = 10
@@ -16,7 +17,6 @@ const debug = /true/i.test(config.debug) || /true/i.test(process.env.DEBUG)
1617
const isPrimary =
1718
/true/i.test(process.env.SWITCH) ||
1819
(typeof process.env.SWITCH === 'undefined' && config.isSwitch)
19-
2020
const headers = {
2121
host: 'x-webswitch-host',
2222
role: 'x-webswitch-role',
@@ -221,11 +221,11 @@ export function attachServer (httpServer, secureCtx = {}) {
221221
publish.apply(this, message)
222222
}
223223

224-
function leastRecentlyUsed (lastUsed, client) {
225-
const cli =
226-
lastUsed[info].lastUsed > client[info].lastUsed ? client : lastUsed
227-
cli[info].lastUsed = Date.now()
228-
return cli
224+
function leastRecentlyUsed (client1, client2) {
225+
const lru =
226+
client1[info].lastUsed > client2[info].lastUsed ? client2 : client1
227+
lru[info].lastUsed = Date.now()
228+
return lru
229229
}
230230

231231
const noRoute = sender => {
@@ -274,7 +274,7 @@ export function attachServer (httpServer, secureCtx = {}) {
274274
client =>
275275
client !== sender && client[info].events.includes(message.eventName)
276276
)
277-
.reduce(leastRecentlyUsed, noRoute)
277+
.reduce(leastRecentlyUsed, noRoute(sender))
278278
.publish(message),
279279
/**
280280
* Send to all clients that consume the event in `message.eventName`.
@@ -347,12 +347,14 @@ export function attachServer (httpServer, secureCtx = {}) {
347347

348348
function assignBackup (client) {
349349
if (
350+
// Is this the primaary switch?
350351
isPrimary &&
351352
// is there a backup already?
352353
!backupSwitch &&
353-
// can't be a browser
354354
client[info] &&
355+
// can't be a browser
355356
client[info].role === 'node' &&
357+
// dont run backup on same host
356358
client[info].hostname !== hostname()
357359
) {
358360
backupSwitch = client[info]?.id

watch.sh

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
11
cd ../aegis
22
yarn build
33
curl -s http://localhost:$1/reload
4-
curl -s http://localhost:$2/reload
4+
5+
PID=$(lsof -P -i tcp:8888 | awk '{print $2}')
6+
7+
if [[ ${PID} ]]; then
8+
curl -s http://localhost:$2/reload
9+
fi

0 commit comments

Comments
 (0)