|
15 | 15 |
|
16 | 16 | import logging |
17 | 17 | from inspect import isawaitable |
18 | | -from typing import TYPE_CHECKING, Optional, Type, cast |
| 18 | +from typing import TYPE_CHECKING, Generic, Optional, Type, TypeVar, cast |
19 | 19 |
|
| 20 | +import attr |
20 | 21 | import txredisapi |
21 | 22 |
|
22 | 23 | from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable |
|
42 | 43 |
|
43 | 44 | logger = logging.getLogger(__name__) |
44 | 45 |
|
| 46 | +T = TypeVar("T") |
| 47 | +V = TypeVar("V") |
| 48 | + |
| 49 | + |
| 50 | +@attr.s |
| 51 | +class ConstantProperty(Generic[T, V]): |
| 52 | + """A descriptor that returns the given constant, ignoring attempts to set |
| 53 | + it. |
| 54 | + """ |
| 55 | + |
| 56 | + constant = attr.ib() # type: V |
| 57 | + |
| 58 | + def __get__(self, obj: Optional[T], objtype: Type[T] = None) -> V: |
| 59 | + return self.constant |
| 60 | + |
| 61 | + def __set__(self, obj: Optional[T], value: V): |
| 62 | + pass |
| 63 | + |
45 | 64 |
|
46 | 65 | class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection): |
47 | 66 | """Connection to redis subscribed to replication stream. |
@@ -195,6 +214,10 @@ class SynapseRedisFactory(txredisapi.RedisFactory): |
195 | 214 | we detect dead connections. |
196 | 215 | """ |
197 | 216 |
|
| 217 | + # We want to *always* retry connecting, txredisapi will stop if there is a |
| 218 | + # failure during certain operations, e.g. during AUTH. |
| 219 | + continueTrying = cast(bool, ConstantProperty(True)) |
| 220 | + |
198 | 221 | def __init__( |
199 | 222 | self, |
200 | 223 | hs: "HomeServer", |
@@ -243,7 +266,6 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory): |
243 | 266 | """ |
244 | 267 |
|
245 | 268 | maxDelay = 5 |
246 | | - continueTrying = True |
247 | 269 | protocol = RedisSubscriber |
248 | 270 |
|
249 | 271 | def __init__( |
|
0 commit comments