Skip to content

Commit b1bbae5

Browse files
authored
Merge pull request #282 from module-federation/release-0-1-0
thread limit and scaling fixed and optimal
2 parents 8f23d2f + 645f05c commit b1bbae5

File tree

2 files changed

+25
-67
lines changed

2 files changed

+25
-67
lines changed

src/domain/event-router.js

Lines changed: 18 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
'use strict'
22

3-
import { workerData, BroadcastChannel } from 'worker_threads'
3+
import { workerData } from 'worker_threads'
44
import domainEvents from './domain-events'
5+
56
const { portEvent } = domainEvents
67

78
/**
@@ -18,8 +19,10 @@ export class PortEventRouter {
1819
this.models = models
1920
this.broker = broker
2021
this._localSpec = this.models.getModelSpec(workerData.poolName)
22+
2123
if (!this._localSpec)
2224
this.models.getModelSpecs().find(s => s.domain === workerData.poolName)
25+
2326
this._threadRemotePorts = this.threadRemotePorts()
2427
this._threadLocalPorts = this.threadLocalPorts()
2528
}
@@ -106,35 +109,17 @@ export class PortEventRouter {
106109
)
107110
}
108111

109-
handleBroadcastEvent (msg) {
110-
if (msg?.data?.eventName)
111-
this.broker.notify(this.internalizeEvent(msg.data.eventName), msg.data)
112-
else {
113-
console.log('missing eventName', msg.data)
114-
this.broker.notify('missingEventName', msg.data)
115-
}
116-
}
117-
118-
externalizeEvent (event) {
112+
externalizeEvent (event, route = 'broadcast') {
119113
if (typeof event === 'object') {
120114
return {
121115
...event,
122-
eventName: portEvent(event.eventName)
116+
eventName: portEvent(event.eventName),
117+
route
123118
}
124119
}
125120
if (typeof event === 'string') return portEvent(event)
126121
}
127122

128-
internalizeEvent (event) {
129-
if (typeof event === 'object') {
130-
return {
131-
...event,
132-
eventName: event.eventName.replace(portEvent(), '')
133-
}
134-
}
135-
if (typeof event === 'string') return event.replace(portEvent(), '')
136-
}
137-
138123
/**
139124
* Listen for producer events coming from other thread pools and
140125
* fire the events in the local pool for ports to consume them.
@@ -143,56 +128,33 @@ export class PortEventRouter {
143128
* dispatch them to pools that consume them.
144129
*/
145130
listen () {
146-
const services = new Set()
147-
const channels = new Map()
148131
const subscriberPorts = this.subscriberPorts()
149132
const publisherPorts = this.publisherPorts()
150133
const unhandledPorts = this.unhandledPorts()
151134

152-
publisherPorts.forEach(port => services.add(port.domain))
153-
subscriberPorts.forEach(port => services.add(port.domain))
154-
155-
services.forEach(service =>
156-
channels.set(service, new BroadcastChannel(service))
157-
)
158-
159135
console.log('publisherPorts', publisherPorts)
160136
console.log('subscriberPorts', subscriberPorts)
161137
console.log('unhandledPorts', unhandledPorts)
162-
console.log('channels', channels)
163138

164139
// listen for producer events and dispatch them to local pools
165140
publisherPorts.forEach(port =>
166141
this.broker.on(port.producesEvent, event =>
167-
channels
168-
.get(port.domain)
169-
.postMessage(JSON.parse(JSON.stringify(this.externalizeEvent(event))))
142+
this.broker.notify('broadcast', this.externalizeEvent(event))
170143
)
171144
)
172145

173146
// listen for incoming consumer events from local pools
174-
subscriberPorts.forEach(port => {
175-
channels.get(port.domain).onmessage = msg =>
176-
this.handleBroadcastEvent(msg)
177-
})
147+
subscriberPorts.forEach(port =>
148+
this.broker.on(this.externalizeEvent(port.consumesEvent), event =>
149+
this.notify(port.consumesEvent, event)
150+
)
151+
)
178152

179153
// dispatch events not handled by local pools to the service mesh
180-
unhandledPorts.forEach(port => {
181-
this.broker.on(port.producesEvent, event => {
182-
this.broker.notify('to_main', {
183-
...event,
184-
eventName: this.externalizeEvent(port.producesEvent),
185-
route: 'balanceEventConsumer' // mesh routing algo
186-
})
187-
})
188-
})
189-
190-
if (!channels.has(this.localSpec.domain)) {
191-
// listen to this model's channel
192-
new BroadcastChannel(this.localSpec.domain).onmessage = msg => {
193-
console.log('onmessage', msg.data)
194-
this.handleBroadcastEvent(msg)
195-
}
196-
}
154+
unhandledPorts.forEach(port =>
155+
this.broker.on(port.producesEvent, event =>
156+
this.broker.notify('to_main', this.externalizeEvent(event))
157+
)
158+
)
197159
}
198160
}

src/domain/thread-pool.js

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -553,14 +553,13 @@ export class ThreadPool extends EventEmitter {
553553
/**
554554
* Spin up a new thread if needed and available.
555555
*/
556-
async allocate (cb = null) {
556+
async allocate () {
557557
if (this.poolCanGrow()) {
558558
console.debug('allocating thread')
559559
const thread = await this.startThread()
560560
if (!thread) {
561561
throw new Error('cannot allocate thread')
562562
}
563-
if (cb) cb(thread)
564563
return this.freeThreads.shift()
565564
}
566565
}
@@ -572,25 +571,22 @@ export class ThreadPool extends EventEmitter {
572571
}
573572

574573
async checkout () {
574+
if (this.threads.length >= this.maxThreads) {
575+
console.warn('no threads available')
576+
return
577+
}
575578
const thread = this.freeThreads.shift()
576579
if (thread) {
577-
console.debug(
578-
`thread checked out, total in use now ${this.threadsInUse()}`
579-
)
580+
console.debug(`checkout: threads in use now ${this.threadsInUse()}`)
580581
return thread
581582
}
582-
if (this.threads.length == this.maxThreads) return
583-
if (this.threads.length > this.maxThreads)
584-
throw new Error('too many threads')
585583
return this.allocate()
586584
}
587585

588586
checkin (thread) {
589587
if (thread) {
590588
this.freeThreads.push(thread)
591-
console.debug(
592-
`thread checked in, total in use now ${this.threadsInUse()}`
593-
)
589+
console.debug(`checkin: threads in use now ${this.threadsInUse()}`)
594590
}
595591
}
596592

0 commit comments

Comments
 (0)