Skip to content

Commit 716e384

Browse files
committed
Extract the logic to evaluate a source entry into a function for reuse.
1 parent 1e40db7 commit 716e384

File tree

2 files changed

+12
-18
lines changed

2 files changed

+12
-18
lines changed

src/execution/indexer.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -418,13 +418,13 @@ async fn commit_source_tracking_info(
418418
Ok(WithApplyStatus::Normal(()))
419419
}
420420

421-
/// Built an evaluation cache on the existing data.
422-
pub async fn evaluation_cache_on_existing_data(
421+
pub async fn evaluate_source_entry_with_cache(
423422
plan: &ExecutionPlan,
424423
source_op_idx: usize,
424+
schema: &schema::DataSchema,
425425
key: &value::KeyValue,
426426
pool: &PgPool,
427-
) -> Result<EvaluationCache> {
427+
) -> Result<Option<value::ScopeValue>> {
428428
let source_id = plan.source_ops[source_op_idx].source_id;
429429
let source_key_json = serde_json::to_value(key)?;
430430
let existing_tracking_info = read_source_tracking_info(
@@ -438,10 +438,11 @@ pub async fn evaluation_cache_on_existing_data(
438438
let memoization_info = existing_tracking_info
439439
.and_then(|info| info.memoization_info.map(|info| info.0))
440440
.flatten();
441-
Ok(EvaluationCache::new(
442-
process_timestamp,
443-
memoization_info.map(|info| info.cache),
444-
))
441+
let evaluation_cache =
442+
EvaluationCache::new(process_timestamp, memoization_info.map(|info| info.cache));
443+
let data_builder =
444+
evaluate_source_entry(plan, source_op_idx, schema, key, Some(&evaluation_cache)).await?;
445+
Ok(data_builder.map(|builder| builder.into()))
445446
}
446447

447448
pub async fn update_source_entry(

src/service/flows.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use super::error::ApiError;
1313
use crate::{
1414
api_bail, api_error,
1515
base::{schema, spec},
16-
execution::{evaluator, indexer},
16+
execution::indexer,
1717
};
1818
use crate::{execution::indexer::IndexUpdateInfo, lib_context::LibContext};
1919

@@ -145,26 +145,19 @@ pub async fn evaluate_data(
145145
.ok_or_else(|| api_error!("field {} does not have a key", query.field))?;
146146
let key = value::KeyValue::from_strs(query.key, &key_field.value_type.typ)?;
147147

148-
let evaluation_cache = indexer::evaluation_cache_on_existing_data(
149-
&execution_plan,
150-
source_op_idx,
151-
&key,
152-
&lib_context.pool,
153-
)
154-
.await?;
155-
let data_builder = evaluator::evaluate_source_entry(
148+
let data = indexer::evaluate_source_entry_with_cache(
156149
&execution_plan,
157150
source_op_idx,
158151
schema,
159152
&key,
160-
Some(&evaluation_cache),
153+
&lib_context.pool,
161154
)
162155
.await?
163156
.ok_or_else(|| api_error!("value not found for source at the specified key: {key:?}"))?;
164157

165158
Ok(Json(EvaluateDataResponse {
166159
schema: schema.clone(),
167-
data: data_builder.into(),
160+
data,
168161
}))
169162
}
170163

0 commit comments

Comments
 (0)