Skip to content

Commit 50705ec

Browse files
committed
Change EventBroker to call subscriber callbacks syncrhonously.
Don't use JSON to serialize Error - comes back empty. Listen for thread exit in case error event isn't fired and worker crashes. Remove thread from pool.
1 parent f491d25 commit 50705ec

File tree

14 files changed

+83
-94
lines changed

14 files changed

+83
-94
lines changed

__test__/use-cases/make-relations.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ function requireRemoteObject (model, relation, broker, ...args) {
3232
return new Promise(async function (resolve) {
3333
setTimeout(resolve, maxwait)
3434
broker.on(response, resolve)
35-
await broker.emit(request, requestData)
35+
broker.emit(request, requestData)
3636
})
3737
}
3838

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@module-federation/aegis",
3-
"version": "1.3.0-beta",
3+
"version": "1.3.1-beta",
44
"module": "lib/index.js",
55
"main": "lib/index.js",
66
"repository": "[email protected]:module-federation/aegis.git",

src/adapters/datasources/datasource-mongodb.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ export class DataSourceMongoDb extends DataSourceMemory {
6464
return (await this.connection()).db(this.name).collection(this.name)
6565
} catch (error) {
6666
console.error({ fn: this.collection.name, error })
67+
throw error
6768
}
6869
}
6970

@@ -250,7 +251,7 @@ export class DataSourceMongoDb extends DataSourceMemory {
250251
let cursor = (await this.collection()).find(filter)
251252
if (sort) cursor = cursor.sort(sort)
252253
if (limit) cursor = cursor.limit(parseInt(limit))
253-
if (aggregate) cursor = cursor.limit(aggregate)
254+
if (aggregate) cursor = cursor.aggregate(aggregate)
254255
return cursor
255256
}
256257

@@ -260,7 +261,6 @@ export class DataSourceMongoDb extends DataSourceMemory {
260261
*
261262
* @param {{
262263
* filter:*
263-
* writable:Writable
264264
* transform:Transform
265265
* serialize:boolean
266266
* }} param0

src/adapters/webassembly/wasm-import.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ export async function importWebAssembly (remoteEntry) {
6161
const event = wasm.exports.__getString(eventName)
6262
const data = interop.constructObject(eventData)
6363
console.debug('fireEvent', data)
64-
await broker.notify(event, data)
64+
broker.notify(event, data)
6565
},
6666

6767
/**

src/domain/distributed-cache.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,7 @@ export default function DistributedCache ({
366366
publish({ ...event, eventName: externalCrudEvent(eventName) })
367367
)
368368

369+
function handlecrudeEvent (modelSpecs) {}
369370
/**
370371
* Subcribe to external CRUD events for related models.
371372
* Also listen for request and response events for locally

src/domain/event-broker.js

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ export class EventBroker {
6464
* @param {Event} eventData - the import of the event
6565
* @param {{forward:boolean}} options - forward this event externally
6666
*/
67-
async notify (eventName, eventData, options) {
67+
notify (eventName, eventData, options) {
6868
throw new Error('unimplemented abstract method')
6969
}
7070

@@ -93,7 +93,7 @@ const handleError = error => {
9393
* @param {eventHandler} handle
9494
* @param {boolean} forward
9595
*/
96-
async function runHandler (eventName, eventData = {}, handle) {
96+
function runHandler (eventName, eventData = {}, handle) {
9797
const abort = eventData ? false : true
9898

9999
if (abort) {
@@ -111,7 +111,7 @@ async function runHandler (eventName, eventData = {}, handle) {
111111
/**
112112
* @type {eventHandler}
113113
*/
114-
await handle(eventData)
114+
handle(eventData)
115115
}
116116

117117
/**
@@ -121,7 +121,7 @@ async function runHandler (eventName, eventData = {}, handle) {
121121
* @param {brokerOptions} options
122122
* @fires eventName
123123
*/
124-
async function notify (eventName, eventData = {}, options = {}) {
124+
function notify (eventName, eventData = {}, options = {}) {
125125
const run = runHandler.bind(this)
126126
const data =
127127
typeof eventData === 'object'
@@ -134,20 +134,21 @@ async function notify (eventName, eventData = {}, options = {}) {
134134
try {
135135
if (handlers.has(eventName)) {
136136
match = true
137-
await Promise.allSettled(
138-
handlers.get(eventName).map(async fn => {
139-
await run(eventName, formattedEvent, fn)
140-
})
141-
)
137+
// await Promise.allSettled(
138+
// handlers.get(eventName).map(async fn => {
139+
// await run(eventName, formattedEvent, fn)
140+
// })
141+
// )
142+
handlers.get(eventName).forEach(fn => run(eventName, formattedEvent, fn))
142143
}
143144

144145
if (options.regexOff && match) return
145146

146-
await Promise.allSettled(
147-
[...handlers]
148-
.filter(([k]) => k instanceof RegExp && k.test(eventName))
149-
.map(([, v]) => v.map(fn => run(eventName, formattedEvent, fn)))
150-
)
147+
//await Promise.allSettled(
148+
;[...handlers]
149+
.filter(([k]) => k instanceof RegExp && k.test(eventName))
150+
.forEach(([, v]) => v.f(fn => run(eventName, formattedEvent, fn)))
151+
//)
151152
} catch (error) {
152153
handleError(notify.name, error)
153154
}

src/domain/index.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,6 @@ export { default as ThreadPoolFactory } from './thread-pool'
407407

408408
export { default as DomainEvents } from './domain-events'
409409

410-
export { AppError } from './util/app-error'
410+
export { AppError } from '../domain/util/app-error'
411411

412412
export default ModelFactory

src/domain/make-relations.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ export function requireRemoteObject (model, relation, broker, ...args) {
152152
return new Promise(async function (resolve) {
153153
setTimeout(resolve, maxwait)
154154
broker.on(response, resolve)
155-
await broker.notify(request, requestData)
155+
broker.notify(request, requestData)
156156
})
157157
}
158158

src/domain/model.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ const Model = (() => {
240240
* defaults to `false`
241241
*/
242242
async emit (eventName, eventData, options) {
243-
await broker.notify(
243+
broker.notify(
244244
eventName,
245245
{
246246
eventName,

src/domain/thread-pool.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ export class ThreadPool extends EventEmitter {
126126
broker.on('to_worker', async event => port1.postMessage(event))
127127
// on receipt of event from worker thread fire 'from_worker'
128128
port1.onmessage = async event =>
129-
event.data.eventName && (await broker.notify('from_worker', event.data))
129+
event.data.eventName && (broker.notify('from_worker', event.data))
130130
}
131131

132132
/**

0 commit comments

Comments
 (0)