diff --git a/python/cocoindex/auth_registry.py b/python/cocoindex/auth_registry.py index f37f82f1f..925c071fa 100644 --- a/python/cocoindex/auth_registry.py +++ b/python/cocoindex/auth_registry.py @@ -6,7 +6,7 @@ from typing import Generic, TypeVar from . import _engine # type: ignore -from .convert import dump_engine_object, load_engine_object +from .engine_object import dump_engine_object, load_engine_object T = TypeVar("T") diff --git a/python/cocoindex/engine_object.py b/python/cocoindex/engine_object.py new file mode 100644 index 000000000..b371d8010 --- /dev/null +++ b/python/cocoindex/engine_object.py @@ -0,0 +1,272 @@ +""" +Utilities to dump/load objects (for configs, specs). +""" + +from __future__ import annotations + +import datetime +import dataclasses +from enum import Enum +from typing import Any, Mapping, TypeVar, overload, get_origin + +import numpy as np + +from .typing import ( + AnalyzedAnyType, + AnalyzedBasicType, + AnalyzedDictType, + AnalyzedListType, + AnalyzedStructType, + AnalyzedTypeInfo, + AnalyzedUnionType, + EnrichedValueType, + FieldSchema, + analyze_type_info, + encode_enriched_type, + is_namedtuple_type, + is_pydantic_model, + extract_ndarray_elem_dtype, +) + + +T = TypeVar("T") + +try: + import pydantic, pydantic_core +except ImportError: + pass + + +def get_auto_default_for_type( + type_info: AnalyzedTypeInfo, +) -> tuple[Any, bool]: + """ + Get an auto-default value for a type annotation if it's safe to do so. + + Returns: + A tuple of (default_value, is_supported) where: + - default_value: The default value if auto-defaulting is supported + - is_supported: True if auto-defaulting is supported for this type + """ + # Case 1: Nullable types (Optional[T] or T | None) + if type_info.nullable: + return None, True + + # Case 2: Table types (KTable or LTable) - check if it's a list or dict type + if isinstance(type_info.variant, AnalyzedListType): + return [], True + elif isinstance(type_info.variant, AnalyzedDictType): + return {}, True + + return None, False + + +def dump_engine_object(v: Any) -> Any: + """Recursively dump an object for engine. Engine side uses `Pythonized` to catch.""" + if v is None: + return None + elif isinstance(v, EnrichedValueType): + return v.encode() + elif isinstance(v, FieldSchema): + return v.encode() + elif isinstance(v, type) or get_origin(v) is not None: + return encode_enriched_type(v) + elif isinstance(v, Enum): + return v.value + elif isinstance(v, datetime.timedelta): + total_secs = v.total_seconds() + secs = int(total_secs) + nanos = int((total_secs - secs) * 1e9) + return {"secs": secs, "nanos": nanos} + elif is_namedtuple_type(type(v)): + # Handle NamedTuple objects specifically to use dict format + field_names = list(getattr(type(v), "_fields", ())) + result = {} + for name in field_names: + val = getattr(v, name) + result[name] = dump_engine_object(val) # Include all values, including None + if hasattr(v, "kind") and "kind" not in result: + result["kind"] = v.kind + return result + elif hasattr(v, "__dict__"): # for dataclass-like objects + s = {} + for k, val in v.__dict__.items(): + if val is None: + # Skip None values + continue + s[k] = dump_engine_object(val) + if hasattr(v, "kind") and "kind" not in s: + s["kind"] = v.kind + return s + elif isinstance(v, (list, tuple)): + return [dump_engine_object(item) for item in v] + elif isinstance(v, np.ndarray): + return v.tolist() + elif isinstance(v, dict): + return {k: dump_engine_object(v) for k, v in v.items()} + return v + + +@overload +def load_engine_object(expected_type: type[T], v: Any) -> T: ... +@overload +def load_engine_object(expected_type: Any, v: Any) -> Any: ... +def load_engine_object(expected_type: Any, v: Any) -> Any: + """Recursively load an object that was produced by dump_engine_object(). + + Args: + expected_type: The Python type annotation to reconstruct to. + v: The engine-facing Pythonized object (e.g., dict/list/primitive) to convert. + + Returns: + A Python object matching the expected_type where possible. + """ + # Fast path + if v is None: + return None + + type_info = analyze_type_info(expected_type) + variant = type_info.variant + + if type_info.core_type is EnrichedValueType: + return EnrichedValueType.decode(v) + if type_info.core_type is FieldSchema: + return FieldSchema.decode(v) + + # Any or unknown → return as-is + if isinstance(variant, AnalyzedAnyType) or type_info.base_type is Any: + return v + + # Enum handling + if isinstance(expected_type, type) and issubclass(expected_type, Enum): + return expected_type(v) + + # TimeDelta special form {secs, nanos} + if isinstance(variant, AnalyzedBasicType) and variant.kind == "TimeDelta": + if isinstance(v, Mapping) and "secs" in v and "nanos" in v: + secs = int(v["secs"]) # type: ignore[index] + nanos = int(v["nanos"]) # type: ignore[index] + return datetime.timedelta(seconds=secs, microseconds=nanos / 1_000) + return v + + # List, NDArray (Vector-ish), or general sequences + if isinstance(variant, AnalyzedListType): + elem_type = variant.elem_type if variant.elem_type else Any + if type_info.base_type is np.ndarray: + # Reconstruct NDArray with appropriate dtype if available + try: + dtype = extract_ndarray_elem_dtype(type_info.core_type) + except (TypeError, ValueError, AttributeError): + dtype = None + return np.array(v, dtype=dtype) + # Regular Python list + return [load_engine_object(elem_type, item) for item in v] + + # Dict / Mapping + if isinstance(variant, AnalyzedDictType): + key_t = variant.key_type + val_t = variant.value_type + return { + load_engine_object(key_t, k): load_engine_object(val_t, val) + for k, val in v.items() + } + + # Structs (dataclass, NamedTuple, or Pydantic) + if isinstance(variant, AnalyzedStructType): + struct_type = variant.struct_type + init_kwargs: dict[str, Any] = {} + missing_fields: list[tuple[str, Any]] = [] + if dataclasses.is_dataclass(struct_type): + if not isinstance(v, Mapping): + raise ValueError(f"Expected dict for dataclass, got {type(v)}") + + for dc_field in dataclasses.fields(struct_type): + if dc_field.name in v: + init_kwargs[dc_field.name] = load_engine_object( + dc_field.type, v[dc_field.name] + ) + else: + if ( + dc_field.default is dataclasses.MISSING + and dc_field.default_factory is dataclasses.MISSING + ): + missing_fields.append((dc_field.name, dc_field.type)) + + elif is_namedtuple_type(struct_type): + if not isinstance(v, Mapping): + raise ValueError(f"Expected dict for NamedTuple, got {type(v)}") + # Dict format (from dump/load functions) + annotations = getattr(struct_type, "__annotations__", {}) + field_names = list(getattr(struct_type, "_fields", ())) + field_defaults = getattr(struct_type, "_field_defaults", {}) + + for name in field_names: + f_type = annotations.get(name, Any) + if name in v: + init_kwargs[name] = load_engine_object(f_type, v[name]) + elif name not in field_defaults: + missing_fields.append((name, f_type)) + + elif is_pydantic_model(struct_type): + if not isinstance(v, Mapping): + raise ValueError(f"Expected dict for Pydantic model, got {type(v)}") + + model_fields: dict[str, pydantic.fields.FieldInfo] + if hasattr(struct_type, "model_fields"): + model_fields = struct_type.model_fields # type: ignore[attr-defined] + else: + model_fields = {} + + for name, pyd_field in model_fields.items(): + if name in v: + init_kwargs[name] = load_engine_object( + pyd_field.annotation, v[name] + ) + elif ( + getattr(pyd_field, "default", pydantic_core.PydanticUndefined) + is pydantic_core.PydanticUndefined + and getattr(pyd_field, "default_factory") is None + ): + missing_fields.append((name, pyd_field.annotation)) + else: + assert False, "Unsupported struct type" + + for name, f_type in missing_fields: + type_info = analyze_type_info(f_type) + auto_default, is_supported = get_auto_default_for_type(type_info) + if is_supported: + init_kwargs[name] = auto_default + return struct_type(**init_kwargs) + + # Union with discriminator support via "kind" + if isinstance(variant, AnalyzedUnionType): + if isinstance(v, Mapping) and "kind" in v: + discriminator = v["kind"] + for typ in variant.variant_types: + t_info = analyze_type_info(typ) + if isinstance(t_info.variant, AnalyzedStructType): + t_struct = t_info.variant.struct_type + candidate_kind = getattr(t_struct, "kind", None) + if candidate_kind == discriminator: + # Remove discriminator for constructor + v_wo_kind = dict(v) + v_wo_kind.pop("kind", None) + return load_engine_object(t_struct, v_wo_kind) + # Fallback: try each variant until one succeeds + for typ in variant.variant_types: + try: + return load_engine_object(typ, v) + except (TypeError, ValueError): + continue + return v + + # Basic types and everything else: handle numpy scalars and passthrough + if isinstance(v, np.ndarray) and type_info.base_type is list: + return v.tolist() + if isinstance(v, (list, tuple)) and type_info.base_type not in (list, tuple): + # If a non-sequence basic type expected, attempt direct cast + try: + return type_info.core_type(v) + except (TypeError, ValueError): + return v + return v diff --git a/python/cocoindex/convert.py b/python/cocoindex/engine_value.py similarity index 70% rename from python/cocoindex/convert.py rename to python/cocoindex/engine_value.py index bb5263553..6effdf39d 100644 --- a/python/cocoindex/convert.py +++ b/python/cocoindex/engine_value.py @@ -1,18 +1,15 @@ """ -Utilities to convert between Python and engine values. +Utilities to encode/decode values in cocoindex (for data). """ from __future__ import annotations import dataclasses -import datetime import inspect import warnings -from enum import Enum -from typing import Any, Callable, Mapping, get_origin, TypeVar, overload +from typing import Any, Callable, Mapping, TypeVar import numpy as np - from .typing import ( AnalyzedAnyType, AnalyzedBasicType, @@ -22,28 +19,21 @@ AnalyzedTypeInfo, AnalyzedUnionType, AnalyzedUnknownType, - EnrichedValueType, analyze_type_info, - encode_enriched_type, is_namedtuple_type, is_pydantic_model, is_numpy_number_type, - extract_ndarray_elem_dtype, ValueType, FieldSchema, BasicValueType, StructType, TableType, ) +from .engine_object import get_auto_default_for_type T = TypeVar("T") -try: - import pydantic, pydantic_core -except ImportError: - pass - class ChildFieldPath: """Context manager to append a field to field_path on enter and pop it on exit.""" @@ -449,30 +439,6 @@ def decode_scalar(value: Any) -> Any | None: return lambda value: value -def _get_auto_default_for_type( - type_info: AnalyzedTypeInfo, -) -> tuple[Any, bool]: - """ - Get an auto-default value for a type annotation if it's safe to do so. - - Returns: - A tuple of (default_value, is_supported) where: - - default_value: The default value if auto-defaulting is supported - - is_supported: True if auto-defaulting is supported for this type - """ - # Case 1: Nullable types (Optional[T] or T | None) - if type_info.nullable: - return None, True - - # Case 2: Table types (KTable or LTable) - check if it's a list or dict type - if isinstance(type_info.variant, AnalyzedListType): - return [], True - elif isinstance(type_info.variant, AnalyzedDictType): - return {}, True - - return None, False - - def make_engine_struct_decoder( field_path: list[str], src_fields: list[FieldSchema], @@ -567,7 +533,7 @@ def make_closure_for_field( if default_value is not inspect.Parameter.empty: return lambda _: default_value - auto_default, is_supported = _get_auto_default_for_type(type_info) + auto_default, is_supported = get_auto_default_for_type(type_info) if is_supported: warnings.warn( f"Field '{name}' (type {param.annotation}) without default value is missing in input: " @@ -667,214 +633,3 @@ def decode_to_tuple(values: list[Any] | None) -> tuple[Any, ...] | None: ) return decode_to_tuple - - -def dump_engine_object(v: Any) -> Any: - """Recursively dump an object for engine. Engine side uses `Pythonized` to catch.""" - if v is None: - return None - elif isinstance(v, EnrichedValueType): - return v.encode() - elif isinstance(v, FieldSchema): - return v.encode() - elif isinstance(v, type) or get_origin(v) is not None: - return encode_enriched_type(v) - elif isinstance(v, Enum): - return v.value - elif isinstance(v, datetime.timedelta): - total_secs = v.total_seconds() - secs = int(total_secs) - nanos = int((total_secs - secs) * 1e9) - return {"secs": secs, "nanos": nanos} - elif is_namedtuple_type(type(v)): - # Handle NamedTuple objects specifically to use dict format - field_names = list(getattr(type(v), "_fields", ())) - result = {} - for name in field_names: - val = getattr(v, name) - result[name] = dump_engine_object(val) # Include all values, including None - if hasattr(v, "kind") and "kind" not in result: - result["kind"] = v.kind - return result - elif hasattr(v, "__dict__"): # for dataclass-like objects - s = {} - for k, val in v.__dict__.items(): - if val is None: - # Skip None values - continue - s[k] = dump_engine_object(val) - if hasattr(v, "kind") and "kind" not in s: - s["kind"] = v.kind - return s - elif isinstance(v, (list, tuple)): - return [dump_engine_object(item) for item in v] - elif isinstance(v, np.ndarray): - return v.tolist() - elif isinstance(v, dict): - return {k: dump_engine_object(v) for k, v in v.items()} - return v - - -@overload -def load_engine_object(expected_type: type[T], v: Any) -> T: ... -@overload -def load_engine_object(expected_type: Any, v: Any) -> Any: ... -def load_engine_object(expected_type: Any, v: Any) -> Any: - """Recursively load an object that was produced by dump_engine_object(). - - Args: - expected_type: The Python type annotation to reconstruct to. - v: The engine-facing Pythonized object (e.g., dict/list/primitive) to convert. - - Returns: - A Python object matching the expected_type where possible. - """ - # Fast path - if v is None: - return None - - type_info = analyze_type_info(expected_type) - variant = type_info.variant - - if type_info.core_type is EnrichedValueType: - return EnrichedValueType.decode(v) - if type_info.core_type is FieldSchema: - return FieldSchema.decode(v) - - # Any or unknown → return as-is - if isinstance(variant, AnalyzedAnyType) or type_info.base_type is Any: - return v - - # Enum handling - if isinstance(expected_type, type) and issubclass(expected_type, Enum): - return expected_type(v) - - # TimeDelta special form {secs, nanos} - if isinstance(variant, AnalyzedBasicType) and variant.kind == "TimeDelta": - if isinstance(v, Mapping) and "secs" in v and "nanos" in v: - secs = int(v["secs"]) # type: ignore[index] - nanos = int(v["nanos"]) # type: ignore[index] - return datetime.timedelta(seconds=secs, microseconds=nanos / 1_000) - return v - - # List, NDArray (Vector-ish), or general sequences - if isinstance(variant, AnalyzedListType): - elem_type = variant.elem_type if variant.elem_type else Any - if type_info.base_type is np.ndarray: - # Reconstruct NDArray with appropriate dtype if available - try: - dtype = extract_ndarray_elem_dtype(type_info.core_type) - except (TypeError, ValueError, AttributeError): - dtype = None - return np.array(v, dtype=dtype) - # Regular Python list - return [load_engine_object(elem_type, item) for item in v] - - # Dict / Mapping - if isinstance(variant, AnalyzedDictType): - key_t = variant.key_type - val_t = variant.value_type - return { - load_engine_object(key_t, k): load_engine_object(val_t, val) - for k, val in v.items() - } - - # Structs (dataclass, NamedTuple, or Pydantic) - if isinstance(variant, AnalyzedStructType): - struct_type = variant.struct_type - init_kwargs: dict[str, Any] = {} - missing_fields: list[tuple[str, Any]] = [] - if dataclasses.is_dataclass(struct_type): - if not isinstance(v, Mapping): - raise ValueError(f"Expected dict for dataclass, got {type(v)}") - - for dc_field in dataclasses.fields(struct_type): - if dc_field.name in v: - init_kwargs[dc_field.name] = load_engine_object( - dc_field.type, v[dc_field.name] - ) - else: - if ( - dc_field.default is dataclasses.MISSING - and dc_field.default_factory is dataclasses.MISSING - ): - missing_fields.append((dc_field.name, dc_field.type)) - - elif is_namedtuple_type(struct_type): - if not isinstance(v, Mapping): - raise ValueError(f"Expected dict for NamedTuple, got {type(v)}") - # Dict format (from dump/load functions) - annotations = getattr(struct_type, "__annotations__", {}) - field_names = list(getattr(struct_type, "_fields", ())) - field_defaults = getattr(struct_type, "_field_defaults", {}) - - for name in field_names: - f_type = annotations.get(name, Any) - if name in v: - init_kwargs[name] = load_engine_object(f_type, v[name]) - elif name not in field_defaults: - missing_fields.append((name, f_type)) - - elif is_pydantic_model(struct_type): - if not isinstance(v, Mapping): - raise ValueError(f"Expected dict for Pydantic model, got {type(v)}") - - model_fields: dict[str, pydantic.fields.FieldInfo] - if hasattr(struct_type, "model_fields"): - model_fields = struct_type.model_fields # type: ignore[attr-defined] - else: - model_fields = {} - - for name, pyd_field in model_fields.items(): - if name in v: - init_kwargs[name] = load_engine_object( - pyd_field.annotation, v[name] - ) - elif ( - getattr(pyd_field, "default", pydantic_core.PydanticUndefined) - is pydantic_core.PydanticUndefined - and getattr(pyd_field, "default_factory") is None - ): - missing_fields.append((name, pyd_field.annotation)) - else: - assert False, "Unsupported struct type" - - for name, f_type in missing_fields: - type_info = analyze_type_info(f_type) - auto_default, is_supported = _get_auto_default_for_type(type_info) - if is_supported: - init_kwargs[name] = auto_default - return struct_type(**init_kwargs) - - # Union with discriminator support via "kind" - if isinstance(variant, AnalyzedUnionType): - if isinstance(v, Mapping) and "kind" in v: - discriminator = v["kind"] - for typ in variant.variant_types: - t_info = analyze_type_info(typ) - if isinstance(t_info.variant, AnalyzedStructType): - t_struct = t_info.variant.struct_type - candidate_kind = getattr(t_struct, "kind", None) - if candidate_kind == discriminator: - # Remove discriminator for constructor - v_wo_kind = dict(v) - v_wo_kind.pop("kind", None) - return load_engine_object(t_struct, v_wo_kind) - # Fallback: try each variant until one succeeds - for typ in variant.variant_types: - try: - return load_engine_object(typ, v) - except (TypeError, ValueError): - continue - return v - - # Basic types and everything else: handle numpy scalars and passthrough - if isinstance(v, np.ndarray) and type_info.base_type is list: - return v.tolist() - if isinstance(v, (list, tuple)) and type_info.base_type not in (list, tuple): - # If a non-sequence basic type expected, attempt direct cast - try: - return type_info.core_type(v) - except (TypeError, ValueError): - return v - return v diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index 7b1f8d9a5..9f7badd85 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -31,8 +31,8 @@ from . import index from . import op from . import setting -from .convert import ( - dump_engine_object, +from .engine_object import dump_engine_object +from .engine_value import ( make_engine_value_decoder, make_engine_value_encoder, ) diff --git a/python/cocoindex/lib.py b/python/cocoindex/lib.py index 70fe68b6e..54745bc70 100644 --- a/python/cocoindex/lib.py +++ b/python/cocoindex/lib.py @@ -7,7 +7,7 @@ from . import _engine # type: ignore from . import flow, setting -from .convert import dump_engine_object +from .engine_object import dump_engine_object from .validation import validate_app_namespace_name from typing import Any, Callable, overload diff --git a/python/cocoindex/op.py b/python/cocoindex/op.py index 1bd12fdee..380f60188 100644 --- a/python/cocoindex/op.py +++ b/python/cocoindex/op.py @@ -17,9 +17,8 @@ from . import _engine # type: ignore from .subprocess_exec import executor_stub -from .convert import ( - dump_engine_object, - load_engine_object, +from .engine_object import dump_engine_object, load_engine_object +from .engine_value import ( make_engine_value_encoder, make_engine_value_decoder, make_engine_key_decoder, diff --git a/python/cocoindex/tests/test_load_convert.py b/python/cocoindex/tests/test_engine_object.py similarity index 62% rename from python/cocoindex/tests/test_load_convert.py rename to python/cocoindex/tests/test_engine_object.py index 6f67926a0..b8b8b2658 100644 --- a/python/cocoindex/tests/test_load_convert.py +++ b/python/cocoindex/tests/test_engine_object.py @@ -1,12 +1,13 @@ import dataclasses import datetime -from typing import TypedDict, NamedTuple +from typing import TypedDict, NamedTuple, Literal import numpy as np from numpy.typing import NDArray import pytest -from cocoindex.convert import dump_engine_object, load_engine_object +from cocoindex.typing import Vector +from cocoindex.engine_object import dump_engine_object, load_engine_object # Optional Pydantic support for testing try: @@ -192,6 +193,30 @@ class TestClass: pass # Expected behavior +def test_dump_vector_type_annotation_with_dim() -> None: + """Test dumping a vector type annotation with a specified dimension.""" + expected_dump = { + "type": { + "kind": "Vector", + "element_type": {"kind": "Float32"}, + "dimension": 3, + } + } + assert dump_engine_object(Vector[np.float32, Literal[3]]) == expected_dump + + +def test_dump_vector_type_annotation_no_dim() -> None: + """Test dumping a vector type annotation with no dimension.""" + expected_dump_no_dim = { + "type": { + "kind": "Vector", + "element_type": {"kind": "Float64"}, + "dimension": None, + } + } + assert dump_engine_object(Vector[np.float64]) == expected_dump_no_dim + + @pytest.mark.skipif(not PYDANTIC_AVAILABLE, reason="Pydantic not available") def test_pydantic_unsupported_type_still_fails() -> None: """Test that fields with unsupported types still cause errors when missing.""" @@ -221,3 +246,86 @@ class TestPydantic(pydantic.BaseModel): list_field=[], dict_field={}, ) + + +@pytest.mark.skipif(not PYDANTIC_AVAILABLE, reason="Pydantic not available") +def test_pydantic_field_descriptions() -> None: + """Test that Pydantic field descriptions are extracted and included in schema.""" + from pydantic import BaseModel, Field + + class UserWithDescriptions(BaseModel): + """A user model with field descriptions.""" + + name: str = Field(description="The user's full name") + age: int = Field(description="The user's age in years", ge=0, le=150) + email: str = Field(description="The user's email address") + is_active: bool = Field( + description="Whether the user account is active", default=True + ) + + # Test that field descriptions are extracted + encoded_schema = dump_engine_object(UserWithDescriptions) + + # Check that the schema contains field descriptions + assert "fields" in encoded_schema["type"] + fields = encoded_schema["type"]["fields"] + + # Find fields by name and check descriptions + field_descriptions = {field["name"]: field.get("description") for field in fields} + + assert field_descriptions["name"] == "The user's full name" + assert field_descriptions["age"] == "The user's age in years" + assert field_descriptions["email"] == "The user's email address" + assert field_descriptions["is_active"] == "Whether the user account is active" + + +@pytest.mark.skipif(not PYDANTIC_AVAILABLE, reason="Pydantic not available") +def test_pydantic_field_descriptions_without_field() -> None: + """Test that Pydantic models without field descriptions work correctly.""" + from pydantic import BaseModel + + class UserWithoutDescriptions(BaseModel): + """A user model without field descriptions.""" + + name: str + age: int + email: str + + # Test that the schema works without descriptions + encoded_schema = dump_engine_object(UserWithoutDescriptions) + + # Check that the schema contains fields but no descriptions + assert "fields" in encoded_schema["type"] + fields = encoded_schema["type"]["fields"] + + # Verify no descriptions are present + for field in fields: + assert "description" not in field or field["description"] is None + + +@pytest.mark.skipif(not PYDANTIC_AVAILABLE, reason="Pydantic not available") +def test_pydantic_mixed_descriptions() -> None: + """Test Pydantic model with some fields having descriptions and others not.""" + from pydantic import BaseModel, Field + + class MixedDescriptions(BaseModel): + """A model with mixed field descriptions.""" + + name: str = Field(description="The name field") + age: int # No description + email: str = Field(description="The email field") + active: bool # No description + + # Test that only fields with descriptions have them in the schema + encoded_schema = dump_engine_object(MixedDescriptions) + + assert "fields" in encoded_schema["type"] + fields = encoded_schema["type"]["fields"] + + # Find fields by name and check descriptions + field_descriptions = {field["name"]: field.get("description") for field in fields} + + assert field_descriptions["name"] == "The name field" + assert field_descriptions["age"] is None + assert field_descriptions["email"] == "The email field" + assert field_descriptions["active"] is None diff --git a/python/cocoindex/tests/test_convert.py b/python/cocoindex/tests/test_engine_value.py similarity index 93% rename from python/cocoindex/tests/test_convert.py rename to python/cocoindex/tests/test_engine_value.py index c9a3eb590..ec603c13c 100644 --- a/python/cocoindex/tests/test_convert.py +++ b/python/cocoindex/tests/test_engine_value.py @@ -19,8 +19,7 @@ PYDANTIC_AVAILABLE = False import cocoindex -from cocoindex.convert import ( - dump_engine_object, +from cocoindex.engine_value import ( make_engine_value_encoder, make_engine_value_decoder, ) @@ -1006,30 +1005,6 @@ def test_decode_error_non_nullable_or_non_list_vector() -> None: decoder("not a list") -def test_dump_vector_type_annotation_with_dim() -> None: - """Test dumping a vector type annotation with a specified dimension.""" - expected_dump = { - "type": { - "kind": "Vector", - "element_type": {"kind": "Float32"}, - "dimension": 3, - } - } - assert dump_engine_object(Float32VectorType) == expected_dump - - -def test_dump_vector_type_annotation_no_dim() -> None: - """Test dumping a vector type annotation with no dimension.""" - expected_dump_no_dim = { - "type": { - "kind": "Vector", - "element_type": {"kind": "Float64"}, - "dimension": None, - } - } - assert dump_engine_object(Float64VectorTypeNoDim) == expected_dump_no_dim - - def test_full_roundtrip_vector_numeric_types() -> None: """Test full roundtrip for numeric vector types using NDArray.""" value_f32 = np.array([1.0, 2.0, 3.0], dtype=np.float32) @@ -1715,86 +1690,3 @@ class MixedStruct: order = OrderPydantic(order_id="O1", name="item1", price=10.0) mixed = MixedStruct(name="test", pydantic_order=order) validate_full_roundtrip(mixed, MixedStruct) - - -@pytest.mark.skipif(not PYDANTIC_AVAILABLE, reason="Pydantic not available") -def test_pydantic_field_descriptions() -> None: - """Test that Pydantic field descriptions are extracted and included in schema.""" - from pydantic import BaseModel, Field - - class UserWithDescriptions(BaseModel): - """A user model with field descriptions.""" - - name: str = Field(description="The user's full name") - age: int = Field(description="The user's age in years", ge=0, le=150) - email: str = Field(description="The user's email address") - is_active: bool = Field( - description="Whether the user account is active", default=True - ) - - # Test that field descriptions are extracted - encoded_schema = dump_engine_object(UserWithDescriptions) - - # Check that the schema contains field descriptions - assert "fields" in encoded_schema["type"] - fields = encoded_schema["type"]["fields"] - - # Find fields by name and check descriptions - field_descriptions = {field["name"]: field.get("description") for field in fields} - - assert field_descriptions["name"] == "The user's full name" - assert field_descriptions["age"] == "The user's age in years" - assert field_descriptions["email"] == "The user's email address" - assert field_descriptions["is_active"] == "Whether the user account is active" - - -@pytest.mark.skipif(not PYDANTIC_AVAILABLE, reason="Pydantic not available") -def test_pydantic_field_descriptions_without_field() -> None: - """Test that Pydantic models without field descriptions work correctly.""" - from pydantic import BaseModel - - class UserWithoutDescriptions(BaseModel): - """A user model without field descriptions.""" - - name: str - age: int - email: str - - # Test that the schema works without descriptions - encoded_schema = dump_engine_object(UserWithoutDescriptions) - - # Check that the schema contains fields but no descriptions - assert "fields" in encoded_schema["type"] - fields = encoded_schema["type"]["fields"] - - # Verify no descriptions are present - for field in fields: - assert "description" not in field or field["description"] is None - - -@pytest.mark.skipif(not PYDANTIC_AVAILABLE, reason="Pydantic not available") -def test_pydantic_mixed_descriptions() -> None: - """Test Pydantic model with some fields having descriptions and others not.""" - from pydantic import BaseModel, Field - - class MixedDescriptions(BaseModel): - """A model with mixed field descriptions.""" - - name: str = Field(description="The name field") - age: int # No description - email: str = Field(description="The email field") - active: bool # No description - - # Test that only fields with descriptions have them in the schema - encoded_schema = dump_engine_object(MixedDescriptions) - - assert "fields" in encoded_schema["type"] - fields = encoded_schema["type"]["fields"] - - # Find fields by name and check descriptions - field_descriptions = {field["name"]: field.get("description") for field in fields} - - assert field_descriptions["name"] == "The name field" - assert field_descriptions["age"] is None - assert field_descriptions["email"] == "The email field" - assert field_descriptions["active"] is None