|
54 | 54 | from sqlalchemy.orm.instrumentation import is_instrumented |
55 | 55 | from sqlalchemy.sql.schema import MetaData |
56 | 56 | from sqlalchemy.sql.sqltypes import LargeBinary, Time, Uuid |
57 | | -from typing_extensions import Literal, TypeAlias, deprecated, get_origin |
| 57 | +from typing_extensions import Annotated, Literal, TypeAlias, deprecated, get_args, get_origin |
58 | 58 |
|
59 | 59 | from ._compat import ( # type: ignore[attr-defined] |
60 | 60 | IS_PYDANTIC_V2, |
@@ -562,7 +562,8 @@ def get_config(name: str) -> Any: |
562 | 562 | # If it was passed by kwargs, ensure it's also set in config |
563 | 563 | set_config_value(model=new_cls, parameter="table", value=config_table) |
564 | 564 | for k, v in get_model_fields(new_cls).items(): |
565 | | - col = get_column_from_field(v) |
| 565 | + original_annotation = new_cls.__annotations__.get(k) |
| 566 | + col = get_column_from_field(v, original_annotation) |
566 | 567 | setattr(new_cls, k, col) |
567 | 568 | # Set a config flag to tell FastAPI that this should be read with a field |
568 | 569 | # in orm_mode instead of preemptively converting it to a dict. |
@@ -646,12 +647,44 @@ def __init__( |
646 | 647 | ModelMetaclass.__init__(cls, classname, bases, dict_, **kw) |
647 | 648 |
|
648 | 649 |
|
649 | | -def get_sqlalchemy_type(field: Any) -> Any: |
| 650 | +def _get_sqlmodel_field_info_from_annotation(annotation: Any) -> Optional["FieldInfo"]: |
| 651 | + """Extract SQLModel FieldInfo from an Annotated type's metadata. |
| 652 | +
|
| 653 | + When using Annotated[type, Field(...), Validator(...)], Pydantic V2 may create |
| 654 | + a new pydantic.fields.FieldInfo that doesn't preserve SQLModel-specific attributes |
| 655 | + like sa_column and sa_type. This function looks through the Annotated metadata |
| 656 | + to find the original SQLModel FieldInfo. |
| 657 | + """ |
| 658 | + if get_origin(annotation) is not Annotated: |
| 659 | + return None |
| 660 | + for arg in get_args(annotation)[1:]: # Skip the first arg (the actual type) |
| 661 | + if isinstance(arg, FieldInfo): |
| 662 | + return arg |
| 663 | + return None |
| 664 | + |
| 665 | + |
| 666 | +def get_sqlalchemy_type(field: Any, original_annotation: Any = None) -> Any: |
650 | 667 | if IS_PYDANTIC_V2: |
651 | 668 | field_info = field |
652 | 669 | else: |
653 | 670 | field_info = field.field_info |
654 | 671 | sa_type = getattr(field_info, "sa_type", Undefined) # noqa: B009 |
| 672 | + # If sa_type not found on field_info, check if it's in the Annotated metadata |
| 673 | + # This handles the case where Pydantic V2 creates a new FieldInfo losing SQLModel attrs |
| 674 | + if sa_type is Undefined and IS_PYDANTIC_V2: |
| 675 | + # First try field_info.annotation (may be unpacked by Pydantic) |
| 676 | + annotation = getattr(field_info, "annotation", None) |
| 677 | + if annotation is not None: |
| 678 | + sqlmodel_field_info = _get_sqlmodel_field_info_from_annotation(annotation) |
| 679 | + if sqlmodel_field_info is not None: |
| 680 | + sa_type = getattr(sqlmodel_field_info, "sa_type", Undefined) |
| 681 | + # If still not found, try the original annotation from the class |
| 682 | + if sa_type is Undefined and original_annotation is not None: |
| 683 | + sqlmodel_field_info = _get_sqlmodel_field_info_from_annotation( |
| 684 | + original_annotation |
| 685 | + ) |
| 686 | + if sqlmodel_field_info is not None: |
| 687 | + sa_type = getattr(sqlmodel_field_info, "sa_type", Undefined) |
655 | 688 | if sa_type is not Undefined: |
656 | 689 | return sa_type |
657 | 690 |
|
@@ -703,15 +736,33 @@ def get_sqlalchemy_type(field: Any) -> Any: |
703 | 736 | raise ValueError(f"{type_} has no matching SQLAlchemy type") |
704 | 737 |
|
705 | 738 |
|
706 | | -def get_column_from_field(field: Any) -> Column: # type: ignore |
| 739 | +def get_column_from_field( |
| 740 | + field: Any, original_annotation: Any = None |
| 741 | +) -> Column: # type: ignore |
707 | 742 | if IS_PYDANTIC_V2: |
708 | 743 | field_info = field |
709 | 744 | else: |
710 | 745 | field_info = field.field_info |
711 | 746 | sa_column = getattr(field_info, "sa_column", Undefined) |
| 747 | + # If sa_column not found on field_info, check if it's in the Annotated metadata |
| 748 | + # This handles the case where Pydantic V2 creates a new FieldInfo losing SQLModel attrs |
| 749 | + if sa_column is Undefined and IS_PYDANTIC_V2: |
| 750 | + # First try field_info.annotation (may be unpacked by Pydantic) |
| 751 | + annotation = getattr(field_info, "annotation", None) |
| 752 | + if annotation is not None: |
| 753 | + sqlmodel_field_info = _get_sqlmodel_field_info_from_annotation(annotation) |
| 754 | + if sqlmodel_field_info is not None: |
| 755 | + sa_column = getattr(sqlmodel_field_info, "sa_column", Undefined) |
| 756 | + # If still not found, try the original annotation from the class |
| 757 | + if sa_column is Undefined and original_annotation is not None: |
| 758 | + sqlmodel_field_info = _get_sqlmodel_field_info_from_annotation( |
| 759 | + original_annotation |
| 760 | + ) |
| 761 | + if sqlmodel_field_info is not None: |
| 762 | + sa_column = getattr(sqlmodel_field_info, "sa_column", Undefined) |
712 | 763 | if isinstance(sa_column, Column): |
713 | 764 | return sa_column |
714 | | - sa_type = get_sqlalchemy_type(field) |
| 765 | + sa_type = get_sqlalchemy_type(field, original_annotation) |
715 | 766 | primary_key = getattr(field_info, "primary_key", Undefined) |
716 | 767 | if primary_key is Undefined: |
717 | 768 | primary_key = False |
|
0 commit comments