Skip to content

Commit 88389d6

Browse files
committed
Rewrite defer to mimic dynamic implementation. Fixed benchmark hanging but discovered new bug
1 parent 6f8e65e commit 88389d6

File tree

5 files changed

+270
-115
lines changed

5 files changed

+270
-115
lines changed

src/runtime/semi_sync.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1017,4 +1017,84 @@ mod tests {
10171017
],
10181018
);
10191019
}
1020+
1021+
#[apply(async_test)]
1022+
async fn test_defer_single(executor: Rc<LocalExecutor<'static>>) {
1023+
let spec = lola_specification(&mut spec_defer()).unwrap();
1024+
1025+
let x = vec![0.into(), 1.into(), 2.into()];
1026+
let e = vec!["x + 1".into(), Value::Deferred, Value::Deferred];
1027+
let input_streams =
1028+
MapInputProvider::new(BTreeMap::from([("x".into(), x), ("e".into(), e)]));
1029+
let mut output_handler = Box::new(ManualOutputHandler::new(
1030+
executor.clone(),
1031+
spec.output_vars.clone(),
1032+
));
1033+
let outputs = output_handler.get_output();
1034+
1035+
let monitor = TestMonitor {
1036+
_executor: executor.clone(),
1037+
model: spec.clone(),
1038+
input_provider: Box::new(input_streams),
1039+
output_handler,
1040+
_marker: std::marker::PhantomData,
1041+
};
1042+
1043+
executor.spawn(monitor.run()).detach();
1044+
1045+
let outputs: Vec<(usize, Vec<Value>)> =
1046+
with_timeout(outputs.enumerate().collect(), 1, "outputs")
1047+
.await
1048+
.unwrap();
1049+
1050+
assert_eq!(outputs.len(), 3,);
1051+
assert_eq!(
1052+
outputs,
1053+
vec![
1054+
(0, vec![1.into()]),
1055+
(1, vec![2.into()]),
1056+
(2, vec![3.into()]),
1057+
],
1058+
);
1059+
}
1060+
1061+
#[apply(async_test)]
1062+
async fn test_defer_multiple(executor: Rc<LocalExecutor<'static>>) {
1063+
let spec = lola_specification(&mut spec_defer()).unwrap();
1064+
1065+
let x = vec![0.into(), 1.into(), 2.into()];
1066+
let e = vec!["x + 1".into(), "x + 2".into(), "x + 3".into()];
1067+
let input_streams =
1068+
MapInputProvider::new(BTreeMap::from([("x".into(), x), ("e".into(), e)]));
1069+
let mut output_handler = Box::new(ManualOutputHandler::new(
1070+
executor.clone(),
1071+
spec.output_vars.clone(),
1072+
));
1073+
let outputs = output_handler.get_output();
1074+
1075+
let monitor = TestMonitor {
1076+
_executor: executor.clone(),
1077+
model: spec.clone(),
1078+
input_provider: Box::new(input_streams),
1079+
output_handler,
1080+
_marker: std::marker::PhantomData,
1081+
};
1082+
1083+
executor.spawn(monitor.run()).detach();
1084+
1085+
let outputs: Vec<(usize, Vec<Value>)> =
1086+
with_timeout(outputs.enumerate().collect(), 1, "outputs")
1087+
.await
1088+
.unwrap();
1089+
1090+
assert_eq!(outputs.len(), 3,);
1091+
assert_eq!(
1092+
outputs,
1093+
vec![
1094+
(0, vec![1.into()]),
1095+
(1, vec![2.into()]),
1096+
(2, vec![3.into()]),
1097+
],
1098+
);
1099+
}
10201100
}

src/semantics/untimed_typed_lola/combinators.rs

