1
1
'use strict'
2
2
3
- import { workerData , BroadcastChannel , isMainThread } from 'worker_threads'
4
- import { modelsInDomain } from './use-cases'
3
+ import { workerData , BroadcastChannel } from 'worker_threads'
5
4
6
5
export class PortEventRouter {
7
6
constructor ( models , broker ) {
8
7
this . models = models
9
8
this . broker = broker
10
9
}
11
10
12
- getThreadLocalPorts ( ) {
13
- const localSpec = this . models . getModelSpec (
14
- workerData . poolName . toUpperCase ( )
15
- )
16
- return this . models
11
+ get localSpec ( ) {
12
+ if ( this . __localSpec ) return this . __localSpec
13
+ this . __localSpec = this . models . getModelSpec ( workerData . poolName )
14
+ return this . __localSpec
15
+ }
16
+
17
+ get threadLocalPorts ( ) {
18
+ if ( this . __threadLocalPorts ) return this . __threadLocalPorts
19
+ this . __threadLocalPorts = this . models
17
20
. getModelSpecs ( )
18
21
. filter (
19
22
spec =>
20
23
spec . ports &&
21
- ( spec . domain . toUpperCase ( ) === localSpec . domain . toUpperCase ( ) ||
22
- spec . modelName . toUpperCase ( ) === localSpec . modelName . toUpperCase ( ) )
24
+ ( spec . domain === this . localSpec . domain ||
25
+ spec . modelName === this . localSpec . modelName )
23
26
)
24
27
. flatMap ( spec =>
25
28
Object . values ( spec . ports )
26
29
. filter ( port => port . consumesEvent || port . producesEvent )
27
- . map ( port => ( { ...port , modelName : spec . modelName } ) )
30
+ . map ( port => ( {
31
+ ...port ,
32
+ modelName : spec . modelName ,
33
+ domain : spec . domain
34
+ } ) )
28
35
)
36
+ return this . __threadLocalPorts
29
37
}
30
38
31
- getThreadRemotePorts ( ) {
32
- return this . models
39
+ get threadRemotePorts ( ) {
40
+ if ( this . __threadRemotePorts ) return this . __threadRemotePorts
41
+ this . __threadRemotePorts = this . models
33
42
. getModelSpecs ( )
34
43
. filter (
35
44
spec =>
36
45
spec . ports &&
37
- ! this . getThreadLocalPorts ( ) . find ( l => l . modelName === spec . modelName )
46
+ ! this . threadLocalPorts . find ( l => l . modelName === spec . modelName )
38
47
)
39
48
. flatMap ( spec =>
40
49
Object . values ( spec . ports )
41
50
. filter ( port => port . consumesEvent || port . producesEvent )
42
- . map ( port => ( { ...port , modelName : spec . modelName } ) )
51
+ . map ( port => ( {
52
+ ...port ,
53
+ modelName : spec . modelName ,
54
+ domain : spec . domain
55
+ } ) )
43
56
)
57
+ return this . __threadRemotePorts
58
+ }
59
+
60
+ get publisherPorts ( ) {
61
+ if ( this . __publisherPorts ) this . __publisherPorts
62
+ this . __publisherPorts = this . threadRemotePorts . filter ( remote =>
63
+ this . threadLocalPorts . find (
64
+ local => local . producesEvent === remote . consumesEvent
65
+ )
66
+ )
67
+ return this . __publisherPorts
44
68
}
45
69
46
- handleChannelEvent ( msg ) {
47
- if ( msg . data . eventName ) this . broker . notify ( msg . data . eventName , msg . data )
70
+ get subscriberPorts ( ) {
71
+ if ( this . __subscriberPorts ) return this . __subscriberPorts
72
+ this . __subscriberPorts = this . threadRemotePorts . filter ( remote =>
73
+ this . threadLocalPorts . find (
74
+ local => local . consumesEvent === remote . producesEvent
75
+ )
76
+ )
77
+ return this . __subscriberPorts
78
+ }
79
+
80
+ get unhandledPorts ( ) {
81
+ if ( this . __unhandledPorts ) return this . __unhandledPorts
82
+ this . __unhandledPorts = this . threadLocalPorts . filter (
83
+ local =>
84
+ ! this . threadRemotePorts . find (
85
+ remote => local . producesEvent === remote . consumesEvent
86
+ ) && ! this . localPorts . find ( l => local . producesEvent === l . consumesEvent )
87
+ )
88
+ return this . __unhandledPorts
89
+ }
90
+
91
+ handleBroadcastEvent ( msg ) {
92
+ if ( msg ?. data ?. eventName ) this . broker . notify ( msg . data . eventName , msg . data )
48
93
else {
49
94
console . log ( 'missing eventName' , msg . data )
50
95
this . broker . notify ( 'missingEventName' , msg . data )
@@ -55,78 +100,56 @@ export class PortEventRouter {
55
100
* Listen for producer events from other thread pools and invoke
56
101
* local ports that consume them. Listen for local producer events
57
102
* and forward to pools that consume them. If a producer event is
58
- * not consumed by any local thread, foward to service mesh.
103
+ * not consumed by any local thread, foward to the service mesh.
59
104
*/
60
105
listen ( ) {
61
- const localPorts = this . getThreadLocalPorts ( )
62
- const remotePorts = this . getThreadRemotePorts ( )
63
-
64
- console . debug ( { localPorts } )
65
- console . debug ( { remotePorts } )
66
-
67
- const publishPorts = remotePorts . filter ( remote =>
68
- localPorts . find ( local => local . producesEvent === remote . consumesEvent )
69
- )
70
- const subscribePorts = remotePorts . filter ( remote =>
71
- localPorts . find ( local => local . consumesEvent === remote . producesEvent )
72
- )
73
- const unhandledPorts = localPorts . filter (
74
- local =>
75
- ! remotePorts . find (
76
- remote => local . producesEvent === remote . consumesEvent
77
- ) && ! localPorts . find ( l => local . producesEvent === l . consumesEvent )
78
- )
79
-
80
106
const services = new Set ( )
81
107
const channels = new Map ( )
82
108
83
- publishPorts . forEach ( port => services . add ( port . modelName ) )
84
- subscribePorts . forEach ( port => services . add ( port . modelName ) )
109
+ this . publisherPorts . forEach ( port => services . add ( port . modelName ) )
110
+ this . subscriberPorts . forEach ( port => services . add ( port . modelName ) )
85
111
86
112
services . forEach ( service =>
87
113
channels . set ( service , new BroadcastChannel ( service ) )
88
114
)
89
115
90
- console . log ( 'publishPorts ' , publishPorts )
91
- console . log ( 'subscribePorts ' , subscribePorts )
92
- console . log ( 'unhandledPorts' , unhandledPorts )
116
+ console . log ( 'publisherPorts ' , this . publisherPorts )
117
+ console . log ( 'subscriberPorts ' , this . subscriberPorts )
118
+ console . log ( 'unhandledPorts' , this . unhandledPorts )
93
119
console . log ( 'channels' , channels )
94
120
95
- // dispatch outgoing events
96
- publishPorts . forEach ( port =>
121
+ // dispatch outgoing events to local pools
122
+ this . publisherPorts . forEach ( port =>
97
123
this . broker . on ( port . consumesEvent , event => {
98
124
console . log ( 'broadcasting...' , { port, event } )
99
125
channels
100
126
. get ( port . modelName )
101
- . postMessage (
102
- JSON . parse (
103
- JSON . stringify ( { ...event , route : 'balanceEventConsumer' } )
104
- )
105
- )
127
+ . postMessage ( JSON . parse ( JSON . stringify ( event ) ) )
106
128
} )
107
129
)
108
130
109
- // listen for incoming events
110
- subscribePorts . forEach ( port => {
131
+ // listen for incoming events from local pools
132
+ this . subscriberPorts . forEach ( port => {
111
133
channels . get ( port . modelName ) . onmessage = msg => {
112
134
console . log ( 'subscribePorts.onmessage' , msg . data )
113
- this . handleChannelEvent ( msg )
135
+ this . handleBroadcastEvent ( msg )
114
136
}
115
137
} )
116
-
117
- unhandledPorts . forEach ( port => {
138
+
139
+ // send ports not handled by local pool to mesh
140
+ this . unhandledPorts . forEach ( port => {
118
141
this . broker . on ( port . producesEvent , event => {
119
142
this . broker . notify ( 'to_main' , {
120
143
...event ,
121
- route : 'balanceEventConsumer'
144
+ route : 'balanceEventConsumer' // mesh routing algo
122
145
} )
123
146
} )
124
147
} )
125
148
126
149
// listen to this model's channel
127
- new BroadcastChannel ( workerData . poolName . toUpperCase ( ) ) . onmessage = msg => {
150
+ new BroadcastChannel ( workerData . poolName ) . onmessage = msg => {
128
151
console . log ( 'onmessage' , msg . data )
129
- this . handleChannelEvent ( msg )
152
+ this . handleBroadcastEvent ( msg )
130
153
}
131
154
}
132
155
}
0 commit comments