Skip to content

Commit 30932fc

Browse files
committed
thread pool well behaved under stress
1 parent f45e68d commit 30932fc

File tree

5 files changed

+69
-64
lines changed

5 files changed

+69
-64
lines changed

src/domain/domain-events.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
/**
44
* @param {import('./event-broker').EventBroker} broker
55
*/
6-
export function registerEvents(broker) {
6+
export function registerEvents (broker) {
77
//
88
broker.on('shutdown', signal => {
99
console.log('shutdown requested')
@@ -51,6 +51,7 @@ const domainEvents = {
5151
poolAbort: modelName => `poolAbort_${String(modelName).toUpperCase()}`,
5252
badUserRoute: error => `badUserRoute error: ${error}`,
5353
reload: 'reload',
54+
portEvent: name => `externalPortEvent_${name}`
5455
}
5556

5657
export default domainEvents

src/domain/event-router.js

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
'use strict'
22

33
import { workerData, BroadcastChannel } from 'worker_threads'
4+
import domainEvents from './domain-events'
5+
const { portEvent } = domainEvents
46

57
/**
68
* Port events controls event-driven, distributed workflow and
@@ -16,6 +18,8 @@ export class PortEventRouter {
1618
this.models = models
1719
this.broker = broker
1820
this._localSpec = this.models.getModelSpec(workerData.poolName)
21+
if (!this._localSpec)
22+
this.models.getModelSpecs().find(i => i.domain === workerData.poolName)
1923
this._threadRemotePorts = this.threadRemotePorts()
2024
this._threadLocalPorts = this.threadLocalPorts()
2125
}
@@ -103,13 +107,34 @@ export class PortEventRouter {
103107
}
104108

105109
handleBroadcastEvent (msg) {
106-
if (msg?.data?.eventName) this.broker.notify(msg.data.eventName, msg.data)
110+
if (msg?.data?.eventName)
111+
this.broker.notify(this.internalizeEvent(msg.data.eventName), msg.data)
107112
else {
108113
console.log('missing eventName', msg.data)
109114
this.broker.notify('missingEventName', msg.data)
110115
}
111116
}
112117

118+
externalizeEvent (event) {
119+
if (typeof event === 'object') {
120+
return {
121+
...event,
122+
eventName: portEvent(event.eventName)
123+
}
124+
}
125+
if (typeof event === 'string') return portEvent(event)
126+
}
127+
128+
internalizeEvent (event) {
129+
if (typeof event === 'object') {
130+
return {
131+
...event,
132+
eventName: event.eventName.replace(portEvent(), '')
133+
}
134+
}
135+
if (typeof event === 'string') return event.replace(portEvent(), '')
136+
}
137+
113138
/**
114139
* Listen for producer events coming from other thread pools and
115140
* fire the events in the local pool for ports to consume them.
@@ -139,7 +164,9 @@ export class PortEventRouter {
139164
// listen for producer events and dispatch them to local pools
140165
publisherPorts.forEach(port =>
141166
this.broker.on(port.producesEvent, event =>
142-
channels.get(port.domain).postMessage(JSON.parse(JSON.stringify(event)))
167+
channels
168+
.get(port.domain)
169+
.postMessage(JSON.parse(JSON.stringify(this.externalizeEvent(event))))
143170
)
144171
)
145172

@@ -154,16 +181,18 @@ export class PortEventRouter {
154181
this.broker.on(port.producesEvent, event => {
155182
this.broker.notify('to_main', {
156183
...event,
157-
eventName: port.producesEvent,
184+
eventName: this.externalizeEvent(port.producesEvent),
158185
route: 'balanceEventConsumer' // mesh routing algo
159186
})
160187
})
161188
})
162189

163-
// listen to this model's channel
164-
new BroadcastChannel(workerData.poolName).onmessage = msg => {
165-
console.log('onmessage', msg.data)
166-
this.handleBroadcastEvent(msg)
190+
if (!channels.has(this.localSpec.domain)) {
191+
// listen to this model's channel
192+
new BroadcastChannel(this.localSpec.domain).onmessage = msg => {
193+
console.log('onmessage', msg.data)
194+
this.handleBroadcastEvent(msg)
195+
}
167196
}
168197
}
169198
}

src/domain/make-relations.js

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ export const relationType = {
5757
* @param {*} rel
5858
* @returns
5959
*/
60-
oneToOne(model, ds, rel) {
60+
oneToOne (model, ds, rel) {
6161
return this.manyToOne(model, ds, rel)
6262
},
6363

@@ -87,14 +87,14 @@ export const relationType = {
8787
const rds = ds.factory.getRestrictedDataSource(model.modelName)
8888
const relRds = ds.factory.getRestrictedDataSource(rel.modelName, {
8989
isCached: true,
90-
ephemeral: true,
90+
ephemeral: true
9191
})
9292

9393
// if relRds is in the same domain but is remote, this fails
9494
if (rds.namespace !== relRds.namespace) return null
9595

9696
return rds[rel.name]({ args, model, ds: relRds, relation: rel })
97-
},
97+
}
9898
}
9999

100100
/**
@@ -108,39 +108,39 @@ const updateForeignKeys = {
108108
* @param {import('./index').relations[x]} relation
109109
* @param {import('./model-factory').Datasource} ds
110110
*/
111-
async [relationType.manyToOne.name](fromModel, toModels, relation, ds) {
111+
async [relationType.manyToOne.name] (fromModel, toModels, relation, ds) {
112112
return fromModel.update(
113113
{ [relation.foreignKey]: toModels[0].getId() },
114114
false
115115
)
116116
},
117117

118-
async [relationType.oneToOne.name](fromModel, toModels, relation, ds) {
118+
async [relationType.oneToOne.name] (fromModel, toModels, relation, ds) {
119119
return this[relationType.manyToOne.name](fromModel, toModels, relation, ds)
120120
},
121121

122-
async [relationType.oneToMany.name](fromModel, toModels, relation, ds) {
122+
async [relationType.oneToMany.name] (fromModel, toModels, relation, ds) {
123123
return Promise.all(
124124
toModels.map(async m => {
125125
const model = await ds.find(m.id || m.getId())
126126
return ds.save(m.id, {
127127
...model,
128-
[relation.foreignKey]: fromModel.getId(),
128+
[relation.foreignKey]: fromModel.getId()
129129
})
130130
})
131131
)
132132
},
133133

134-
async [relationType.containsMany.name](fromModel, toModels, relation, ds) {
134+
async [relationType.containsMany.name] (fromModel, toModels, relation, ds) {
135135
toModels.map(model =>
136136
model.update({ [relation.foreignKey]: fromModel.getId() })
137137
)
138138
},
139139

140-
async [relationType.custom.name](fromModel, toModels, relation, ds) {
140+
async [relationType.custom.name] (fromModel, toModels, relation, ds) {
141141
const customFn = fromModel[`${relation}UpdateForeignKeys`]
142142
if (customFn === 'function') customFn(toModels, relation, ds)
143-
},
143+
}
144144
}
145145

146146
/**
@@ -151,7 +151,7 @@ const updateForeignKeys = {
151151
* @param {import('./datasource').default} ds
152152
* @returns
153153
*/
154-
async function createModels(args, fromModel, relation, ds) {
154+
async function createModels (args, fromModel, relation, ds) {
155155
if (args.length > 0) {
156156
const { UseCaseService } = require('.')
157157
const service = UseCaseService(relation.modelName)
@@ -162,18 +162,18 @@ async function createModels(args, fromModel, relation, ds) {
162162
}
163163
}
164164

165-
async function createNewModels(args, rel, datasource) {
165+
async function createNewModels (args, rel, datasource) {
166166
if (args.length > 0 && rel.type !== 'custom') {
167167
// fetch the local ds and create the models
168168
const ds = datasource.factory.getDataSource(rel.modelName)
169169
return {
170170
yes: true,
171-
create: async () => await createModels(args, this, rel, ds),
171+
create: async () => await createModels(args, this, rel, ds)
172172
}
173173
}
174174
return {
175175
yes: false,
176-
create: () => null,
176+
create: () => null
177177
}
178178
}
179179

@@ -188,7 +188,7 @@ async function createNewModels(args, rel, datasource) {
188188
* @param {import("./event-broker").EventBroker} broker
189189
* @returns {Promise<import(".").Event>} source model
190190
*/
191-
export function requireRemoteObject(model, relation, broker, ...args) {
191+
export function requireRemoteObject (model, relation, broker, ...args) {
192192
const request = internalCacheRequest(relation.modelName)
193193
const response = internalCacheResponse(relation.modelName)
194194
const name = (model ? model.getName() : relation.modelName).toUpperCase()
@@ -207,7 +207,7 @@ export function requireRemoteObject(model, relation, broker, ...args) {
207207
modelId: id,
208208
relation,
209209
model,
210-
args,
210+
args
211211
}
212212

213213
return new Promise(async function (resolve) {
@@ -223,7 +223,7 @@ export function requireRemoteObject(model, relation, broker, ...args) {
223223
* @param {import('.').relations[x]} relation
224224
* @returns {boolean}
225225
*/
226-
function isRelatedModelLocal(relation) {
226+
function isRelatedModelLocal (relation) {
227227
return require('.')
228228
.default.getModelSpecs()
229229
.filter(spec => !spec.isCached)
@@ -236,7 +236,7 @@ function isRelatedModelLocal(relation) {
236236
* @param {import("./index").relations} relations
237237
* @param {import("./datasource").default} datasource
238238
*/
239-
export default function makeRelations(relations, datasource, broker) {
239+
export default function makeRelations (relations, datasource, broker) {
240240
if (Object.getOwnPropertyNames(relations).length < 1) return
241241

242242
return Object.keys(relations)
@@ -246,7 +246,7 @@ export default function makeRelations(relations, datasource, broker) {
246246
const rel = {
247247
...relations[relation],
248248
modelName: relModelName,
249-
name: relation,
249+
name: relation
250250
}
251251

252252
try {
@@ -258,18 +258,24 @@ export default function makeRelations(relations, datasource, broker) {
258258

259259
return {
260260
// the relation function
261-
async [relation](...args) {
261+
async [relation] (...args) {
262262
const local = isRelatedModelLocal(relModelName)
263263
if (!local) await importModelCache(relModelName)
264264

265+
console.log({ relModelName })
266+
265267
if (local) {
266268
const result = createNewModels(args, rel, datasource)
267269
if (result.yes) return result.create()
268270
}
269271

272+
const importedSpec = require('.').default.getModelSpec(relModelName)
270273
// If object is remote, we should have its code by now.
271274
// Recreate its datasource, including any customization
272-
const ds = datasource.factory.getDataSource(relModelName)
275+
const ds = datasource.factory.getDataSource(
276+
relModelName,
277+
importedSpec.domain
278+
)
273279

274280
const models = await relationType[rel.type](this, ds, rel, args)
275281

@@ -289,7 +295,7 @@ export default function makeRelations(relations, datasource, broker) {
289295
}
290296

291297
return models
292-
},
298+
}
293299
}
294300
} catch (error) {
295301
console.error({ fn: makeRelations.name, error })

src/domain/thread-pool.js

Lines changed: 3 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -440,27 +440,10 @@ export class ThreadPool extends EventEmitter {
440440

441441
incrementJobsQueued () {
442442
this.jobsQueued++
443-
if (
444-
this.jobQueueRate() > this.jobQueueThreshold() &&
445-
this.jobsRequested > 20
446-
) {
447-
console.warn('job queue threshold exceeded')
448-
this.emit(this.growPool.name)
449-
}
450443
return this
451444
}
452445

453-
async growPool () {
454-
if (this.capacityAvailable()) {
455-
const thread = await this.startThread()
456-
if (this.capacityAvailable())
457-
broker.on(this.growPool.name, this.growPool, {
458-
singleton: true,
459-
once: true
460-
})
461-
return thread
462-
}
463-
}
446+
async growPool () {}
464447

465448
jobQueueRate () {
466449
return Math.round((this.jobsQueued / this.jobsRequested) * 100)
@@ -473,13 +456,6 @@ export class ThreadPool extends EventEmitter {
473456
jobTime (millisec) {
474457
this.totJobTime += millisec
475458
this.avgJobTime = Math.round(this.totJobTime / this.jobsRequested)
476-
if (
477-
this.avgJobTime > this.jobDurationThreshold() &&
478-
this.jobsRequested > 20
479-
) {
480-
this.emit(this.growPool.name)
481-
console.log('job duration threshold exceeded', this)
482-
}
483459
return this
484460
}
485461

@@ -493,10 +469,6 @@ export class ThreadPool extends EventEmitter {
493469

494470
incrementErrorCount () {
495471
this.errors++
496-
if (this.errorRate() > this.errorRateThreshold() && this.errors > 10) {
497-
this.emit(this.growPool.name)
498-
console.log('job duration threshold exceeded', this)
499-
}
500472
return this
501473
}
502474

@@ -597,10 +569,8 @@ export class ThreadPool extends EventEmitter {
597569
return thread
598570
}
599571
// none free, try to allocate
600-
const thread = this.allocate()
601-
// pop this one
602-
if (thread) this.freeThreads.pop()
603-
// dont return tyhread, let queued jobs go
572+
this.allocate()
573+
// dont return thread, let queued jobs go
604574
}
605575

606576
checkin (thread) {

src/domain/use-cases/load-models.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import { isMainThread } from 'worker_threads'
44
import { Writable, Transform } from 'node:stream'
55
import { hostConfig } from '../../config'
6-
import e from 'express'
76

87
function nextModelFn (port, mf) {
98
mf

0 commit comments

Comments
 (0)