Lines changed: 119 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -510,11 +510,15 @@ where
510510
// Defer for an UntimedLolaExpression using the lola_expression parser
511511
// TODO: this currently has unnecessary potentially-panicing casts since the types in the untyped
512512
// semantics are not granular enough
513+
// TODO: TW: I don't think the combinators in this file are implementing NoVal/Defer semantics
514+
// correctly for typed streams. I did not change this when updating the defer combinator as it
515+
// would be a lot of work (and might conflict with yours). Would also remove the need for
516+
// `prev_received_deferred`. See outcommented stream_lift_base calls
513517
// #[instrument(skip(ctx, prop_stream))]
514518
pub fn defer<AC, Parser, T>(
515519
ctx: &AC::Ctx,
516-
mut prop_stream: OutputStream<PartialStreamValue<String>>,
517-
_: EcoVec<VarName>,
520+
mut eval_stream: OutputStream<PartialStreamValue<String>>,
521+
vs: EcoVec<VarName>,
518522
history_length: usize,
519523
type_info: &TypeInfo,
520524
) -> OutputStream<PartialStreamValue<T>>
@@ -523,51 +527,26 @@ where
523527
AC: AsyncConfig<Val = Value, Expr = SExprTE>,
524528
T: TypedStreamData + TryFrom<Value, Error = ()>,
525529
{
526-
let mut subcontext = ctx.subcontext(history_length);
530+
// Create a subcontext with a history window length
531+
let mut subcontext = ctx.restricted_subcontext(vs, history_length);
532+
// let mut eval_stream = stream_lift_base(eval_stream);
533+
let mut eval_output_stream: Option<OutputStream<PartialStreamValue<T>>> = None;
527534
let type_info = type_info.clone();
528-
Box::pin(stream! {
529-
let mut eval_output_stream: Option<OutputStream<PartialStreamValue<T>>> = None;
530-
let mut i = 0;
531-
let mut prev_received_deferred = false;
535+
let mut prev_received_deferred = false;
532536

533-
// Yield Deferred until we have a value to evaluate, then evaluate it
534-
while let Some(current) = prop_stream.next().await {
535-
debug!(?i, ?current, "Defer");
537+
// Build an output stream for dynamic of x over the subcontext
538+
Box::pin(stream! {
539+
while let Some(current) = eval_stream.next().await {
536540
match current {
537-
PartialStreamValue::Known(defer_s) => {
538-
// We have a string to evaluate so do so
539-
let expr = Parser::parse(&mut defer_s.as_ref())
540-
.expect("Invalid dynamic str");
541-
// Create a typed version of the expression
542-
let mut type_info_local = type_info.clone();
543-
let expr = (expr, StreamTypeAscription::Ascribed(T::stream_data_type())).type_check(&mut type_info_local)
544-
.expect("Type error");
545-
let untyped_eval_output_stream: OutputStream<Value> = <TypedUntimedLolaSemantics::<Parser> as MonitoringSemantics<AC>>::to_async_stream(expr, &subcontext);
546-
eval_output_stream = Some(to_typed_partial_stream::<T>(untyped_eval_output_stream));
547-
// debug!(s = ?defer_s.as_ref(), "Evaluated defer string");
548-
subcontext.run().await;
549-
break;
550-
}
551541
PartialStreamValue::Deferred => {
552542
// Consume a sample from the subcontext but return Deferred
553-
debug!("defer combinator received Deferred");
554-
if i >= history_length {
555-
debug!(?i, ?history_length, "Advancing subcontext to clean history");
556-
subcontext.tick().await;
557-
}
558-
i += 1;
543+
subcontext.tick().await;
559544
prev_received_deferred = true;
560545
yield PartialStreamValue::Deferred;
561546
}
562547
PartialStreamValue::NoVal => {
563548
// Consume a sample from the subcontext but return NoVal
564-
debug!("defer combinator received NoVal");
565-
if i >= history_length {
566-
debug!(?i, ?history_length, "Advancing subcontext to clean history");
567-
subcontext.tick().await;
568-
}
569-
i += 1;
570-
549+
subcontext.tick().await;
571550
// Deferred is sticky compared to NoVal, since Deferred indicates that we have
572551
// a pending property that cannot be evaluated yet with the given context.
573552
if prev_received_deferred {
@@ -576,23 +555,116 @@ where
576555
yield PartialStreamValue::NoVal;
577556
}
578557
}
579-
558+
PartialStreamValue::Known(defer_s) => {
559+
let expr = Parser::parse(&mut defer_s.as_ref())
560+
.expect("Invalid defer str");
561+
debug!("Defer evaluated to expression {:?}", expr);
562+
// Create a typed version of the expression
563+
let mut type_info_local = type_info.clone();
564+
let expr = (expr, StreamTypeAscription::Ascribed(T::stream_data_type())).type_check(&mut type_info_local)
565+
.expect("Type error");
566+
let tmp_stream = <TypedUntimedLolaSemantics::<Parser> as MonitoringSemantics<AC>>::to_async_stream(expr, &subcontext);
567+
// let tmp_stream = stream_lift_base(tmp_stream);
568+
let mut tmp_stream = to_typed_partial_stream::<T>(tmp_stream);
569+
// Advance the subcontext to make a new set of input values
570+
// available for the dynamic stream
571+
subcontext.tick().await;
572+
if let Some(eval_res) = tmp_stream.next().await {
573+
eval_output_stream = Some(tmp_stream);
574+
yield eval_res;
575+
} else {
576+
return;
577+
}
578+
break;
579+
}
580580
}
581581
}
582+
if eval_output_stream.is_none() {
583+
return;
584+
}
585+
let mut eval_output_stream = eval_output_stream.unwrap();
582586

583-
// This is None if the prop_stream is done but we never received a property
584-
if let Some(eval_output_stream) = eval_output_stream {
585-
// Wind forward the stream to the current time
586-
let time_progressed = i.min(history_length);
587-
debug!(?i, ?time_progressed, ?history_length, "Time progressed");
588-
let mut eval_output_stream = eval_output_stream.skip(time_progressed);
589-
590-
// Yield the saved value until the inner stream is done
591-
while let Some(eval_res) = eval_output_stream.next().await {
587+
// Use eval_stream as controller for when to tick subcontext. Yield from
588+
// eval_output_stream.
589+
while let Some(_) = eval_stream.next().await {
590+
subcontext.tick().await;
591+
if let Some(eval_res) = eval_output_stream.next().await {
592592
yield eval_res;
593+
} else {
594+
return;
593595
}
594596
}
595597
})
598+
// let mut subcontext = ctx.subcontext(history_length);
599+
// let type_info = type_info.clone();
600+
// Box::pin(stream! {
601+
// let mut eval_output_stream: Option<OutputStream<PartialStreamValue<T>>> = None;
602+
// let mut i = 0;
603+
// let mut prev_received_deferred = false;
604+
//
605+
// // Yield Deferred until we have a value to evaluate, then evaluate it
606+
// while let Some(current) = prop_stream.next().await {
607+
// debug!(?i, ?current, "Defer");
608+
// match current {
609+
// PartialStreamValue::Known(defer_s) => {
610+
// // We have a string to evaluate so do so
611+
// let expr = Parser::parse(&mut defer_s.as_ref())
612+
// .expect("Invalid dynamic str");
613+
// // Create a typed version of the expression
614+
// let mut type_info_local = type_info.clone();
615+
// let expr = (expr, StreamTypeAscription::Ascribed(T::stream_data_type())).type_check(&mut type_info_local)
616+
// .expect("Type error");
617+
// let untyped_eval_output_stream: OutputStream<Value> = <TypedUntimedLolaSemantics::<Parser> as MonitoringSemantics<AC>>::to_async_stream(expr, &subcontext);
618+
// eval_output_stream = Some(to_typed_partial_stream::<T>(untyped_eval_output_stream));
619+
// // debug!(s = ?defer_s.as_ref(), "Evaluated defer string");
620+
// subcontext.run().await;
621+
// break;
622+
// }
623+
// PartialStreamValue::Deferred => {
624+
// // Consume a sample from the subcontext but return Deferred
625+
// debug!("defer combinator received Deferred");
626+
// if i >= history_length {
627+
// debug!(?i, ?history_length, "Advancing subcontext to clean history");
628+
// subcontext.tick().await;
629+
// }
630+
// i += 1;
631+
// prev_received_deferred = true;
632+
// yield PartialStreamValue::Deferred;
633+
// }
634+
// PartialStreamValue::NoVal => {
635+
// // Consume a sample from the subcontext but return NoVal
636+
// debug!("defer combinator received NoVal");
637+
// if i >= history_length {
638+
// debug!(?i, ?history_length, "Advancing subcontext to clean history");
639+
// subcontext.tick().await;
640+
// }
641+
// i += 1;
642+
//
643+
// // Deferred is sticky compared to NoVal, since Deferred indicates that we have
644+
// // a pending property that cannot be evaluated yet with the given context.
645+
// if prev_received_deferred {
646+
// yield PartialStreamValue::Deferred;
647+
// } else {
648+
// yield PartialStreamValue::NoVal;
649+
// }
650+
// }
651+
//
652+
// }
653+
// }
654+
//
655+
// // This is None if the prop_stream is done but we never received a property
656+
// if let Some(eval_output_stream) = eval_output_stream {
657+
// // Wind forward the stream to the current time
658+
// let time_progressed = i.min(history_length);
659+
// debug!(?i, ?time_progressed, ?history_length, "Time progressed");
660+
// let mut eval_output_stream = eval_output_stream.skip(time_progressed);
661+
//
662+
// // Yield the saved value until the inner stream is done
663+
// while let Some(eval_res) = eval_output_stream.next().await {
664+
// yield eval_res;
665+
// }
666+
// }
667+
// })
596668
}
597669

598670
#[cfg(test)]

src/semantics/untimed_typed_lola/semantics.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -442,17 +442,17 @@ where
442442
let e2 = to_async_stream_unit::<AC, Parser>(*e2, ctx);
443443
mc::default(e1, e2)
444444
}
445-
SExprUnit::Defer(e, type_ctx, vs) => {
445+
SExprUnit::Defer(e, type_info, vs) => {
446446
let e = to_async_stream_str::<AC, Parser>(*e, ctx);
447-
mc::defer::<AC, Parser, ()>(ctx, e, vs, 1, &type_ctx)
447+
mc::defer::<AC, Parser, ()>(ctx, e, vs, 1, &type_info)
448448
}
449-
SExprUnit::Dynamic(e, type_ctx) => {
449+
SExprUnit::Dynamic(e, type_info) => {
450450
let e = to_async_stream_str::<AC, Parser>(*e, ctx);
451-
mc::dynamic::<AC, Parser, ()>(ctx, e, None, 1, &type_ctx)
451+
mc::dynamic::<AC, Parser, ()>(ctx, e, None, 1, &type_info)
452452
}
453-
SExprUnit::RestrictedDynamic(e, vs, type_ctx) => {
453+
SExprUnit::RestrictedDynamic(e, vs, type_info) => {
454454
let e = to_async_stream_str::<AC, Parser>(*e, ctx);
455-
mc::dynamic::<AC, Parser, ()>(ctx, e, Some(vs), 1, &type_ctx)
455+
mc::dynamic::<AC, Parser, ()>(ctx, e, Some(vs), 1, &type_info)
456456
}
457457
SExprUnit::Init(e1, e2) => {
458458
let e1 = to_async_stream_unit::<AC, Parser>(*e1, ctx);
@@ -747,7 +747,7 @@ mod tests {
747747
let defer_expr = SExprFloat::Defer(
748748
e_str,
749749
type_info(&[("x", StreamType::Float)]),
750-
eco_vec!["x".into(), "e".into()],
750+
eco_vec!["x".into()],
751751
);
752752

753753
let x = Box::pin(stream::iter(vec![Value::Float(1.0), Value::Float(2.0)]));

0 commit comments

Comments
 (0)