Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/docs/core/basics.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Each piece of data has a **data type**, falling into one of the following catego

* *Basic type*.
* *Struct type*: a collection of **fields**, each with a name and a type.
* *Table type*: a collection of **rows**, each of which is a struct with specified schema. A table type can be a *KTable* (which has a key field) or a *LTable* (ordered but without key field).
* *Table type*: a collection of **rows**, each of which is a struct with specified schema. A table type can be a *KTable* (with key columns that uniquely identify each row) or a *LTable* (rows are ordered but without keys).

An indexing flow always has a top-level struct, containing all data within and managed by the flow.

Expand Down
26 changes: 16 additions & 10 deletions docs/docs/core/data_types.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -148,21 +148,27 @@ We have two specific types of *Table* types: *KTable* and *LTable*.

#### KTable

*KTable* is a *Table* type whose first column serves as the key.
*KTable* is a *Table* type whose one or more columns together serve as the key.
The row order of a *KTable* is not preserved.
Type of the first column (key column) must be a [key type](#key-types).
Each key column must be a [key type](#key-types). When multiple key columns are present, they form a composite key.

In Python, a *KTable* type is represented by `dict[K, V]`.
The `K` should be the type binding to a key type,
and the `V` should be the type binding to a *Struct* type representing the value fields of each row.
When the specific type annotation is not provided,
the key type is bound to a tuple with its key parts when it's a *Struct* type, the value type is bound to `dict[str, Any]`.
In Python, a *KTable* type is represented by `dict[K, V]`.
`K` represents the key and `V` represents the value for each row:

- `K` can be a Struct type (either a frozen dataclass or a `NamedTuple`) that contains all key parts as fields. This is the general way to model multi-part keys.
- When there is only a single key part and it is a basic type (e.g. `str`, `int`), you may use that basic type directly as the dictionary key instead of wrapping it in a Struct.
- `V` should be the type bound to a *Struct* representing the non-key value fields of each row.

When a specific type annotation is not provided:
- For composite keys (multiple key parts), the key binds to a Python tuple of the key parts, e.g. `tuple[str, str]`.
- For a single basic key part, the key binds to that basic Python type.
- The value binds to `dict[str, Any]`.


For example, you can use `dict[str, Person]` or `dict[str, PersonTuple]` to represent a *KTable*, with 4 columns: key (*Str*), `first_name` (*Str*), `last_name` (*Str*), `dob` (*Date*).
It's bound to `dict[str, dict[str, Any]]` if you don't annotate the function argument with a specific type.

Note that if you want to use a *Struct* as the key, you need to ensure its value in Python is immutable. For `dataclass`, annotate it with `@dataclass(frozen=True)`. For `NamedTuple`, immutability is built-in. For example:
Note that when using a Struct as the key, it must be immutable in Python. For a dataclass, annotate it with `@dataclass(frozen=True)`. For `NamedTuple`, immutability is built-in. For example:

```python
@dataclass(frozen=True)
Expand All @@ -175,8 +181,8 @@ class PersonKeyTuple(NamedTuple):
id: str
```

Then you can use `dict[PersonKey, Person]` or `dict[PersonKeyTuple, PersonTuple]` to represent a KTable keyed by `PersonKey` or `PersonKeyTuple`.
It's bound to `dict[(str, str), dict[str, Any]]` if you don't annotate the function argument with a specific type.
Then you can use `dict[PersonKey, Person]` or `dict[PersonKeyTuple, PersonTuple]` to represent a KTable keyed by both `id_kind` and `id`.
If you don't annotate the function argument with a specific type, it's bound to `dict[tuple[str, str], dict[str, Any]]`.


#### LTable
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/getting_started/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ Notes:
* `chunk`, representing each row of `chunks`.

3. A *data source* extracts data from an external source.
In this example, the `LocalFile` data source imports local files as a KTable (table with a key field, see [KTable](../core/data_types#ktable) for details), each row has `"filename"` and `"content"` fields.
In this example, the `LocalFile` data source imports local files as a KTable (table with key columns, see [KTable](../core/data_types#ktable) for details), each row has `"filename"` and `"content"` fields.

4. After defining the KTable, we extend a new field `"chunks"` to each row by *transforming* the `"content"` field using `SplitRecursively`. The output of the `SplitRecursively` is also a KTable representing each chunk of the document, with `"location"` and `"text"` fields.

Expand Down
8 changes: 4 additions & 4 deletions examples/postgres_source/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ def postgres_product_indexing_flow(
with data_scope["products"].row() as product:
product["full_description"] = flow_builder.transform(
make_full_description,
product["_key"]["product_category"],
product["_key"]["product_name"],
product["product_category"],
product["product_name"],
product["description"],
)
product["total_value"] = flow_builder.transform(
Expand All @@ -112,8 +112,8 @@ def postgres_product_indexing_flow(
)
)
indexed_product.collect(
product_category=product["_key"]["product_category"],
product_name=product["_key"]["product_name"],
product_category=product["product_category"],
product_name=product["product_name"],
description=product["description"],
price=product["price"],
amount=product["amount"],
Expand Down
61 changes: 46 additions & 15 deletions python/cocoindex/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import numpy as np

from .typing import (
KEY_FIELD_NAME,
TABLE_TYPES,
AnalyzedAnyType,
AnalyzedBasicType,
Expand Down Expand Up @@ -96,14 +95,24 @@ def encode_struct_list(value: Any) -> Any:
f"Value type for dict is required to be a struct (e.g. dataclass or NamedTuple), got {variant.value_type}. "
f"If you want a free-formed dict, use `cocoindex.Json` instead."
)
value_encoder = make_engine_value_encoder(value_type_info)

key_encoder = make_engine_value_encoder(analyze_type_info(variant.key_type))
value_encoder = make_engine_value_encoder(analyze_type_info(variant.value_type))
key_type_info = analyze_type_info(variant.key_type)
key_encoder = make_engine_value_encoder(key_type_info)
if isinstance(key_type_info.variant, AnalyzedBasicType):

def encode_row(k: Any, v: Any) -> Any:
return [key_encoder(k)] + value_encoder(v)

else:

def encode_row(k: Any, v: Any) -> Any:
return key_encoder(k) + value_encoder(v)

def encode_struct_dict(value: Any) -> Any:
if not value:
return []
return [[key_encoder(k)] + value_encoder(v) for k, v in value.items()]
return [encode_row(k, v) for k, v in value.items()]

return encode_struct_dict

Expand Down Expand Up @@ -234,25 +243,47 @@ def decode(value: Any) -> Any | None:
f"declared `{dst_type_info.core_type}`, a dict type expected"
)

key_field_schema = engine_fields_schema[0]
field_path.append(f".{key_field_schema.get('name', KEY_FIELD_NAME)}")
key_decoder = make_engine_value_decoder(
field_path,
key_field_schema["type"],
analyze_type_info(key_type),
for_key=True,
)
field_path.pop()
num_key_parts = src_type.get("num_key_parts", 1)
key_type_info = analyze_type_info(key_type)
key_decoder: Callable[..., Any] | None = None
if (
isinstance(
key_type_info.variant, (AnalyzedBasicType, AnalyzedAnyType)
)
and num_key_parts == 1
):
single_key_decoder = make_engine_value_decoder(
field_path,
engine_fields_schema[0]["type"],
key_type_info,
for_key=True,
)

def key_decoder(value: list[Any]) -> Any:
return single_key_decoder(value[0])

else:
key_decoder = make_engine_struct_decoder(
field_path,
engine_fields_schema[0:num_key_parts],
key_type_info,
for_key=True,
)
value_decoder = make_engine_struct_decoder(
field_path,
engine_fields_schema[1:],
engine_fields_schema[num_key_parts:],
analyze_type_info(value_type),
)

def decode(value: Any) -> Any | None:
if value is None:
return None
return {key_decoder(v[0]): value_decoder(v[1:]) for v in value}
return {
key_decoder(v[0:num_key_parts]): value_decoder(
v[num_key_parts:]
)
for v in value
}

return decode

Expand Down
59 changes: 37 additions & 22 deletions python/cocoindex/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,35 +330,50 @@ def analyze_type_info(t: Any) -> AnalyzedTypeInfo:

def _encode_struct_schema(
struct_type: type, key_type: type | None = None
) -> dict[str, Any]:
) -> tuple[dict[str, Any], int | None]:
fields = []

def add_field(name: str, t: Any) -> None:
def add_field(name: str, analyzed_type: AnalyzedTypeInfo) -> None:
try:
type_info = encode_enriched_type_info(analyze_type_info(t))
type_info = encode_enriched_type_info(analyzed_type)
except ValueError as e:
e.add_note(
f"Failed to encode annotation for field - "
f"{struct_type.__name__}.{name}: {t}"
f"{struct_type.__name__}.{name}: {analyzed_type.core_type}"
)
raise
type_info["name"] = name
fields.append(type_info)

def add_fields_from_struct(struct_type: type) -> None:
if dataclasses.is_dataclass(struct_type):
for field in dataclasses.fields(struct_type):
add_field(field.name, analyze_type_info(field.type))
elif is_namedtuple_type(struct_type):
for name, field_type in struct_type.__annotations__.items():
add_field(name, analyze_type_info(field_type))
else:
raise ValueError(f"Unsupported struct type: {struct_type}")

result: dict[str, Any] = {}
num_key_parts = None
if key_type is not None:
add_field(KEY_FIELD_NAME, key_type)
key_type_info = analyze_type_info(key_type)
if isinstance(key_type_info.variant, AnalyzedBasicType):
add_field(KEY_FIELD_NAME, key_type_info)
num_key_parts = 1
elif isinstance(key_type_info.variant, AnalyzedStructType):
add_fields_from_struct(key_type_info.variant.struct_type)
num_key_parts = len(fields)
else:
raise ValueError(f"Unsupported key type: {key_type}")

if dataclasses.is_dataclass(struct_type):
for field in dataclasses.fields(struct_type):
add_field(field.name, field.type)
elif is_namedtuple_type(struct_type):
for name, field_type in struct_type.__annotations__.items():
add_field(name, field_type)
add_fields_from_struct(struct_type)

result: dict[str, Any] = {"fields": fields}
result["fields"] = fields
if doc := inspect.getdoc(struct_type):
result["description"] = doc
return result
return result, num_key_parts


def _encode_type(type_info: AnalyzedTypeInfo) -> dict[str, Any]:
Expand All @@ -374,7 +389,7 @@ def _encode_type(type_info: AnalyzedTypeInfo) -> dict[str, Any]:
return {"kind": variant.kind}

if isinstance(variant, AnalyzedStructType):
encoded_type = _encode_struct_schema(variant.struct_type)
encoded_type, _ = _encode_struct_schema(variant.struct_type)
encoded_type["kind"] = "Struct"
return encoded_type

Expand All @@ -384,10 +399,8 @@ def _encode_type(type_info: AnalyzedTypeInfo) -> dict[str, Any]:
if isinstance(elem_type_info.variant, AnalyzedStructType):
if variant.vector_info is not None:
raise ValueError("LTable type must not have a vector info")
return {
"kind": "LTable",
"row": _encode_struct_schema(elem_type_info.variant.struct_type),
}
row_type, _ = _encode_struct_schema(elem_type_info.variant.struct_type)
return {"kind": "LTable", "row": row_type}
else:
vector_info = variant.vector_info
return {
Expand All @@ -402,12 +415,14 @@ def _encode_type(type_info: AnalyzedTypeInfo) -> dict[str, Any]:
raise ValueError(
f"KTable value must have a Struct type, got {value_type_info.core_type}"
)
row_type, num_key_parts = _encode_struct_schema(
value_type_info.variant.struct_type,
variant.key_type,
)
return {
"kind": "KTable",
"row": _encode_struct_schema(
value_type_info.variant.struct_type,
variant.key_type,
),
"row": row_type,
"num_key_parts": num_key_parts,
}

if isinstance(variant, AnalyzedUnionType):
Expand Down
Loading
Loading