| 
 | 1 | +from collections import defaultdict  | 
 | 2 | +from typing import Any, Dict, Optional, Sequence, Tuple, Type  | 
 | 3 | + | 
 | 4 | +from django.apps import apps  | 
 | 5 | +from django.db import models as django_models  | 
 | 6 | +from django.db.models.options import Options  | 
 | 7 | +from django.utils.translation import gettext_lazy as _  | 
 | 8 | + | 
 | 9 | +from ..remote import get_remote_object_class, get_local_resource_prefix  | 
 | 10 | + | 
 | 11 | + | 
 | 12 | +class DABContentTypeManager(django_models.Manager["DABContentType"]):  | 
 | 13 | +    """Manager storing DABContentType objects in a local cache like original ContentType.  | 
 | 14 | +
  | 
 | 15 | +    The major structural difference is that the cache keys have to add the service reference.  | 
 | 16 | +    """  | 
 | 17 | + | 
 | 18 | +    use_in_migrations = True  | 
 | 19 | + | 
 | 20 | +    def __init__(self, *args: Any, **kwargs: Any) -> None:  | 
 | 21 | +        super().__init__(*args, **kwargs)  | 
 | 22 | +        self._cache: Dict[str, Dict[Tuple[str, str, str] | int, "DABContentType"]] = {}  | 
 | 23 | + | 
 | 24 | +    def clear_cache(self) -> None:  | 
 | 25 | +        self._cache.clear()  | 
 | 26 | + | 
 | 27 | +    def create(self, *args: Any, **kwargs: Any) -> "DABContentType":  | 
 | 28 | +        obj = super().create(*args, **kwargs)  | 
 | 29 | +        self._add_to_cache(self.db, obj)  | 
 | 30 | +        return obj  | 
 | 31 | + | 
 | 32 | +    def _add_to_cache(self, using: str, ct: "DABContentType") -> None:  | 
 | 33 | +        """Store ``ct`` in the manager cache for the given database alias."""  | 
 | 34 | +        key = (ct.service, ct.app_label, ct.model)  | 
 | 35 | +        self._cache.setdefault(using, {})[key] = ct  | 
 | 36 | +        self._cache.setdefault(using, {})[ct.id] = ct  | 
 | 37 | + | 
 | 38 | +    def _get_from_cache(self, opts: Options, service: str) -> "DABContentType":  | 
 | 39 | +        """Return a cached ``DABContentType`` for ``opts`` and ``service``."""  | 
 | 40 | +        key = (service, opts.app_label, opts.model_name)  | 
 | 41 | +        return self._cache[self.db][key]  | 
 | 42 | + | 
 | 43 | +    def _get_opts(self, model: Type[django_models.Model], for_concrete_model: bool) -> Options:  | 
 | 44 | +        """Return the ``Options`` object for ``model``."""  | 
 | 45 | +        return model._meta.concrete_model._meta if for_concrete_model else model._meta  | 
 | 46 | + | 
 | 47 | +    def get_for_model(  | 
 | 48 | +        self,  | 
 | 49 | +        model: Type[django_models.Model],  | 
 | 50 | +        for_concrete_model: bool = True,  | 
 | 51 | +        service: Optional[str] = None,  | 
 | 52 | +    ) -> "DABContentType":  | 
 | 53 | +        if service is None:  | 
 | 54 | +            service = get_local_resource_prefix()  | 
 | 55 | +        opts = self._get_opts(model, for_concrete_model)  | 
 | 56 | +        try:  | 
 | 57 | +            return self._get_from_cache(opts, service)  | 
 | 58 | +        except KeyError:  | 
 | 59 | +            pass  | 
 | 60 | + | 
 | 61 | +        try:  | 
 | 62 | +            ct = self.get(service=service, app_label=opts.app_label, model=opts.model_name)  | 
 | 63 | +        except self.model.DoesNotExist:  | 
 | 64 | +            ct, _ = self.get_or_create(  | 
 | 65 | +                service=service,  | 
 | 66 | +                app_label=opts.app_label,  | 
 | 67 | +                model=opts.model_name,  | 
 | 68 | +            )  | 
 | 69 | +        self._add_to_cache(self.db, ct)  | 
 | 70 | +        return ct  | 
 | 71 | + | 
 | 72 | +    def get_for_models(  | 
 | 73 | +        self,  | 
 | 74 | +        *model_list: Type[django_models.Model],  | 
 | 75 | +        for_concrete_models: bool = True,  | 
 | 76 | +        service: Optional[str] = None,  | 
 | 77 | +    ) -> Dict[Type[django_models.Model], "DABContentType"]:  | 
 | 78 | +        """Return ``DABContentType`` objects for each model in ``model_list``."""  | 
 | 79 | +        if service is None:  | 
 | 80 | +            service = get_local_resource_prefix()  | 
 | 81 | +        results: Dict[Type[django_models.Model], "DABContentType"] = {}  | 
 | 82 | +        needed_models: Dict[str, set[str]] = defaultdict(set)  | 
 | 83 | +        needed_opts: Dict[Tuple[str, str], list[Type[django_models.Model]]] = defaultdict(list)  | 
 | 84 | +        for model in model_list:  | 
 | 85 | +            opts = self._get_opts(model, for_concrete_models)  | 
 | 86 | +            try:  | 
 | 87 | +                ct = self._get_from_cache(opts, service)  | 
 | 88 | +            except KeyError:  | 
 | 89 | +                needed_models[opts.app_label].add(opts.model_name)  | 
 | 90 | +                needed_opts[(opts.app_label, opts.model_name)].append(model)  | 
 | 91 | +            else:  | 
 | 92 | +                results[model] = ct  | 
 | 93 | + | 
 | 94 | +        if needed_opts:  | 
 | 95 | +            condition = django_models.Q(  | 
 | 96 | +                *(  | 
 | 97 | +                    django_models.Q(  | 
 | 98 | +                        ("service", service),  | 
 | 99 | +                        ("app_label", app_label),  | 
 | 100 | +                        ("model__in", models),  | 
 | 101 | +                    )  | 
 | 102 | +                    for app_label, models in needed_models.items()  | 
 | 103 | +                ),  | 
 | 104 | +                _connector=django_models.Q.OR,  | 
 | 105 | +            )  | 
 | 106 | +            cts = self.filter(condition)  | 
 | 107 | +            for ct in cts:  | 
 | 108 | +                opts_models = needed_opts.pop((ct.app_label, ct.model), [])  | 
 | 109 | +                for model in opts_models:  | 
 | 110 | +                    results[model] = ct  | 
 | 111 | +                self._add_to_cache(self.db, ct)  | 
 | 112 | +            for (app_label, model_name), opts_models in needed_opts.items():  | 
 | 113 | +                ct = self.create(service=service, app_label=app_label, model=model_name)  | 
 | 114 | +                self._add_to_cache(self.db, ct)  | 
 | 115 | +                for model in opts_models:  | 
 | 116 | +                    results[model] = ct  | 
 | 117 | +        return results  | 
 | 118 | + | 
 | 119 | +    def get_by_natural_key(self, *args: str) -> "DABContentType":  | 
 | 120 | +        """Return the content type identified by its natural key."""  | 
 | 121 | +        if len(args) == 2:  | 
 | 122 | +            service = get_local_resource_prefix()  | 
 | 123 | +            app_label, model = args  | 
 | 124 | +        else:  | 
 | 125 | +            service, app_label, model = args  | 
 | 126 | +        key = (service, app_label, model)  | 
 | 127 | +        try:  | 
 | 128 | +            return self._cache[self.db][key]  | 
 | 129 | +        except KeyError:  | 
 | 130 | +            ct = self.get(service=service, app_label=app_label, model=model)  | 
 | 131 | +            self._add_to_cache(self.db, ct)  | 
 | 132 | +            return ct  | 
 | 133 | + | 
 | 134 | +    def get_for_id(self, id: int) -> "DABContentType":  | 
 | 135 | +        """Return the content type with primary key ``id`` from the cache."""  | 
 | 136 | +        try:  | 
 | 137 | +            return self._cache[self.db][id]  | 
 | 138 | +        except KeyError:  | 
 | 139 | +            ct = self.get(pk=id)  | 
 | 140 | +            self._add_to_cache(self.db, ct)  | 
 | 141 | +            return ct  | 
 | 142 | + | 
 | 143 | + | 
 | 144 | +class DABContentType(django_models.Model):  | 
 | 145 | +    """Like Django ContentType model but scoped by service."""  | 
 | 146 | + | 
 | 147 | +    service = django_models.CharField(  | 
 | 148 | +        max_length=100,  | 
 | 149 | +        default=get_local_resource_prefix,  | 
 | 150 | +        help_text=_("service namespace to track what service this type is for. Can have a value of shared, which indicates it is synchronized."),  | 
 | 151 | +    )  | 
 | 152 | +    app_label = django_models.CharField(  | 
 | 153 | +        max_length=100,  | 
 | 154 | +        help_text=_("Django app that the model is in. This is an internal technical detail that does not affect API use."),  | 
 | 155 | +    )  | 
 | 156 | +    model = django_models.CharField(  | 
 | 157 | +        max_length=100,  | 
 | 158 | +        help_text=_("Name of the type according to the Django ORM Meta model_name convention. Comes from the python class, but lowercase with no spaces."),  | 
 | 159 | +    )  | 
 | 160 | + | 
 | 161 | +    objects = DABContentTypeManager()  | 
 | 162 | + | 
 | 163 | +    class Meta:  | 
 | 164 | +        unique_together = [  | 
 | 165 | +            ("service", "app_label", "model"),  | 
 | 166 | +        ]  | 
 | 167 | + | 
 | 168 | +    def __str__(self) -> str:  | 
 | 169 | +        return self.app_labeled_name  | 
 | 170 | + | 
 | 171 | +    @property  | 
 | 172 | +    def name(self) -> str:  | 
 | 173 | +        model = self.model_class()  | 
 | 174 | +        if not model:  | 
 | 175 | +            return self.model  | 
 | 176 | +        return str(model._meta.verbose_name)  | 
 | 177 | + | 
 | 178 | +    @property  | 
 | 179 | +    def app_labeled_name(self) -> str:  | 
 | 180 | +        model = self.model_class()  | 
 | 181 | +        if not model:  | 
 | 182 | +            return self.model  | 
 | 183 | +        return f"{model._meta.app_config.verbose_name} | {model._meta.verbose_name}"  | 
 | 184 | + | 
 | 185 | +    def model_class(self) -> Optional[Type[django_models.Model]]:  | 
 | 186 | +        """Return the model class if available for the current service."""  | 
 | 187 | +        if self.service not in ("shared", get_local_resource_prefix()):  | 
 | 188 | +            return None  | 
 | 189 | +        try:  | 
 | 190 | +            return apps.get_model(self.app_label, self.model)  | 
 | 191 | +        except LookupError:  | 
 | 192 | +            return None  | 
 | 193 | + | 
 | 194 | +    def get_object_for_this_type(self, **kwargs: Any) -> django_models.Model:  | 
 | 195 | +        """Return the object referenced by this content type."""  | 
 | 196 | +        model = self.model_class()  | 
 | 197 | +        if model is None:  | 
 | 198 | +            object_id = kwargs.get("pk") or kwargs.get("id") or kwargs.get("pk__exact") or kwargs.get("id__exact")  | 
 | 199 | +            if object_id is None:  | 
 | 200 | +                raise LookupError("Model not available in this service")  | 
 | 201 | +            return get_remote_object_class()(self, object_id)  | 
 | 202 | +        return model._base_manager.get(**kwargs)  | 
 | 203 | + | 
 | 204 | +    def get_all_objects_for_this_type(self, **kwargs: Any) -> django_models.QuerySet | Sequence[django_models.Model]:  | 
 | 205 | +        """Return all objects referenced by this content type."""  | 
 | 206 | +        model = self.model_class()  | 
 | 207 | +        if model is None:  | 
 | 208 | +            ids = kwargs.get("pk__in") or kwargs.get("id__in") or (kwargs.get("pk") and [kwargs["pk"]]) or (kwargs.get("id") and [kwargs["id"]])  | 
 | 209 | +            if not ids:  | 
 | 210 | +                return []  | 
 | 211 | +            return [get_remote_object_class()(self, obj_id) for obj_id in ids]  | 
 | 212 | +        return list(model._base_manager.filter(**kwargs))  | 
 | 213 | + | 
 | 214 | +    def natural_key(self) -> Tuple[str, str, str]:  | 
 | 215 | +        return (self.service, self.app_label, self.model)  | 
0 commit comments