Skip to content

Commit 704470a

Browse files
committed
Read/write cache in evaluator.
1 parent da2e980 commit 704470a

File tree

4 files changed

+80
-19
lines changed

4 files changed

+80
-19
lines changed

src/builder/analyzer.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::setup::{
88
self, DesiredMode, FlowSetupMetadata, FlowSetupState, ResourceIdentifier, SourceSetupState,
99
TargetSetupState, TargetSetupStateCommon,
1010
};
11+
use crate::utils::fingerprint::Fingerprinter;
1112
use crate::{
1213
api_bail, api_error,
1314
base::{
@@ -698,24 +699,32 @@ impl<'a> AnalyzerContext<'a> {
698699
let output = scope
699700
.data
700701
.add_field(reactive_op.name.clone(), &output_type)?;
701-
let op_name = reactive_op.name.clone();
702+
let reactive_op = reactive_op.clone();
702703
async move {
703704
let executor = executor.await.with_context(|| {
704-
format!("Failed to build executor for transform op: {op_name}")
705+
format!("Failed to build executor for transform op: {}", reactive_op.name)
705706
})?;
707+
let behavior_version = executor.behavior_version();
706708
let function_exec_info = AnalyzedFunctionExecInfo {
707709
enable_caching: executor.enable_caching(),
708-
behavior_version: executor.behavior_version(),
710+
behavior_version,
711+
fingerprinter: Fingerprinter::default()
712+
.with(&reactive_op.name)?
713+
.with(&reactive_op.spec)?
714+
.with(&behavior_version)?
715+
.with(&output_type.without_attrs())?,
716+
output_type: output_type.typ.clone(),
709717
};
710718
if function_exec_info.enable_caching
711719
&& function_exec_info.behavior_version.is_some()
712720
{
713721
api_bail!(
714-
"When caching is enabled, behavior version must be specified for transform op: {op_name}",
722+
"When caching is enabled, behavior version must be specified for transform op: {}",
723+
reactive_op.name
715724
);
716725
}
717726
Ok(AnalyzedReactiveOp::Transform(AnalyzedTransformOp {
718-
name: op_name,
727+
name: reactive_op.name,
719728
inputs: input_value_mappings,
720729
function_exec_info,
721730
executor,

src/builder/plan.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::base::schema::ValueType;
66
use crate::base::value;
77
use crate::execution::db_tracking_setup;
88
use crate::ops::interface::*;
9+
use crate::utils::fingerprint::Fingerprinter;
910

1011
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1112
pub struct AnalyzedLocalFieldReference {
@@ -65,6 +66,10 @@ pub struct AnalyzedSourceOp {
6566
pub struct AnalyzedFunctionExecInfo {
6667
pub enable_caching: bool,
6768
pub behavior_version: Option<u32>,
69+
70+
/// Fingerprinter of the function's behavior.
71+
pub fingerprinter: Fingerprinter,
72+
pub output_type: ValueType,
6873
}
6974

7075
pub struct AnalyzedTransformOp {

src/execution/evaluator.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,29 @@ async fn evaluate_op_scope(
304304
match reactive_op {
305305
AnalyzedReactiveOp::Transform(op) => {
306306
let input_values = assemble_input_values(&op.inputs, scoped_entries);
307-
let output_value = op.executor.evaluate(input_values).await?;
307+
let output_value = if let Some(cache) = op
308+
.function_exec_info
309+
.enable_caching
310+
.then_some(cache)
311+
.flatten()
312+
{
313+
let key = op
314+
.function_exec_info
315+
.fingerprinter
316+
.clone()
317+
.with(&input_values)?
318+
.to_fingerprint();
319+
cache
320+
.evaluate(
321+
key,
322+
&op.function_exec_info.output_type,
323+
/*ttl=*/ None,
324+
move || op.executor.evaluate(input_values),
325+
)
326+
.await?
327+
} else {
328+
op.executor.evaluate(input_values).await?
329+
};
308330
head_scope.define_field(&op.output, output_value)?;
309331
}
310332

src/execution/memoization.rs

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@ use anyhow::Result;
22
use serde::{Deserialize, Serialize};
33
use std::{
44
collections::HashMap,
5+
future::Future,
56
sync::{Arc, Mutex},
67
};
78

89
use crate::{
910
base::{schema, value},
11+
service::error::{SharedError, SharedResultExt},
1012
utils::fingerprint::Fingerprint,
1113
};
1214

@@ -37,7 +39,7 @@ enum EvaluationCacheData {
3739
/// Existing entry in previous runs, but not in current run yet.
3840
Previous(serde_json::Value),
3941
/// Value appeared in current run.
40-
Current(Arc<async_lock::OnceCell<value::Value>>),
42+
Current(Arc<async_lock::OnceCell<Result<value::Value, SharedError>>>),
4143
}
4244

4345
pub struct EvaluationCache {
@@ -79,25 +81,28 @@ impl EvaluationCache {
7981
.into_iter()
8082
.filter_map(|(k, e)| match e.data {
8183
EvaluationCacheData::Previous(_) => None,
82-
EvaluationCacheData::Current(entry) => entry.get().map(|v| {
83-
Ok((
84-
k,
85-
CacheEntry {
86-
time_sec: e.time.timestamp(),
87-
value: serde_json::to_value(v)?,
88-
},
89-
))
90-
}),
84+
EvaluationCacheData::Current(entry) => match entry.get() {
85+
Some(Ok(v)) => Some(serde_json::to_value(v).map(|value| {
86+
(
87+
k,
88+
CacheEntry {
89+
time_sec: e.time.timestamp(),
90+
value,
91+
},
92+
)
93+
})),
94+
_ => None,
95+
},
9196
})
92-
.collect::<Result<_>>()?)
97+
.collect::<Result<_, _>>()?)
9398
}
9499

95100
pub fn get(
96101
&self,
97102
key: Fingerprint,
98103
typ: &schema::ValueType,
99104
ttl: Option<chrono::Duration>,
100-
) -> Result<Arc<async_lock::OnceCell<value::Value>>> {
105+
) -> Result<Arc<async_lock::OnceCell<Result<value::Value, SharedError>>>> {
101106
let mut cache = self.cache.lock().unwrap();
102107
let result = {
103108
match cache.entry(key) {
@@ -110,7 +115,7 @@ impl EvaluationCache {
110115
match &mut entry_mut.data {
111116
EvaluationCacheData::Previous(value) => {
112117
let value = value::Value::from_json(std::mem::take(value), typ)?;
113-
let cell = Arc::new(async_lock::OnceCell::from(value));
118+
let cell = Arc::new(async_lock::OnceCell::from(Ok(value)));
114119
let time = entry_mut.time;
115120
entry.insert(EvaluationCacheEntry {
116121
time,
@@ -133,4 +138,24 @@ impl EvaluationCache {
133138
};
134139
Ok(result)
135140
}
141+
142+
pub async fn evaluate<Fut>(
143+
&self,
144+
key: Fingerprint,
145+
typ: &schema::ValueType,
146+
ttl: Option<chrono::Duration>,
147+
compute: impl FnOnce() -> Fut,
148+
) -> Result<value::Value>
149+
where
150+
Fut: Future<Output = Result<value::Value>>,
151+
{
152+
let cell = self.get(key, typ, ttl)?;
153+
let result = cell
154+
.get_or_init(|| {
155+
let fut = compute();
156+
async move { fut.await.map_err(SharedError::new) }
157+
})
158+
.await;
159+
Ok(result.clone().std_result()?)
160+
}
136161
}

0 commit comments

Comments
 (0)