@@ -20,16 +20,51 @@ use std::collections::BTreeMap;
2020use tracing:: debug;
2121use tracing:: info;
2222use tracing:: instrument;
23+ use tracing:: warn;
2324
2425pub trait CloneFn1 < T : StreamData , S : StreamData > : Fn ( T ) -> S + Clone + ' static { }
2526impl < T , S : StreamData , R : StreamData > CloneFn1 < S , R > for T where T : Fn ( S ) -> R + Clone + ' static { }
2627
27- pub fn lift1 < S : StreamData , R : StreamData > (
28+ // TODO: Should be applied to input streams only
29+ fn no_val_lift_base ( mut x_mon : OutputStream < Value > ) -> OutputStream < Value > {
30+ Box :: pin ( stream ! {
31+ let mut last : Option <Value > = None ;
32+ while let Some ( curr) = x_mon. next( ) . await {
33+ match curr {
34+ Value :: NoVal => {
35+ if let Some ( last) = & last {
36+ yield last. clone( ) ;
37+ } else {
38+ // Only happens when the first value is NoVal
39+ yield Value :: NoVal ;
40+ }
41+ }
42+ _ => {
43+ last = Some ( curr. clone( ) ) ;
44+ yield curr;
45+ }
46+ }
47+ }
48+ } )
49+ }
50+
51+ pub fn no_val_stream_lift1 (
52+ f : impl CloneFn1 < Value , Value > ,
53+ x_mon : OutputStream < Value > ,
54+ ) -> OutputStream < Value > {
55+ Box :: pin ( no_val_lift_base ( x_mon) . map ( move |x| {
56+ if x == Value :: NoVal {
57+ Value :: NoVal
58+ } else {
59+ f ( x)
60+ }
61+ } ) )
62+ }
63+
64+ pub fn stream_lift1 < S : StreamData , R : StreamData > (
2865 f : impl CloneFn1 < S , R > ,
2966 x_mon : OutputStream < S > ,
3067) -> OutputStream < R > {
31- let f = f. clone ( ) ;
32-
3368 Box :: pin ( x_mon. map ( f) )
3469}
3570
@@ -42,13 +77,12 @@ impl<T, S: StreamData, R: StreamData, U: StreamData> CloneFn2<S, R, U> for T whe
4277{
4378}
4479
45- pub fn lift2 < S : StreamData , R : StreamData , U : StreamData > (
80+ pub fn stream_lift2 < S : StreamData , R : StreamData , U : StreamData > (
4681 f : impl CloneFn2 < S , R , U > ,
4782 x_mon : OutputStream < S > ,
4883 y_mon : OutputStream < R > ,
4984) -> OutputStream < U > {
5085 debug ! ( "lift2: Creating combined stream" ) ;
51- let f = f. clone ( ) ;
5286 Box :: pin ( x_mon. zip ( y_mon) . map ( move |( x, y) | {
5387 debug ! ( "lift2: Processing input values" ) ;
5488 let result = f ( x, y) ;
@@ -57,6 +91,24 @@ pub fn lift2<S: StreamData, R: StreamData, U: StreamData>(
5791 } ) )
5892}
5993
94+ pub fn no_val_stream_lift2 (
95+ f : impl CloneFn2 < Value , Value , Value > ,
96+ x_mon : OutputStream < Value > ,
97+ y_mon : OutputStream < Value > ,
98+ ) -> OutputStream < Value > {
99+ Box :: pin (
100+ no_val_lift_base ( x_mon)
101+ . zip ( no_val_lift_base ( y_mon) )
102+ . map ( move |( x, y) | {
103+ if x == Value :: NoVal || y == Value :: NoVal {
104+ Value :: NoVal
105+ } else {
106+ f ( x, y)
107+ }
108+ } ) ,
109+ )
110+ }
111+
60112pub trait CloneFn3 < S : StreamData , R : StreamData , U : StreamData , V : StreamData > :
61113 Fn ( S , R , U ) -> V + Clone + ' static
62114{
@@ -66,48 +118,52 @@ impl<T, S: StreamData, R: StreamData, U: StreamData, V: StreamData> CloneFn3<S,
66118{
67119}
68120
69- pub fn lift3 < S : StreamData , R : StreamData , U : StreamData , V : StreamData > (
70- f : impl CloneFn3 < S , R , V , U > ,
71- x_mon : OutputStream < S > ,
72- y_mon : OutputStream < R > ,
73- z_mon : OutputStream < V > ,
74- ) -> OutputStream < U > {
75- let f = f. clone ( ) ;
76-
121+ pub fn no_val_stream_lift3 (
122+ f : impl CloneFn3 < Value , Value , Value , Value > ,
123+ x_mon : OutputStream < Value > ,
124+ y_mon : OutputStream < Value > ,
125+ z_mon : OutputStream < Value > ,
126+ ) -> OutputStream < Value > {
77127 Box :: pin (
78- x_mon
79- . zip ( y_mon)
80- . zip ( z_mon)
81- . map ( move |( ( x, y) , z) | f ( x, y, z) ) ,
82- ) as LocalBoxStream < ' static , U >
128+ no_val_lift_base ( x_mon)
129+ . zip ( no_val_lift_base ( y_mon) )
130+ . zip ( no_val_lift_base ( z_mon) )
131+ . map ( move |( ( x, y) , z) | {
132+ if x == Value :: NoVal || y == Value :: NoVal || z == Value :: NoVal {
133+ Value :: NoVal
134+ } else {
135+ f ( x, y, z)
136+ }
137+ } ) ,
138+ )
83139}
84140
85141pub fn and ( x : OutputStream < Value > , y : OutputStream < Value > ) -> OutputStream < Value > {
86- lift2 (
142+ no_val_stream_lift2 (
87143 |x, y| Value :: Bool ( x == Value :: Bool ( true ) && y == Value :: Bool ( true ) ) ,
88144 x,
89145 y,
90146 )
91147}
92148
93149pub fn or ( x : OutputStream < Value > , y : OutputStream < Value > ) -> OutputStream < Value > {
94- lift2 (
150+ no_val_stream_lift2 (
95151 |x, y| Value :: Bool ( x == Value :: Bool ( true ) || y == Value :: Bool ( true ) ) ,
96152 x,
97153 y,
98154 )
99155}
100156
101157pub fn not ( x : OutputStream < Value > ) -> OutputStream < Value > {
102- lift1 ( |x| Value :: Bool ( x == Value :: Bool ( false ) ) , x)
158+ no_val_stream_lift1 ( |x| Value :: Bool ( x == Value :: Bool ( false ) ) , x)
103159}
104160
105161pub fn eq ( x : OutputStream < Value > , y : OutputStream < Value > ) -> OutputStream < Value > {
106- lift2 ( |x, y| Value :: Bool ( x == y) , x, y)
162+ no_val_stream_lift2 ( |x, y| Value :: Bool ( x == y) , x, y)
107163}
108164
109165pub fn le ( x : OutputStream < Value > , y : OutputStream < Value > ) -> OutputStream < Value > {
110- lift2 (
166+ no_val_stream_lift2 (
111167 |x, y| match ( x, y) {
112168 ( Value :: Int ( x) , Value :: Int ( y) ) => Value :: Bool ( x <= y) ,
113169 ( Value :: Int ( a) , Value :: Float ( b) ) => Value :: Bool ( a as f64 <= b) ,
@@ -132,7 +188,7 @@ pub fn le(x: OutputStream<Value>, y: OutputStream<Value>) -> OutputStream<Value>
132188}
133189
134190pub fn lt ( x : OutputStream < Value > , y : OutputStream < Value > ) -> OutputStream < Value > {
135- lift2 (
191+ no_val_stream_lift2 (
136192 |x, y| match ( x, y) {
137193 ( Value :: Int ( x) , Value :: Int ( y) ) => Value :: Bool ( x < y) ,
138194 ( Value :: Int ( a) , Value :: Float ( b) ) => Value :: Bool ( ( a as f64 ) < b) ,
@@ -157,7 +213,7 @@ pub fn lt(x: OutputStream<Value>, y: OutputStream<Value>) -> OutputStream<Value>
157213}
158214
159215pub fn ge ( x : OutputStream < Value > , y : OutputStream < Value > ) -> OutputStream < Value > {
160- lift2 (
216+ no_val_stream_lift2 (
161217 |x, y| match ( x, y) {
162218 ( Value :: Int ( x) , Value :: Int ( y) ) => Value :: Bool ( x >= y) ,
163219 ( Value :: Int ( a) , Value :: Float ( b) ) => Value :: Bool ( a as f64 >= b) ,
@@ -182,7 +238,7 @@ pub fn ge(x: OutputStream<Value>, y: OutputStream<Value>) -> OutputStream<Value>
182238}
183239
184240pub fn gt ( x : OutputStream < Value > , y : OutputStream < Value > ) -> OutputStream < Value > {
185- lift2 (
241+ no_val_stream_lift2 (
186242 |x, y| match ( x, y) {
187243 ( Value :: Int ( x) , Value :: Int ( y) ) => Value :: Bool ( x > y) ,
188244 ( Value :: Int ( a) , Value :: Float ( b) ) => Value :: Bool ( ( a as f64 ) > b) ,
@@ -216,7 +272,7 @@ pub fn if_stm(
216272 y : OutputStream < Value > ,
217273 z : OutputStream < Value > ,
218274) -> OutputStream < Value > {
219- lift3 (
275+ no_val_stream_lift3 (
220276 |x, y, z| match x {
221277 Value :: Bool ( true ) => y,
222278 Value :: Bool ( false ) => z,
@@ -237,19 +293,64 @@ pub fn if_stm(
237293// last samples. This is accomplished by yielding the x[-N] sample but having the stream
238294// currently at x[0]. However, with recursive streams that puts us in a deadlock when calling
239295// x.next()
296+ //
297+ // TODO: There is a bug here introduced by async SRV.
298+ // The bug is that sindex expressions can never yield NoVal, which is usually
299+ // possible if the first value received in an expression is NoVal.
300+ // Fixing it requires a larger refactor, probably with a special case for dealing with
301+ // recursive sindex expressions. Or alternatively, disallowing recursive definitions in sindex.
302+ //
303+ // First consider the spec/trace:
304+ // in x
305+ // out y
306+ // y = x[1]
307+ // 0: x = NoVal
308+ // 1: x = 42
309+ // 2: x = 43
310+ //
311+ // The correct output here is:
312+ // 0: y = NoVal
313+ // 1: y = Deferred
314+ // 2: y = 42
315+ //
316+ // In order to implement this behavior, we need to yield Deferred for i samples,
317+ // not counting those where x is NoVal. Until x is not NoVal for the first time,
318+ // we yield NoVal. This is not too hard to implement, and the core looks something like:
319+ // let val = x.next().await;
320+ // if val != Value::NoVal {...} else {...}
321+ //
322+ // Notice that it requires looking at x in order to decide what to yield.
323+ //
324+ // Now consider this recursive spec with the same trace:
325+ // out z
326+ // z = default(z[1], 0) + x
327+ //
328+ // If we implement sindex like the pseudo-implementation above, we get a deadlock
329+ // as `z[-1]` needs to look at `z` to decide what to yield, but `z` is waiting for the
330+ // rhs of the assignment to finish.
331+ //
332+ // Potential solution:
333+ // If we knew which variable the expression is assigned to, we could have a
334+ // sindex_rec implementation that is implemented more or less like normal sindex,
335+ // and sindex which is implemented like below.
336+ // (The correct call would need to be evaluated in semantics.rs where the SExpr
337+ // is still available).
240338pub fn sindex ( x : OutputStream < Value > , i : u64 ) -> OutputStream < Value > {
241- if let Ok ( i) = usize:: try_from ( i) {
242- let cs = stream:: repeat ( Value :: Deferred ) . take ( i) ;
243- // Delay x by i defers
244- Box :: pin ( cs. chain ( x) )
245- } else {
246- panic ! ( "Index too large for sindex operation" )
339+ fn sindex_base ( x : OutputStream < Value > , i : u64 ) -> OutputStream < Value > {
340+ if let Ok ( i) = usize:: try_from ( i) {
341+ let cs = stream:: repeat ( Value :: Deferred ) . take ( i) ;
342+ // Delay x by i defers
343+ Box :: pin ( cs. chain ( x) )
344+ } else {
345+ panic ! ( "Index too large for sindex operation" )
346+ }
247347 }
348+ no_val_lift_base ( sindex_base ( x, i) )
248349}
249350
250351pub fn plus ( x : OutputStream < Value > , y : OutputStream < Value > ) -> OutputStream < Value > {
251352 debug ! ( "Creating plus operation stream" ) ;
252- lift2 (
353+ stream_lift2 (
253354 |x, y| {
254355 debug ! ( "Executing plus operation" ) ;
255356 let result = match ( x, y) {
@@ -278,7 +379,7 @@ pub fn plus(x: OutputStream<Value>, y: OutputStream<Value>) -> OutputStream<Valu
278379}
279380
280381pub fn modulo ( x : OutputStream < Value > , y : OutputStream < Value > ) -> OutputStream < Value > {
281- lift2 (
382+ stream_lift2 (
282383 |x, y| match ( x, y) {
283384 ( Value :: Int ( x) , Value :: Int ( y) ) => Value :: Int ( x % y) ,
284385 ( Value :: Int ( x) , Value :: Float ( y) ) => Value :: Float ( x as f64 % y) ,
@@ -297,7 +398,7 @@ pub fn modulo(x: OutputStream<Value>, y: OutputStream<Value>) -> OutputStream<Va
297398}
298399
299400pub fn minus ( x : OutputStream < Value > , y : OutputStream < Value > ) -> OutputStream < Value > {
300- lift2 (
401+ stream_lift2 (
301402 |x, y| match ( x, y) {
302403 ( Value :: Int ( x) , Value :: Int ( y) ) => Value :: Int ( x - y) ,
303404 ( Value :: Int ( x) , Value :: Float ( y) ) => Value :: Float ( x as f64 - y) ,
@@ -316,7 +417,7 @@ pub fn minus(x: OutputStream<Value>, y: OutputStream<Value>) -> OutputStream<Val
316417}
317418
318419pub fn mult ( x : OutputStream < Value > , y : OutputStream < Value > ) -> OutputStream < Value > {
319- lift2 (
420+ stream_lift2 (
320421 |x, y| match ( x, y) {
321422 ( Value :: Int ( x) , Value :: Int ( y) ) => Value :: Int ( x * y) ,
322423 ( Value :: Int ( x) , Value :: Float ( y) ) => Value :: Float ( x as f64 * y) ,
@@ -335,7 +436,7 @@ pub fn mult(x: OutputStream<Value>, y: OutputStream<Value>) -> OutputStream<Valu
335436}
336437
337438pub fn div ( x : OutputStream < Value > , y : OutputStream < Value > ) -> OutputStream < Value > {
338- lift2 (
439+ stream_lift2 (
339440 |x, y| match ( x, y) {
340441 ( Value :: Int ( x) , Value :: Int ( y) ) => Value :: Int ( x / y) ,
341442 ( Value :: Int ( x) , Value :: Float ( y) ) => Value :: Float ( x as f64 / y) ,
@@ -354,7 +455,7 @@ pub fn div(x: OutputStream<Value>, y: OutputStream<Value>) -> OutputStream<Value
354455}
355456
356457pub fn concat ( x : OutputStream < Value > , y : OutputStream < Value > ) -> OutputStream < Value > {
357- lift2 (
458+ stream_lift2 (
358459 |x, y| match ( x, y) {
359460 ( Value :: Str ( x) , Value :: Str ( y) ) => {
360461 // ConcreteStreamData::Str(format!("{x}{y}").into());
@@ -769,7 +870,7 @@ pub fn mhas_key(mut xs: OutputStream<Value>, k: EcoString) -> OutputStream<Value
769870}
770871
771872pub fn sin ( v : OutputStream < Value > ) -> OutputStream < Value > {
772- lift1 (
873+ stream_lift1 (
773874 |v| match v {
774875 Value :: Float ( v) => v. sin ( ) . into ( ) ,
775876 Value :: Deferred => Value :: Deferred ,
@@ -780,7 +881,7 @@ pub fn sin(v: OutputStream<Value>) -> OutputStream<Value> {
780881}
781882
782883pub fn cos ( v : OutputStream < Value > ) -> OutputStream < Value > {
783- lift1 (
884+ stream_lift1 (
784885 |v| match v {
785886 Value :: Float ( v) => v. cos ( ) . into ( ) ,
786887 Value :: Deferred => Value :: Deferred ,
@@ -791,7 +892,7 @@ pub fn cos(v: OutputStream<Value>) -> OutputStream<Value> {
791892}
792893
793894pub fn tan ( v : OutputStream < Value > ) -> OutputStream < Value > {
794- lift1 (
895+ stream_lift1 (
795896 |v| match v {
796897 Value :: Float ( v) => v. tan ( ) . into ( ) ,
797898 Value :: Deferred => Value :: Deferred ,
@@ -802,7 +903,7 @@ pub fn tan(v: OutputStream<Value>) -> OutputStream<Value> {
802903}
803904
804905pub fn abs ( v : OutputStream < Value > ) -> OutputStream < Value > {
805- lift1 (
906+ stream_lift1 (
806907 |v| match v {
807908 Value :: Int ( v) => v. abs ( ) . into ( ) ,
808909 Value :: Float ( v) => v. abs ( ) . into ( ) ,
0 commit comments