Skip to content

Commit 98b76cb

Browse files
committed
addtl fixes for aggregation and threadpool refctr
1 parent 9974e06 commit 98b76cb

File tree

2 files changed

+44
-40
lines changed

2 files changed

+44
-40
lines changed

src/domain/datasource-factory.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ const DataSourceFactory = (() => {
317317
const DsClass = createDataSourceClass(spec, options)
318318
const DsExtendedClass = extendDataSourceClass(DsClass, options)
319319

320-
if (spec.datasource) {
320+
if (spec?.datasource) {
321321
options = { ...options, connOpts: { ...spec.datasource } }
322322
}
323323

src/domain/thread-pool.js

Lines changed: 43 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33
import { Worker, BroadcastChannel } from 'worker_threads'
44
import { EventEmitter } from 'node:stream'
55
import { AsyncResource } from 'node:async_hooks'
6-
import { assert } from 'console'
76
import ModelFactory, { totalServices, requestContext } from '.'
87
import EventBrokerFactory from './event-broker'
98
import domainEvents from './domain-events'
9+
import assert from 'assert'
1010
import path from 'path'
1111
import os from 'os'
1212

@@ -226,7 +226,7 @@ export class ThreadPool extends EventEmitter {
226226
resolve(1)
227227
})
228228
this.eventChannel.postMessage({ eventName: 'shutdown' })
229-
}).catch(console.error)
229+
})
230230
},
231231

232232
/**
@@ -335,10 +335,8 @@ export class ThreadPool extends EventEmitter {
335335
file: this.file,
336336
workerData: this.workerData
337337
})
338-
339-
if (thread) return thread
340-
341-
throw new Error('error creating thread')
338+
assert.ok(thread, 'error creating thread')
339+
return thread
342340
}
343341

344342
/**
@@ -372,8 +370,8 @@ export class ThreadPool extends EventEmitter {
372370
const status = []
373371
for (const thread of this.threads)
374372
status.push(await this.stopThread(thread, reason))
375-
assert(this.threads.length === 0, 'at least 1 thread didnt stop', ...status)
376373
this.emit(this.stopThreads.name, ...status)
374+
assert.ok(this.threads.length === 0, 'some threads didnt stop', ...status)
377375
return this
378376
}
379377

@@ -524,37 +522,40 @@ export class ThreadPool extends EventEmitter {
524522
if (this.poolCanGrow()) return this.startThread()
525523
}
526524

527-
checkThreadCount (threadsInUse) {
528-
assert(
529-
threadsInUse < 0 && threadsInUse > this.threads.length,
530-
'thread mgmt issue',
531-
'in use',
532-
threadsInUse,
533-
'total',
534-
this.threads.length,
535-
'free',
536-
this.freeThreads.length,
537-
'max',
538-
this.maxThreads
525+
threadsInUse () {
526+
const diff = this.threads.length - this.freeThreads.length
527+
assert.ok(
528+
diff >= 0 && diff <= this.maxThreads,
529+
'thread mgmt issue: in use: ' +
530+
diff +
531+
', total:' +
532+
this.threads.length +
533+
', free:' +
534+
this.freeThreads.length +
535+
', min:' +
536+
this.minThreads +
537+
', max:' +
538+
this.maxThreads
539539
)
540+
return diff
540541
}
541542

542543
checkout () {
543544
if (this.freeThreads.length > 0) {
544545
const thread = this.freeThreads.shift()
545-
const threadsInUse = this.threads.length - this.freeThreads.length
546-
console.debug(`thread checked-out, total in-use now ${threadsInUse}`)
547-
this.checkThreadCount(threadsInUse)
546+
console.debug(
547+
`thread checked out, total in use now ${this.threadsInUse()}`
548+
)
548549
return thread
549550
}
550551
}
551552

552553
checkin (thread) {
553554
if (thread) {
554555
this.freeThreads.push(thread)
555-
const threadsInUse = this.threads.length - this.freeThreads.length
556-
console.debug(`thread checked-in, total in-use now ${threadsInUse}`)
557-
this.checkThreadCount(threadsInUse)
556+
console.debug(
557+
`thread checked-in, total in-use now ${this.threadsInUse()}`
558+
)
558559
}
559560
return this
560561
}
@@ -617,7 +618,7 @@ export class ThreadPool extends EventEmitter {
617618
}
618619

619620
/**
620-
* Fire event with the given scope (host, pool, thread)
621+
* Fire event within the given scope (mesh, host, pool, thread)
621622
* @param {{
622623
* scope:'host'|'pool'|'thread'|'response',
623624
* data:string,
@@ -627,6 +628,9 @@ export class ThreadPool extends EventEmitter {
627628
*/
628629
async fireEvent (event) {
629630
const eventScopes = {
631+
mesh: event =>
632+
broker.notify(event.eventName, { ...event, eventSource: this.name }),
633+
630634
host: event =>
631635
broker.notify('to_worker', { ...event, eventSource: this.name }),
632636

@@ -746,11 +750,8 @@ const ThreadPoolFactory = (() => {
746750
* @param {string} poolName same as `modelName`
747751
*/
748752
function broadcastEvent (event, poolName) {
749-
if (poolName) return getBroadcastChannel(poolName).postMessage(event)
750-
getBroadcastChannel().postMessgae({
751-
eventName: 'threadError',
752-
...event
753-
})
753+
assert.ok(poolName, 'no poolname given')
754+
getBroadcastChannel(poolName).postMessage(event)
754755
}
755756

756757
/**
@@ -862,9 +863,10 @@ const ThreadPoolFactory = (() => {
862863
* @returns {Promise<any>} returns a response
863864
*/
864865
async function fireEvent (event) {
865-
const pool = threadPools.get(event.domain)
866+
const pool = threadPools.get(event.domain || event.modelName)
866867
if (pool) return pool.fireEvent(event)
867-
broadcastEvent(event)
868+
// no pool specified, forward to mesh
869+
broker.notify('from_worker', event)
868870
}
869871

870872
/**
@@ -874,13 +876,15 @@ const ThreadPoolFactory = (() => {
874876
* @returns {Promise<ThreadPool>}
875877
* @throws {ReloadError}
876878
*/
877-
function reload (poolName) {
878-
return new Promise(async (resolve, reject) => {
879+
async function reload (poolName) {
880+
try {
879881
const pool = threadPools.get(poolName.toUpperCase())
880-
if (!pool) reject(`no such pool ${pool}`)
882+
assert.ok(pool, `no such pool ${pool}`)
881883
await pool.reload()
882-
resolve(pool.status())
883-
}).catch(broadcastEvent)
884+
} catch (error) {
885+
console.error({ fn: reload.name, error })
886+
return fireEvent({ eventName: error.name, ...error })
887+
}
884888
}
885889

886890
async function reloadPools () {
@@ -910,7 +914,7 @@ const ThreadPoolFactory = (() => {
910914

911915
async function destroyPools () {
912916
await Promise.all([...threadPools].map(([, pool]) => destroy(pool)))
913-
threadPools.clear()
917+
assert.ok(threadPools.size === 0, 'some pools not destroyed')
914918
}
915919

916920
function status (poolName = null) {

0 commit comments

Comments
 (0)