1+ import { InMultiPort , InPort , MutationSandbox , OutMultiPort , OutPort , Parameter , Reactor , State , Variable , } from "../../core/internal" ;
2+ import { SORActor } from "./actor" ;
3+ import { SORPeer } from "./peer" ;
4+ import { Message , MessageTypes , SORBootMessage , SORResultMessage , SorBorder , jacobi , omega } from "./sorutils" ;
5+
6+ export class SORRunner extends Reactor {
7+ protected s : State < number > ;
8+ protected part : State < number > ;
9+
10+ protected sorActors : State < SORActor [ ] > ;
11+ protected sorPeer : State < SORPeer | undefined > ;
12+
13+ protected portToSORActors : OutPort < Message > ;
14+ protected portToSORPeer : OutPort < Message > ;
15+ // Unsure if this would work, let's just try......
16+ protected portFromSORActor : InPort < Message > ;
17+ protected portFromSORPeer : InPort < Message > ;
18+
19+ protected gTotal = new State ( 0.0 ) ;
20+ protected returned = new State ( 0 ) ;
21+ protected totalMsgRcv = new State ( 0 ) ;
22+ protected expectingBoot = new State ( true ) ;
23+
24+ // In Savina, randoms is accessed directly from SucOverRelaxConfig.
25+ // Here we pass it in to the closure.
26+ constructor ( parent : Reactor , size : number , _randoms : number [ ] [ ] ) {
27+ super ( parent , "SORRunner" ) ;
28+ // These are in SorRunner;
29+ this . s = new State ( size ) ;
30+ // In the scala implementation a simple /2 was used.
31+ // In JS we might need to enforce some sort of guarantee as it was used to calculate position
32+ this . part = new State ( Math . floor ( size / 2 ) ) ;
33+ /** These are from Savina. They should be rather irrelevant, actually. */
34+ this . sorActors = new State ( [ ] ) ;
35+ this . sorPeer = new State ( undefined ) ;
36+
37+ /** These are the actual messaging passing mechanism that are synonomous to that of Savina. */
38+ // This creates a bunch of ports.
39+ this . portToSORActors = new OutPort ( this ) ;
40+ this . portToSORPeer = new OutPort ( this ) ;
41+ this . portFromSORActor = new InPort ( this ) ;
42+ this . portFromSORPeer = new InPort ( this ) ;
43+
44+ this . addMutation (
45+ [ this . startup ] ,
46+ [ this . sorActors , this . sorPeer , this . portToSORActors , this . portToSORPeer ] ,
47+ function ( this , sorActors , sorPeer , portToSORActors , portToSORPeer ) {
48+ // TODO: Add actual stuff
49+ ;
50+ }
51+ ) ;
52+ }
53+
54+ // This is to be used WITHIN mutation react functions.
55+ static process ( this : MutationSandbox , message : Message , args : ProcessingArgs ) : void {
56+ switch ( message . messageType ) {
57+ case MessageTypes . sorBootMessage : {
58+ if ( args . type !== MessageTypes . sorBootMessage ) {
59+ throw new Error ( "Wrong type of arguments passed." ) ;
60+ }
61+ // expectingBoot is args[0]
62+ args . expectingBoot . set ( false ) ;
63+ SORRunner . boot . apply ( this , [ args ] ) ;
64+ break ;
65+ }
66+ case MessageTypes . sorResultMessage : {
67+ if ( args . type !== MessageTypes . sorResultMessage ) {
68+ throw new Error ( "Wrong type of arguments passed." ) ;
69+ }
70+
71+ const { mv, msgRcv} = message ;
72+ const { expectingBoot, totalMsgRcv, returned, gTotal, s, part} = args ;
73+
74+ if ( expectingBoot . get ( ) ) {
75+ throw new Error ( "SORRunner not booted yet!" ) ;
76+ }
77+
78+ totalMsgRcv . set ( totalMsgRcv . get ( ) + msgRcv ) ;
79+ returned . set ( returned . get ( ) + 1 ) ;
80+ gTotal . set ( gTotal . get ( ) + mv ) ;
81+
82+ if ( returned . get ( ) === ( s . get ( ) * part . get ( ) ) + 1 ) {
83+ // TODO: validate
84+ // TODO: exit
85+ ;
86+ }
87+ break ;
88+ }
89+ case MessageTypes . sorBorderMessage : {
90+ if ( args . type !== MessageTypes . sorBorderMessage ) {
91+ throw new Error ( "Wrong type of arguments passed." ) ;
92+ }
93+
94+ const { mBorder} = message ;
95+ const { expectingBoot, s, part, sorActors, portToSORActors} = args ;
96+
97+ if ( expectingBoot . get ( ) ) {
98+ throw new Error ( "SORRunner not booted yet!" ) ;
99+ }
100+ const sorActorsValue = sorActors . get ( ) ;
101+ for ( let i = 0 ; i <= s . get ( ) ; ++ i ) {
102+ sorActorsValue [ ( i + 1 ) * ( part . get ( ) + 1 ) - 1 ] = mBorder . borderActors [ i ] ;
103+ }
104+ sorActors . set ( sorActorsValue ) ;
105+ for ( let i = 0 ; i <= s . get ( ) ; ++ i ) {
106+ for ( let j = 0 ; j <= part . get ( ) ; ++ j ) {
107+ const pos = ( i * ( part . get ( ) + 1 ) ) + j ;
108+ // Ibidem, connect then disconnect to simulate
109+ // "fire and forget" in scala.
110+ this . connect ( portToSORActors , sorActorsValue [ pos ] . portFromRunner ) ;
111+ this . getReactor ( ) . writable ( portToSORActors ) . set (
112+ {
113+ messageType : MessageTypes . sorStartMessage ,
114+ mi : jacobi ,
115+ mActors : sorActorsValue
116+ }
117+ ) ;
118+ this . disconnect ( portToSORActors , sorActorsValue [ pos ] . portFromRunner ) ;
119+ }
120+ }
121+ break ;
122+ }
123+ default : {
124+ throw new Error ( "Received wrong message from port" ) ;
125+ }
126+ }
127+ }
128+
129+ // SorRunner::boot()
130+ static boot ( this : MutationSandbox ,
131+ args : BootProcessingArgs
132+ ) : void {
133+ const { _randoms, _s, _part, sorActors, sorPeer, portToSORPeer} = args ;
134+
135+ const myBorder : SORActor [ ] = [ ] ;
136+ const randoms = _randoms ;
137+ const s = _s . get ( ) ;
138+ const part = _part . get ( ) ;
139+ // In scala, (i <- 0 until s) is a loop excluding s.
140+ const sorActorsValue = sorActors . get ( ) ;
141+ for ( let i = 0 ; i < s ; ++ i ) {
142+ let c = i % 2 ;
143+ for ( let j = 0 ; j < part ; ++ j ) {
144+ const pos = i * ( part + 1 ) + j ;
145+ c = 1 - c ;
146+ // We modify them in bulk, then update the state.
147+ // Unlike in Scala we do not need to initialise the array here, JS supports sparse array.
148+ // I have absolutely no idea why these parametres are called as such......
149+ sorActorsValue [ pos ] = this . getReactor ( ) . _uncheckedAddSibling (
150+ SORActor ,
151+ pos , randoms [ i ] [ j ] , c , s , part + 1 , omega , this . getReactor ( ) , false
152+ ) ;
153+
154+ if ( j === ( part - 1 ) ) {
155+ myBorder [ i ] = sorActorsValue [ pos ] ;
156+ }
157+
158+ }
159+ }
160+ sorActors . set ( sorActorsValue ) ;
161+
162+ const partialMatrix : number [ ] [ ] = [ ] ;
163+ for ( let i = 0 ; i < s ; ++ i ) {
164+ for ( let j = 0 ; j < s - part ; ++ j ) {
165+ partialMatrix [ i ] [ j ] = randoms [ i ] [ j + part ] ;
166+ }
167+ }
168+
169+ const sorPeerValue = this . getReactor ( ) . _uncheckedAddSibling (
170+ SORPeer ,
171+ s , part , partialMatrix , new SorBorder ( myBorder ) ,
172+ // A dirty hack. Maybe this will be removed as ports get added.
173+ this . getReactor ( ) as SORRunner
174+ ) ;
175+ sorPeer . set ( sorPeerValue ) ;
176+ // Pass message.
177+ // This is similar to Scala's !; but it looks pretty...... interesting in LF.
178+ // If node is concurrent or parallel, this might be a problem, so direct copy-pastaing to C++ runtime might not work.
179+ this . connect ( portToSORPeer , sorPeerValue . portFromSORRunner ) ;
180+ this . getReactor ( ) . writable ( portToSORPeer ) . set ( { messageType : MessageTypes . sorBootMessage } ) ;
181+ // Disconnect immediately.
182+ this . disconnect ( portToSORPeer , sorPeerValue . portFromSORRunner ) ;
183+ }
184+ }
185+
186+
187+ interface BootProcessingArgs {
188+ type : MessageTypes . sorBootMessage ,
189+ expectingBoot : State < boolean > ,
190+ _randoms : number [ ] [ ] ,
191+ _s : State < number > ,
192+ _part : State < number > ,
193+ sorActors : State < SORActor [ ] > ,
194+ sorPeer : State < SORPeer > ,
195+ portToSORPeer : OutPort < Message >
196+ }
197+
198+ interface ResultProcessingArgs {
199+ type : MessageTypes . sorResultMessage ,
200+ expectingBoot : State < boolean > ,
201+ totalMsgRcv : State < number > ,
202+ returned : State < number > ,
203+ gTotal : State < number > ,
204+ s : State < number > ,
205+ part : State < number >
206+ }
207+
208+ interface BorderProcessingArgs {
209+ type : MessageTypes . sorBorderMessage ,
210+ expectingBoot : State < boolean > ,
211+ s : State < number > ,
212+ part : State < number > ,
213+ sorActors : State < SORActor [ ] > ,
214+ portToSORActors : OutPort < Message >
215+ }
216+
217+ type ProcessingArgs = BootProcessingArgs | ResultProcessingArgs | BorderProcessingArgs ;
0 commit comments