Skip to content

Commit 235f932

Browse files
committed
fix thread limit
1 parent 6b2e11b commit 235f932

File tree

4 files changed

+60
-70
lines changed

4 files changed

+60
-70
lines changed

src/domain/make-relations.js

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,14 @@ const { internalCacheRequest, internalCacheResponse, externalCacheRequest } =
1212

1313
const maxwait = process.env.REMOTE_OBJECT_MAXWAIT || 1000
1414

15+
class RelError extends Error {
16+
constructor (error, code, fn) {
17+
super(error)
18+
this.code = code
19+
console.log({ fn, code, error })
20+
}
21+
}
22+
1523
export const relationType = {
1624
/**
1725
* Search memory and external storage
@@ -108,7 +116,7 @@ const updateForeignKeys = {
108116
* @param {import('./index').relations[x]} relation
109117
* @param {import('./model-factory').Datasource} ds
110118
*/
111-
async [relationType.manyToOne.name] (fromModel, toModels, relation, ds) {
119+
async [relationType.manyToOne.name] (fromModels, toModels, relation, ds) {
112120
return fromModel.update(
113121
{ [relation.foreignKey]: toModels[0].getId() },
114122
false
@@ -228,7 +236,7 @@ function isRelatedModelLocal (relation) {
228236
.default.getModelSpecs()
229237
.filter(spec => !spec.isCached)
230238
.map(spec => spec.modelName)
231-
.includes(relation.modelName)
239+
.includes(relation.modelName.toUpperCase())
232240
}
233241

234242
/**
@@ -259,26 +267,25 @@ export default function makeRelations (relations, datasource, broker) {
259267
return {
260268
// the relation function
261269
async [relation] (...args) {
270+
const createNew = args?.length > 0 ? true : false
262271
const local = isRelatedModelLocal(relModelName)
263272
if (!local) await importModelCache(relModelName)
264273

265-
console.log({ relModelName })
266-
267-
if (local) {
274+
if (local && createNew) {
268275
const result = createNewModels(args, rel, datasource)
269276
if (result.yes) return result.create()
277+
throw new Error('cannot create new models')
270278
}
271279

272-
const importedSpec = require('.').default.getModelSpec(relModelName)
273280
// If object is remote, we should have its code by now.
274-
// Recreate its datasource, including any customization
275-
const ds = datasource.factory.getDataSource(
276-
relModelName,
277-
importedSpec.domain
278-
)
281+
const ds = datasource.factory.getDataSource(relModelName)
282+
283+
if (!ds) throw new RelError(`missing ds for ${relModelName}`, 500)
279284

285+
// call the relation's implementation
280286
const models = await relationType[rel.type](this, ds, rel, args)
281287

288+
// still failing, try service mesh
282289
if (!models || models.length < 1) {
283290
// find remote instance(s)
284291
const event = await requireRemoteObject(
@@ -288,7 +295,8 @@ export default function makeRelations (relations, datasource, broker) {
288295
...args
289296
)
290297

291-
if (args.length > 0)
298+
if (createNew)
299+
// args mean models were created and need new fk's
292300
updateForeignKeys[rel.type](this, event.model, rel, ds)
293301

294302
return await relationType[rel.type](this, ds, rel, args)

src/domain/thread-pool.js

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import domainEvents from './domain-events'
99
import assert from 'assert'
1010
import path from 'path'
1111
import os from 'os'
12+
import { setServers } from 'dns/promises'
1213

1314
const { poolOpen, poolClose, poolDrain, poolAbort } = domainEvents
1415
const broker = EventBrokerFactory.getInstance()
@@ -159,6 +160,7 @@ export class ThreadPool extends EventEmitter {
159160
this.aborting = false
160161
this.jobsRequested = this.jobsQueued = 0
161162
this.broadcastChannel = options.broadcast
163+
this.updateLocks()
162164

163165
if (options?.preload) {
164166
console.info('preload enabled for', this.name)
@@ -220,6 +222,7 @@ export class ThreadPool extends EventEmitter {
220222
console.warn('shutdown timeout')
221223
resolve(await worker.terminate())
222224
}, 6000)
225+
223226
worker.once('exit', () => {
224227
clearTimeout(timerId)
225228
console.log('orderly shutdown')
@@ -539,33 +542,33 @@ export class ThreadPool extends EventEmitter {
539542

540543
threadsInUse () {
541544
const diff = this.threads.length - this.freeThreads.length
542-
assert.ok(
543-
diff >= 0 && diff <= this.maxThreads,
544-
'thread mgmt issue: in use: ' +
545-
diff +
546-
', total:' +
547-
this.threads.length +
548-
', free:' +
549-
this.freeThreads.length +
550-
', min:' +
551-
this.minThreads +
552-
', max:' +
553-
this.maxThreads
554-
)
545+
assert.ok(diff >= 0 && diff <= this.maxThreads, 'thread mgmt issue')
555546
return diff
556547
}
557548

549+
updateLocks () {
550+
this.locks = []
551+
this.threads.forEach(t => this.locks.push(t.id))
552+
}
553+
554+
canCheckout () {
555+
return true
556+
// return this.threads.some(t => t.id === this.locks.pop())
557+
}
558+
558559
checkout () {
559-
if (this.freeThreads.length > 0) {
560-
const thread = this.freeThreads.shift()
561-
console.debug(
562-
`thread checked out, total in use now ${this.threadsInUse()}`
563-
)
564-
return thread
565-
}
566-
// none free, try to allocate
567-
this.allocate()
568-
// dont return thread, let queued jobs go
560+
if (!this.canCheckout()) return
561+
562+
try {
563+
if (this.freeThreads.length > 0) {
564+
const thread = this.freeThreads.shift()
565+
console.debug(
566+
`thread checked out, total in use now ${this.threadsInUse()}`
567+
)
568+
return thread
569+
}
570+
this.allocate()
571+
} catch (err) {}
569572
}
570573

571574
checkin (thread) {
@@ -575,7 +578,6 @@ export class ThreadPool extends EventEmitter {
575578
`thread checked in, total in use now ${this.threadsInUse()}`
576579
)
577580
}
578-
return this
579581
}
580582

581583
enqueue (job) {
Lines changed: 13 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
'use strict'
22

33
import { isMainThread } from 'worker_threads'
4-
import { Writable, Transform } from 'node:stream'
4+
import { Writable } from 'node:stream'
55
import { hostConfig } from '../../config'
66

7-
function nextModelFn (port, mf) {
8-
mf
7+
function nextModelFn (port, modelFactory) {
8+
modelFactory
99
.getModelSpecs()
1010
.filter(spec => spec.ports)
1111
.map(spec =>
@@ -15,20 +15,21 @@ function nextModelFn (port, mf) {
1515
)[0]
1616
}
1717

18-
function startWorkflow (model, mf) {
18+
function startWorkflow (model, modelFactory) {
1919
const history = model.getPortFlow()
2020
const ports = model.getPorts()
2121

2222
if (history?.length > 0 && !model.compensate) {
2323
const lastPort = history.length - 1
2424
const nextPort = ports[history[lastPort]]?.producesEvent
25-
const nextModel = nextModelFn(nextPort, mf)
25+
const nextModel = nextModelFn(nextPort, modelFactory)
2626
if (nextPort) model.emit(nextPort, nextModel)
2727
}
2828
}
2929

3030
/**
31-
* deserialize and unmarshall from stream.
31+
* Add models to cache and run workflow.
32+
*
3233
* @typedef {import('..').Model}
3334
* @param {{
3435
* models:import('../model-factory').ModelFactory,
@@ -38,7 +39,7 @@ function startWorkflow (model, mf) {
3839
* }} options
3940
* @returns {function():Promise<void>}
4041
*/
41-
export default async function ({ modelName, repository, models }) {
42+
export default async function ({ repository, models }) {
4243
if (isMainThread) return
4344

4445
const cacheLoadDisabled =
@@ -47,39 +48,17 @@ export default async function ({ modelName, repository, models }) {
4748

4849
if (cacheLoadDisabled) return
4950

50-
const resumeWorkflowDisabled =
51-
/false/i.test(process.env.RESUMEWORKFLOW) ||
52-
hostConfig.adapters.resumeWorkflow === false
53-
54-
console.log('loading cache ', modelName)
55-
56-
// const serializers = models.getModelSpec(modelName).serializers
57-
58-
// const deserializers = serializers
59-
// ? serializers.filter(s => !s.disabled && s.on === 'deserialize')
60-
// : null
61-
62-
// if (deserializers) deserializers.forEach(des => Serializer.addSerializer(des))
63-
6451
const loadCache = new Writable({
6552
objectMode: true,
6653

6754
write (chunk, _, next) {
68-
if (chunk?.id) repository.saveSync(chunk.id, chunk)
69-
next()
70-
}
71-
})
72-
73-
const resumeWorkflow = new Transform({
74-
objectMode: true,
75-
76-
transform (chunk, _, next) {
77-
startWorkflow(chunk, models)
55+
if (chunk?.id) {
56+
repository.saveSync(chunk.id, chunk)
57+
startWorkflow(chunk, models)
58+
}
7859
next()
7960
}
8061
})
8162

82-
if (resumeWorkflowDisabled)
83-
repository.list({ writable: loadCache, transform: resumeWorkflow })
84-
else repository.list({ writable: loadCache })
63+
repository.list({ writable: loadCache })
8564
}

start.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
export LOADCACHE=false
12
cd ../aegis-app
23
nohup node repo.js &
34
nohup node repo.js 8001 cache &

0 commit comments

Comments
 (0)