|
1 | 1 | import logging |
2 | | -from abc import abstractmethod |
3 | 2 | from typing import Any, Dict, List, Optional, Tuple, Type |
4 | 3 |
|
5 | 4 | from django.db import models, transaction |
6 | | -from django.db.models.query import QuerySet |
7 | 5 | from ninja_extra.shortcuts import get_object_or_none |
8 | 6 |
|
9 | 7 | from easy.controller.meta_conf import ModelMetaConfig |
| 8 | +from easy.domain.meta import CrudModel |
10 | 9 | from easy.exception import BaseAPIException |
11 | 10 |
|
12 | 11 | logger = logging.getLogger(__name__) |
13 | 12 |
|
14 | 13 |
|
15 | | -class CrudModel(object): |
16 | | - Meta: Dict = {} |
17 | | - |
18 | | - def __init__(self, model: Any): |
19 | | - self.model = model |
20 | | - |
21 | | - @abstractmethod |
22 | | - def crud_add_obj(self, **payload: Dict) -> Any: |
23 | | - raise NotImplementedError |
24 | | - |
25 | | - @abstractmethod |
26 | | - def crud_del_obj(self, pk: int) -> bool: |
27 | | - raise NotImplementedError |
28 | | - |
29 | | - @abstractmethod |
30 | | - def crud_update_obj(self, pk: int, payload: Dict) -> bool: |
31 | | - raise NotImplementedError |
32 | | - |
33 | | - @abstractmethod |
34 | | - def crud_get_obj(self, pk: int) -> Any: |
35 | | - raise NotImplementedError |
36 | | - |
37 | | - @abstractmethod |
38 | | - def crud_get_objs_all(self, maximum: Optional[int] = None, **filters: Any) -> Any: |
39 | | - raise NotImplementedError |
40 | | - |
41 | | - @abstractmethod |
42 | | - def crud_filter(self, **kwargs: Any) -> QuerySet: |
43 | | - raise NotImplementedError |
44 | | - |
45 | | - @abstractmethod |
46 | | - def crud_filter_exclude(self, **kwargs: Any) -> QuerySet: |
47 | | - raise NotImplementedError |
48 | | - |
49 | | - |
50 | 14 | class DjangoOrmModel(CrudModel): |
51 | 15 | def __init__(self, model: Type[models.Model]): |
52 | 16 | self.model = model |
@@ -164,3 +128,128 @@ def crud_filter(self, **kwargs: Any) -> Any: |
164 | 128 |
|
165 | 129 | def crud_filter_exclude(self, **kwargs: Any) -> Any: |
166 | 130 | return self.model.objects.all().exclude(**kwargs) |
| 131 | + |
| 132 | + |
| 133 | +class DjangoSerializer(ModelMetaConfig): |
| 134 | + @staticmethod |
| 135 | + def is_model_instance(data: Any) -> bool: |
| 136 | + return isinstance(data, models.Model) |
| 137 | + |
| 138 | + @staticmethod |
| 139 | + def is_queryset(data: Any) -> bool: |
| 140 | + return isinstance(data, models.query.QuerySet) |
| 141 | + |
| 142 | + @staticmethod |
| 143 | + def is_one_relationship(data: Any) -> bool: |
| 144 | + return isinstance(data, models.ForeignKey) or isinstance( |
| 145 | + data, models.OneToOneRel |
| 146 | + ) |
| 147 | + |
| 148 | + @staticmethod |
| 149 | + def is_many_relationship(data: Any) -> bool: |
| 150 | + return ( |
| 151 | + isinstance(data, models.ManyToManyRel) |
| 152 | + or isinstance(data, models.ManyToManyField) |
| 153 | + or isinstance(data, models.ManyToOneRel) |
| 154 | + ) |
| 155 | + |
| 156 | + @staticmethod |
| 157 | + def is_paginated(data: Any) -> bool: |
| 158 | + return isinstance(data, dict) and isinstance( |
| 159 | + data.get("items", None), models.query.QuerySet |
| 160 | + ) |
| 161 | + |
| 162 | + def serialize_model_instance( |
| 163 | + self, obj: models.Model, referrers: Any = tuple() |
| 164 | + ) -> Dict[Any, Any]: |
| 165 | + """Serializes Django model instance to dictionary""" |
| 166 | + out = {} |
| 167 | + for field in obj._meta.get_fields(): |
| 168 | + if self.show_field(obj, field.name): |
| 169 | + if self.is_one_relationship(field): |
| 170 | + out.update( |
| 171 | + self.serialize_foreign_key(obj, field, referrers + (obj,)) |
| 172 | + ) |
| 173 | + |
| 174 | + elif self.is_many_relationship(field): |
| 175 | + out.update( |
| 176 | + self.serialize_many_relationship(obj, referrers + (obj,)) |
| 177 | + ) |
| 178 | + |
| 179 | + else: |
| 180 | + out.update(self.serialize_value_field(obj, field)) |
| 181 | + return out |
| 182 | + |
| 183 | + def serialize_queryset( |
| 184 | + self, data: models.query.QuerySet, referrers: Tuple[Any, ...] = tuple() |
| 185 | + ) -> List[Dict[Any, Any]]: |
| 186 | + """Serializes Django Queryset to dictionary""" |
| 187 | + return [self.serialize_model_instance(obj, referrers) for obj in data] |
| 188 | + |
| 189 | + def serialize_foreign_key( |
| 190 | + self, obj: models.Model, field: Any, referrers: Any = tuple() |
| 191 | + ) -> Dict[Any, Any]: |
| 192 | + """Serializes foreign key field of Django model instance""" |
| 193 | + try: |
| 194 | + if not hasattr(obj, field.name): |
| 195 | + return {field.name: None} # pragma: no cover |
| 196 | + related_instance = getattr(obj, field.name) |
| 197 | + if related_instance is None: |
| 198 | + return {field.name: None} |
| 199 | + if related_instance in referrers: |
| 200 | + return {} # pragma: no cover |
| 201 | + field_value = getattr(related_instance, "pk") |
| 202 | + except Exception as exc: # pragma: no cover |
| 203 | + logger.error(f"serialize_foreign_key error - {obj}", exc_info=exc) |
| 204 | + return {field.name: None} |
| 205 | + |
| 206 | + if self.get_model_recursive(obj): |
| 207 | + return { |
| 208 | + field.name: self.serialize_model_instance(related_instance, referrers) |
| 209 | + } |
| 210 | + return {field.name: field_value} |
| 211 | + |
| 212 | + def serialize_many_relationship( |
| 213 | + self, obj: models.Model, referrers: Any = tuple() |
| 214 | + ) -> Dict[Any, Any]: |
| 215 | + """ |
| 216 | + Serializes many relationship (ManyToMany, ManyToOne) of Django model instance |
| 217 | + """ |
| 218 | + if not hasattr(obj, "_prefetched_objects_cache"): |
| 219 | + return {} |
| 220 | + out = {} |
| 221 | + try: |
| 222 | + for k, v in obj._prefetched_objects_cache.items(): # type: ignore |
| 223 | + field_name = k if hasattr(obj, k) else k + "_set" |
| 224 | + if v: |
| 225 | + if self.get_model_join(obj): |
| 226 | + out[field_name] = self.serialize_queryset(v, referrers + (obj,)) |
| 227 | + else: |
| 228 | + out[field_name] = [o.pk for o in v] |
| 229 | + else: |
| 230 | + out[field_name] = [] |
| 231 | + except Exception as exc: # pragma: no cover |
| 232 | + logger.error(f"serialize_many_relationship error - {obj}", exc_info=exc) |
| 233 | + return out |
| 234 | + |
| 235 | + def serialize_value_field(self, obj: models.Model, field: Any) -> Dict[Any, Any]: |
| 236 | + """ |
| 237 | + Serializes regular 'jsonable' field (Char, Int, etc.) of Django model instance |
| 238 | + """ |
| 239 | + return {field.name: getattr(obj, field.name)} |
| 240 | + |
| 241 | + def serialize_data(self, data: Any) -> Any: |
| 242 | + out = data |
| 243 | + # Queryset |
| 244 | + if self.is_queryset(data): |
| 245 | + out = self.serialize_queryset(data) |
| 246 | + # Model |
| 247 | + elif self.is_model_instance(data): |
| 248 | + out = self.serialize_model_instance(data) |
| 249 | + # Add limit_off pagination support |
| 250 | + elif self.is_paginated(data): |
| 251 | + out = self.serialize_queryset(data.get("items")) |
| 252 | + return out |
| 253 | + |
| 254 | + |
| 255 | +django_serializer = DjangoSerializer() |
0 commit comments