diff --git a/rust/cocoindex/src/builder/analyzer.rs b/rust/cocoindex/src/builder/analyzer.rs index a244a301..a7d57a1d 100644 --- a/rust/cocoindex/src/builder/analyzer.rs +++ b/rust/cocoindex/src/builder/analyzer.rs @@ -797,25 +797,28 @@ impl AnalyzerContext { .iter() .map(|field| field.analyzed_value.clone()) .collect(); - let (output_enriched_type, executor) = fn_executor + let build_output = fn_executor .build(spec, input_field_schemas, self.flow_ctx.clone()) .await?; + let output_type = build_output.output_type.typ.clone(); let logic_fingerprinter = Fingerprinter::default() .with(&op.op)? - .with(&output_enriched_type.without_attrs())?; - let output_type = output_enriched_type.typ.clone(); + .with(&build_output.output_type.without_attrs())? + .with(&build_output.behavior_version)?; let output = - op_scope.add_op_output(reactive_op_name.clone(), output_enriched_type)?; + op_scope.add_op_output(reactive_op_name.clone(), build_output.output_type)?; let op_name = reactive_op_name.clone(); let op_kind = op.op.kind.clone(); + let execution_options_timeout = op.execution_options.timeout; + + let behavior_version = build_output.behavior_version; async move { trace!("Start building executor for transform op `{op_name}`"); - let executor = executor.await.with_context(|| { + let executor = build_output.executor.await.with_context(|| { format!("Preparing for transform op: {op_name}") })?; let enable_cache = executor.enable_cache(); - let behavior_version = executor.behavior_version(); let timeout = executor.timeout() .or(execution_options_timeout) .or(Some(TIMEOUT_THRESHOLD)); @@ -824,8 +827,7 @@ impl AnalyzerContext { enable_cache, timeout, behavior_version, - fingerprinter: logic_fingerprinter - .with(&behavior_version)?, + fingerprinter: logic_fingerprinter, output_type }; if function_exec_info.enable_cache diff --git a/rust/cocoindex/src/ops/factory_bases.rs b/rust/cocoindex/src/ops/factory_bases.rs index 3b028a53..f3603113 100644 --- a/rust/cocoindex/src/ops/factory_bases.rs +++ b/rust/cocoindex/src/ops/factory_bases.rs @@ -266,6 +266,12 @@ impl SourceFactory for T { // Function //////////////////////////////////////////////////////// +pub struct SimpleFunctionAnalysisOutput { + pub resolved_args: T, + pub output_schema: EnrichedValueType, + pub behavior_version: Option, +} + #[async_trait] pub trait SimpleFunctionFactoryBase: SimpleFunctionFactory + Send + Sync + 'static { type Spec: DeserializeOwned + Send + Sync; @@ -273,17 +279,17 @@ pub trait SimpleFunctionFactoryBase: SimpleFunctionFactory + Send + Sync + 'stat fn name(&self) -> &str; - async fn resolve_schema<'a>( + async fn analyze<'a>( &'a self, spec: &'a Self::Spec, args_resolver: &mut OpArgsResolver<'a>, context: &FlowInstanceContext, - ) -> Result<(Self::ResolvedArgs, EnrichedValueType)>; + ) -> Result>; async fn build_executor( self: Arc, spec: Self::Spec, - resolved_input_schema: Self::ResolvedArgs, + resolved_args: Self::ResolvedArgs, context: Arc, ) -> Result; @@ -317,10 +323,6 @@ impl SimpleFunctionExecutor for FunctionExecutorWrapp fn enable_cache(&self) -> bool { self.executor.enable_cache() } - - fn behavior_version(&self) -> Option { - self.executor.behavior_version() - } } #[async_trait] @@ -330,10 +332,7 @@ impl SimpleFunctionFactory for T { spec: serde_json::Value, input_schema: Vec, context: Arc, - ) -> Result<( - EnrichedValueType, - BoxFuture<'static, Result>>, - )> { + ) -> Result { let spec: T::Spec = utils::deser::from_json_value(spec) .with_context(|| format!("Failed in parsing spec for function `{}`", self.name()))?; let mut nonnull_args_idx = vec![]; @@ -343,9 +342,11 @@ impl SimpleFunctionFactory for T { &mut nonnull_args_idx, &mut may_nullify_output, )?; - let (resolved_input_schema, mut output_schema) = self - .resolve_schema(&spec, &mut args_resolver, &context) - .await?; + let SimpleFunctionAnalysisOutput { + resolved_args, + mut output_schema, + behavior_version, + } = self.analyze(&spec, &mut args_resolver, &context).await?; args_resolver.done()?; // If any required argument is nullable, the output schema should be nullable. @@ -355,13 +356,15 @@ impl SimpleFunctionFactory for T { let executor = async move { Ok(Box::new(FunctionExecutorWrapper { - executor: self - .build_executor(spec, resolved_input_schema, context) - .await?, + executor: self.build_executor(spec, resolved_args, context).await?, nonnull_args_idx, }) as Box) }; - Ok((output_schema, Box::pin(executor))) + Ok(SimpleFunctionBuildOutput { + output_type: output_schema, + behavior_version, + executor: Box::pin(executor), + }) } } @@ -373,10 +376,6 @@ pub trait BatchedFunctionExecutor: Send + Sync + Sized + 'static { false } - fn behavior_version(&self) -> Option { - None - } - fn timeout(&self) -> Option { None } @@ -406,7 +405,6 @@ impl batching::Runner for BatchedFunctionExecutorRun struct BatchedFunctionExecutorWrapper { batcher: batching::Batcher>, enable_cache: bool, - behavior_version: Option, timeout: Option, } @@ -414,11 +412,9 @@ impl BatchedFunctionExecutorWrapper { fn new(executor: E) -> Self { let batching_options = executor.batching_options(); let enable_cache = executor.enable_cache(); - let behavior_version = executor.behavior_version(); let timeout = executor.timeout(); Self { enable_cache, - behavior_version, timeout, batcher: batching::Batcher::new( BatchedFunctionExecutorRunner(executor), @@ -437,9 +433,6 @@ impl SimpleFunctionExecutor for BatchedFunctionExecu fn enable_cache(&self) -> bool { self.enable_cache } - fn behavior_version(&self) -> Option { - self.behavior_version - } fn timeout(&self) -> Option { self.timeout } diff --git a/rust/cocoindex/src/ops/functions/detect_program_lang.rs b/rust/cocoindex/src/ops/functions/detect_program_lang.rs index 5bf3f238..0b554431 100644 --- a/rust/cocoindex/src/ops/functions/detect_program_lang.rs +++ b/rust/cocoindex/src/ops/functions/detect_program_lang.rs @@ -35,12 +35,12 @@ impl SimpleFunctionFactoryBase for Factory { "DetectProgrammingLanguage" } - async fn resolve_schema<'a>( + async fn analyze<'a>( &'a self, _spec: &'a EmptySpec, args_resolver: &mut OpArgsResolver<'a>, _context: &FlowInstanceContext, - ) -> Result<(Args, EnrichedValueType)> { + ) -> Result> { let args = Args { filename: args_resolver .next_arg("filename")? @@ -49,7 +49,11 @@ impl SimpleFunctionFactoryBase for Factory { }; let output_schema = make_output_type(BasicValueType::Str); - Ok((args, output_schema)) + Ok(SimpleFunctionAnalysisOutput { + resolved_args: args, + output_schema, + behavior_version: None, + }) } async fn build_executor( diff --git a/rust/cocoindex/src/ops/functions/embed_text.rs b/rust/cocoindex/src/ops/functions/embed_text.rs index 1a1ce735..f9940018 100644 --- a/rust/cocoindex/src/ops/functions/embed_text.rs +++ b/rust/cocoindex/src/ops/functions/embed_text.rs @@ -28,10 +28,6 @@ struct Executor { #[async_trait] impl BatchedFunctionExecutor for Executor { - fn behavior_version(&self) -> Option { - self.args.client.behavior_version() - } - fn enable_cache(&self) -> bool { true } @@ -108,12 +104,12 @@ impl SimpleFunctionFactoryBase for Factory { "EmbedText" } - async fn resolve_schema<'a>( + async fn analyze<'a>( &'a self, spec: &'a Spec, args_resolver: &mut OpArgsResolver<'a>, _context: &FlowInstanceContext, - ) -> Result<(Self::ResolvedArgs, EnrichedValueType)> { + ) -> Result> { let text = args_resolver .next_arg("text")? .expect_type(&ValueType::Basic(BasicValueType::Str))? @@ -132,14 +128,15 @@ impl SimpleFunctionFactoryBase for Factory { dimension: Some(output_dimension as usize), element_type: Box::new(BasicValueType::Float32), })); - Ok(( - Args { + Ok(SimpleFunctionAnalysisOutput { + behavior_version: client.behavior_version(), + resolved_args: Args { client, text, expected_output_dimension: output_dimension as usize, }, output_schema, - )) + }) } async fn build_executor( diff --git a/rust/cocoindex/src/ops/functions/extract_by_llm.rs b/rust/cocoindex/src/ops/functions/extract_by_llm.rs index 4dfe9d4d..21dd7ac1 100644 --- a/rust/cocoindex/src/ops/functions/extract_by_llm.rs +++ b/rust/cocoindex/src/ops/functions/extract_by_llm.rs @@ -72,10 +72,6 @@ impl Executor { #[async_trait] impl SimpleFunctionExecutor for Executor { - fn behavior_version(&self) -> Option { - Some(1) - } - fn enable_cache(&self) -> bool { true } @@ -130,12 +126,12 @@ impl SimpleFunctionFactoryBase for Factory { "ExtractByLlm" } - async fn resolve_schema<'a>( + async fn analyze<'a>( &'a self, spec: &'a Spec, args_resolver: &mut OpArgsResolver<'a>, _context: &FlowInstanceContext, - ) -> Result<(Args, EnrichedValueType)> { + ) -> Result> { let args = Args { text: args_resolver .next_arg("text")? @@ -157,7 +153,11 @@ impl SimpleFunctionFactoryBase for Factory { { output_type.nullable = true; } - Ok((args, output_type)) + Ok(SimpleFunctionAnalysisOutput { + resolved_args: args, + output_schema: output_type, + behavior_version: Some(1), + }) } async fn build_executor( diff --git a/rust/cocoindex/src/ops/functions/parse_json.rs b/rust/cocoindex/src/ops/functions/parse_json.rs index d1052439..8e256287 100644 --- a/rust/cocoindex/src/ops/functions/parse_json.rs +++ b/rust/cocoindex/src/ops/functions/parse_json.rs @@ -74,12 +74,12 @@ impl SimpleFunctionFactoryBase for Factory { "ParseJson" } - async fn resolve_schema<'a>( + async fn analyze<'a>( &'a self, _spec: &'a EmptySpec, args_resolver: &mut OpArgsResolver<'a>, _context: &FlowInstanceContext, - ) -> Result<(Args, EnrichedValueType)> { + ) -> Result> { let args = Args { text: args_resolver .next_arg("text")? @@ -92,7 +92,11 @@ impl SimpleFunctionFactoryBase for Factory { }; let output_schema = make_output_type(BasicValueType::Json); - Ok((args, output_schema)) + Ok(SimpleFunctionAnalysisOutput { + resolved_args: args, + output_schema, + behavior_version: None, + }) } async fn build_executor( diff --git a/rust/cocoindex/src/ops/functions/split_by_separators.rs b/rust/cocoindex/src/ops/functions/split_by_separators.rs index 280d70cd..1e156b94 100644 --- a/rust/cocoindex/src/ops/functions/split_by_separators.rs +++ b/rust/cocoindex/src/ops/functions/split_by_separators.rs @@ -138,12 +138,12 @@ impl SimpleFunctionFactoryBase for Factory { "SplitBySeparators" } - async fn resolve_schema<'a>( + async fn analyze<'a>( &'a self, _spec: &'a Spec, args_resolver: &mut OpArgsResolver<'a>, _context: &FlowInstanceContext, - ) -> Result<(Args, EnrichedValueType)> { + ) -> Result> { // one required arg: text: Str let args = Args { text: args_resolver @@ -153,7 +153,11 @@ impl SimpleFunctionFactoryBase for Factory { }; let output_schema = make_common_chunk_schema(args_resolver, &args.text)?; - Ok((args, output_schema)) + Ok(SimpleFunctionAnalysisOutput { + resolved_args: args, + output_schema, + behavior_version: None, + }) } async fn build_executor( diff --git a/rust/cocoindex/src/ops/functions/split_recursively.rs b/rust/cocoindex/src/ops/functions/split_recursively.rs index 9e285a65..381218a8 100644 --- a/rust/cocoindex/src/ops/functions/split_recursively.rs +++ b/rust/cocoindex/src/ops/functions/split_recursively.rs @@ -732,12 +732,12 @@ impl SimpleFunctionFactoryBase for Factory { "SplitRecursively" } - async fn resolve_schema<'a>( + async fn analyze<'a>( &'a self, _spec: &'a Spec, args_resolver: &mut OpArgsResolver<'a>, _context: &FlowInstanceContext, - ) -> Result<(Args, EnrichedValueType)> { + ) -> Result> { let args = Args { text: args_resolver .next_arg("text")? @@ -763,7 +763,11 @@ impl SimpleFunctionFactoryBase for Factory { let output_schema = crate::ops::shared::split::make_common_chunk_schema(args_resolver, &args.text)?; - Ok((args, output_schema)) + Ok(SimpleFunctionAnalysisOutput { + resolved_args: args, + output_schema, + behavior_version: None, + }) } async fn build_executor( diff --git a/rust/cocoindex/src/ops/functions/test_utils.rs b/rust/cocoindex/src/ops/functions/test_utils.rs index b12cc981..79253e5f 100644 --- a/rust/cocoindex/src/ops/functions/test_utils.rs +++ b/rust/cocoindex/src/ops/functions/test_utils.rs @@ -48,11 +48,11 @@ pub async fn test_flow_function( auth_registry: Arc::new(AuthRegistry::default()), py_exec_ctx: None, }); - let (_, exec_fut) = factory + let build_output = factory .clone() .build(serde_json::to_value(spec)?, op_arg_schemas, context) .await?; - let executor = exec_fut.await?; + let executor = build_output.executor.await?; // 3. Evaluate let result = executor.evaluate(input_arg_values).await?; diff --git a/rust/cocoindex/src/ops/interface.rs b/rust/cocoindex/src/ops/interface.rs index 7cef1eda..94a7c5c1 100644 --- a/rust/cocoindex/src/ops/interface.rs +++ b/rust/cocoindex/src/ops/interface.rs @@ -180,18 +180,22 @@ pub trait SimpleFunctionExecutor: Send + Sync { false } - /// Must be Some if `enable_cache` is true. - /// If it changes, the cache will be invalidated. - fn behavior_version(&self) -> Option { - None - } - /// Returns None to use the default timeout (1800s) fn timeout(&self) -> Option { None } } +pub struct SimpleFunctionBuildOutput { + pub output_type: EnrichedValueType, + + /// Must be Some if `enable_cache` is true. + /// If it changes, the cache will be invalidated. + pub behavior_version: Option, + + pub executor: BoxFuture<'static, Result>>, +} + #[async_trait] pub trait SimpleFunctionFactory { async fn build( @@ -199,10 +203,7 @@ pub trait SimpleFunctionFactory { spec: serde_json::Value, input_schema: Vec, context: Arc, - ) -> Result<( - EnrichedValueType, - BoxFuture<'static, Result>>, - )>; + ) -> Result; } #[derive(Debug)] diff --git a/rust/cocoindex/src/ops/py_factory.rs b/rust/cocoindex/src/ops/py_factory.rs index 94ad4c88..aa7eb2bb 100644 --- a/rust/cocoindex/src/ops/py_factory.rs +++ b/rust/cocoindex/src/ops/py_factory.rs @@ -42,7 +42,6 @@ struct PyFunctionExecutor { result_type: schema::EnrichedValueType, enable_cache: bool, - behavior_version: Option, timeout: Option, } @@ -110,10 +109,6 @@ impl interface::SimpleFunctionExecutor for Arc { self.enable_cache } - fn behavior_version(&self) -> Option { - self.behavior_version - } - fn timeout(&self) -> Option { self.timeout } @@ -125,7 +120,6 @@ struct PyBatchedFunctionExecutor { result_type: schema::EnrichedValueType, enable_cache: bool, - behavior_version: Option, timeout: Option, batching_options: batching::BatchingOptions, } @@ -172,9 +166,6 @@ impl BatchedFunctionExecutor for PyBatchedFunctionExecutor { fn enable_cache(&self) -> bool { self.enable_cache } - fn behavior_version(&self) -> Option { - self.behavior_version - } fn timeout(&self) -> Option { self.timeout } @@ -194,11 +185,8 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory { spec: serde_json::Value, input_schema: Vec, context: Arc, - ) -> Result<( - schema::EnrichedValueType, - BoxFuture<'static, Result>>, - )> { - let (result_type, executor, kw_args_names, num_positional_args) = + ) -> Result { + let (result_type, executor, kw_args_names, num_positional_args, behavior_version) = Python::with_gil(|py| -> anyhow::Result<_> { let mut args = vec![pythonize(py, &spec)?]; let mut kwargs = vec![]; @@ -233,11 +221,16 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory { .to_result_with_py_trace(py)?; let (result_type, executor) = result .extract::<(crate::py::Pythonized, Py)>(py)?; + let behavior_version = executor + .call_method(py, "behavior_version", (), None) + .to_result_with_py_trace(py)? + .extract::>(py)?; Ok(( result_type.into_inner(), executor, kw_args_names, num_positional_args, + behavior_version, )) })?; @@ -249,7 +242,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory { .as_ref() .ok_or_else(|| anyhow!("Python execution context is missing"))? .clone(); - let (prepare_fut, enable_cache, behavior_version, timeout, batching_options) = + let (prepare_fut, enable_cache, timeout, batching_options) = Python::with_gil(|py| -> anyhow::Result<_> { let prepare_coro = executor .call_method(py, "prepare", (), None) @@ -265,10 +258,6 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory { .call_method(py, "enable_cache", (), None) .to_result_with_py_trace(py)? .extract::(py)?; - let behavior_version = executor - .call_method(py, "behavior_version", (), None) - .to_result_with_py_trace(py)? - .extract::>(py)?; let timeout = executor .call_method(py, "timeout", (), None) .to_result_with_py_trace(py)?; @@ -287,13 +276,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory { py, )? .into_inner(); - Ok(( - prepare_fut, - enable_cache, - behavior_version, - timeout, - batching_options, - )) + Ok((prepare_fut, enable_cache, timeout, batching_options)) })?; prepare_fut.await?; let executor: Box = @@ -304,7 +287,6 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory { py_exec_ctx, result_type, enable_cache, - behavior_version, timeout, batching_options, } @@ -318,7 +300,6 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory { kw_args_names, result_type, enable_cache, - behavior_version, timeout, })) }; @@ -326,7 +307,11 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory { } }; - Ok((result_type, executor_fut.boxed())) + Ok(interface::SimpleFunctionBuildOutput { + output_type: result_type, + behavior_version, + executor: executor_fut.boxed(), + }) } }