|
28 | 28 | is_struct_type, |
29 | 29 | ) |
30 | 30 |
|
| 31 | +_CONVERTIBLE_KINDS = { |
| 32 | + ("Float32", "Float64"), |
| 33 | + ("LocalDateTime", "OffsetDateTime"), |
| 34 | +} |
31 | 35 |
|
32 | | -def encode_engine_value( |
33 | | - value: Any, in_struct: bool = False, type_hint: Type[Any] | str | None = None |
| 36 | + |
| 37 | +def _is_type_kind_convertible_to(src_type_kind: str, dst_type_kind: str) -> bool: |
| 38 | + return ( |
| 39 | + src_type_kind == dst_type_kind |
| 40 | + or (src_type_kind, dst_type_kind) in _CONVERTIBLE_KINDS |
| 41 | + ) |
| 42 | + |
| 43 | + |
| 44 | +def _encode_engine_value_core( |
| 45 | + value: Any, |
| 46 | + in_struct: bool = False, |
| 47 | + type_hint: Type[Any] | str | None = None, |
| 48 | + type_variant: AnalyzedTypeInfo | None = None, |
34 | 49 | ) -> Any: |
35 | | - """Encode a Python value to an engine value.""" |
| 50 | + """Core encoding logic for converting Python values to engine values.""" |
| 51 | + |
36 | 52 | if dataclasses.is_dataclass(value): |
| 53 | + fields = dataclasses.fields(value) |
37 | 54 | return [ |
38 | 55 | encode_engine_value( |
39 | | - getattr(value, f.name), in_struct=True, type_hint=f.type |
| 56 | + getattr(value, f.name), |
| 57 | + in_struct=True, |
| 58 | + type_hint=f.type, |
40 | 59 | ) |
41 | | - for f in dataclasses.fields(value) |
| 60 | + for f in fields |
42 | 61 | ] |
| 62 | + |
43 | 63 | if is_namedtuple_type(type(value)): |
44 | 64 | annotations = type(value).__annotations__ |
45 | 65 | return [ |
46 | 66 | encode_engine_value( |
47 | | - getattr(value, name), in_struct=True, type_hint=annotations.get(name) |
| 67 | + getattr(value, name), |
| 68 | + in_struct=True, |
| 69 | + type_hint=annotations.get(name), |
48 | 70 | ) |
49 | 71 | for name in value._fields |
50 | 72 | ] |
| 73 | + |
51 | 74 | if isinstance(value, np.number): |
52 | 75 | return value.item() |
| 76 | + |
53 | 77 | if isinstance(value, np.ndarray): |
54 | 78 | return value |
| 79 | + |
55 | 80 | if isinstance(value, (list, tuple)): |
56 | | - return [encode_engine_value(v, in_struct) for v in value] |
| 81 | + if ( |
| 82 | + type_variant |
| 83 | + and isinstance(type_variant.variant, AnalyzedListType) |
| 84 | + and type_variant.variant.elem_type |
| 85 | + ): |
| 86 | + elem_encoder = make_engine_value_encoder(type_variant.variant.elem_type) |
| 87 | + return [elem_encoder(v) for v in value] |
| 88 | + else: |
| 89 | + return [encode_engine_value(v, in_struct) for v in value] |
| 90 | + |
57 | 91 | if isinstance(value, dict): |
| 92 | + # Determine if this is a JSON type |
58 | 93 | is_json_type = False |
59 | | - if type_hint: |
60 | | - type_info = analyze_type_info(type_hint) |
| 94 | + if type_variant and isinstance(type_variant.variant, AnalyzedBasicType): |
| 95 | + is_json_type = type_variant.variant.kind == "Json" |
| 96 | + elif type_hint: |
| 97 | + hint_type_info = analyze_type_info(type_hint) |
61 | 98 | is_json_type = ( |
62 | | - isinstance(type_info.variant, AnalyzedBasicType) |
63 | | - and type_info.variant.kind == "Json" |
| 99 | + isinstance(hint_type_info.variant, AnalyzedBasicType) |
| 100 | + and hint_type_info.variant.kind == "Json" |
64 | 101 | ) |
65 | 102 |
|
66 | | - # For empty dicts, check type hints if in a struct context |
67 | | - # when no contexts are provided, return an empty dict as default |
68 | | - # TODO: always pass in the type annotation to make this robust |
| 103 | + # Handle empty dict |
69 | 104 | if not value: |
70 | 105 | if in_struct: |
71 | 106 | return value if is_json_type else [] |
72 | | - return {} |
| 107 | + return {} if is_json_type else value |
73 | 108 |
|
| 109 | + # Handle KTable |
74 | 110 | first_val = next(iter(value.values())) |
75 | | - if is_struct_type(type(first_val)): # KTable |
| 111 | + if is_struct_type(type(first_val)): |
76 | 112 | return [ |
77 | 113 | [encode_engine_value(k, in_struct)] + encode_engine_value(v, in_struct) |
78 | 114 | for k, v in value.items() |
79 | 115 | ] |
| 116 | + |
| 117 | + # Handle regular dict |
| 118 | + if ( |
| 119 | + type_variant |
| 120 | + and isinstance(type_variant.variant, AnalyzedDictType) |
| 121 | + and type_variant.variant.value_type |
| 122 | + ): |
| 123 | + value_encoder = make_engine_value_encoder(type_variant.variant.value_type) |
| 124 | + return {k: value_encoder(v) for k, v in value.items()} |
| 125 | + |
80 | 126 | return value |
81 | 127 |
|
82 | 128 |
|
83 | | -_CONVERTIBLE_KINDS = { |
84 | | - ("Float32", "Float64"), |
85 | | - ("LocalDateTime", "OffsetDateTime"), |
86 | | -} |
| 129 | +def make_engine_value_encoder(type_annotation: Any) -> Callable[[Any], Any]: |
| 130 | + """ |
| 131 | + Make an encoder from a Python value to an engine value. |
87 | 132 |
|
| 133 | + Args: |
| 134 | + type_annotation: The type annotation of the Python value. |
88 | 135 |
|
89 | | -def _is_type_kind_convertible_to(src_type_kind: str, dst_type_kind: str) -> bool: |
90 | | - return ( |
91 | | - src_type_kind == dst_type_kind |
92 | | - or (src_type_kind, dst_type_kind) in _CONVERTIBLE_KINDS |
93 | | - ) |
| 136 | + Returns: |
| 137 | + An encoder from a Python value to an engine value. |
| 138 | + """ |
| 139 | + type_info = analyze_type_info(type_annotation) |
| 140 | + |
| 141 | + if isinstance(type_info.variant, AnalyzedUnknownType): |
| 142 | + raise ValueError(f"Type annotation `{type_info.core_type}` is unsupported") |
| 143 | + |
| 144 | + def encode_value(value: Any, in_struct: bool = False) -> Any: |
| 145 | + return _encode_engine_value_core( |
| 146 | + value, in_struct=in_struct, type_variant=type_info |
| 147 | + ) |
| 148 | + |
| 149 | + return lambda value: encode_value(value, in_struct=False) |
| 150 | + |
| 151 | + |
| 152 | +def encode_engine_value( |
| 153 | + value: Any, in_struct: bool = False, type_hint: Type[Any] | str | None = None |
| 154 | +) -> Any: |
| 155 | + """ |
| 156 | + Encode a Python value to an engine value. |
| 157 | +
|
| 158 | + Args: |
| 159 | + value: The Python value to encode |
| 160 | + in_struct: Whether this value is being encoded within a struct context |
| 161 | + type_hint: Type annotation for the value. When provided, enables optimized |
| 162 | + type-aware encoding. For top-level calls, this should always be provided. |
| 163 | +
|
| 164 | + Returns: |
| 165 | + The encoded engine value |
| 166 | + """ |
| 167 | + if type_hint is not None: |
| 168 | + encoder = make_engine_value_encoder(type_hint) |
| 169 | + return encoder(value) |
| 170 | + |
| 171 | + return _encode_engine_value_core(value, in_struct=in_struct) |
94 | 172 |
|
95 | 173 |
|
96 | 174 | def make_engine_value_decoder( |
|
0 commit comments