Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions rust/cocoindex/src/builder/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -797,25 +797,28 @@ impl AnalyzerContext {
.iter()
.map(|field| field.analyzed_value.clone())
.collect();
let (output_enriched_type, executor) = fn_executor
let build_output = fn_executor
.build(spec, input_field_schemas, self.flow_ctx.clone())
.await?;
let output_type = build_output.output_type.typ.clone();
let logic_fingerprinter = Fingerprinter::default()
.with(&op.op)?
.with(&output_enriched_type.without_attrs())?;
let output_type = output_enriched_type.typ.clone();
.with(&build_output.output_type.without_attrs())?
.with(&build_output.behavior_version)?;
let output =
op_scope.add_op_output(reactive_op_name.clone(), output_enriched_type)?;
op_scope.add_op_output(reactive_op_name.clone(), build_output.output_type)?;
let op_name = reactive_op_name.clone();
let op_kind = op.op.kind.clone();

let execution_options_timeout = op.execution_options.timeout;

let behavior_version = build_output.behavior_version;
async move {
trace!("Start building executor for transform op `{op_name}`");
let executor = executor.await.with_context(|| {
let executor = build_output.executor.await.with_context(|| {
format!("Preparing for transform op: {op_name}")
})?;
let enable_cache = executor.enable_cache();
let behavior_version = executor.behavior_version();
let timeout = executor.timeout()
.or(execution_options_timeout)
.or(Some(TIMEOUT_THRESHOLD));
Expand All @@ -824,8 +827,7 @@ impl AnalyzerContext {
enable_cache,
timeout,
behavior_version,
fingerprinter: logic_fingerprinter
.with(&behavior_version)?,
fingerprinter: logic_fingerprinter,
output_type
};
if function_exec_info.enable_cache
Expand Down
49 changes: 21 additions & 28 deletions rust/cocoindex/src/ops/factory_bases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,24 +266,30 @@ impl<T: SourceFactoryBase> SourceFactory for T {
// Function
////////////////////////////////////////////////////////

pub struct SimpleFunctionAnalysisOutput<T: Send + Sync> {
pub resolved_args: T,
pub output_schema: EnrichedValueType,
pub behavior_version: Option<u32>,
}

#[async_trait]
pub trait SimpleFunctionFactoryBase: SimpleFunctionFactory + Send + Sync + 'static {
type Spec: DeserializeOwned + Send + Sync;
type ResolvedArgs: Send + Sync;

fn name(&self) -> &str;

async fn resolve_schema<'a>(
async fn analyze<'a>(
&'a self,
spec: &'a Self::Spec,
args_resolver: &mut OpArgsResolver<'a>,
context: &FlowInstanceContext,
) -> Result<(Self::ResolvedArgs, EnrichedValueType)>;
) -> Result<SimpleFunctionAnalysisOutput<Self::ResolvedArgs>>;

async fn build_executor(
self: Arc<Self>,
spec: Self::Spec,
resolved_input_schema: Self::ResolvedArgs,
resolved_args: Self::ResolvedArgs,
context: Arc<FlowInstanceContext>,
) -> Result<impl SimpleFunctionExecutor>;

Expand Down Expand Up @@ -317,10 +323,6 @@ impl<E: SimpleFunctionExecutor> SimpleFunctionExecutor for FunctionExecutorWrapp
fn enable_cache(&self) -> bool {
self.executor.enable_cache()
}

fn behavior_version(&self) -> Option<u32> {
self.executor.behavior_version()
}
}

#[async_trait]
Expand All @@ -330,10 +332,7 @@ impl<T: SimpleFunctionFactoryBase> SimpleFunctionFactory for T {
spec: serde_json::Value,
input_schema: Vec<OpArgSchema>,
context: Arc<FlowInstanceContext>,
) -> Result<(
EnrichedValueType,
BoxFuture<'static, Result<Box<dyn SimpleFunctionExecutor>>>,
)> {
) -> Result<SimpleFunctionBuildOutput> {
let spec: T::Spec = utils::deser::from_json_value(spec)
.with_context(|| format!("Failed in parsing spec for function `{}`", self.name()))?;
let mut nonnull_args_idx = vec![];
Expand All @@ -343,9 +342,11 @@ impl<T: SimpleFunctionFactoryBase> SimpleFunctionFactory for T {
&mut nonnull_args_idx,
&mut may_nullify_output,
)?;
let (resolved_input_schema, mut output_schema) = self
.resolve_schema(&spec, &mut args_resolver, &context)
.await?;
let SimpleFunctionAnalysisOutput {
resolved_args,
mut output_schema,
behavior_version,
} = self.analyze(&spec, &mut args_resolver, &context).await?;
args_resolver.done()?;

// If any required argument is nullable, the output schema should be nullable.
Expand All @@ -355,13 +356,15 @@ impl<T: SimpleFunctionFactoryBase> SimpleFunctionFactory for T {

let executor = async move {
Ok(Box::new(FunctionExecutorWrapper {
executor: self
.build_executor(spec, resolved_input_schema, context)
.await?,
executor: self.build_executor(spec, resolved_args, context).await?,
nonnull_args_idx,
}) as Box<dyn SimpleFunctionExecutor>)
};
Ok((output_schema, Box::pin(executor)))
Ok(SimpleFunctionBuildOutput {
output_type: output_schema,
behavior_version,
executor: Box::pin(executor),
})
}
}

Expand All @@ -373,10 +376,6 @@ pub trait BatchedFunctionExecutor: Send + Sync + Sized + 'static {
false
}

fn behavior_version(&self) -> Option<u32> {
None
}

fn timeout(&self) -> Option<std::time::Duration> {
None
}
Expand Down Expand Up @@ -406,19 +405,16 @@ impl<E: BatchedFunctionExecutor> batching::Runner for BatchedFunctionExecutorRun
struct BatchedFunctionExecutorWrapper<E: BatchedFunctionExecutor> {
batcher: batching::Batcher<BatchedFunctionExecutorRunner<E>>,
enable_cache: bool,
behavior_version: Option<u32>,
timeout: Option<std::time::Duration>,
}

impl<E: BatchedFunctionExecutor> BatchedFunctionExecutorWrapper<E> {
fn new(executor: E) -> Self {
let batching_options = executor.batching_options();
let enable_cache = executor.enable_cache();
let behavior_version = executor.behavior_version();
let timeout = executor.timeout();
Self {
enable_cache,
behavior_version,
timeout,
batcher: batching::Batcher::new(
BatchedFunctionExecutorRunner(executor),
Expand All @@ -437,9 +433,6 @@ impl<E: BatchedFunctionExecutor> SimpleFunctionExecutor for BatchedFunctionExecu
fn enable_cache(&self) -> bool {
self.enable_cache
}
fn behavior_version(&self) -> Option<u32> {
self.behavior_version
}
fn timeout(&self) -> Option<std::time::Duration> {
self.timeout
}
Expand Down
10 changes: 7 additions & 3 deletions rust/cocoindex/src/ops/functions/detect_program_lang.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ impl SimpleFunctionFactoryBase for Factory {
"DetectProgrammingLanguage"
}

async fn resolve_schema<'a>(
async fn analyze<'a>(
&'a self,
_spec: &'a EmptySpec,
args_resolver: &mut OpArgsResolver<'a>,
_context: &FlowInstanceContext,
) -> Result<(Args, EnrichedValueType)> {
) -> Result<SimpleFunctionAnalysisOutput<Args>> {
let args = Args {
filename: args_resolver
.next_arg("filename")?
Expand All @@ -49,7 +49,11 @@ impl SimpleFunctionFactoryBase for Factory {
};

let output_schema = make_output_type(BasicValueType::Str);
Ok((args, output_schema))
Ok(SimpleFunctionAnalysisOutput {
resolved_args: args,
output_schema,
behavior_version: None,
})
}

async fn build_executor(
Expand Down
15 changes: 6 additions & 9 deletions rust/cocoindex/src/ops/functions/embed_text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ struct Executor {

#[async_trait]
impl BatchedFunctionExecutor for Executor {
fn behavior_version(&self) -> Option<u32> {
self.args.client.behavior_version()
}

fn enable_cache(&self) -> bool {
true
}
Expand Down Expand Up @@ -108,12 +104,12 @@ impl SimpleFunctionFactoryBase for Factory {
"EmbedText"
}

async fn resolve_schema<'a>(
async fn analyze<'a>(
&'a self,
spec: &'a Spec,
args_resolver: &mut OpArgsResolver<'a>,
_context: &FlowInstanceContext,
) -> Result<(Self::ResolvedArgs, EnrichedValueType)> {
) -> Result<SimpleFunctionAnalysisOutput<Self::ResolvedArgs>> {
let text = args_resolver
.next_arg("text")?
.expect_type(&ValueType::Basic(BasicValueType::Str))?
Expand All @@ -132,14 +128,15 @@ impl SimpleFunctionFactoryBase for Factory {
dimension: Some(output_dimension as usize),
element_type: Box::new(BasicValueType::Float32),
}));
Ok((
Args {
Ok(SimpleFunctionAnalysisOutput {
behavior_version: client.behavior_version(),
resolved_args: Args {
client,
text,
expected_output_dimension: output_dimension as usize,
},
output_schema,
))
})
}

async fn build_executor(
Expand Down
14 changes: 7 additions & 7 deletions rust/cocoindex/src/ops/functions/extract_by_llm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,6 @@ impl Executor {

#[async_trait]
impl SimpleFunctionExecutor for Executor {
fn behavior_version(&self) -> Option<u32> {
Some(1)
}

fn enable_cache(&self) -> bool {
true
}
Expand Down Expand Up @@ -130,12 +126,12 @@ impl SimpleFunctionFactoryBase for Factory {
"ExtractByLlm"
}

async fn resolve_schema<'a>(
async fn analyze<'a>(
&'a self,
spec: &'a Spec,
args_resolver: &mut OpArgsResolver<'a>,
_context: &FlowInstanceContext,
) -> Result<(Args, EnrichedValueType)> {
) -> Result<SimpleFunctionAnalysisOutput<Args>> {
let args = Args {
text: args_resolver
.next_arg("text")?
Expand All @@ -157,7 +153,11 @@ impl SimpleFunctionFactoryBase for Factory {
{
output_type.nullable = true;
}
Ok((args, output_type))
Ok(SimpleFunctionAnalysisOutput {
resolved_args: args,
output_schema: output_type,
behavior_version: Some(1),
})
}

async fn build_executor(
Expand Down
10 changes: 7 additions & 3 deletions rust/cocoindex/src/ops/functions/parse_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ impl SimpleFunctionFactoryBase for Factory {
"ParseJson"
}

async fn resolve_schema<'a>(
async fn analyze<'a>(
&'a self,
_spec: &'a EmptySpec,
args_resolver: &mut OpArgsResolver<'a>,
_context: &FlowInstanceContext,
) -> Result<(Args, EnrichedValueType)> {
) -> Result<SimpleFunctionAnalysisOutput<Args>> {
let args = Args {
text: args_resolver
.next_arg("text")?
Expand All @@ -92,7 +92,11 @@ impl SimpleFunctionFactoryBase for Factory {
};

let output_schema = make_output_type(BasicValueType::Json);
Ok((args, output_schema))
Ok(SimpleFunctionAnalysisOutput {
resolved_args: args,
output_schema,
behavior_version: None,
})
}

async fn build_executor(
Expand Down
10 changes: 7 additions & 3 deletions rust/cocoindex/src/ops/functions/split_by_separators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,12 @@ impl SimpleFunctionFactoryBase for Factory {
"SplitBySeparators"
}

async fn resolve_schema<'a>(
async fn analyze<'a>(
&'a self,
_spec: &'a Spec,
args_resolver: &mut OpArgsResolver<'a>,
_context: &FlowInstanceContext,
) -> Result<(Args, EnrichedValueType)> {
) -> Result<SimpleFunctionAnalysisOutput<Args>> {
// one required arg: text: Str
let args = Args {
text: args_resolver
Expand All @@ -153,7 +153,11 @@ impl SimpleFunctionFactoryBase for Factory {
};

let output_schema = make_common_chunk_schema(args_resolver, &args.text)?;
Ok((args, output_schema))
Ok(SimpleFunctionAnalysisOutput {
resolved_args: args,
output_schema,
behavior_version: None,
})
}

async fn build_executor(
Expand Down
10 changes: 7 additions & 3 deletions rust/cocoindex/src/ops/functions/split_recursively.rs
Original file line number Diff line number Diff line change
Expand Up @@ -732,12 +732,12 @@ impl SimpleFunctionFactoryBase for Factory {
"SplitRecursively"
}

async fn resolve_schema<'a>(
async fn analyze<'a>(
&'a self,
_spec: &'a Spec,
args_resolver: &mut OpArgsResolver<'a>,
_context: &FlowInstanceContext,
) -> Result<(Args, EnrichedValueType)> {
) -> Result<SimpleFunctionAnalysisOutput<Args>> {
let args = Args {
text: args_resolver
.next_arg("text")?
Expand All @@ -763,7 +763,11 @@ impl SimpleFunctionFactoryBase for Factory {

let output_schema =
crate::ops::shared::split::make_common_chunk_schema(args_resolver, &args.text)?;
Ok((args, output_schema))
Ok(SimpleFunctionAnalysisOutput {
resolved_args: args,
output_schema,
behavior_version: None,
})
}

async fn build_executor(
Expand Down
4 changes: 2 additions & 2 deletions rust/cocoindex/src/ops/functions/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ pub async fn test_flow_function(
auth_registry: Arc::new(AuthRegistry::default()),
py_exec_ctx: None,
});
let (_, exec_fut) = factory
let build_output = factory
.clone()
.build(serde_json::to_value(spec)?, op_arg_schemas, context)
.await?;
let executor = exec_fut.await?;
let executor = build_output.executor.await?;

// 3. Evaluate
let result = executor.evaluate(input_arg_values).await?;
Expand Down
Loading
Loading