|
72 | 72 |
|
73 | 73 | NamesType = List[str] |
74 | 74 |
|
| 75 | +RUNTIME_TYPE_KEY = "_runtime_type" |
| 76 | + |
75 | 77 |
|
76 | 78 | class TypeNotSupportedError(TypeError): |
77 | 79 | """Error raised when a Avro schema cannot be generated for a given Python type""" |
@@ -137,6 +139,9 @@ class Option(enum.Flag): |
137 | 139 | # the two cases. |
138 | 140 | MARK_NON_TOTAL_TYPED_DICTS = enum.auto() |
139 | 141 |
|
| 142 | + #: Adds a _runtime_type field to the record schemas that contains the name of the class |
| 143 | + ADD_RUNTIME_TYPE_FIELD = enum.auto() |
| 144 | + |
140 | 145 |
|
141 | 146 | JSON_OPTIONS = [opt for opt in Option if opt.name and opt.name.startswith("JSON_")] |
142 | 147 |
|
@@ -1105,6 +1110,13 @@ def _record_field(self, py_field: dataclasses.Field) -> RecordField: |
1105 | 1110 |
|
1106 | 1111 | return field_obj |
1107 | 1112 |
|
| 1113 | + def data_before_deduplication(self, names: NamesType) -> JSONObj: |
| 1114 | + """Return the schema data""" |
| 1115 | + data = super().data_before_deduplication(names) |
| 1116 | + if Option.ADD_RUNTIME_TYPE_FIELD in self.options: |
| 1117 | + data["fields"].append({"name": RUNTIME_TYPE_KEY, "type": ["null", "string"]}) |
| 1118 | + return data |
| 1119 | + |
1108 | 1120 |
|
1109 | 1121 | @register_schema |
1110 | 1122 | class PydanticSchema(RecordSchema): |
@@ -1239,6 +1251,13 @@ def _record_field(self, py_field: tuple[str, Type]) -> RecordField: |
1239 | 1251 | ) |
1240 | 1252 | return field_obj |
1241 | 1253 |
|
| 1254 | + def data_before_deduplication(self, names: NamesType) -> JSONObj: |
| 1255 | + """Return the schema data""" |
| 1256 | + data = super().data_before_deduplication(names) |
| 1257 | + if Option.ADD_RUNTIME_TYPE_FIELD in self.options: |
| 1258 | + data["fields"].append({"name": RUNTIME_TYPE_KEY, "type": ["null", "string"]}) |
| 1259 | + return data |
| 1260 | + |
1242 | 1261 |
|
1243 | 1262 | @register_schema |
1244 | 1263 | class TypedDictSchema(RecordSchema): |
|
0 commit comments