|
1 | 1 | import json |
| 2 | +import logging |
2 | 3 | import pathlib |
| 4 | +import random |
3 | 5 | from functools import lru_cache |
4 | | -from typing import NotRequired, TypedDict |
| 6 | +from itertools import cycle |
| 7 | +from typing import Iterator, Literal, NotRequired, TypedDict, TypeVar |
5 | 8 |
|
6 | 9 | from django.conf import settings |
7 | 10 | from django.contrib.auth import get_user_model |
8 | 11 | from django.contrib.auth.models import AbstractBaseUser |
9 | | -from django.db.models import Manager |
| 12 | +from django.db import connections |
| 13 | +from django.db.models import Manager, Model |
| 14 | +from django.db.utils import OperationalError |
| 15 | + |
| 16 | +from common.core import ReplicaReadStrategy |
| 17 | + |
| 18 | +logger = logging.getLogger(__name__) |
10 | 19 |
|
11 | 20 | UNKNOWN = "unknown" |
12 | 21 | VERSIONS_INFO_FILE_LOCATION = ".versions.json" |
13 | 22 |
|
| 23 | +ManagerType = TypeVar("ManagerType", bound=Manager[Model]) |
| 24 | + |
| 25 | +ReplicaNamePrefix = Literal["replica_", "cross_region_replica_"] |
| 26 | +_replica_sequential_names_by_prefix: dict[ReplicaNamePrefix, Iterator[str]] = {} |
| 27 | + |
14 | 28 |
|
15 | 29 | class SelfHostedData(TypedDict): |
16 | 30 | has_users: bool |
@@ -114,3 +128,53 @@ def get_file_contents(file_path: str) -> str | None: |
114 | 128 | return f.read().replace("\n", "") |
115 | 129 | except FileNotFoundError: |
116 | 130 | return None |
| 131 | + |
| 132 | + |
| 133 | +def using_database_replica( |
| 134 | + manager: ManagerType, |
| 135 | + replica_prefix: ReplicaNamePrefix = "replica_", |
| 136 | +) -> ManagerType: |
| 137 | + """Attempts to bind a manager to a healthy database replica""" |
| 138 | + local_replicas = [name for name in connections if name.startswith(replica_prefix)] |
| 139 | + |
| 140 | + if not local_replicas: |
| 141 | + logger.info("No replicas set up.") |
| 142 | + return manager |
| 143 | + |
| 144 | + chosen_replica = None |
| 145 | + |
| 146 | + if settings.REPLICA_READ_STRATEGY == ReplicaReadStrategy.SEQUENTIAL: |
| 147 | + sequence = _replica_sequential_names_by_prefix.setdefault( |
| 148 | + replica_prefix, cycle(local_replicas) |
| 149 | + ) |
| 150 | + for _ in range(len(local_replicas)): |
| 151 | + attempted_replica = next(sequence) |
| 152 | + try: |
| 153 | + connections[attempted_replica].ensure_connection() |
| 154 | + chosen_replica = attempted_replica |
| 155 | + break |
| 156 | + except OperationalError: |
| 157 | + logger.exception(f"Replica '{attempted_replica}' is not available.") |
| 158 | + continue |
| 159 | + |
| 160 | + if settings.REPLICA_READ_STRATEGY == ReplicaReadStrategy.DISTRIBUTED: |
| 161 | + for _ in range(len(local_replicas)): |
| 162 | + attempted_replica = random.choice(local_replicas) |
| 163 | + try: |
| 164 | + connections[attempted_replica].ensure_connection() |
| 165 | + chosen_replica = attempted_replica |
| 166 | + break |
| 167 | + except OperationalError: |
| 168 | + logger.exception(f"Replica '{attempted_replica}' is not available.") |
| 169 | + local_replicas.remove(attempted_replica) |
| 170 | + continue |
| 171 | + |
| 172 | + if not chosen_replica: |
| 173 | + if replica_prefix == "replica_": |
| 174 | + logger.warning("Falling back to cross-region replicas, if any.") |
| 175 | + return using_database_replica(manager, "cross_region_replica_") |
| 176 | + |
| 177 | + logger.warning("No replicas available.") |
| 178 | + return manager |
| 179 | + |
| 180 | + return manager.db_manager(chosen_replica) |
0 commit comments