Skip to content

Commit 5050026

Browse files
authored
refactor: simplify the interface for behavior version (#1291)
1 parent ad10f8f commit 5050026

File tree

11 files changed

+99
-105
lines changed

11 files changed

+99
-105
lines changed

rust/cocoindex/src/builder/analyzer.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -797,25 +797,28 @@ impl AnalyzerContext {
797797
.iter()
798798
.map(|field| field.analyzed_value.clone())
799799
.collect();
800-
let (output_enriched_type, executor) = fn_executor
800+
let build_output = fn_executor
801801
.build(spec, input_field_schemas, self.flow_ctx.clone())
802802
.await?;
803+
let output_type = build_output.output_type.typ.clone();
803804
let logic_fingerprinter = Fingerprinter::default()
804805
.with(&op.op)?
805-
.with(&output_enriched_type.without_attrs())?;
806-
let output_type = output_enriched_type.typ.clone();
806+
.with(&build_output.output_type.without_attrs())?
807+
.with(&build_output.behavior_version)?;
807808
let output =
808-
op_scope.add_op_output(reactive_op_name.clone(), output_enriched_type)?;
809+
op_scope.add_op_output(reactive_op_name.clone(), build_output.output_type)?;
809810
let op_name = reactive_op_name.clone();
810811
let op_kind = op.op.kind.clone();
812+
811813
let execution_options_timeout = op.execution_options.timeout;
814+
815+
let behavior_version = build_output.behavior_version;
812816
async move {
813817
trace!("Start building executor for transform op `{op_name}`");
814-
let executor = executor.await.with_context(|| {
818+
let executor = build_output.executor.await.with_context(|| {
815819
format!("Preparing for transform op: {op_name}")
816820
})?;
817821
let enable_cache = executor.enable_cache();
818-
let behavior_version = executor.behavior_version();
819822
let timeout = executor.timeout()
820823
.or(execution_options_timeout)
821824
.or(Some(TIMEOUT_THRESHOLD));
@@ -824,8 +827,7 @@ impl AnalyzerContext {
824827
enable_cache,
825828
timeout,
826829
behavior_version,
827-
fingerprinter: logic_fingerprinter
828-
.with(&behavior_version)?,
830+
fingerprinter: logic_fingerprinter,
829831
output_type
830832
};
831833
if function_exec_info.enable_cache

rust/cocoindex/src/ops/factory_bases.rs

Lines changed: 21 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -266,24 +266,30 @@ impl<T: SourceFactoryBase> SourceFactory for T {
266266
// Function
267267
////////////////////////////////////////////////////////
268268

269+
pub struct SimpleFunctionAnalysisOutput<T: Send + Sync> {
270+
pub resolved_args: T,
271+
pub output_schema: EnrichedValueType,
272+
pub behavior_version: Option<u32>,
273+
}
274+
269275
#[async_trait]
270276
pub trait SimpleFunctionFactoryBase: SimpleFunctionFactory + Send + Sync + 'static {
271277
type Spec: DeserializeOwned + Send + Sync;
272278
type ResolvedArgs: Send + Sync;
273279

274280
fn name(&self) -> &str;
275281

276-
async fn resolve_schema<'a>(
282+
async fn analyze<'a>(
277283
&'a self,
278284
spec: &'a Self::Spec,
279285
args_resolver: &mut OpArgsResolver<'a>,
280286
context: &FlowInstanceContext,
281-
) -> Result<(Self::ResolvedArgs, EnrichedValueType)>;
287+
) -> Result<SimpleFunctionAnalysisOutput<Self::ResolvedArgs>>;
282288

283289
async fn build_executor(
284290
self: Arc<Self>,
285291
spec: Self::Spec,
286-
resolved_input_schema: Self::ResolvedArgs,
292+
resolved_args: Self::ResolvedArgs,
287293
context: Arc<FlowInstanceContext>,
288294
) -> Result<impl SimpleFunctionExecutor>;
289295

@@ -317,10 +323,6 @@ impl<E: SimpleFunctionExecutor> SimpleFunctionExecutor for FunctionExecutorWrapp
317323
fn enable_cache(&self) -> bool {
318324
self.executor.enable_cache()
319325
}
320-
321-
fn behavior_version(&self) -> Option<u32> {
322-
self.executor.behavior_version()
323-
}
324326
}
325327

326328
#[async_trait]
@@ -330,10 +332,7 @@ impl<T: SimpleFunctionFactoryBase> SimpleFunctionFactory for T {
330332
spec: serde_json::Value,
331333
input_schema: Vec<OpArgSchema>,
332334
context: Arc<FlowInstanceContext>,
333-
) -> Result<(
334-
EnrichedValueType,
335-
BoxFuture<'static, Result<Box<dyn SimpleFunctionExecutor>>>,
336-
)> {
335+
) -> Result<SimpleFunctionBuildOutput> {
337336
let spec: T::Spec = utils::deser::from_json_value(spec)
338337
.with_context(|| format!("Failed in parsing spec for function `{}`", self.name()))?;
339338
let mut nonnull_args_idx = vec![];
@@ -343,9 +342,11 @@ impl<T: SimpleFunctionFactoryBase> SimpleFunctionFactory for T {
343342
&mut nonnull_args_idx,
344343
&mut may_nullify_output,
345344
)?;
346-
let (resolved_input_schema, mut output_schema) = self
347-
.resolve_schema(&spec, &mut args_resolver, &context)
348-
.await?;
345+
let SimpleFunctionAnalysisOutput {
346+
resolved_args,
347+
mut output_schema,
348+
behavior_version,
349+
} = self.analyze(&spec, &mut args_resolver, &context).await?;
349350
args_resolver.done()?;
350351

351352
// If any required argument is nullable, the output schema should be nullable.
@@ -355,13 +356,15 @@ impl<T: SimpleFunctionFactoryBase> SimpleFunctionFactory for T {
355356

356357
let executor = async move {
357358
Ok(Box::new(FunctionExecutorWrapper {
358-
executor: self
359-
.build_executor(spec, resolved_input_schema, context)
360-
.await?,
359+
executor: self.build_executor(spec, resolved_args, context).await?,
361360
nonnull_args_idx,
362361
}) as Box<dyn SimpleFunctionExecutor>)
363362
};
364-
Ok((output_schema, Box::pin(executor)))
363+
Ok(SimpleFunctionBuildOutput {
364+
output_type: output_schema,
365+
behavior_version,
366+
executor: Box::pin(executor),
367+
})
365368
}
366369
}
367370

@@ -373,10 +376,6 @@ pub trait BatchedFunctionExecutor: Send + Sync + Sized + 'static {
373376
false
374377
}
375378

376-
fn behavior_version(&self) -> Option<u32> {
377-
None
378-
}
379-
380379
fn timeout(&self) -> Option<std::time::Duration> {
381380
None
382381
}
@@ -406,19 +405,16 @@ impl<E: BatchedFunctionExecutor> batching::Runner for BatchedFunctionExecutorRun
406405
struct BatchedFunctionExecutorWrapper<E: BatchedFunctionExecutor> {
407406
batcher: batching::Batcher<BatchedFunctionExecutorRunner<E>>,
408407
enable_cache: bool,
409-
behavior_version: Option<u32>,
410408
timeout: Option<std::time::Duration>,
411409
}
412410

413411
impl<E: BatchedFunctionExecutor> BatchedFunctionExecutorWrapper<E> {
414412
fn new(executor: E) -> Self {
415413
let batching_options = executor.batching_options();
416414
let enable_cache = executor.enable_cache();
417-
let behavior_version = executor.behavior_version();
418415
let timeout = executor.timeout();
419416
Self {
420417
enable_cache,
421-
behavior_version,
422418
timeout,
423419
batcher: batching::Batcher::new(
424420
BatchedFunctionExecutorRunner(executor),
@@ -437,9 +433,6 @@ impl<E: BatchedFunctionExecutor> SimpleFunctionExecutor for BatchedFunctionExecu
437433
fn enable_cache(&self) -> bool {
438434
self.enable_cache
439435
}
440-
fn behavior_version(&self) -> Option<u32> {
441-
self.behavior_version
442-
}
443436
fn timeout(&self) -> Option<std::time::Duration> {
444437
self.timeout
445438
}

rust/cocoindex/src/ops/functions/detect_program_lang.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,12 @@ impl SimpleFunctionFactoryBase for Factory {
3535
"DetectProgrammingLanguage"
3636
}
3737

38-
async fn resolve_schema<'a>(
38+
async fn analyze<'a>(
3939
&'a self,
4040
_spec: &'a EmptySpec,
4141
args_resolver: &mut OpArgsResolver<'a>,
4242
_context: &FlowInstanceContext,
43-
) -> Result<(Args, EnrichedValueType)> {
43+
) -> Result<SimpleFunctionAnalysisOutput<Args>> {
4444
let args = Args {
4545
filename: args_resolver
4646
.next_arg("filename")?
@@ -49,7 +49,11 @@ impl SimpleFunctionFactoryBase for Factory {
4949
};
5050

5151
let output_schema = make_output_type(BasicValueType::Str);
52-
Ok((args, output_schema))
52+
Ok(SimpleFunctionAnalysisOutput {
53+
resolved_args: args,
54+
output_schema,
55+
behavior_version: None,
56+
})
5357
}
5458

5559
async fn build_executor(

rust/cocoindex/src/ops/functions/embed_text.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,6 @@ struct Executor {
2828

2929
#[async_trait]
3030
impl BatchedFunctionExecutor for Executor {
31-
fn behavior_version(&self) -> Option<u32> {
32-
self.args.client.behavior_version()
33-
}
34-
3531
fn enable_cache(&self) -> bool {
3632
true
3733
}
@@ -108,12 +104,12 @@ impl SimpleFunctionFactoryBase for Factory {
108104
"EmbedText"
109105
}
110106

111-
async fn resolve_schema<'a>(
107+
async fn analyze<'a>(
112108
&'a self,
113109
spec: &'a Spec,
114110
args_resolver: &mut OpArgsResolver<'a>,
115111
_context: &FlowInstanceContext,
116-
) -> Result<(Self::ResolvedArgs, EnrichedValueType)> {
112+
) -> Result<SimpleFunctionAnalysisOutput<Self::ResolvedArgs>> {
117113
let text = args_resolver
118114
.next_arg("text")?
119115
.expect_type(&ValueType::Basic(BasicValueType::Str))?
@@ -132,14 +128,15 @@ impl SimpleFunctionFactoryBase for Factory {
132128
dimension: Some(output_dimension as usize),
133129
element_type: Box::new(BasicValueType::Float32),
134130
}));
135-
Ok((
136-
Args {
131+
Ok(SimpleFunctionAnalysisOutput {
132+
behavior_version: client.behavior_version(),
133+
resolved_args: Args {
137134
client,
138135
text,
139136
expected_output_dimension: output_dimension as usize,
140137
},
141138
output_schema,
142-
))
139+
})
143140
}
144141

145142
async fn build_executor(

rust/cocoindex/src/ops/functions/extract_by_llm.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,6 @@ impl Executor {
7272

7373
#[async_trait]
7474
impl SimpleFunctionExecutor for Executor {
75-
fn behavior_version(&self) -> Option<u32> {
76-
Some(1)
77-
}
78-
7975
fn enable_cache(&self) -> bool {
8076
true
8177
}
@@ -130,12 +126,12 @@ impl SimpleFunctionFactoryBase for Factory {
130126
"ExtractByLlm"
131127
}
132128

133-
async fn resolve_schema<'a>(
129+
async fn analyze<'a>(
134130
&'a self,
135131
spec: &'a Spec,
136132
args_resolver: &mut OpArgsResolver<'a>,
137133
_context: &FlowInstanceContext,
138-
) -> Result<(Args, EnrichedValueType)> {
134+
) -> Result<SimpleFunctionAnalysisOutput<Args>> {
139135
let args = Args {
140136
text: args_resolver
141137
.next_arg("text")?
@@ -157,7 +153,11 @@ impl SimpleFunctionFactoryBase for Factory {
157153
{
158154
output_type.nullable = true;
159155
}
160-
Ok((args, output_type))
156+
Ok(SimpleFunctionAnalysisOutput {
157+
resolved_args: args,
158+
output_schema: output_type,
159+
behavior_version: Some(1),
160+
})
161161
}
162162

163163
async fn build_executor(

rust/cocoindex/src/ops/functions/parse_json.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,12 @@ impl SimpleFunctionFactoryBase for Factory {
7474
"ParseJson"
7575
}
7676

77-
async fn resolve_schema<'a>(
77+
async fn analyze<'a>(
7878
&'a self,
7979
_spec: &'a EmptySpec,
8080
args_resolver: &mut OpArgsResolver<'a>,
8181
_context: &FlowInstanceContext,
82-
) -> Result<(Args, EnrichedValueType)> {
82+
) -> Result<SimpleFunctionAnalysisOutput<Args>> {
8383
let args = Args {
8484
text: args_resolver
8585
.next_arg("text")?
@@ -92,7 +92,11 @@ impl SimpleFunctionFactoryBase for Factory {
9292
};
9393

9494
let output_schema = make_output_type(BasicValueType::Json);
95-
Ok((args, output_schema))
95+
Ok(SimpleFunctionAnalysisOutput {
96+
resolved_args: args,
97+
output_schema,
98+
behavior_version: None,
99+
})
96100
}
97101

98102
async fn build_executor(

rust/cocoindex/src/ops/functions/split_by_separators.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,12 +138,12 @@ impl SimpleFunctionFactoryBase for Factory {
138138
"SplitBySeparators"
139139
}
140140

141-
async fn resolve_schema<'a>(
141+
async fn analyze<'a>(
142142
&'a self,
143143
_spec: &'a Spec,
144144
args_resolver: &mut OpArgsResolver<'a>,
145145
_context: &FlowInstanceContext,
146-
) -> Result<(Args, EnrichedValueType)> {
146+
) -> Result<SimpleFunctionAnalysisOutput<Args>> {
147147
// one required arg: text: Str
148148
let args = Args {
149149
text: args_resolver
@@ -153,7 +153,11 @@ impl SimpleFunctionFactoryBase for Factory {
153153
};
154154

155155
let output_schema = make_common_chunk_schema(args_resolver, &args.text)?;
156-
Ok((args, output_schema))
156+
Ok(SimpleFunctionAnalysisOutput {
157+
resolved_args: args,
158+
output_schema,
159+
behavior_version: None,
160+
})
157161
}
158162

159163
async fn build_executor(

rust/cocoindex/src/ops/functions/split_recursively.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -732,12 +732,12 @@ impl SimpleFunctionFactoryBase for Factory {
732732
"SplitRecursively"
733733
}
734734

735-
async fn resolve_schema<'a>(
735+
async fn analyze<'a>(
736736
&'a self,
737737
_spec: &'a Spec,
738738
args_resolver: &mut OpArgsResolver<'a>,
739739
_context: &FlowInstanceContext,
740-
) -> Result<(Args, EnrichedValueType)> {
740+
) -> Result<SimpleFunctionAnalysisOutput<Args>> {
741741
let args = Args {
742742
text: args_resolver
743743
.next_arg("text")?
@@ -763,7 +763,11 @@ impl SimpleFunctionFactoryBase for Factory {
763763

764764
let output_schema =
765765
crate::ops::shared::split::make_common_chunk_schema(args_resolver, &args.text)?;
766-
Ok((args, output_schema))
766+
Ok(SimpleFunctionAnalysisOutput {
767+
resolved_args: args,
768+
output_schema,
769+
behavior_version: None,
770+
})
767771
}
768772

769773
async fn build_executor(

rust/cocoindex/src/ops/functions/test_utils.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,11 @@ pub async fn test_flow_function(
4848
auth_registry: Arc::new(AuthRegistry::default()),
4949
py_exec_ctx: None,
5050
});
51-
let (_, exec_fut) = factory
51+
let build_output = factory
5252
.clone()
5353
.build(serde_json::to_value(spec)?, op_arg_schemas, context)
5454
.await?;
55-
let executor = exec_fut.await?;
55+
let executor = build_output.executor.await?;
5656

5757
// 3. Evaluate
5858
let result = executor.evaluate(input_arg_values).await?;

0 commit comments

Comments
 (0)