|
| 1 | +"""Lazy loading proxy for ORM models. |
| 2 | +
|
| 3 | +Enables deferred loading of ORM models when the ORM is not initialized |
| 4 | +during task deserialization. The actual model is fetched when accessed. |
| 5 | +
|
| 6 | +This allows users to pass ORM model instances as task parameters without |
| 7 | +requiring ORM initialization before worker startup. Models are automatically |
| 8 | +fetched when tasks execute. |
| 9 | +""" |
| 10 | + |
| 11 | +from __future__ import annotations |
| 12 | + |
| 13 | +import asyncio |
| 14 | +from typing import Any |
| 15 | + |
| 16 | + |
| 17 | +class LazyOrmProxy: |
| 18 | + """Proxy that defers ORM model loading until first access. |
| 19 | +
|
| 20 | + This allows task parameters to reference ORM models even when the ORM |
| 21 | + is not initialized during deserialization. The actual model is fetched |
| 22 | + when the task accesses the parameter. |
| 23 | +
|
| 24 | + Attributes: |
| 25 | + _model_class: The ORM model class to fetch |
| 26 | + _pk: The primary key value |
| 27 | + _fetch_callback: Async callback to fetch the model |
| 28 | + _resolved: Cached resolved model instance |
| 29 | + """ |
| 30 | + |
| 31 | + __slots__ = ("_model_class", "_pk", "_fetch_callback", "_resolved") |
| 32 | + |
| 33 | + def __init__( |
| 34 | + self, |
| 35 | + model_class: type, |
| 36 | + pk: Any, |
| 37 | + fetch_callback: Any, # Callable awaitable |
| 38 | + ) -> None: |
| 39 | + """Initialize lazy proxy. |
| 40 | +
|
| 41 | + Args: |
| 42 | + model_class: The ORM model class |
| 43 | + pk: The primary key value |
| 44 | + fetch_callback: Async callable that fetches the model |
| 45 | + """ |
| 46 | + object.__setattr__(self, "_model_class", model_class) |
| 47 | + object.__setattr__(self, "_pk", pk) |
| 48 | + object.__setattr__(self, "_fetch_callback", fetch_callback) |
| 49 | + object.__setattr__(self, "_resolved", None) |
| 50 | + |
| 51 | + async def _resolve(self) -> Any: |
| 52 | + """Resolve the proxy by fetching the actual model. |
| 53 | +
|
| 54 | + Returns: |
| 55 | + The resolved ORM model instance |
| 56 | +
|
| 57 | + Raises: |
| 58 | + RuntimeError: If ORM is still not initialized |
| 59 | + """ |
| 60 | + resolved = object.__getattribute__(self, "_resolved") |
| 61 | + if resolved is not None: |
| 62 | + return resolved |
| 63 | + |
| 64 | + # Check if we need to initialize Tortoise ORM first |
| 65 | + try: |
| 66 | + from asynctasq.config import Config |
| 67 | + |
| 68 | + config = Config.get() |
| 69 | + if config.tortoise_orm is not None: |
| 70 | + # Auto-initialize Tortoise if config is available |
| 71 | + try: |
| 72 | + from tortoise import Tortoise |
| 73 | + |
| 74 | + if not Tortoise._inited: |
| 75 | + await Tortoise.init(**config.tortoise_orm) |
| 76 | + # Generate schemas to ensure tables exist |
| 77 | + await Tortoise.generate_schemas(safe=True) |
| 78 | + except ImportError: |
| 79 | + pass # Tortoise not installed, fetch_callback will handle error |
| 80 | + except Exception as e: |
| 81 | + # Log the error but don't crash - let fetch_callback handle it |
| 82 | + import logging |
| 83 | + |
| 84 | + logger = logging.getLogger(__name__) |
| 85 | + logger.warning( |
| 86 | + f"Failed to auto-initialize Tortoise ORM: {e}\n" |
| 87 | + f"Make sure the modules specified in tortoise_config are importable.\n" |
| 88 | + f"If running a script directly, you may need to set PYTHONPATH or use proper module paths." |
| 89 | + ) |
| 90 | + except Exception as e: |
| 91 | + # Log config access errors |
| 92 | + import logging |
| 93 | + |
| 94 | + logger = logging.getLogger(__name__) |
| 95 | + logger.warning(f"Failed to access Tortoise config: {e}") |
| 96 | + |
| 97 | + fetch_callback = object.__getattribute__(self, "_fetch_callback") |
| 98 | + model_class = object.__getattribute__(self, "_model_class") |
| 99 | + pk = object.__getattribute__(self, "_pk") |
| 100 | + |
| 101 | + resolved = await fetch_callback(model_class, pk) |
| 102 | + object.__setattr__(self, "_resolved", resolved) |
| 103 | + return resolved |
| 104 | + |
| 105 | + def __getattribute__(self, name: str) -> Any: |
| 106 | + """Intercept attribute access to trigger lazy loading. |
| 107 | +
|
| 108 | + This is synchronous and will raise an error if the model hasn't |
| 109 | + been resolved yet. The proxy will auto-resolve when awaited. |
| 110 | + """ |
| 111 | + # Allow access to special methods and private attributes |
| 112 | + if name.startswith("_") or name in ( |
| 113 | + "__class__", |
| 114 | + "__dict__", |
| 115 | + "__slots__", |
| 116 | + "await_resolve", |
| 117 | + ): |
| 118 | + return object.__getattribute__(self, name) |
| 119 | + |
| 120 | + # Check if already resolved |
| 121 | + resolved = object.__getattribute__(self, "_resolved") |
| 122 | + if resolved is not None: |
| 123 | + return getattr(resolved, name) |
| 124 | + |
| 125 | + # Not resolved - provide helpful error message |
| 126 | + model_class = object.__getattribute__(self, "_model_class") |
| 127 | + raise RuntimeError( |
| 128 | + f"LazyOrmProxy for {model_class.__name__} has not been resolved yet.\n\n" |
| 129 | + f"The ORM model is lazy-loaded because the ORM was not initialized during deserialization.\n" |
| 130 | + f"To fix this, pass your Tortoise configuration to asynctasq.init():\n\n" |
| 131 | + f" from asynctasq import init\n\n" |
| 132 | + f" init(\n" |
| 133 | + f" {{'driver': 'redis', 'redis': RedisConfig(url='redis://localhost')}},\n" |
| 134 | + f" tortoise_config={{\n" |
| 135 | + f" 'db_url': 'postgres://user:pass@localhost/db',\n" |
| 136 | + f" 'modules': {{'models': ['myapp.models']}}\n" |
| 137 | + f" }}\n" |
| 138 | + f" )\n\n" |
| 139 | + f"This will auto-initialize Tortoise when the worker deserializes tasks.\n" |
| 140 | + ) |
| 141 | + |
| 142 | + def __setattr__(self, name: str, value: Any) -> None: |
| 143 | + """Set attribute on resolved model.""" |
| 144 | + resolved = object.__getattribute__(self, "_resolved") |
| 145 | + if resolved is not None: |
| 146 | + setattr(resolved, name, value) |
| 147 | + else: |
| 148 | + model_class = object.__getattribute__(self, "_model_class") |
| 149 | + raise RuntimeError( |
| 150 | + f"Cannot set attribute on unresolved LazyOrmProxy for {model_class.__name__}" |
| 151 | + ) |
| 152 | + |
| 153 | + async def await_resolve(self) -> Any: |
| 154 | + """Public method to explicitly resolve the proxy. |
| 155 | +
|
| 156 | + Returns: |
| 157 | + The resolved ORM model instance |
| 158 | + """ |
| 159 | + return await self._resolve() |
| 160 | + |
| 161 | + def __await__(self): |
| 162 | + """Make the proxy awaitable. |
| 163 | +
|
| 164 | + This allows users to resolve proxies by awaiting them: |
| 165 | + product = await product # Resolves the proxy |
| 166 | +
|
| 167 | + Returns: |
| 168 | + Generator that resolves to the ORM model instance |
| 169 | + """ |
| 170 | + return self._resolve().__await__() |
| 171 | + |
| 172 | + def __repr__(self) -> str: |
| 173 | + """Return string representation.""" |
| 174 | + model_class = object.__getattribute__(self, "_model_class") |
| 175 | + pk = object.__getattribute__(self, "_pk") |
| 176 | + resolved = object.__getattribute__(self, "_resolved") |
| 177 | + status = "resolved" if resolved is not None else "unresolved" |
| 178 | + return f"<LazyOrmProxy({model_class.__name__}, pk={pk}, {status})>" |
| 179 | + |
| 180 | + |
| 181 | +def is_lazy_proxy(obj: Any) -> bool: |
| 182 | + """Check if an object is a LazyOrmProxy. |
| 183 | +
|
| 184 | + Args: |
| 185 | + obj: Object to check |
| 186 | +
|
| 187 | + Returns: |
| 188 | + True if obj is a LazyOrmProxy |
| 189 | + """ |
| 190 | + return isinstance(obj, LazyOrmProxy) |
| 191 | + |
| 192 | + |
| 193 | +async def resolve_lazy_proxies(obj: Any) -> Any: |
| 194 | + """Recursively resolve all LazyOrmProxy instances in a data structure. |
| 195 | +
|
| 196 | + Resolves proxies in parallel when possible for better performance. |
| 197 | +
|
| 198 | + Args: |
| 199 | + obj: Object that may contain lazy proxies (dict, list, tuple, or single value) |
| 200 | +
|
| 201 | + Returns: |
| 202 | + The same structure with all lazy proxies resolved |
| 203 | + """ |
| 204 | + if is_lazy_proxy(obj): |
| 205 | + return await obj.await_resolve() |
| 206 | + |
| 207 | + if isinstance(obj, dict): |
| 208 | + # Resolve all values in parallel |
| 209 | + keys = list(obj.keys()) |
| 210 | + values = await asyncio.gather(*[resolve_lazy_proxies(v) for v in obj.values()]) |
| 211 | + return dict(zip(keys, values, strict=False)) |
| 212 | + |
| 213 | + if isinstance(obj, (list, tuple)): |
| 214 | + # Resolve all items in parallel |
| 215 | + resolved_items = await asyncio.gather(*[resolve_lazy_proxies(item) for item in obj]) |
| 216 | + return type(obj)(resolved_items) |
| 217 | + |
| 218 | + return obj |
0 commit comments