1- import { InMultiPort , InPort , OutMultiPort , OutPort , Parameter , Reactor , State , } from "../../core/internal" ;
1+ import { InMultiPort , InPort , MutationSandbox , OutMultiPort , OutPort , Parameter , Reactor , State , Variable , } from "../../core/internal" ;
22import { SORActor } from "./actor" ;
33import { SORPeer } from "./peer" ;
4- import { Message , SorBorder , omega } from "./sorutils" ;
4+ import { Message , MessageTypes , SORBootMessage , SORResultMessage , SorBorder , jacobi , omega } from "./sorutils" ;
55
66export class SORRunner extends Reactor {
7- protected sorActors : State < Reactor [ ] > ;
8- protected sorPeer : State < Reactor | undefined > ;
7+ protected s : State < number > ;
8+ protected part : State < number > ;
9+
10+ protected sorActors : State < SORActor [ ] > ;
11+ protected sorPeer : State < SORPeer | undefined > ;
912
1013 protected portToSORActors : OutPort < Message > ;
1114 protected portToSORPeer : OutPort < Message > ;
1215 // Unsure if this would work, let's just try......
1316 protected portFromSORActor : InPort < Message > ;
1417 protected portFromSORPeer : InPort < Message > ;
1518
19+ protected gTotal = new State ( 0.0 ) ;
20+ protected returned = new State ( 0 ) ;
21+ protected totalMsgRcv = new State ( 0 ) ;
22+ protected expectingBoot = new State ( true ) ;
23+
24+ // In Savina, randoms is accessed directly from SucOverRelaxConfig.
25+ // Here we pass it in to the closure.
1626 constructor ( parent : Reactor , size : number , _randoms : number [ ] [ ] ) {
1727 super ( parent , "SORRunner" ) ;
1828 // These are in SorRunner;
19- const s = size ;
29+ this . s = new State ( size ) ;
2030 // In the scala implementation a simple /2 was used.
2131 // In JS we might need to enforce some sort of guarantee as it was used to calculate position
22- const part = Math . floor ( s / 2 ) ;
32+ this . part = new State ( Math . floor ( size / 2 ) ) ;
2333 /** These are from Savina. They should be rather irrelevant, actually. */
2434 this . sorActors = new State ( [ ] ) ;
2535 this . sorPeer = new State ( undefined ) ;
@@ -28,16 +38,105 @@ export class SORRunner extends Reactor {
2838 // This creates a bunch of ports.
2939 this . portToSORActors = new OutPort ( this ) ;
3040 this . portToSORPeer = new OutPort ( this ) ;
31-
32- // SorRunner::boot()
41+ this . portFromSORActor = new InPort ( this ) ;
42+ this . portFromSORPeer = new InPort ( this ) ;
43+
3344 this . addMutation (
3445 [ this . startup ] ,
35- [ this . sorActors , this . sorPeer ] ,
36- function ( this , sorActors , sorPeer ) {
37- const myBorder : Reactor [ ] = [ ] ;
38- const randoms = _randoms ;
46+ [ this . sorActors , this . sorPeer , this . portToSORActors , this . portToSORPeer ] ,
47+ function ( this , sorActors , sorPeer , portToSORActors , portToSORPeer ) {
48+ // TODO: Add actual stuff
49+ ;
50+ }
51+ ) ;
52+ }
53+
54+ // This is to be used WITHIN mutation react functions.
55+ static process ( this : MutationSandbox , message : Message , args : ProcessingArgs ) : void {
56+ switch ( message . messageType ) {
57+ case MessageTypes . sorBootMessage : {
58+ if ( args . type !== MessageTypes . sorBootMessage ) {
59+ throw new Error ( "Wrong type of arguments passed." ) ;
60+ }
61+ // expectingBoot is args[0]
62+ args . expectingBoot . set ( false ) ;
63+ SORRunner . boot . apply ( this , [ args ] ) ;
64+ break ;
65+ }
66+ case MessageTypes . sorResultMessage : {
67+ if ( args . type !== MessageTypes . sorResultMessage ) {
68+ throw new Error ( "Wrong type of arguments passed." ) ;
69+ }
3970
40- // In scala, (i <- 0 until s) is loop excluding s.
71+ const { mv, msgRcv} = message ;
72+ const { expectingBoot, totalMsgRcv, returned, gTotal, s, part} = args ;
73+
74+ if ( expectingBoot . get ( ) ) {
75+ throw new Error ( "SORRunner not booted yet!" ) ;
76+ }
77+
78+ totalMsgRcv . set ( totalMsgRcv . get ( ) + msgRcv ) ;
79+ returned . set ( returned . get ( ) + 1 ) ;
80+ gTotal . set ( gTotal . get ( ) + mv ) ;
81+
82+ if ( returned . get ( ) === ( s . get ( ) * part . get ( ) ) + 1 ) {
83+ // TODO: validate
84+ // TODO: exit
85+ ;
86+ }
87+ break ;
88+ }
89+ case MessageTypes . sorBorderMessage : {
90+ if ( args . type !== MessageTypes . sorBorderMessage ) {
91+ throw new Error ( "Wrong type of arguments passed." ) ;
92+ }
93+
94+ const { mBorder} = message ;
95+ const { expectingBoot, s, part, sorActors, portToSORActors} = args ;
96+
97+ if ( expectingBoot . get ( ) ) {
98+ throw new Error ( "SORRunner not booted yet!" ) ;
99+ }
100+ const sorActorsValue = sorActors . get ( ) ;
101+ for ( let i = 0 ; i <= s . get ( ) ; ++ i ) {
102+ sorActorsValue [ ( i + 1 ) * ( part . get ( ) + 1 ) - 1 ] = mBorder . borderActors [ i ] ;
103+ }
104+ sorActors . set ( sorActorsValue ) ;
105+ for ( let i = 0 ; i <= s . get ( ) ; ++ i ) {
106+ for ( let j = 0 ; j <= part . get ( ) ; ++ j ) {
107+ const pos = ( i * ( part . get ( ) + 1 ) ) + j ;
108+ // Ibidem, connect then disconnect to simulate
109+ // "fire and forget" in scala.
110+ this . connect ( portToSORActors , sorActorsValue [ pos ] . portFromRunner ) ;
111+ this . getReactor ( ) . writable ( portToSORActors ) . set (
112+ {
113+ messageType : MessageTypes . sorStartMessage ,
114+ mi : jacobi ,
115+ mActors : sorActorsValue
116+ }
117+ ) ;
118+ this . disconnect ( portToSORActors , sorActorsValue [ pos ] . portFromRunner ) ;
119+ }
120+ }
121+ break ;
122+ }
123+ default : {
124+ throw new Error ( "Received wrong message from port" ) ;
125+ }
126+ }
127+ }
128+
129+ // SorRunner::boot()
130+ static boot ( this : MutationSandbox ,
131+ args : BootProcessingArgs
132+ ) : void {
133+ const { _randoms, _s, _part, sorActors, sorPeer, portToSORPeer} = args ;
134+
135+ const myBorder : SORActor [ ] = [ ] ;
136+ const randoms = _randoms ;
137+ const s = _s . get ( ) ;
138+ const part = _part . get ( ) ;
139+ // In scala, (i <- 0 until s) is a loop excluding s.
41140 const sorActorsValue = sorActors . get ( ) ;
42141 for ( let i = 0 ; i < s ; ++ i ) {
43142 let c = i % 2 ;
@@ -51,7 +150,7 @@ export class SORRunner extends Reactor {
51150 SORActor ,
52151 pos , randoms [ i ] [ j ] , c , s , part + 1 , omega , this . getReactor ( ) , false
53152 ) ;
54- // TODO: Make connections
153+
55154 if ( j === ( part - 1 ) ) {
56155 myBorder [ i ] = sorActorsValue [ pos ] ;
57156 }
@@ -69,14 +168,50 @@ export class SORRunner extends Reactor {
69168
70169 const sorPeerValue = this . getReactor ( ) . _uncheckedAddSibling (
71170 SORPeer ,
72- s , part , partialMatrix , new SorBorder ( myBorder ) ,
171+ s , part , partialMatrix , new SorBorder ( myBorder ) ,
73172 // A dirty hack. Maybe this will be removed as ports get added.
74173 this . getReactor ( ) as SORRunner
75- ) ;
174+ ) ;
76175 sorPeer . set ( sorPeerValue ) ;
77- // TODO: Add connections.
78-
79- }
80- ) ;
176+ // Pass message.
177+ // This is similar to Scala's !; but it looks pretty...... interesting in LF.
178+ // If node is concurrent or parallel, this might be a problem, so direct copy-pastaing to C++ runtime might not work.
179+ this . connect ( portToSORPeer , sorPeerValue . portFromSORRunner ) ;
180+ this . getReactor ( ) . writable ( portToSORPeer ) . set ( { messageType : MessageTypes . sorBootMessage } ) ;
181+ // Disconnect immediately.
182+ this . disconnect ( portToSORPeer , sorPeerValue . portFromSORRunner ) ;
81183 }
82184}
185+
186+
187+ interface BootProcessingArgs {
188+ type : MessageTypes . sorBootMessage ,
189+ expectingBoot : State < boolean > ,
190+ _randoms : number [ ] [ ] ,
191+ _s : State < number > ,
192+ _part : State < number > ,
193+ sorActors : State < SORActor [ ] > ,
194+ sorPeer : State < SORPeer > ,
195+ portToSORPeer : OutPort < Message >
196+ }
197+
198+ interface ResultProcessingArgs {
199+ type : MessageTypes . sorResultMessage ,
200+ expectingBoot : State < boolean > ,
201+ totalMsgRcv : State < number > ,
202+ returned : State < number > ,
203+ gTotal : State < number > ,
204+ s : State < number > ,
205+ part : State < number >
206+ }
207+
208+ interface BorderProcessingArgs {
209+ type : MessageTypes . sorBorderMessage ,
210+ expectingBoot : State < boolean > ,
211+ s : State < number > ,
212+ part : State < number > ,
213+ sorActors : State < SORActor [ ] > ,
214+ portToSORActors : OutPort < Message >
215+ }
216+
217+ type ProcessingArgs = BootProcessingArgs | ResultProcessingArgs | BorderProcessingArgs ;
0 commit comments