Skip to content

Commit 8fc3c7b

Browse files
authored
Avoid clone when reading data from cache. (#31)
* Avoid clone when reading data from cache. * Cleanup unused import.
1 parent 88951a3 commit 8fc3c7b

File tree

5 files changed

+66
-52
lines changed

5 files changed

+66
-52
lines changed

src/execution/evaluator.rs

Lines changed: 29 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::{
1111
utils::immutable::RefList,
1212
};
1313

14-
use super::memoization::EvaluationCache;
14+
use super::memoization::{evaluate_with_cell, EvaluationCache};
1515

1616
#[derive(Debug)]
1717
pub struct ScopeValueBuilder {
@@ -59,7 +59,7 @@ impl ScopeValueBuilder {
5959
}
6060

6161
fn augmented_from(
62-
source: value::ScopeValue,
62+
source: &value::ScopeValue,
6363
schema: &schema::CollectionSchema,
6464
) -> Result<Self> {
6565
let val_index_base = if schema.has_key() { 1 } else { 0 };
@@ -70,7 +70,7 @@ impl ScopeValueBuilder {
7070
let value::ScopeValue(source_fields) = source;
7171
for ((v, t), r) in source_fields
7272
.fields
73-
.into_iter()
73+
.iter()
7474
.zip(schema.row.fields[val_index_base..(val_index_base + len)].iter())
7575
.zip(&mut builder.fields)
7676
{
@@ -82,17 +82,17 @@ impl ScopeValueBuilder {
8282
}
8383

8484
fn augmented_value(
85-
val: value::Value,
85+
val: &value::Value,
8686
val_type: &schema::ValueType,
8787
) -> Result<value::Value<ScopeValueBuilder>> {
8888
let value = match (val, val_type) {
8989
(value::Value::Null, _) => value::Value::Null,
90-
(value::Value::Basic(v), _) => value::Value::Basic(v),
90+
(value::Value::Basic(v), _) => value::Value::Basic(v.clone()),
9191
(value::Value::Struct(v), schema::ValueType::Struct(t)) => {
9292
value::Value::Struct(value::FieldValues {
9393
fields: v
9494
.fields
95-
.into_iter()
95+
.iter()
9696
.enumerate()
9797
.map(|(i, v)| augmented_value(v, &t.fields[i].value_type.typ))
9898
.collect::<Result<Vec<_>>>()?,
@@ -106,8 +106,8 @@ fn augmented_value(
106106
)
107107
}
108108
(value::Value::Table(v), schema::ValueType::Collection(t)) => value::Value::Table(
109-
v.into_iter()
110-
.map(|(k, v)| Ok((k, ScopeValueBuilder::augmented_from(v, t)?)))
109+
v.iter()
110+
.map(|(k, v)| Ok((k.clone(), ScopeValueBuilder::augmented_from(v, t)?)))
111111
.collect::<Result<BTreeMap<_, _>>>()?,
112112
),
113113
(value::Value::List(v), schema::ValueType::Collection(t)) => value::Value::List(
@@ -245,7 +245,7 @@ impl<'a> ScopeEntry<'a> {
245245
.expect("Field is already set, violating single-definition rule");
246246
}
247247

248-
fn define_field(&self, output_field: &AnalyzedOpOutput, val: value::Value) -> Result<()> {
248+
fn define_field(&self, output_field: &AnalyzedOpOutput, val: &value::Value) -> Result<()> {
249249
let field_index = output_field.field_idx as usize;
250250
let field_schema = &self.schema.fields[field_index];
251251
let val = augmented_value(val, &field_schema.value_type.typ)?;
@@ -304,30 +304,28 @@ async fn evaluate_op_scope(
304304
match reactive_op {
305305
AnalyzedReactiveOp::Transform(op) => {
306306
let input_values = assemble_input_values(&op.inputs, scoped_entries);
307-
let output_value = if let Some(cache) = op
308-
.function_exec_info
309-
.enable_cache
310-
.then_some(cache)
311-
.flatten()
312-
{
313-
let key = op
314-
.function_exec_info
315-
.fingerprinter
316-
.clone()
317-
.with(&input_values)?
318-
.to_fingerprint();
319-
cache
320-
.evaluate(
307+
308+
let output_value_cell = match (op.function_exec_info.enable_cache, cache) {
309+
(true, Some(cache)) => {
310+
let key = op
311+
.function_exec_info
312+
.fingerprinter
313+
.clone()
314+
.with(&input_values)?
315+
.to_fingerprint();
316+
Some(cache.get(
321317
key,
322318
&op.function_exec_info.output_type,
323319
/*ttl=*/ None,
324-
move || op.executor.evaluate(input_values),
325-
)
326-
.await?
327-
} else {
328-
op.executor.evaluate(input_values).await?
320+
)?)
321+
}
322+
_ => None,
329323
};
330-
head_scope.define_field(&op.output, output_value)?;
324+
let output_value = evaluate_with_cell(output_value_cell.as_ref(), move || {
325+
op.executor.evaluate(input_values)
326+
})
327+
.await?;
328+
head_scope.define_field(&op.output, &output_value)?;
331329
}
332330

333331
AnalyzedReactiveOp::ForEach(op) => {
@@ -442,7 +440,7 @@ pub async fn evaluate_source_entry<'a>(
442440
let result = match source_op.executor.get_value(&key).await? {
443441
Some(val) => {
444442
let scope_value =
445-
ScopeValueBuilder::augmented_from(value::ScopeValue(val), &collection_schema)?;
443+
ScopeValueBuilder::augmented_from(&value::ScopeValue(val), &collection_schema)?;
446444
root_scope_entry.define_field_w_builder(
447445
&source_op.output,
448446
value::Value::Table(BTreeMap::from([(key.clone(), scope_value)])),
@@ -463,7 +461,7 @@ pub async fn evaluate_source_entry<'a>(
463461

464462
pub async fn evaluate_transient_flow(
465463
flow: &AnalyzedTransientFlow,
466-
input_values: Vec<value::Value>,
464+
input_values: &Vec<value::Value>,
467465
) -> Result<value::Value> {
468466
let root_schema = &flow.data_schema.schema;
469467
let root_scope_value =

src/execution/memoization.rs

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
use anyhow::Result;
22
use serde::{Deserialize, Serialize};
33
use std::{
4+
borrow::Cow,
45
collections::HashMap,
56
future::Future,
67
sync::{Arc, Mutex},
78
};
89

910
use crate::{
1011
base::{schema, value},
11-
service::error::{SharedError, SharedResultExt},
12+
service::error::{SharedError, SharedResultExtRef},
1213
utils::fingerprint::Fingerprint,
1314
};
1415

@@ -35,11 +36,12 @@ struct EvaluationCacheEntry {
3536
data: EvaluationCacheData,
3637
}
3738

39+
pub type CacheEntryCell = Arc<async_lock::OnceCell<Result<value::Value, SharedError>>>;
3840
enum EvaluationCacheData {
3941
/// Existing entry in previous runs, but not in current run yet.
4042
Previous(serde_json::Value),
4143
/// Value appeared in current run.
42-
Current(Arc<async_lock::OnceCell<Result<value::Value, SharedError>>>),
44+
Current(CacheEntryCell),
4345
}
4446

4547
pub struct EvaluationCache {
@@ -102,7 +104,7 @@ impl EvaluationCache {
102104
key: Fingerprint,
103105
typ: &schema::ValueType,
104106
ttl: Option<chrono::Duration>,
105-
) -> Result<Arc<async_lock::OnceCell<Result<value::Value, SharedError>>>> {
107+
) -> Result<CacheEntryCell> {
106108
let mut cache = self.cache.lock().unwrap();
107109
let result = {
108110
match cache.entry(key) {
@@ -138,24 +140,25 @@ impl EvaluationCache {
138140
};
139141
Ok(result)
140142
}
143+
}
141144

142-
pub async fn evaluate<Fut>(
143-
&self,
144-
key: Fingerprint,
145-
typ: &schema::ValueType,
146-
ttl: Option<chrono::Duration>,
147-
compute: impl FnOnce() -> Fut,
148-
) -> Result<value::Value>
149-
where
150-
Fut: Future<Output = Result<value::Value>>,
151-
{
152-
let cell = self.get(key, typ, ttl)?;
153-
let result = cell
154-
.get_or_init(|| {
145+
pub async fn evaluate_with_cell<'a, Fut>(
146+
cell: Option<&'a CacheEntryCell>,
147+
compute: impl FnOnce() -> Fut,
148+
) -> Result<Cow<'a, value::Value>>
149+
where
150+
Fut: Future<Output = Result<value::Value>>,
151+
{
152+
let result = match cell {
153+
Some(cell) => Cow::Borrowed(
154+
cell.get_or_init(|| {
155155
let fut = compute();
156156
async move { fut.await.map_err(SharedError::new) }
157157
})
158-
.await;
159-
Ok(result.clone().std_result()?)
160-
}
158+
.await
159+
.std_result()?,
160+
),
161+
None => Cow::Owned(compute().await?),
162+
};
163+
Ok(result)
161164
}

src/execution/query.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ impl SimpleSemanticsQueryHandler {
7272
) -> Result<(QueryResults, SimpleSemanticsQueryInfo)> {
7373
let query_results = evaluate_transient_flow(
7474
&self.query_transform_flow,
75-
vec![value::BasicValue::Str(Arc::from(query)).into()],
75+
&vec![value::BasicValue::Str(Arc::from(query)).into()],
7676
)
7777
.await?;
7878
let vector = match query_results {

src/service/error.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,19 @@ impl<T> SharedResultExt<T> for Result<T, SharedError> {
142142
}
143143
}
144144

145+
pub trait SharedResultExtRef<'a, T> {
146+
fn std_result(self) -> Result<&'a T, SharedErrorWrapper>;
147+
}
148+
149+
impl<'a, T> SharedResultExtRef<'a, T> for &'a Result<T, SharedError> {
150+
fn std_result(self) -> Result<&'a T, SharedErrorWrapper> {
151+
match self {
152+
Ok(value) => Ok(value),
153+
Err(err) => Err(SharedErrorWrapper(err.clone())),
154+
}
155+
}
156+
}
157+
145158
#[macro_export]
146159
macro_rules! api_bail {
147160
( $fmt:literal $(, $($arg:expr) , *)?) => {

src/utils/fingerprint.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use anyhow::{anyhow, bail};
1+
use anyhow::bail;
22
use base64::prelude::*;
33
use blake2::digest::typenum;
44
use blake2::{Blake2b, Digest};

0 commit comments

Comments
 (0)