-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcached_data.py
More file actions
330 lines (264 loc) · 10.4 KB
/
cached_data.py
File metadata and controls
330 lines (264 loc) · 10.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
"""Caching utilities for the Vacasa integration."""
import asyncio
import json
import logging
import os
import random
import time
from collections.abc import Callable
from typing import Any, TypeVar
from .const import DEFAULT_CACHE_TTL, PROPERTY_CACHE_FILE
_LOGGER = logging.getLogger(__name__)
T = TypeVar("T")
class CachedData:
"""Manages cached data with TTL (Time To Live) support."""
def __init__(
self,
cache_file_path: str | None = None,
default_ttl: int = DEFAULT_CACHE_TTL,
hass=None,
) -> None:
"""Initialize the cached data manager.
Args:
cache_file_path: Optional path to cache file
default_ttl: Default TTL in seconds
hass: Optional Home Assistant instance for async file operations
"""
self._cache_file = cache_file_path or PROPERTY_CACHE_FILE
self._default_ttl = default_ttl
self._hass = hass
self._cache: dict[str, dict[str, Any]] = {}
self._lock = asyncio.Lock()
_LOGGER.debug("Initialized CachedData with TTL %s seconds", default_ttl)
def _is_expired(self, cache_entry: dict[str, Any]) -> bool:
"""Check if a cache entry is expired.
Args:
cache_entry: Cache entry with 'timestamp' and 'ttl' fields
Returns:
True if expired, False otherwise
"""
current_time = time.time()
entry_time = cache_entry.get("timestamp", 0)
ttl = cache_entry.get("ttl", self._default_ttl)
return (current_time - entry_time) > ttl
async def get(self, key: str, default: T | None = None, ttl: int | None = None) -> T | None:
"""Get a value from cache.
Args:
key: Cache key
default: Default value if not found or expired
ttl: Optional TTL override
Returns:
Cached value or default
"""
async with self._lock:
if key not in self._cache:
_LOGGER.debug("Cache miss for key: %s", key)
return default
cache_entry = self._cache[key]
if self._is_expired(cache_entry):
_LOGGER.debug("Cache expired for key: %s", key)
del self._cache[key]
return default
_LOGGER.debug("Cache hit for key: %s", key)
return cache_entry.get("data", default)
async def set(self, key: str, value: Any, ttl: int | None = None) -> None:
"""Set a value in cache.
Args:
key: Cache key
value: Value to cache
ttl: Optional TTL override (in seconds)
"""
async with self._lock:
cache_entry = {
"data": value,
"timestamp": time.time(),
"ttl": ttl or self._default_ttl,
}
self._cache[key] = cache_entry
_LOGGER.debug("Cached value for key: %s (TTL: %s)", key, cache_entry["ttl"])
# Save to disk asynchronously
await self._save_to_disk()
async def delete(self, key: str) -> bool:
"""Delete a key from cache.
Args:
key: Cache key
Returns:
True if key existed, False otherwise
"""
async with self._lock:
existed = key in self._cache
if existed:
del self._cache[key]
_LOGGER.debug("Deleted cache key: %s", key)
if existed:
await self._save_to_disk()
return existed
async def clear(self) -> None:
"""Clear all cached data."""
async with self._lock:
had_entries = bool(self._cache)
self._cache.clear()
_LOGGER.debug("Cleared all cache data")
if had_entries or os.path.exists(self._cache_file):
# Remove cache file
await self._clear_disk_cache()
async def cleanup_expired(self) -> int:
"""Remove expired entries from cache.
Returns:
Number of entries removed
"""
async with self._lock:
expired_keys = [key for key, entry in self._cache.items() if self._is_expired(entry)]
for key in expired_keys:
del self._cache[key]
if expired_keys:
_LOGGER.debug("Cleaned up %s expired cache entries", len(expired_keys))
await self._save_to_disk()
return len(expired_keys)
async def _run_io_task(self, func: Callable[..., T], *args, **kwargs) -> T:
"""Execute a blocking IO task safely when hass is available."""
if self._hass:
return await self._hass.async_add_executor_job(func, *args, **kwargs)
return func(*args, **kwargs)
def _save_to_disk_sync(self) -> None:
"""Save cache to disk (synchronous helper)."""
try:
with open(self._cache_file, "w") as f:
json.dump(self._cache, f, indent=2)
# Set file permissions to be readable only by the owner
os.chmod(self._cache_file, 0o600)
_LOGGER.debug("Cache saved to disk: %s", self._cache_file)
except Exception as e:
_LOGGER.warning("Failed to save cache to disk: %s", e)
async def _save_to_disk(self) -> None:
"""Save cache to disk."""
try:
await self._run_io_task(self._save_to_disk_sync)
except Exception as e:
_LOGGER.warning("Failed to save cache to disk: %s", e)
def _load_from_disk_sync(self) -> bool:
"""Load cache from disk (synchronous helper).
Returns:
True if loaded successfully, False otherwise
"""
if not os.path.exists(self._cache_file):
_LOGGER.debug("Cache file does not exist: %s", self._cache_file)
return False
try:
with open(self._cache_file, "r") as f:
cache_data = json.load(f)
if not isinstance(cache_data, dict):
_LOGGER.warning("Invalid cache file format")
return False
self._cache = cache_data
_LOGGER.debug("Cache loaded from disk: %s entries", len(self._cache))
return True
except json.JSONDecodeError:
_LOGGER.warning("Failed to parse cache file (invalid JSON)")
return False
except Exception as e:
_LOGGER.warning("Failed to load cache from disk: %s", e)
return False
async def load_from_disk(self) -> bool:
"""Load cache from disk.
Returns:
True if loaded successfully, False otherwise
"""
try:
return await self._run_io_task(self._load_from_disk_sync)
except Exception as e:
_LOGGER.warning("Failed to load cache from disk: %s", e)
return False
async def _clear_disk_cache(self) -> None:
"""Clear the disk cache file."""
if os.path.exists(self._cache_file):
try:
await self._run_io_task(os.remove, self._cache_file)
_LOGGER.debug("Cache file removed: %s", self._cache_file)
except Exception as e:
_LOGGER.warning("Failed to remove cache file: %s", e)
def get_stats(self) -> dict[str, Any]:
"""Get cache statistics.
Returns:
Dictionary with cache statistics
"""
total_entries = len(self._cache)
expired_entries = sum(1 for entry in self._cache.values() if self._is_expired(entry))
return {
"total_entries": total_entries,
"expired_entries": expired_entries,
"valid_entries": total_entries - expired_entries,
"cache_file": self._cache_file,
"default_ttl": self._default_ttl,
}
class RetryWithBackoff:
"""Implements exponential backoff with jitter for retrying operations."""
def __init__(
self,
max_retries: int = 3,
base_delay: float = 1.0,
backoff_multiplier: float = 2.0,
max_jitter: float = 1.0,
):
"""Initialize retry handler.
Args:
max_retries: Maximum number of retries
base_delay: Base delay in seconds
backoff_multiplier: Multiplier for exponential backoff
max_jitter: Maximum jitter to add in seconds
"""
self.max_retries = max_retries
self.base_delay = base_delay
self.backoff_multiplier = backoff_multiplier
self.max_jitter = max_jitter
def calculate_delay(self, attempt: int) -> float:
"""Calculate delay for a given attempt with exponential backoff and jitter.
Args:
attempt: Current attempt number (0-based)
Returns:
Delay in seconds
"""
# Exponential backoff
delay = self.base_delay * (self.backoff_multiplier**attempt)
# Add jitter to prevent thundering herd
jitter = random.uniform(0, self.max_jitter)
return delay + jitter
async def retry(self, func, *args, **kwargs):
"""Retry a function with exponential backoff and jitter.
Args:
func: Async function to retry
*args: Arguments to pass to function
**kwargs: Keyword arguments to pass to function
Returns:
Result of the function
Raises:
The last exception encountered if all retries fail
"""
last_exception = None
for attempt in range(self.max_retries + 1):
try:
return await func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < self.max_retries:
delay = self.calculate_delay(attempt)
_LOGGER.debug(
"Retry attempt %s/%s failed: %s. Retrying in %.2fs",
attempt + 1,
self.max_retries,
e,
delay,
)
await asyncio.sleep(delay)
else:
_LOGGER.error(
"All retry attempts failed after %s tries: %s",
self.max_retries + 1,
e,
)
# Re-raise the last exception if all retries failed
if last_exception is not None:
raise last_exception
else:
# This should not happen in normal operation, but provides safety
raise RuntimeError("All retry attempts failed, but no exception was captured")