From 6b6788a725e00b084a25ddf58ba12e3a9262628d Mon Sep 17 00:00:00 2001 From: LJ Date: Tue, 4 Mar 2025 12:26:34 -0800 Subject: [PATCH 1/3] `s/enable_caching/enable_cache/` --- src/builder/analyzer.rs | 4 ++-- src/builder/plan.rs | 2 +- src/execution/evaluator.rs | 2 +- src/ops/interface.rs | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index aaea9d72a..7ffa1e2a5 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -706,7 +706,7 @@ impl<'a> AnalyzerContext<'a> { })?; let behavior_version = executor.behavior_version(); let function_exec_info = AnalyzedFunctionExecInfo { - enable_caching: executor.enable_caching(), + enable_cache: executor.enable_cache(), behavior_version, fingerprinter: Fingerprinter::default() .with(&reactive_op.name)? @@ -715,7 +715,7 @@ impl<'a> AnalyzerContext<'a> { .with(&output_type.without_attrs())?, output_type: output_type.typ.clone(), }; - if function_exec_info.enable_caching + if function_exec_info.enable_cache && function_exec_info.behavior_version.is_some() { api_bail!( diff --git a/src/builder/plan.rs b/src/builder/plan.rs index 526581b1a..8dc94a568 100644 --- a/src/builder/plan.rs +++ b/src/builder/plan.rs @@ -64,7 +64,7 @@ pub struct AnalyzedSourceOp { } pub struct AnalyzedFunctionExecInfo { - pub enable_caching: bool, + pub enable_cache: bool, pub behavior_version: Option, /// Fingerprinter of the function's behavior. diff --git a/src/execution/evaluator.rs b/src/execution/evaluator.rs index adcb6d548..df0dda04b 100644 --- a/src/execution/evaluator.rs +++ b/src/execution/evaluator.rs @@ -306,7 +306,7 @@ async fn evaluate_op_scope( let input_values = assemble_input_values(&op.inputs, scoped_entries); let output_value = if let Some(cache) = op .function_exec_info - .enable_caching + .enable_cache .then_some(cache) .flatten() { diff --git a/src/ops/interface.rs b/src/ops/interface.rs index a49ccb968..e040b6cce 100644 --- a/src/ops/interface.rs +++ b/src/ops/interface.rs @@ -40,11 +40,11 @@ pub trait SimpleFunctionExecutor: Send + Sync { /// Evaluate the operation. async fn evaluate(&self, args: Vec) -> Result; - fn enable_caching(&self) -> bool { + fn enable_cache(&self) -> bool { false } - /// Must be Some if `enable_caching` is true. + /// Must be Some if `enable_cache` is true. /// If it changes, the cache will be invalidated. fn behavior_version(&self) -> Option { None From a374e91b48d3c80b93eeb597005108ff566cf133 Mon Sep 17 00:00:00 2001 From: LJ Date: Tue, 4 Mar 2025 12:51:37 -0800 Subject: [PATCH 2/3] Support eneabling cache in Python SDK. --- python/cocoindex/op.py | 14 ++++++++++++-- src/ops/py_factory.rs | 29 ++++++++++++++++++++++++++--- 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/python/cocoindex/op.py b/python/cocoindex/op.py index 650ca3f49..9fe11b5de 100644 --- a/python/cocoindex/op.py +++ b/python/cocoindex/op.py @@ -62,12 +62,14 @@ def __call__(self, spec_json: str, *args, **kwargs): _gpu_dispatch_lock = Lock() -def executor_class(gpu: bool = False) -> Callable[[type], type]: +def executor_class(gpu: bool = False, cache: bool = False, behavior_version: int | None = None) -> Callable[[type], type]: """ Decorate a class to provide an executor for an op. Args: gpu: Whether the executor will be executed on GPU. + cache: Whether the executor will be cached. + behavior_version: The behavior version of the executor. Cache will be invalidated if it changes. Must be provided if `cache` is True. """ def _inner(cls: type[Executor]) -> type: @@ -87,7 +89,15 @@ def _inner(cls: type[Executor]) -> type: expected_return = sig.return_annotation cls_type: type = cls - class _WrappedClass(cls_type): + + class _Fallback: + def enable_cache(self): + return cache + + def behavior_version(self): + return behavior_version + + class _WrappedClass(cls_type, _Fallback): def __init__(self, spec): super().__init__() self.spec = spec diff --git a/src/ops/py_factory.rs b/src/ops/py_factory.rs index e2ae24e92..c343f9895 100644 --- a/src/ops/py_factory.rs +++ b/src/ops/py_factory.rs @@ -149,6 +149,9 @@ struct PyFunctionExecutor { num_positional_args: usize, kw_args_names: Vec>, result_type: schema::EnrichedValueType, + + enable_cache: bool, + behavior_version: Option, } #[async_trait] @@ -193,6 +196,14 @@ impl SimpleFunctionExecutor for Arc { }) .await } + + fn enable_cache(&self) -> bool { + self.enable_cache + } + + fn behavior_version(&self) -> Option { + self.behavior_version + } } pub(crate) struct PyFunctionFactory { @@ -251,15 +262,27 @@ impl SimpleFunctionFactory for PyFunctionFactory { let executor_fut = { let result_type = result_type.clone(); - async move { - Python::with_gil(|py| executor.call_method(py, "prepare", (), None))?; + unblock(move || { + let (enable_cache, behavior_version) = + Python::with_gil(|py| -> anyhow::Result<_> { + executor.call_method(py, "prepare", (), None)?; + let enable_cache = executor.call_method(py, "enable_cache", (), None)?; + let behavior_version = + executor.call_method(py, "behavior_version", (), None)?; + Ok(( + enable_cache.extract::(py)?, + behavior_version.extract::>(py)?, + )) + })?; Ok(Box::new(Arc::new(PyFunctionExecutor { py_function_executor: executor, num_positional_args, kw_args_names, result_type, + enable_cache, + behavior_version, })) as Box) - } + }) }; Ok((result_type, executor_fut.boxed())) From 72aa5c1a6c69624742c20d7dabecc299166e5e44 Mon Sep 17 00:00:00 2001 From: LJ Date: Tue, 4 Mar 2025 14:29:25 -0800 Subject: [PATCH 3/3] Fix validation logic for `behavior_version` --- src/builder/analyzer.rs | 2 +- src/ops/py_factory.rs | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index 7ffa1e2a5..dc576f3b9 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -716,7 +716,7 @@ impl<'a> AnalyzerContext<'a> { output_type: output_type.typ.clone(), }; if function_exec_info.enable_cache - && function_exec_info.behavior_version.is_some() + && function_exec_info.behavior_version.is_none() { api_bail!( "When caching is enabled, behavior version must be specified for transform op: {}", diff --git a/src/ops/py_factory.rs b/src/ops/py_factory.rs index c343f9895..6694da7fe 100644 --- a/src/ops/py_factory.rs +++ b/src/ops/py_factory.rs @@ -266,13 +266,13 @@ impl SimpleFunctionFactory for PyFunctionFactory { let (enable_cache, behavior_version) = Python::with_gil(|py| -> anyhow::Result<_> { executor.call_method(py, "prepare", (), None)?; - let enable_cache = executor.call_method(py, "enable_cache", (), None)?; - let behavior_version = - executor.call_method(py, "behavior_version", (), None)?; - Ok(( - enable_cache.extract::(py)?, - behavior_version.extract::>(py)?, - )) + let enable_cache = executor + .call_method(py, "enable_cache", (), None)? + .extract::(py)?; + let behavior_version = executor + .call_method(py, "behavior_version", (), None)? + .extract::>(py)?; + Ok((enable_cache, behavior_version)) })?; Ok(Box::new(Arc::new(PyFunctionExecutor { py_function_executor: executor,