diff --git a/examples/pdf_embedding/main.py b/examples/pdf_embedding/main.py index 0f7994ba1..a87ea9ab3 100644 --- a/examples/pdf_embedding/main.py +++ b/examples/pdf_embedding/main.py @@ -55,13 +55,14 @@ def pdf_embedding_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoinde with doc["chunks"].row() as chunk: chunk["embedding"] = chunk["text"].call(text_to_embedding) - doc_embeddings.collect(filename=doc["filename"], location=chunk["location"], + doc_embeddings.collect(id=cocoindex.GeneratedField.UUID, + filename=doc["filename"], location=chunk["location"], text=chunk["text"], embedding=chunk["embedding"]) doc_embeddings.export( "doc_embeddings", cocoindex.storages.Postgres(), - primary_key_fields=["filename", "location"], + primary_key_fields=["id"], vector_index=[("embedding", cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY)]) query_handler = cocoindex.query.SimpleSemanticsQueryHandler( diff --git a/python/cocoindex/__init__.py b/python/cocoindex/__init__.py index 8c9916777..81397f404 100644 --- a/python/cocoindex/__init__.py +++ b/python/cocoindex/__init__.py @@ -2,7 +2,8 @@ Cocoindex is a framework for building and running indexing pipelines. """ from . import flow, functions, query, sources, storages, cli -from .flow import FlowBuilder, DataScope, DataSlice, Flow, flow_def, EvaluateAndDumpOptions +from .flow import FlowBuilder, DataScope, DataSlice, Flow, flow_def +from .flow import EvaluateAndDumpOptions, GeneratedField from .llm import LlmSpec, LlmApiType from .vector import VectorSimilarityMetric from .lib import * diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index 23fc88721..2ace32df2 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -238,6 +238,12 @@ def add_collector(self, name: str | None = None) -> DataCollector: ) ) +class GeneratedField(Enum): + """ + A generated field is automatically set by the engine. + """ + UUID = "Uuid" + class DataCollector: """A data collector is used to collect data into a collector.""" _flow_builder_state: _FlowBuilderState @@ -248,12 +254,25 @@ def __init__(self, flow_builder_state: _FlowBuilderState, self._flow_builder_state = flow_builder_state self._engine_data_collector = data_collector - def collect(self, **kwargs: DataSlice): + def collect(self, **kwargs: DataSlice | GeneratedField): """ Collect data into the collector. """ + regular_kwargs = [] + auto_uuid_field = None + for k, v in kwargs.items(): + if isinstance(v, GeneratedField): + if v == GeneratedField.UUID: + if auto_uuid_field is not None: + raise ValueError("Only one generated UUID field is allowed") + auto_uuid_field = k + else: + raise ValueError(f"Unexpected generated field: {v}") + else: + regular_kwargs.append((k, _data_slice_state(v).engine_data_slice)) + self._flow_builder_state.engine_flow_builder.collect( - self._engine_data_collector, [(k, _data_slice_state(v).engine_data_slice) for k, v in kwargs.items()]) + self._engine_data_collector, regular_kwargs, auto_uuid_field) def export(self, name: str, target_spec: op.StorageSpec, /, *, primary_key_fields: Sequence[str] | None = None, diff --git a/src/base/schema.rs b/src/base/schema.rs index 2a4f45c08..24537a19e 100644 --- a/src/base/schema.rs +++ b/src/base/schema.rs @@ -360,13 +360,13 @@ impl std::fmt::Display for CollectorSchema { } impl CollectorSchema { - pub fn from_fields(fields: Vec, has_auto_uuid_field: bool) -> Self { + pub fn from_fields(fields: Vec, auto_uuid_field: Option) -> Self { let mut fields = fields; - let auto_uuid_field_idx = if has_auto_uuid_field { + let auto_uuid_field_idx = if let Some(auto_uuid_field) = auto_uuid_field { fields.insert( 0, FieldSchema::new( - "uuid".to_string(), + auto_uuid_field, EnrichedValueType { typ: ValueType::Basic(BasicValueType::Uuid), nullable: false, diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index 8095d1112..b50ba7b01 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -799,7 +799,7 @@ impl AnalyzerContext<'_> { collector_ref: add_collector( &op.scope_name, op.collector_name.clone(), - CollectorSchema::from_fields(fields_schema, has_auto_uuid_field), + CollectorSchema::from_fields(fields_schema, op.auto_uuid_field.clone()), scopes, )?, fingerprinter, diff --git a/src/builder/flow_builder.rs b/src/builder/flow_builder.rs index 3e4361920..53f966316 100644 --- a/src/builder/flow_builder.rs +++ b/src/builder/flow_builder.rs @@ -543,7 +543,7 @@ impl FlowBuilder { }, scope_name: collector.scope.scope_name.clone(), collector_name: collector.name.clone(), - auto_uuid_field, + auto_uuid_field: auto_uuid_field.clone(), }), }; @@ -565,7 +565,7 @@ impl FlowBuilder { value_type: ds.data_type.schema, }) .collect(), - has_auto_uuid_field, + auto_uuid_field, ); { let mut collector = collector.collector.lock().unwrap();