|
| 1 | +import typing as tp |
| 2 | +from datetime import timedelta |
| 3 | + |
| 4 | +import httpx |
| 5 | +from aiorwlock import RWLock as AsyncRWLock |
| 6 | +from fasteners import ReaderWriterLock as RWLock |
| 7 | +from redis import Redis |
| 8 | +from redis.asyncio import Redis as AsyncRedis |
| 9 | + |
| 10 | +from httpx_cache.cache.base import BaseCache |
| 11 | +from httpx_cache.serializer.base import BaseSerializer |
| 12 | +from httpx_cache.serializer.common import MsgPackSerializer |
| 13 | +from httpx_cache.utils import get_cache_key |
| 14 | + |
| 15 | +__all__ = ["RedisCache"] |
| 16 | + |
| 17 | + |
| 18 | +class RedisCache(BaseCache): |
| 19 | + """Redis cache that stores cached responses in Redis. |
| 20 | +
|
| 21 | + Uses a lock/async_lock to make sure each get/set/delete operation is safe. |
| 22 | +
|
| 23 | + You can either provide an instance of 'Redis'/'AsyncRedis' or a redis url to |
| 24 | + have RedisCache create the connection for you. |
| 25 | +
|
| 26 | + Args: |
| 27 | + serializer: Optional serializer for the data to cache, defaults to: |
| 28 | + httpx_cache.MsgPackSerializer |
| 29 | + namespace: Optional namespace for the cache keys, defaults to "httpx_cache" |
| 30 | + redis_url: Optional redis url, defaults to empty string |
| 31 | + redis: Optional redis instance, defaults to None |
| 32 | + aredis: Optional async redis instance, defaults to None |
| 33 | + default_ttl: Optional default ttl for cached responses, defaults to None |
| 34 | + """ |
| 35 | + |
| 36 | + lock = RWLock() |
| 37 | + |
| 38 | + def __init__( |
| 39 | + self, |
| 40 | + serializer: tp.Optional[BaseSerializer] = None, |
| 41 | + namespace: str = "httpx_cache", |
| 42 | + redis_url: str = "", |
| 43 | + redis: tp.Optional["Redis[bytes]"] = None, |
| 44 | + aredis: tp.Optional["AsyncRedis[bytes]"] = None, |
| 45 | + default_ttl: tp.Optional[timedelta] = None, |
| 46 | + ) -> None: |
| 47 | + self.namespace = namespace |
| 48 | + # redis connection is lazy loaded |
| 49 | + self.redis = redis or Redis.from_url(redis_url) |
| 50 | + self.aredis = aredis or AsyncRedis.from_url(redis_url) |
| 51 | + self.serializer = serializer or MsgPackSerializer() |
| 52 | + self.default_ttl = default_ttl |
| 53 | + if not isinstance(self.serializer, BaseSerializer): |
| 54 | + raise TypeError( |
| 55 | + "Expected serializer of type 'httpx_cache.BaseSerializer', " |
| 56 | + f"got {type(self.serializer)}" |
| 57 | + ) |
| 58 | + |
| 59 | + self._async_lock: tp.Optional[AsyncRWLock] = None |
| 60 | + |
| 61 | + @property |
| 62 | + def async_lock(self) -> AsyncRWLock: |
| 63 | + if self._async_lock is None: |
| 64 | + self._async_lock = AsyncRWLock() |
| 65 | + return self._async_lock |
| 66 | + |
| 67 | + def _get_namespaced_cache_key(self, request: httpx.Request) -> str: |
| 68 | + key = get_cache_key(request) |
| 69 | + if self.namespace: |
| 70 | + key = f"{self.namespace}:{key}" |
| 71 | + return key |
| 72 | + |
| 73 | + def get(self, request: httpx.Request) -> tp.Optional[httpx.Response]: |
| 74 | + key = self._get_namespaced_cache_key(request) |
| 75 | + with self.lock.read_lock(): |
| 76 | + cached = self.redis.get(key) |
| 77 | + if cached is not None: |
| 78 | + return self.serializer.loads(cached=cached, request=request) |
| 79 | + return None |
| 80 | + |
| 81 | + async def aget(self, request: httpx.Request) -> tp.Optional[httpx.Response]: |
| 82 | + key = self._get_namespaced_cache_key(request) |
| 83 | + async with self.async_lock.reader: |
| 84 | + cached_data = await self.aredis.get(key) |
| 85 | + if cached_data is not None: |
| 86 | + return self.serializer.loads(cached=cached_data, request=request) |
| 87 | + return None |
| 88 | + |
| 89 | + def set( |
| 90 | + self, |
| 91 | + *, |
| 92 | + request: httpx.Request, |
| 93 | + response: httpx.Response, |
| 94 | + content: tp.Optional[bytes] = None, |
| 95 | + ) -> None: |
| 96 | + key = self._get_namespaced_cache_key(request) |
| 97 | + to_cache = self.serializer.dumps(response=response, content=content) |
| 98 | + with self.lock.write_lock(): |
| 99 | + if self.default_ttl: |
| 100 | + self.redis.setex(key, self.default_ttl, to_cache) |
| 101 | + else: |
| 102 | + self.redis.set(key, to_cache) |
| 103 | + |
| 104 | + async def aset( |
| 105 | + self, |
| 106 | + *, |
| 107 | + request: httpx.Request, |
| 108 | + response: httpx.Response, |
| 109 | + content: tp.Optional[bytes] = None, |
| 110 | + ) -> None: |
| 111 | + to_cache = self.serializer.dumps(response=response, content=content) |
| 112 | + key = self._get_namespaced_cache_key(request) |
| 113 | + async with self.async_lock.writer: |
| 114 | + if self.default_ttl: |
| 115 | + await self.aredis.setex(key, self.default_ttl, to_cache) |
| 116 | + else: |
| 117 | + await self.aredis.set(key, to_cache) |
| 118 | + |
| 119 | + def delete(self, request: httpx.Request) -> None: |
| 120 | + key = self._get_namespaced_cache_key(request) |
| 121 | + with self.lock.write_lock(): |
| 122 | + self.redis.delete(key) |
| 123 | + |
| 124 | + async def adelete(self, request: httpx.Request) -> None: |
| 125 | + key = self._get_namespaced_cache_key(request) |
| 126 | + async with self.async_lock.writer: |
| 127 | + await self.aredis.delete(key) |
| 128 | + |
| 129 | + def close(self) -> None: |
| 130 | + self.redis.close() |
| 131 | + |
| 132 | + async def aclose(self) -> None: |
| 133 | + await self.aredis.close() |
0 commit comments