Skip to content

Commit 3a18979

Browse files
authored
Expose UUID to Python SDK's DataCollector.collect() method. (#211)
1 parent 5be47e4 commit 3a18979

File tree

6 files changed

+32
-11
lines changed

6 files changed

+32
-11
lines changed

examples/pdf_embedding/main.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,14 @@ def pdf_embedding_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoinde
5555

5656
with doc["chunks"].row() as chunk:
5757
chunk["embedding"] = chunk["text"].call(text_to_embedding)
58-
doc_embeddings.collect(filename=doc["filename"], location=chunk["location"],
58+
doc_embeddings.collect(id=cocoindex.GeneratedField.UUID,
59+
filename=doc["filename"], location=chunk["location"],
5960
text=chunk["text"], embedding=chunk["embedding"])
6061

6162
doc_embeddings.export(
6263
"doc_embeddings",
6364
cocoindex.storages.Postgres(),
64-
primary_key_fields=["filename", "location"],
65+
primary_key_fields=["id"],
6566
vector_index=[("embedding", cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY)])
6667

6768
query_handler = cocoindex.query.SimpleSemanticsQueryHandler(

python/cocoindex/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22
Cocoindex is a framework for building and running indexing pipelines.
33
"""
44
from . import flow, functions, query, sources, storages, cli
5-
from .flow import FlowBuilder, DataScope, DataSlice, Flow, flow_def, EvaluateAndDumpOptions
5+
from .flow import FlowBuilder, DataScope, DataSlice, Flow, flow_def
6+
from .flow import EvaluateAndDumpOptions, GeneratedField
67
from .llm import LlmSpec, LlmApiType
78
from .vector import VectorSimilarityMetric
89
from .lib import *

python/cocoindex/flow.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,12 @@ def add_collector(self, name: str | None = None) -> DataCollector:
238238
)
239239
)
240240

241+
class GeneratedField(Enum):
242+
"""
243+
A generated field is automatically set by the engine.
244+
"""
245+
UUID = "Uuid"
246+
241247
class DataCollector:
242248
"""A data collector is used to collect data into a collector."""
243249
_flow_builder_state: _FlowBuilderState
@@ -248,12 +254,25 @@ def __init__(self, flow_builder_state: _FlowBuilderState,
248254
self._flow_builder_state = flow_builder_state
249255
self._engine_data_collector = data_collector
250256

251-
def collect(self, **kwargs: DataSlice):
257+
def collect(self, **kwargs: DataSlice | GeneratedField):
252258
"""
253259
Collect data into the collector.
254260
"""
261+
regular_kwargs = []
262+
auto_uuid_field = None
263+
for k, v in kwargs.items():
264+
if isinstance(v, GeneratedField):
265+
if v == GeneratedField.UUID:
266+
if auto_uuid_field is not None:
267+
raise ValueError("Only one generated UUID field is allowed")
268+
auto_uuid_field = k
269+
else:
270+
raise ValueError(f"Unexpected generated field: {v}")
271+
else:
272+
regular_kwargs.append((k, _data_slice_state(v).engine_data_slice))
273+
255274
self._flow_builder_state.engine_flow_builder.collect(
256-
self._engine_data_collector, [(k, _data_slice_state(v).engine_data_slice) for k, v in kwargs.items()])
275+
self._engine_data_collector, regular_kwargs, auto_uuid_field)
257276

258277
def export(self, name: str, target_spec: op.StorageSpec, /, *,
259278
primary_key_fields: Sequence[str] | None = None,

src/base/schema.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -360,13 +360,13 @@ impl std::fmt::Display for CollectorSchema {
360360
}
361361

362362
impl CollectorSchema {
363-
pub fn from_fields(fields: Vec<FieldSchema>, has_auto_uuid_field: bool) -> Self {
363+
pub fn from_fields(fields: Vec<FieldSchema>, auto_uuid_field: Option<FieldName>) -> Self {
364364
let mut fields = fields;
365-
let auto_uuid_field_idx = if has_auto_uuid_field {
365+
let auto_uuid_field_idx = if let Some(auto_uuid_field) = auto_uuid_field {
366366
fields.insert(
367367
0,
368368
FieldSchema::new(
369-
"uuid".to_string(),
369+
auto_uuid_field,
370370
EnrichedValueType {
371371
typ: ValueType::Basic(BasicValueType::Uuid),
372372
nullable: false,

src/builder/analyzer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -799,7 +799,7 @@ impl AnalyzerContext<'_> {
799799
collector_ref: add_collector(
800800
&op.scope_name,
801801
op.collector_name.clone(),
802-
CollectorSchema::from_fields(fields_schema, has_auto_uuid_field),
802+
CollectorSchema::from_fields(fields_schema, op.auto_uuid_field.clone()),
803803
scopes,
804804
)?,
805805
fingerprinter,

src/builder/flow_builder.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -543,7 +543,7 @@ impl FlowBuilder {
543543
},
544544
scope_name: collector.scope.scope_name.clone(),
545545
collector_name: collector.name.clone(),
546-
auto_uuid_field,
546+
auto_uuid_field: auto_uuid_field.clone(),
547547
}),
548548
};
549549

@@ -565,7 +565,7 @@ impl FlowBuilder {
565565
value_type: ds.data_type.schema,
566566
})
567567
.collect(),
568-
has_auto_uuid_field,
568+
auto_uuid_field,
569569
);
570570
{
571571
let mut collector = collector.collector.lock().unwrap();

0 commit comments

Comments
 (0)