-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathredis_helper.py
More file actions
153 lines (133 loc) · 5.71 KB
/
redis_helper.py
File metadata and controls
153 lines (133 loc) · 5.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import json
import logging
import os
from datetime import datetime
from typing import Dict, Any, Optional
# Try to import redis, but provide fallback for local testing
try:
import redis.asyncio as redis
REDIS_AVAILABLE = True
except ImportError:
REDIS_AVAILABLE = False
logging.warning("Redis package not available, using in-memory mock instead")
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("redis_helper")
# Get Redis connection details from environment variables with defaults
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
REDIS_DB = int(os.getenv("REDIS_DB", 0))
class InMemoryRedis:
"""Mock Redis implementation for local testing"""
def __init__(self):
self.data = {}
async def hset(self, hash_name, key, value):
"""Emulates Redis HSET"""
if hash_name not in self.data:
self.data[hash_name] = {}
self.data[hash_name][key] = value
return 1
async def hget(self, hash_name, key):
"""Emulates Redis HGET"""
if hash_name not in self.data or key not in self.data[hash_name]:
return None
return self.data[hash_name][key]
async def hgetall(self, hash_name):
"""Emulates Redis HGETALL"""
if hash_name not in self.data:
return {}
return self.data[hash_name]
class RedisManager:
"""Helper class for Redis operations related to worker management"""
def __init__(self):
self.redis_pool = None
self.use_mock = not REDIS_AVAILABLE
async def connect(self) -> bool:
"""Connect to Redis or initialize mock"""
if self.use_mock:
self.redis_pool = InMemoryRedis()
logger.info("Using in-memory mock for Redis")
return True
try:
self.redis_pool = await redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
db=REDIS_DB,
encoding="utf-8",
decode_responses=True
)
logger.info(f"Connected to Redis at {REDIS_HOST}:{REDIS_PORT}")
return True
except Exception as e:
logger.error(f"Failed to connect to Redis: {e}")
# Fall back to mock if Redis connection fails
self.redis_pool = InMemoryRedis()
self.use_mock = True
logger.info("Falling back to in-memory mock for Redis")
return True
async def register_worker(self, worker_id: str, worker_data: Dict[str, Any]) -> bool:
"""Register a worker in Redis"""
try:
# Ensure last_heartbeat is set
if "last_heartbeat" not in worker_data:
worker_data["last_heartbeat"] = str(datetime.now())
await self.redis_pool.hset("workers", worker_id, json.dumps(worker_data))
logger.info(f"Registered worker: {worker_id}")
return True
except Exception as e:
logger.error(f"Error registering worker: {e}")
return False
async def update_worker_heartbeat(self, worker_id: str, status: str = "alive") -> bool:
"""Update worker heartbeat timestamp and status"""
try:
worker_data = await self.redis_pool.hget("workers", worker_id)
if worker_data:
worker_info = json.loads(worker_data)
worker_info["status"] = status
worker_info["last_heartbeat"] = str(datetime.now())
await self.redis_pool.hset("workers", worker_id, json.dumps(worker_info))
logger.debug(f"Updated heartbeat for worker {worker_id}")
return True
else:
logger.warning(f"Worker {worker_id} not found in Redis")
return False
except Exception as e:
logger.error(f"Error updating worker heartbeat: {e}")
return False
async def update_worker_status(self, worker_id: str, status: str) -> bool:
"""Update worker status"""
try:
worker_data = await self.redis_pool.hget("workers", worker_id)
if worker_data:
worker_info = json.loads(worker_data)
worker_info["status"] = status
await self.redis_pool.hset("workers", worker_id, json.dumps(worker_info))
logger.info(f"Updated status of worker {worker_id} to {status}")
return True
else:
logger.warning(f"Worker {worker_id} not found in Redis when updating status")
return False
except Exception as e:
logger.error(f"Error updating worker status: {e}")
return False
async def get_worker(self, worker_id: str) -> Optional[Dict[str, Any]]:
"""Get worker data from Redis"""
try:
worker_data = await self.redis_pool.hget("workers", worker_id)
if worker_data:
return json.loads(worker_data)
return None
except Exception as e:
logger.error(f"Error getting worker: {e}")
return None
async def get_all_workers(self) -> Dict[str, Dict[str, Any]]:
"""Get all workers from Redis"""
try:
workers = await self.redis_pool.hgetall("workers")
result = {}
for worker_id, worker_data in workers.items():
result[worker_id] = json.loads(worker_data)
return result
except Exception as e:
logger.error(f"Error getting all workers: {e}")
return {}